X-Git-Url: https://jfr.im/git/z_archive/twitter.git/blobdiff_plain/8ac8f6bb94866160116f5419ba8899ff787e877c..5a412b39408a2154c0f3537eb0b7f681d5744af5:/twitter/api.py diff --git a/twitter/api.py b/twitter/api.py index 6cfc650..bed6275 100644 --- a/twitter/api.py +++ b/twitter/api.py @@ -1,3 +1,6 @@ +# encoding: utf-8 +from __future__ import unicode_literals + try: import urllib.request as urllib_request import urllib.error as urllib_error @@ -5,17 +8,32 @@ except ImportError: import urllib2 as urllib_request import urllib2 as urllib_error -from twitter.twitter_globals import POST_ACTIONS -from twitter.auth import NoAuth +try: + from cStringIO import StringIO +except ImportError: + from io import BytesIO as StringIO + +from .twitter_globals import POST_ACTIONS +from .auth import NoAuth + +import re +import gzip + +try: + import http.client as http_client +except ImportError: + import httplib as http_client try: import json except ImportError: import simplejson as json + class _DEFAULT(object): pass + class TwitterError(Exception): """ Base Exception thrown by the Twitter object when there is a @@ -23,6 +41,7 @@ class TwitterError(Exception): """ pass + class TwitterHTTPError(TwitterError): """ Exception thrown by the Twitter object when there is an @@ -33,13 +52,28 @@ class TwitterHTTPError(TwitterError): self.uri = uri self.format = format self.uriparts = uriparts + try: + data = self.e.fp.read() + except http_client.IncompleteRead as e: + # can't read the error text + # let's try some of it + data = e.partial + if self.e.headers.get('Content-Encoding') == 'gzip': + buf = StringIO(data) + f = gzip.GzipFile(fileobj=buf) + self.response_data = f.read() + else: + self.response_data = data + super(TwitterHTTPError, self).__init__(str(self)) def __str__(self): + fmt = ("." + self.format) if self.format else "" return ( - "Twitter sent status %i for URL: %s.%s using parameters: " - "(%s)\ndetails: %s" %( - self.e.code, self.uri, self.format, self.uriparts, - self.e.fp.read())) + "Twitter sent status %i for URL: %s%s using parameters: " + "(%s)\ndetails: %s" % ( + self.e.code, self.uri, fmt, self.uriparts, + self.response_data)) + class TwitterResponse(object): """ @@ -49,49 +83,57 @@ class TwitterResponse(object): `headers` gives you access to the response headers as an httplib.HTTPHeaders instance. You can do - `response.headers.getheader('h')` to retrieve a header. + `response.headers.get('h')` to retrieve a header. """ - def __init__(self, headers): - self.headers = headers @property def rate_limit_remaining(self): """ Remaining requests in the current rate-limit. """ - return int(self.headers.getheader('X-RateLimit-Remaining')) + return int(self.headers.get('X-Rate-Limit-Remaining', "0")) + + @property + def rate_limit_limit(self): + """ + The rate limit ceiling for that given request. + """ + return int(self.headers.get('X-Rate-Limit-Limit', "0")) @property def rate_limit_reset(self): """ Time in UTC epoch seconds when the rate limit will reset. """ - return int(self.headers.getheader('X-RateLimit-Reset')) + return int(self.headers.get('X-Rate-Limit-Reset', "0")) -def wrap_response(response, headers): - response_typ = type(response) - if response_typ is bool: - # HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool. - response_typ = int +class TwitterDictResponse(dict, TwitterResponse): + pass - class WrappedTwitterResponse(response_typ, TwitterResponse): - __doc__ = TwitterResponse.__doc__ - def __init__(self, response): - if response_typ is not int: - response_typ.__init__(self, response) - TwitterResponse.__init__(self, headers) +class TwitterListResponse(list, TwitterResponse): + pass - return WrappedTwitterResponse(response) +def wrap_response(response, headers): + response_typ = type(response) + if response_typ is dict: + res = TwitterDictResponse(response) + res.headers = headers + elif response_typ is list: + res = TwitterListResponse(response) + res.headers = headers + else: + res = response + return res class TwitterCall(object): def __init__( - self, auth, format, domain, callable_cls, uri="", - uriparts=None, secure=True): + self, auth, format, domain, callable_cls, uri="", + uriparts=None, secure=True, timeout=None, gzip=False): self.auth = auth self.format = format self.domain = domain @@ -99,6 +141,8 @@ class TwitterCall(object): self.uri = uri self.uriparts = uriparts self.secure = secure + self.timeout = timeout + self.gzip = gzip def __getattr__(self, k): try: @@ -107,9 +151,9 @@ class TwitterCall(object): def extend_call(arg): return self.callable_cls( auth=self.auth, format=self.format, domain=self.domain, - callable_cls=self.callable_cls, uriparts=self.uriparts \ - + (arg,), - secure=self.secure) + callable_cls=self.callable_cls, timeout=self.timeout, + secure=self.secure, gzip=self.gzip, + uriparts=self.uriparts + (arg,)) if k == "_": return extend_call else: @@ -124,17 +168,28 @@ class TwitterCall(object): uriparts.append(str(kwargs.pop(uripart, uripart))) uri = '/'.join(uriparts) - method = "GET" - for action in POST_ACTIONS: - if uri.endswith(action): - method = "POST" - break + method = kwargs.pop('_method', None) + if not method: + method = "GET" + for action in POST_ACTIONS: + if re.search("%s(/\d+)?$" % action, uri): + method = "POST" + break # If an id kwarg is present and there is no id to fill in in # the list of uriparts, assume the id goes at the end. id = kwargs.pop('id', None) if id: - uri += "/%s" %(id) + uri += "/%s" % (id) + + # If an _id kwarg is present, this is treated as id as a CGI + # param. + _id = kwargs.pop('_id', None) + if _id: + kwargs['id'] = _id + + # If an _timeout is specified in kwargs, use it + _timeout = kwargs.pop('_timeout', None) secure_str = '' if self.secure: @@ -142,37 +197,93 @@ class TwitterCall(object): dot = "" if self.format: dot = "." - uriBase = "http%s://%s/%s%s%s" %( - secure_str, self.domain, uri, dot, self.format) + uriBase = "http%s://%s/%s%s%s" % ( + secure_str, self.domain, uri, dot, self.format) + + # Catch media arguments to handle oauth query differently for multipart + media = None + for arg in ['media[]', 'banner', 'image']: + if arg in kwargs: + media = kwargs.pop(arg) + # Check if argument tells whether img is already base64 encoded + b64_convert = True + if "_base64" in kwargs: + b64_convert = not kwargs.pop("_base64") + if b64_convert: + import base64 + media = base64.b64encode(media) + mediafield = arg + break - headers = {} + headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict() + body = None + arg_data = None if self.auth: headers.update(self.auth.generate_headers()) - arg_data = self.auth.encode_params(uriBase, method, kwargs) - if method == 'GET': + # Use urlencoded oauth args with no params when sending media + # via multipart and send it directly via uri even for post + arg_data = self.auth.encode_params( + uriBase, method, {} if media else kwargs) + if method == 'GET' or media: uriBase += '?' + arg_data - body = None else: body = arg_data.encode('utf8') + # Handle query as multipart when sending media + if media: + BOUNDARY = "###Python-Twitter###" + bod = [] + bod.append('--' + BOUNDARY) + bod.append( + 'Content-Disposition: form-data; name="%s"' % mediafield) + bod.append('Content-Transfer-Encoding: base64') + bod.append('') + bod.append(media) + for k, v in kwargs.items(): + bod.append('--' + BOUNDARY) + bod.append('Content-Disposition: form-data; name="%s"' % k) + bod.append('') + bod.append(v) + bod.append('--' + BOUNDARY + '--') + body = '\r\n'.join(bod) + headers['Content-Type'] = \ + 'multipart/form-data; boundary=%s' % BOUNDARY + req = urllib_request.Request(uriBase, body, headers) - return self._handle_response(req, uri, arg_data) + return self._handle_response(req, uri, arg_data, _timeout) - def _handle_response(self, req, uri, arg_data): + def _handle_response(self, req, uri, arg_data, _timeout=None): + kwargs = {} + if _timeout: + kwargs['timeout'] = _timeout try: - handle = urllib_request.urlopen(req) + handle = urllib_request.urlopen(req, **kwargs) + if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']: + return handle + try: + data = handle.read() + except http_client.IncompleteRead as e: + # Even if we don't get all the bytes we should have there + # may be a complete response in e.partial + data = e.partial + if handle.info().get('Content-Encoding') == 'gzip': + # Handle gzip decompression + buf = StringIO(data) + f = gzip.GzipFile(fileobj=buf) + data = f.read() if "json" == self.format: - res = json.loads(handle.read().decode('utf8')) + res = json.loads(data.decode('utf8')) return wrap_response(res, handle.headers) else: return wrap_response( - handle.read().decode('utf8'), handle.headers) + data.decode('utf8'), handle.headers) except urllib_error.HTTPError as e: if (e.code == 304): return [] else: raise TwitterHTTPError(e, uri, self.format, arg_data) + class Twitter(TwitterCall): """ The minimalist yet fully featured Twitter API class. @@ -180,43 +291,66 @@ class Twitter(TwitterCall): Get RESTful data by accessing members of this class. The result is decoded python objects (lists and dicts). - The Twitter API is documented here: + The Twitter API is documented at: http://dev.twitter.com/doc Examples:: - twitter = Twitter( - auth=OAuth(token, token_key, con_secret, con_secret_key))) + t = Twitter( + auth=OAuth(token, token_key, con_secret, con_secret_key))) - # Get the public timeline - twitter.statuses.public_timeline() + # Get your "home" timeline + t.statuses.home_timeline() - # Get a particular friend's timeline - twitter.statuses.friends_timeline(id="billybob") + # Get a particular friend's tweets + t.statuses.user_timeline(user_id="billybob") - # Also supported (but totally weird) - twitter.statuses.friends_timeline.billybob() + # Update your status + t.statuses.update( + status="Using @sixohsix's sweet Python Twitter Tools.") - # Send a direct message - twitter.direct_messages.new( - user="billybob", - text="I think yer swell!") + # Send a direct message + t.direct_messages.new( + user="billybob", + text="I think yer swell!") - # Get the members of a particular list of a particular friend - twitter.user.listname.members(user="billybob", listname="billysbuds") + # Get the members of tamtar's list "Things That Are Rad" + t._("tamtar")._("things-that-are-rad").members() + # Note how the magic `_` method can be used to insert data + # into the middle of a call. You can also use replacement: + t.user.list.members(user="tamtar", list="things-that-are-rad") - Searching Twitter:: + # An *optional* `_timeout` parameter can also be used for API + # calls which take much more time than normal or twitter stops + # responding for some reasone + t.users.lookup( + screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \ + _timeout=1) + + # Overriding Method: GET/POST + # you should not need to use this method as this library properly + # detects whether GET or POST should be used, Nevertheless + # to force a particular method, use `_method` + t.statuses.oembed(_id=1234567890, _method='GET') - twitter_search = Twitter(domain="search.twitter.com") + # Send a tweet with an image included (or set your banner or logo similarily) + # By just reading your image from the web or a file in a string: + with open("example.png", "rb") as imagefile: + params = {"media[]": imagefile.read(), "status": "PTT"} + t.statuses.update_with_media(**params) + # Or by sending a base64 encoded image: + params = {"media[]": base64_image, "status": "PTT", "_base64": True} + t.statuses.update_with_media(**params) - # Find the latest search trends - twitter_search.trends() - # Search for the latest News on #gaza - twitter_search.search(q="#gaza") + + Searching Twitter:: + + # Search for the latest tweets about #pycon + t.search.tweets(q="#pycon") Using the data returned @@ -225,13 +359,13 @@ class Twitter(TwitterCall): Twitter API calls return decoded JSON. This is converted into a bunch of Python lists, dicts, ints, and strings. For example:: - x = twitter.statuses.public_timeline() + x = twitter.statuses.home_timeline() - # The first 'tweet' in the timeline - x[0] + # The first 'tweet' in the timeline + x[0] - # The screen name of the user who wrote the first 'tweet' - x[0]['user']['screen_name'] + # The screen name of the user who wrote the first 'tweet' + x[0]['user']['screen_name'] Getting raw XML data @@ -240,16 +374,16 @@ class Twitter(TwitterCall): If you prefer to get your Twitter data in XML format, pass format="xml" to the Twitter object when you instantiate it:: - twitter = Twitter(format="xml") + twitter = Twitter(format="xml") - The output will not be parsed in any way. It will be a raw string - of XML. + The output will not be parsed in any way. It will be a raw string + of XML. """ def __init__( - self, format="json", - domain="api.twitter.com", secure=True, auth=None, - api_version=_DEFAULT): + self, format="json", + domain="api.twitter.com", secure=True, auth=None, + api_version=_DEFAULT): """ Create a new twitter API connector. @@ -262,26 +396,22 @@ class Twitter(TwitterCall): `domain` lets you change the domain you are connecting. By - default it's `api.twitter.com` but `search.twitter.com` may be - useful too. + default it's `api.twitter.com`. If `secure` is False you will connect with HTTP instead of HTTPS. `api_version` is used to set the base uri. By default it's - '1'. If you are using "search.twitter.com" set this to None. + '1.1'. """ if not auth: auth = NoAuth() if (format not in ("json", "xml", "")): - raise ValueError("Unknown data format '%s'" %(format)) + raise ValueError("Unknown data format '%s'" % (format)) if api_version is _DEFAULT: - if domain == 'api.twitter.com': - api_version = '1' - else: - api_version = None + api_version = '1.1' uriparts = () if api_version: