]> jfr.im git - z_archive/twitter.git/blobdiff - twitter/api.py
Clarify the comment about edge cases.
[z_archive/twitter.git] / twitter / api.py
index d51015b4fe98a6c06a28fdbe06b720a11396bd6f..0e4377d930b01d2a2a0c9bc82c4f8aa8731a0c49 100644 (file)
-"""
-Attempting to patch to accommodate API like the list interface.
-Note: Make sure not to use keyword substitutions that have the same name
-as an argument that will get encoded.
-"""
-
-from base64 import b64encode
-from urllib import urlencode
+try:
+    import urllib.request as urllib_request
+    import urllib.error as urllib_error
+except ImportError:
+    import urllib2 as urllib_request
+    import urllib2 as urllib_error
+
+try:
+    from cStringIO import StringIO
+except ImportError:
+    from io import BytesIO as StringIO
+
+from .twitter_globals import POST_ACTIONS
+from .auth import NoAuth
+
+import re
+import gzip
+
+try:
+    import http.client as http_client
+except ImportError:
+    import httplib as http_client
+
+try:
+    import json
+except ImportError:
+    import simplejson as json
 
 
-import urllib2
 
 
-from exceptions import Exception
+class _DEFAULT(object):
+    pass
 
 
-from twitter.twitter_globals import POST_ACTIONS
+class TwitterError(Exception):
+    """
+    Base Exception thrown by the Twitter object when there is a
+    general error interacting with the API.
+    """
+    pass
 
 
-def _py26OrGreater():
-    import sys
-    return sys.hexversion > 0x20600f0
+class TwitterHTTPError(TwitterError):
+    """
+    Exception thrown by the Twitter object when there is an
+    HTTP error interacting with twitter.com.
+    """
+    def __init__(self, e, uri, format, uriparts):
+        self.e = e
+        self.uri = uri
+        self.format = format
+        self.uriparts = uriparts
+        try:
+            data = self.e.fp.read()
+        except http_client.IncompleteRead as e:
+            # can't read the error text
+            # let's try some of it
+            data = e.partial
+        if self.e.headers.get('Content-Encoding') == 'gzip':
+            buf = StringIO(data)
+            f = gzip.GzipFile(fileobj=buf)
+            self.response_data = f.read()
+        else:
+            self.response_data = data
 
 
-if _py26OrGreater():
-    import json
-else:
-    import simplejson as json
+    def __str__(self):
+        fmt = ("." + self.format) if self.format else ""
+        return (
+            "Twitter sent status %i for URL: %s%s using parameters: "
+            "(%s)\ndetails: %s" %(
+                self.e.code, self.uri, fmt, self.uriparts,
+                self.response_data))
 
 
-class TwitterError(Exception):
+class TwitterResponse(object):
     """
     """
-    Exception thrown by the Twitter object when there is an
-    error interacting with twitter.com.
+    Response from a twitter request. Behaves like a list or a string
+    (depending on requested format) but it has a few other interesting
+    attributes.
+
+    `headers` gives you access to the response headers as an
+    httplib.HTTPHeaders instance. You can do
+    `response.headers.get('h')` to retrieve a header.
     """
     """
-    pass
+    def __init__(self, headers):
+        self.headers = headers
+
+    @property
+    def rate_limit_remaining(self):
+        """
+        Remaining requests in the current rate-limit.
+        """
+        return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
+
+    @property
+    def rate_limit_limit(self):
+        """
+        The rate limit ceiling for that given request.
+        """
+        return int(self.headers.get('X-Rate-Limit-Limit', "0"))
+
+    @property
+    def rate_limit_reset(self):
+        """
+        Time in UTC epoch seconds when the rate limit will reset.
+        """
+        return int(self.headers.get('X-Rate-Limit-Reset', "0"))
+
+
+def wrap_response(response, headers):
+    response_typ = type(response)
+    if response_typ is bool:
+        # HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool.
+        response_typ = int
+    elif response_typ is str:
+        return response
+
+    class WrappedTwitterResponse(response_typ, TwitterResponse):
+        __doc__ = TwitterResponse.__doc__
+
+        def __init__(self, response, headers):
+            response_typ.__init__(self, response)
+            TwitterResponse.__init__(self, headers)
+        def __new__(cls, response, headers):
+            return response_typ.__new__(cls, response)
+
+    return WrappedTwitterResponse(response, headers)
+
+
 
 class TwitterCall(object):
 
 class TwitterCall(object):
+
     def __init__(
     def __init__(
-        self, username, password, format, domain, uri="", agent=None,  uriparts=()):
-        self.username = username
-        self.password = password
+        self, auth, format, domain, callable_cls, uri="",
+        uriparts=None, secure=True, timeout=None, gzip=False):
+        self.auth = auth
         self.format = format
         self.domain = domain
         self.format = format
         self.domain = domain
+        self.callable_cls = callable_cls
         self.uri = uri
         self.uri = uri
-        self.agent = agent
         self.uriparts = uriparts
         self.uriparts = uriparts
-        
+        self.secure = secure
+        self.timeout = timeout
+        self.gzip = gzip
+
     def __getattr__(self, k):
         try:
             return object.__getattr__(self, k)
         except AttributeError:
     def __getattr__(self, k):
         try:
             return object.__getattr__(self, k)
         except AttributeError:
-            """Instead of incrementally building the uri string, now we
-            just append to uriparts.  We'll build the uri later."""
-            return TwitterCall(
-                self.username, self.password, self.format, self.domain,
-                self.uri, self.agent, self.uriparts + (k,))
+            def extend_call(arg):
+                return self.callable_cls(
+                    auth=self.auth, format=self.format, domain=self.domain,
+                    callable_cls=self.callable_cls, timeout=self.timeout,
+                    secure=self.secure, gzip=self.gzip,
+                    uriparts=self.uriparts + (arg,))
+            if k == "_":
+                return extend_call
+            else:
+                return extend_call(k)
+
     def __call__(self, **kwargs):
     def __call__(self, **kwargs):
-        #build the uri
-        uri = self.uri
+        # Build the uri.
+        uriparts = []
         for uripart in self.uriparts:
         for uripart in self.uriparts:
-            #if this part matches a keyword argument, use the supplied value
-            #otherwise, just use the part
-            uri = uri + "/" + kwargs.pop(uripart,uripart)
-
-        method = "GET"
-        for action in POST_ACTIONS:
-            if uri.endswith(action):
-                method = "POST"
-                if (self.agent):
-                    kwargs["source"] = self.agent
-                break
-
-        """This handles a special case. It isn't really needed anymore because now
-        we can insert an id value (or any other value) at the end of the
-        uri (or anywhere else).
-        However we can leave it for backward compatibility."""
+            # If this part matches a keyword argument, use the
+            # supplied value otherwise, just use the part.
+            uriparts.append(str(kwargs.pop(uripart, uripart)))
+        uri = '/'.join(uriparts)
+
+        method = kwargs.pop('_method', None)
+        if not method:
+            method = "GET"
+            for action in POST_ACTIONS:
+                if re.search("%s(/\d+)?$" % action, uri):
+                    method = "POST"
+                    break
+
+        # If an id kwarg is present and there is no id to fill in in
+        # the list of uriparts, assume the id goes at the end.
         id = kwargs.pop('id', None)
         if id:
             uri += "/%s" %(id)
 
         id = kwargs.pop('id', None)
         if id:
             uri += "/%s" %(id)
 
-        argStr = ""
-        argData = None
-        encoded_kwargs = urlencode(kwargs.items())
-        if (method == "GET"):
-            if kwargs:
-                argStr = "?%s" %(encoded_kwargs)
-        else:
-            argData = encoded_kwargs
-
-        headers = {}
-        if (self.agent):
-            headers["X-Twitter-Client"] = self.agent
-        if (self.username):
-            headers["Authorization"] = "Basic " + b64encode("%s:%s" %(
-                self.username, self.password))
-
-        req = urllib2.Request(
-                "http://%s/%s.%s%s" %(self.domain, uri, self.format, argStr),
-                argData, headers
-            )
+        # If an _id kwarg is present, this is treated as id as a CGI
+        # param.
+        _id = kwargs.pop('_id', None)
+        if _id:
+            kwargs['id'] = _id
+
+        # If an _timeout is specified in kwargs, use it
+        _timeout = kwargs.pop('_timeout', None)
+
+        secure_str = ''
+        if self.secure:
+            secure_str = 's'
+        dot = ""
+        if self.format:
+            dot = "."
+        uriBase = "http%s://%s/%s%s%s" %(
+                    secure_str, self.domain, uri, dot, self.format)
+
+        headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
+        body = None; arg_data = None
+        if self.auth:
+            headers.update(self.auth.generate_headers())
+            arg_data = self.auth.encode_params(uriBase, method, kwargs)
+            if method == 'GET':
+                uriBase += '?' + arg_data
+            else:
+                body = arg_data.encode('utf8')
+
+        req = urllib_request.Request(uriBase, body, headers)
+        return self._handle_response(req, uri, arg_data, _timeout)
+
+    def _handle_response(self, req, uri, arg_data, _timeout=None):
+        kwargs = {}
+        if _timeout:
+            kwargs['timeout'] = _timeout
         try:
         try:
-            handle = urllib2.urlopen(req)
+            handle = urllib_request.urlopen(req, **kwargs)
+            if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
+                return handle
+            try:
+                data = handle.read()
+            except http_client.IncompleteRead as e:
+                # Even if we don't get all the bytes we should have there
+                # may be a complete response in e.partial
+                data = e.partial
+            if handle.info().get('Content-Encoding') == 'gzip':
+                # Handle gzip decompression
+                buf = StringIO(data)
+                f = gzip.GzipFile(fileobj=buf)
+                data = f.read()
             if "json" == self.format:
             if "json" == self.format:
-                return json.loads(handle.read())
+                res = json.loads(data.decode('utf8'))
+                return wrap_response(res, handle.headers)
             else:
             else:
-                return handle.read()
-        except urllib2.HTTPError, e:
+                return wrap_response(
+                    data.decode('utf8'), handle.headers)
+        except urllib_error.HTTPError as e:
             if (e.code == 304):
                 return []
             else:
             if (e.code == 304):
                 return []
             else:
-                raise TwitterError(
-                    "Twitter sent status %i for URL: %s.%s using parameters: (%s)\ndetails: %s" %(
-                        e.code, uri, self.format, encoded_kwargs, e.fp.read()))
+                raise TwitterHTTPError(e, uri, self.format, arg_data)
 
 class Twitter(TwitterCall):
     """
 
 class Twitter(TwitterCall):
     """
@@ -114,75 +246,125 @@ class Twitter(TwitterCall):
     Get RESTful data by accessing members of this class. The result
     is decoded python objects (lists and dicts).
 
     Get RESTful data by accessing members of this class. The result
     is decoded python objects (lists and dicts).
 
-    The Twitter API is documented here:
+    The Twitter API is documented at:
+
+      http://dev.twitter.com/doc
 
 
-      http://apiwiki.twitter.com/
-      http://groups.google.com/group/twitter-development-talk/web/api-documentation
 
     Examples::
 
 
     Examples::
 
-      twitter = Twitter("hello@foo.com", "password123")
+        t = Twitter(
+            auth=OAuth(token, token_key, con_secret, con_secret_key)))
+
+        # Get your "home" timeline
+        t.statuses.home_timeline()
 
 
-      # Get the public timeline
-      twitter.statuses.public_timeline()
+        # Get a particular friend's timeline
+        t.statuses.friends_timeline(id="billybob")
 
 
-      # Get a particular friend's timeline
-      twitter.statuses.friends_timeline(id="billybob")
+        # Also supported (but totally weird)
+        t.statuses.friends_timeline.billybob()
 
 
-      # Also supported (but totally weird)
-      twitter.statuses.friends_timeline.billybob()
+        # Update your status
+        t.statuses.update(
+            status="Using @sixohsix's sweet Python Twitter Tools.")
+
+        # Send a direct message
+        t.direct_messages.new(
+            user="billybob",
+            text="I think yer swell!")
+
+        # Get the members of tamtar's list "Things That Are Rad"
+        t._("tamtar")._("things-that-are-rad").members()
+
+        # Note how the magic `_` method can be used to insert data
+        # into the middle of a call. You can also use replacement:
+        t.user.list.members(user="tamtar", list="things-that-are-rad")
+
+        # An *optional* `_timeout` parameter can also be used for API
+        # calls which take much more time than normal or twitter stops
+        # responding for some reasone
+        t.users.lookup(
+            screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
+            _timeout=1)
 
 
-      # Send a direct message
-      twitter.direct_messages.new(
-          user="billybob",
-          text="I think yer swell!")
 
 
-      # Get the members of a particular list of a particular friend
-      twitter.user.listname.members(user="billybob", listname="billysbuds")
 
     Searching Twitter::
 
 
     Searching Twitter::
 
-      twitter_search = Twitter(domain="search.twitter.com")
+        # Search for the latest tweets about #pycon
+        t.search.tweets(q="#pycon")
 
 
-      # Find the latest search trends
-      twitter_search.trends()
 
 
-      # Search for the latest News on #gaza
-      twitter_search.search(q="#gaza")
+    Using the data returned
+    -----------------------
 
 
-    Using the data returned::
+    Twitter API calls return decoded JSON. This is converted into
+    a bunch of Python lists, dicts, ints, and strings. For example::
 
 
-      Twitter API calls return decoded JSON. This is converted into
-      a bunch of Python lists, dicts, ints, and strings. For example,
+        x = twitter.statuses.home_timeline()
 
 
-      x = twitter.statuses.public_timeline()
+        # The first 'tweet' in the timeline
+        x[0]
 
 
-      # The first 'tweet' in the timeline
-      x[0]
+        # The screen name of the user who wrote the first 'tweet'
+        x[0]['user']['screen_name']
 
 
-      # The screen name of the user who wrote the first 'tweet'
-      x[0]['user']['screen_name']
 
 
-    Getting raw XML data::
+    Getting raw XML data
+    --------------------
 
 
-      If you prefer to get your Twitter data in XML format, pass
-      format="xml" to the Twitter object when you instantiate it:
+    If you prefer to get your Twitter data in XML format, pass
+    format="xml" to the Twitter object when you instantiate it::
 
 
-      twitter = Twitter(format="xml")
+        twitter = Twitter(format="xml")
+
+    The output will not be parsed in any way. It will be a raw string
+    of XML.
 
 
-      The output will not be parsed in any way. It will be a raw string
-      of XML.
     """
     def __init__(
     """
     def __init__(
-        self, email=None, password=None, format="json", domain="twitter.com",
-        agent=None):
+        self, format="json",
+        domain="api.twitter.com", secure=True, auth=None,
+        api_version=_DEFAULT):
         """
         """
-        Create a new twitter API connector using the specified
-        credentials (email and password). Format specifies the output
-        format ("json" (default) or "xml").
+        Create a new twitter API connector.
+
+        Pass an `auth` parameter to use the credentials of a specific
+        user. Generally you'll want to pass an `OAuth`
+        instance::
+
+            twitter = Twitter(auth=OAuth(
+                    token, token_secret, consumer_key, consumer_secret))
+
+
+        `domain` lets you change the domain you are connecting. By
+        default it's `api.twitter.com` but `search.twitter.com` may be
+        useful too.
+
+        If `secure` is False you will connect with HTTP instead of
+        HTTPS.
+
+        `api_version` is used to set the base uri. By default it's
+        '1'. If you are using "search.twitter.com" set this to None.
         """
         """
-        if (format not in ("json", "xml")):
-            raise TwitterError("Unknown data format '%s'" %(format))
-        TwitterCall.__init__(self, email, password, format, domain, "", agent,  ())
+        if not auth:
+            auth = NoAuth()
+
+        if (format not in ("json", "xml", "")):
+            raise ValueError("Unknown data format '%s'" %(format))
+
+        if api_version is _DEFAULT:
+            api_version = '1.1'
+
+        uriparts = ()
+        if api_version:
+            uriparts += (str(api_version),)
+
+        TwitterCall.__init__(
+            self, auth=auth, format=format, domain=domain,
+            callable_cls=TwitterCall,
+            secure=secure, uriparts=uriparts)
+
 
 
-__all__ = ["Twitter", "TwitterError"]
+__all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]