X-Git-Url: https://jfr.im/git/z_archive/twitter.git/blobdiff_plain/effd06bbbcddfac2783e4d028569ca9fad426216..17b9ff10207340026b876eb623660f2c79bfe85d:/twitter/api.py diff --git a/twitter/api.py b/twitter/api.py index c0f791f..bb1ca6c 100644 --- a/twitter/api.py +++ b/twitter/api.py @@ -1,3 +1,8 @@ +# encoding: utf-8 +from __future__ import unicode_literals, print_function + +from .util import PY_3_OR_HIGHER, actually_bytes + try: import urllib.request as urllib_request import urllib.error as urllib_error @@ -14,19 +19,25 @@ from .twitter_globals import POST_ACTIONS from .auth import NoAuth import re +import sys import gzip +from time import sleep, time try: import http.client as http_client except ImportError: import httplib as http_client -import json +try: + import json +except ImportError: + import simplejson as json class _DEFAULT(object): pass + class TwitterError(Exception): """ Base Exception thrown by the Twitter object when there is a @@ -34,6 +45,7 @@ class TwitterError(Exception): """ pass + class TwitterHTTPError(TwitterError): """ Exception thrown by the Twitter object when there is an @@ -53,18 +65,25 @@ class TwitterHTTPError(TwitterError): if self.e.headers.get('Content-Encoding') == 'gzip': buf = StringIO(data) f = gzip.GzipFile(fileobj=buf) - self.response_data = f.read() + data = f.read() + if len(data) == 0: + data = {} + elif "json" == self.format: + data = json.loads(data.decode('utf8')) else: - self.response_data = data + data = data.decode('utf8') + self.response_data = data + super(TwitterHTTPError, self).__init__(str(self)) def __str__(self): fmt = ("." + self.format) if self.format else "" return ( "Twitter sent status %i for URL: %s%s using parameters: " - "(%s)\ndetails: %s" %( + "(%s)\ndetails: %s" % ( self.e.code, self.uri, fmt, self.uriparts, self.response_data)) + class TwitterResponse(object): """ Response from a twitter request. Behaves like a list or a string @@ -75,8 +94,6 @@ class TwitterResponse(object): httplib.HTTPHeaders instance. You can do `response.headers.get('h')` to retrieve a header. """ - def __init__(self, headers): - self.headers = headers @property def rate_limit_remaining(self): @@ -100,32 +117,66 @@ class TwitterResponse(object): return int(self.headers.get('X-Rate-Limit-Reset', "0")) +class TwitterDictResponse(dict, TwitterResponse): + pass + + +class TwitterListResponse(list, TwitterResponse): + pass + + def wrap_response(response, headers): response_typ = type(response) - if response_typ is bool: - # HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool. - response_typ = int - elif response_typ is str: - return response + if response_typ is dict: + res = TwitterDictResponse(response) + res.headers = headers + elif response_typ is list: + res = TwitterListResponse(response) + res.headers = headers + else: + res = response + return res + + +POST_ACTIONS_RE = re.compile('(' + '|'.join(POST_ACTIONS) + r')(/\d+)?$') + +def method_for_uri(uri): + if POST_ACTIONS_RE.search(uri): + return "POST" + return "GET" - class WrappedTwitterResponse(response_typ, TwitterResponse): - __doc__ = TwitterResponse.__doc__ - def __init__(self, response, headers): - response_typ.__init__(self, response) - TwitterResponse.__init__(self, headers) - def __new__(cls, response, headers): - return response_typ.__new__(cls, response) +def build_uri(orig_uriparts, kwargs): + """ + Build the URI from the original uriparts and kwargs. Modifies kwargs. + """ + uriparts = [] + for uripart in orig_uriparts: + # If this part matches a keyword argument (starting with _), use + # the supplied value. Otherwise, just use the part. + if uripart.startswith("_"): + part = (str(kwargs.pop(uripart, uripart))) + else: + part = uripart + uriparts.append(part) + uri = '/'.join(uriparts) - return WrappedTwitterResponse(response, headers) + # If an id kwarg is present and there is no id to fill in in + # the list of uriparts, assume the id goes at the end. + id = kwargs.pop('id', None) + if id: + uri += "/%s" % (id) + return uri class TwitterCall(object): + TWITTER_UNAVAILABLE_WAIT = 30 # delay after HTTP codes 502, 503 or 504 + def __init__( - self, auth, format, domain, callable_cls, uri="", - uriparts=None, secure=True, timeout=None): + self, auth, format, domain, callable_cls, uri="", + uriparts=None, secure=True, timeout=None, gzip=False, retry=False): self.auth = auth self.format = format self.domain = domain @@ -134,6 +185,8 @@ class TwitterCall(object): self.uriparts = uriparts self.secure = secure self.timeout = timeout + self.gzip = gzip + self.retry = retry def __getattr__(self, k): try: @@ -142,36 +195,19 @@ class TwitterCall(object): def extend_call(arg): return self.callable_cls( auth=self.auth, format=self.format, domain=self.domain, - callable_cls=self.callable_cls, timeout=self.timeout, uriparts=self.uriparts \ - + (arg,), - secure=self.secure) + callable_cls=self.callable_cls, timeout=self.timeout, + secure=self.secure, gzip=self.gzip, retry=self.retry, + uriparts=self.uriparts + (arg,)) if k == "_": return extend_call else: return extend_call(k) def __call__(self, **kwargs): - # Build the uri. - uriparts = [] - for uripart in self.uriparts: - # If this part matches a keyword argument, use the - # supplied value otherwise, just use the part. - uriparts.append(str(kwargs.pop(uripart, uripart))) - uri = '/'.join(uriparts) - - method = kwargs.pop('_method', None) - if not method: - method = "GET" - for action in POST_ACTIONS: - if re.search("%s(/\d+)?$" % action, uri): - method = "POST" - break - - # If an id kwarg is present and there is no id to fill in in - # the list of uriparts, assume the id goes at the end. - id = kwargs.pop('id', None) - if id: - uri += "/%s" %(id) + kwargs = dict(kwargs) + uri = build_uri(self.uriparts, kwargs) + method = kwargs.pop('_method', None) or method_for_uri(uri) + domain = self.domain # If an _id kwarg is present, this is treated as id as a CGI # param. @@ -188,21 +224,88 @@ class TwitterCall(object): dot = "" if self.format: dot = "." - uriBase = "http%s://%s/%s%s%s" %( - secure_str, self.domain, uri, dot, self.format) - - headers = {'Accept-Encoding': 'gzip'} + url_base = "http%s://%s/%s%s%s" % ( + secure_str, domain, uri, dot, self.format) + + # Check if argument tells whether img is already base64 encoded + b64_convert = not kwargs.pop("_base64", False) + if b64_convert: + import base64 + + # Catch media arguments to handle oauth query differently for multipart + media = None + if 'media' in kwargs: + mediafield = 'media' + media = kwargs.pop('media') + media_raw = True + elif 'media[]' in kwargs: + mediafield = 'media[]' + media = kwargs.pop('media[]') + if b64_convert: + media = base64.b64encode(media) + media_raw = False + + # Catch media arguments that are not accepted through multipart + # and are not yet base64 encoded + if b64_convert: + for arg in ['banner', 'image']: + if arg in kwargs: + kwargs[arg] = base64.b64encode(kwargs[arg]) + + headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict() + body = None + arg_data = None if self.auth: headers.update(self.auth.generate_headers()) - arg_data = self.auth.encode_params(uriBase, method, kwargs) - if method == 'GET': - uriBase += '?' + arg_data - body = None + # Use urlencoded oauth args with no params when sending media + # via multipart and send it directly via uri even for post + arg_data = self.auth.encode_params( + url_base, method, {} if media else kwargs) + if method == 'GET' or media: + url_base += '?' + arg_data else: - body = arg_data.encode('utf8') - - req = urllib_request.Request(uriBase, body, headers) - return self._handle_response(req, uri, arg_data, _timeout) + body = arg_data.encode('utf-8') + + # Handle query as multipart when sending media + if media: + BOUNDARY = b"###Python-Twitter###" + bod = [] + bod.append(b'--' + BOUNDARY) + bod.append( + b'Content-Disposition: form-data; name="' + + actually_bytes(mediafield) + + b'"') + bod.append(b'Content-Type: application/octet-stream') + if not media_raw: + bod.append(b'Content-Transfer-Encoding: base64') + bod.append(b'') + bod.append(actually_bytes(media)) + for k, v in kwargs.items(): + k = actually_bytes(k) + v = actually_bytes(v) + bod.append(b'--' + BOUNDARY) + bod.append(b'Content-Disposition: form-data; name="' + k + b'"') + bod.append(b'Content-Type: text/plain;charset=utf-8') + bod.append(b'') + bod.append(v) + bod.append(b'--' + BOUNDARY + b'--') + bod.append(b'') + bod.append(b'') + body = b'\r\n'.join(bod) + # print(body.decode('utf-8', errors='ignore')) + headers['Content-Type'] = \ + b'multipart/form-data; boundary=' + BOUNDARY + + if not PY_3_OR_HIGHER: + url_base = url_base.encode("utf-8") + for k in headers: + headers[actually_bytes(k)] = actually_bytes(headers.pop(k)) + + req = urllib_request.Request(url_base, data=body, headers=headers) + if self.retry: + return self._handle_response_with_retry(req, uri, arg_data, _timeout) + else: + return self._handle_response(req, uri, arg_data, _timeout) def _handle_response(self, req, uri, arg_data, _timeout=None): kwargs = {} @@ -223,7 +326,9 @@ class TwitterCall(object): buf = StringIO(data) f = gzip.GzipFile(fileobj=buf) data = f.read() - if "json" == self.format: + if len(data) == 0: + return wrap_response({}, handle.headers) + elif "json" == self.format: res = json.loads(data.decode('utf8')) return wrap_response(res, handle.headers) else: @@ -235,6 +340,29 @@ class TwitterCall(object): else: raise TwitterHTTPError(e, uri, self.format, arg_data) + def _handle_response_with_retry(self, req, uri, arg_data, _timeout=None): + retry = self.retry + while retry: + try: + return self._handle_response(req, uri, arg_data, _timeout) + except TwitterHTTPError as e: + if e.e.code == 429: + # API rate limit reached + reset = int(e.e.headers.get('X-Rate-Limit-Reset', time() + 30)) + delay = int(reset - time() + 2) # add some extra margin + print("API rate limit reached; waiting for %ds..." % delay, file=sys.stderr) + elif e.e.code in (502, 503, 504): + delay = self.TWITTER_UNAVAILABLE_WAIT + print("Service unavailable; waiting for %ds..." % delay, file=sys.stderr) + else: + raise + if isinstance(retry, int) and not isinstance(retry, bool): + if retry <= 0: + raise + retry -= 1 + sleep(delay) + + class Twitter(TwitterCall): """ The minimalist yet fully featured Twitter API class. @@ -249,17 +377,22 @@ class Twitter(TwitterCall): Examples:: + from twitter import * + t = Twitter( - auth=OAuth(token, token_key, con_secret, con_secret_key))) + auth=OAuth(token, token_key, con_secret, con_secret_key)) # Get your "home" timeline t.statuses.home_timeline() # Get a particular friend's timeline - t.statuses.friends_timeline(id="billybob") + t.statuses.user_timeline(screen_name="billybob") + + # to pass in GET/POST parameters, such as `count` + t.statuses.home_timeline(count=5) - # Also supported (but totally weird) - t.statuses.friends_timeline.billybob() + # to pass in the GET/POST parameter `id` you need to use `_id` + t.statuses.oembed(_id=1234567890) # Update your status t.statuses.update( @@ -271,19 +404,31 @@ class Twitter(TwitterCall): text="I think yer swell!") # Get the members of tamtar's list "Things That Are Rad" - t._("tamtar")._("things-that-are-rad").members() - - # Note how the magic `_` method can be used to insert data - # into the middle of a call. You can also use replacement: - t.user.list.members(user="tamtar", list="things-that-are-rad") + t.lists.members(owner_screen_name="tamtar", slug="things-that-are-rad") # An *optional* `_timeout` parameter can also be used for API # calls which take much more time than normal or twitter stops - # responding for some reasone + # responding for some reason: t.users.lookup( screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \ _timeout=1) + # Overriding Method: GET/POST + # you should not need to use this method as this library properly + # detects whether GET or POST should be used, Nevertheless + # to force a particular method, use `_method` + t.statuses.oembed(_id=1234567890, _method='GET') + + # Send a tweet with an image included (or set your banner or logo similarily) + # by just reading your image from the web or a file in a string: + status = "PTT ★" + with open("example.png", "rb") as imagefile: + params = {"media[]": imagefile.read(), "status": status} + t.statuses.update_with_media(**params) + + # Or by sending a base64 encoded image: + params = {"media[]": base64_image, "status": status, "_base64": True} + t.statuses.update_with_media(**params) Searching Twitter:: @@ -320,9 +465,9 @@ class Twitter(TwitterCall): """ def __init__( - self, format="json", - domain="api.twitter.com", secure=True, auth=None, - api_version=_DEFAULT): + self, format="json", + domain="api.twitter.com", secure=True, auth=None, + api_version=_DEFAULT, retry=False): """ Create a new twitter API connector. @@ -335,20 +480,24 @@ class Twitter(TwitterCall): `domain` lets you change the domain you are connecting. By - default it's `api.twitter.com` but `search.twitter.com` may be - useful too. + default it's `api.twitter.com`. If `secure` is False you will connect with HTTP instead of HTTPS. `api_version` is used to set the base uri. By default it's - '1'. If you are using "search.twitter.com" set this to None. + '1.1'. + + If `retry` is True, API rate limits will automatically be + handled by waiting until the next reset, as indicated by + the X-Rate-Limit-Reset HTTP header. If retry is an integer, + it defines the number of retries attempted. """ if not auth: auth = NoAuth() if (format not in ("json", "xml", "")): - raise ValueError("Unknown data format '%s'" %(format)) + raise ValueError("Unknown data format '%s'" % (format)) if api_version is _DEFAULT: api_version = '1.1' @@ -360,7 +509,7 @@ class Twitter(TwitterCall): TwitterCall.__init__( self, auth=auth, format=format, domain=domain, callable_cls=TwitterCall, - secure=secure, uriparts=uriparts) + secure=secure, uriparts=uriparts, retry=retry) __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]