X-Git-Url: https://jfr.im/git/z_archive/twitter.git/blobdiff_plain/cd7ce713e9f11559092b58ed368a47a85d1146fe..3b35b923f0cf80d88e198bfcefdab8b6b25e1fcc:/twitter/api.py diff --git a/twitter/api.py b/twitter/api.py index f3d6612..9914ce6 100644 --- a/twitter/api.py +++ b/twitter/api.py @@ -1,103 +1,233 @@ - -from base64 import encodestring -from urllib import urlencode - -import urllib2 - -from exceptions import Exception +try: + import urllib.request as urllib_request + import urllib.error as urllib_error +except ImportError: + import urllib2 as urllib_request + import urllib2 as urllib_error + +try: + from cStringIO import StringIO +except ImportError: + from io import BytesIO as StringIO from twitter.twitter_globals import POST_ACTIONS +from twitter.auth import NoAuth -def _py26OrGreater(): - import sys - return sys.hexversion > 0x20600f0 +import re +import gzip +import httplib -if _py26OrGreater(): +try: import json -else: +except ImportError: import simplejson as json + +class _DEFAULT(object): + pass + + class TwitterError(Exception): """ - Exception thrown by the Twitter object when there is an - error interacting with twitter.com. + Base Exception thrown by the Twitter object when there is a + general error interacting with the API. """ pass + +class TwitterHTTPError(TwitterError): + """ + Exception thrown by the Twitter object when there is an + HTTP error interacting with twitter.com. + """ + def __init__(self, e, uri, format, uriparts): + self.e = e + self.uri = uri + self.format = format + self.uriparts = uriparts + if self.e.headers['Content-Encoding'] == 'gzip': + buf = StringIO(self.e.fp.read()) + f = gzip.GzipFile(fileobj=buf) + self.response_data = f.read() + else: + self.response_data = self.e.fp.read() + + def __str__(self): + fmt = ("." + self.format) if self.format else "" + return ( + "Twitter sent status %i for URL: %s%s using parameters: " + "(%s)\ndetails: %s" % ( + self.e.code, self.uri, fmt, self.uriparts, + self.response_data)) + + +class TwitterResponse(object): + """ + Response from a twitter request. Behaves like a list or a string + (depending on requested format) but it has a few other interesting + attributes. + + `headers` gives you access to the response headers as an + httplib.HTTPHeaders instance. You can do + `response.headers.get('h')` to retrieve a header. + """ + def __init__(self, headers): + self.headers = headers + + @property + def rate_limit_remaining(self): + """ + Remaining requests in the current rate-limit. + """ + return int(self.headers.get('X-Rate-Limit-Remaining', "0")) + + @property + def rate_limit_limit(self): + """ + The rate limit ceiling for that given request. + """ + return int(self.headers.get('X-Rate-Limit-Limit', "0")) + + @property + def rate_limit_reset(self): + """ + Time in UTC epoch seconds when the rate limit will reset. + """ + return int(self.headers.get('X-Rate-Limit-Reset', "0")) + + +def wrap_response(response, headers): + response_typ = type(response) + if response_typ is bool: + # HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool. + response_typ = int + + class WrappedTwitterResponse(response_typ, TwitterResponse): + __doc__ = TwitterResponse.__doc__ + + def __init__(self, response, headers): + response_typ.__init__(self, response) + TwitterResponse.__init__(self, headers) + + def __new__(cls, response, headers): + return response_typ.__new__(cls, response) + + return WrappedTwitterResponse(response, headers) + + class TwitterCall(object): + def __init__( - self, username, password, format, domain, uri="", agent=None, - encoded_args=None, secure=True): - self.username = username - self.password = password + self, auth, format, domain, callable_cls, uri="", + uriparts=None, secure=True): + self.auth = auth self.format = format self.domain = domain + self.callable_cls = callable_cls self.uri = uri - self.agent = agent - self.encoded_args = encoded_args + self.uriparts = uriparts self.secure = secure def __getattr__(self, k): try: return object.__getattr__(self, k) except AttributeError: - return TwitterCall( - self.username, self.password, self.format, self.domain, - self.uri + "/" + k, self.agent, self.encoded_args) + def extend_call(arg): + return self.callable_cls( + auth=self.auth, format=self.format, domain=self.domain, + callable_cls=self.callable_cls, uriparts=self.uriparts + + (arg,), + secure=self.secure) + if k == "_": + return extend_call + else: + return extend_call(k) def __call__(self, **kwargs): - uri = self.uri - method = "GET" - for action in POST_ACTIONS: - if self.uri.endswith(action): - method = "POST" - if (self.agent): - kwargs["source"] = self.agent - break - - if (not self.encoded_args): - if kwargs.has_key('id'): - uri += "/%s" %(kwargs['id']) - - self.encoded_args = urlencode(kwargs.items()) - - argStr = "" - argData = None - if (method == "GET"): - if self.encoded_args: - argStr = "?%s" %(self.encoded_args) - else: - argData = self.encoded_args - - headers = {} - if (self.agent): - headers["X-Twitter-Client"] = self.agent - if (self.username): - headers["Authorization"] = "Basic " + encodestring("%s:%s" %( - self.username, self.password)).strip('\n') + # Build the uri. + uriparts = [] + for uripart in self.uriparts: + # If this part matches a keyword argument, use the + # supplied value otherwise, just use the part. + uriparts.append(str(kwargs.pop(uripart, uripart))) + uri = '/'.join(uriparts) + + method = kwargs.pop('_method', None) + if not method: + method = "GET" + for action in POST_ACTIONS: + if re.search("%s(/\d+)?$" % action, uri): + method = "POST" + break + + # If an id kwarg is present and there is no id to fill in in + # the list of uriparts, assume the id goes at the end. + id = kwargs.pop('id', None) + if id: + uri += "/%s" % (id) + + # If an _id kwarg is present, this is treated as id as a CGI + # param. + _id = kwargs.pop('_id', None) + if _id: + kwargs['id'] = _id + + # If an _timeout is specified in kwargs, use it + _timeout = kwargs.pop('_timeout', None) secure_str = '' if self.secure: secure_str = 's' + dot = "" + if self.format: + dot = "." + uriBase = "http%s://%s/%s%s%s" % ( + secure_str, self.domain, uri, dot, self.format) + + headers = {'Accept-Encoding': 'gzip'} + if self.auth: + headers.update(self.auth.generate_headers()) + arg_data = self.auth.encode_params(uriBase, method, kwargs) + if method == 'GET': + uriBase += '?' + arg_data + body = None + else: + body = arg_data.encode('utf8') - req = urllib2.Request( - "http%s://%s/%s.%s%s" %( - secure_str, self.domain, uri, self.format, argStr), - argData, headers) - + req = urllib_request.Request(uriBase, body, headers) + return self._handle_response(req, uri, arg_data, _timeout) + + def _handle_response(self, req, uri, arg_data, _timeout=None): + kwargs = {} + if _timeout: + kwargs['timeout'] = _timeout try: - handle = urllib2.urlopen(req) + handle = urllib_request.urlopen(req, **kwargs) + if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']: + return handle + try: + data = handle.read() + except httplib.IncompleteRead as e: + # Even if we don't get all the bytes we should have there + # may be a complete response in e.partial + data = e.partial + if handle.info().get('Content-Encoding') == 'gzip': + # Handle gzip decompression + buf = StringIO(data) + f = gzip.GzipFile(fileobj=buf) + data = f.read() if "json" == self.format: - return json.loads(handle.read()) + res = json.loads(data.decode('utf8')) + return wrap_response(res, handle.headers) else: - return handle.read() - except urllib2.HTTPError, e: + return wrap_response( + data.decode('utf8'), handle.headers) + except urllib_error.HTTPError as e: if (e.code == 304): return [] else: - raise TwitterError( - "Twitter sent status %i for URL: %s.%s using parameters: (%s)\ndetails: %s" %( - e.code, uri, self.format, self.encoded_args, e.fp.read())) + raise TwitterHTTPError(e, uri, self.format, arg_data) + class Twitter(TwitterCall): """ @@ -106,74 +236,127 @@ class Twitter(TwitterCall): Get RESTful data by accessing members of this class. The result is decoded python objects (lists and dicts). - The Twitter API is documented here: + The Twitter API is documented at: + + http://dev.twitter.com/doc - http://apiwiki.twitter.com/ - http://groups.google.com/group/twitter-development-talk/web/api-documentation Examples:: - twitter = Twitter("hello@foo.com", "password123") + t = Twitter( + auth=OAuth(token, token_key, con_secret, con_secret_key))) + + # Get your "home" timeline + t.statuses.home_timeline() + + # Get a particular friend's timeline + t.statuses.friends_timeline(id="billybob") + + # Also supported (but totally weird) + t.statuses.friends_timeline.billybob() - # Get the public timeline - twitter.statuses.public_timeline() + # Update your status + t.statuses.update( + status="Using @sixohsix's sweet Python Twitter Tools.") - # Get a particular friend's timeline - twitter.statuses.friends_timeline(id="billybob") + # Send a direct message + t.direct_messages.new( + user="billybob", + text="I think yer swell!") + + # Get the members of tamtar's list "Things That Are Rad" + t._("tamtar")._("things-that-are-rad").members() + + # Note how the magic `_` method can be used to insert data + # into the middle of a call. You can also use replacement: + t.user.list.members(user="tamtar", list="things-that-are-rad") + + # An *optional* `_timeout` parameter can also be used for API + # calls which take much more time than normal or twitter stops + # responding for some reasone + t.users.lookup( + screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \ + _timeout=1) - # Also supported (but totally weird) - twitter.statuses.friends_timeline.billybob() - # Send a direct message - twitter.direct_messages.new( - user="billybob", - text="I think yer swell!") Searching Twitter:: - twitter_search = Twitter(domain="search.twitter.com") + # Search for the latest tweets about #pycon + t.search.tweets(q="#pycon") + - # Find the latest search trends - twitter_search.trends() + Using the data returned + ----------------------- - # Search for the latest News on #gaza - twitter_search.search(q="#gaza") + Twitter API calls return decoded JSON. This is converted into + a bunch of Python lists, dicts, ints, and strings. For example:: - Using the data returned:: + x = twitter.statuses.home_timeline() - Twitter API calls return decoded JSON. This is converted into - a bunch of Python lists, dicts, ints, and strings. For example, + # The first 'tweet' in the timeline + x[0] - x = twitter.statuses.public_timeline() + # The screen name of the user who wrote the first 'tweet' + x[0]['user']['screen_name'] - # The first 'tweet' in the timeline - x[0] - # The screen name of the user who wrote the first 'tweet' - x[0]['user']['screen_name'] + Getting raw XML data + -------------------- - Getting raw XML data:: + If you prefer to get your Twitter data in XML format, pass + format="xml" to the Twitter object when you instantiate it:: - If you prefer to get your Twitter data in XML format, pass - format="xml" to the Twitter object when you instantiate it: + twitter = Twitter(format="xml") - twitter = Twitter(format="xml") + The output will not be parsed in any way. It will be a raw string + of XML. - The output will not be parsed in any way. It will be a raw string - of XML. """ def __init__( - self, email=None, password=None, format="json", domain="twitter.com", - agent=None, secure=True): + self, format="json", + domain="api.twitter.com", secure=True, auth=None, + api_version=_DEFAULT): """ - Create a new twitter API connector using the specified - credentials (email and password). Format specifies the output - format ("json" (default) or "xml"). + Create a new twitter API connector. + + Pass an `auth` parameter to use the credentials of a specific + user. Generally you'll want to pass an `OAuth` + instance:: + + twitter = Twitter(auth=OAuth( + token, token_secret, consumer_key, consumer_secret)) + + + `domain` lets you change the domain you are connecting. By + default it's `api.twitter.com`. + + If `secure` is False you will connect with HTTP instead of + HTTPS. + + `api_version` is used to set the base uri. By default it's + '1.1'. """ - if (format not in ("json", "xml")): - raise TwitterError("Unknown data format '%s'" %(format)) + if not auth: + auth = NoAuth() + + if (format not in ("json", "xml", "")): + raise ValueError("Unknown data format '%s'" % (format)) + + if api_version is _DEFAULT: + if domain == 'api.twitter.com': + api_version = '1.1' + else: + api_version = None + + uriparts = () + if api_version: + uriparts += (str(api_version),) + TwitterCall.__init__( - self, email, password, format, domain, "", agent, - secure=secure) + self, auth=auth, format=format, domain=domain, + callable_cls=TwitterCall, + secure=secure, uriparts=uriparts) + -__all__ = ["Twitter", "TwitterError"] +__all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]