import re
import gzip
+import httplib
try:
import json
self.uri = uri
self.format = format
self.uriparts = uriparts
- self.response_data = self.e.fp.read()
+ try:
+ data = self.e.fp.read()
+ except httplib.IncompleteRead, e:
+ # can't read the error text
+ # let's try some of it
+ data = e.partial
+ if self.e.headers['Content-Encoding'] == 'gzip':
+ buf = StringIO(data)
+ f = gzip.GzipFile(fileobj=buf)
+ self.response_data = f.read()
+ else:
+ self.response_data = data
def __str__(self):
fmt = ("." + self.format) if self.format else ""
"""
Remaining requests in the current rate-limit.
"""
- return int(self.headers.get('X-RateLimit-Remaining', "0"))
+ return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
+
+ @property
+ def rate_limit_limit(self):
+ """
+ The rate limit ceiling for that given request.
+ """
+ return int(self.headers.get('X-Rate-Limit-Limit', "0"))
@property
def rate_limit_reset(self):
"""
Time in UTC epoch seconds when the rate limit will reset.
"""
- return int(self.headers.get('X-RateLimit-Reset', "0"))
+ return int(self.headers.get('X-Rate-Limit-Reset', "0"))
def wrap_response(response, headers):
if id:
uri += "/%s" %(id)
+ # If an _id kwarg is present, this is treated as id as a CGI
+ # param.
+ _id = kwargs.pop('_id', None)
+ if _id:
+ kwargs['id'] = _id
+
+ # If an _timeout is specified in kwargs, use it
+ _timeout = kwargs.pop('_timeout', None)
+
secure_str = ''
if self.secure:
secure_str = 's'
body = arg_data.encode('utf8')
req = urllib_request.Request(uriBase, body, headers)
- return self._handle_response(req, uri, arg_data)
+ return self._handle_response(req, uri, arg_data, _timeout)
- def _handle_response(self, req, uri, arg_data):
+ def _handle_response(self, req, uri, arg_data, _timeout=None):
+ kwargs = {}
+ if _timeout:
+ kwargs['timeout'] = _timeout
try:
- handle = urllib_request.urlopen(req)
+ handle = urllib_request.urlopen(req, **kwargs)
if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
return handle
- elif handle.info().get('Content-Encoding') == 'gzip':
+ try:
+ data = handle.read()
+ except httplib.IncompleteRead, e:
+ # Even if we don't get all the bytes we should have there
+ # may be a complete response in e.partial
+ data = e.partial
+ if handle.info().get('Content-Encoding') == 'gzip':
# Handle gzip decompression
- buf = StringIO(handle.read())
+ buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
data = f.read()
- else:
- data = handle.read()
-
if "json" == self.format:
res = json.loads(data.decode('utf8'))
return wrap_response(res, handle.headers)
t = Twitter(
auth=OAuth(token, token_key, con_secret, con_secret_key)))
- # Get the public timeline
- t.statuses.public_timeline()
+ # Get your "home" timeline
+ t.statuses.home_timeline()
# Get a particular friend's timeline
t.statuses.friends_timeline(id="billybob")
# into the middle of a call. You can also use replacement:
t.user.list.members(user="tamtar", list="things-that-are-rad")
+ # An *optional* `_timeout` parameter can also be used for API
+ # calls which take much more time than normal or twitter stops
+ # responding for some reasone
+ t.users.lookup(
+ screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
+ _timeout=1)
- Searching Twitter::
- twitter_search = Twitter(domain="search.twitter.com")
- # Find the latest search trends
- twitter_search.trends()
+ Searching Twitter::
- # Search for the latest News on #gaza
- twitter_search.search(q="#gaza")
+ # Search for the latest tweets about #pycon
+ t.search.tweets(q="#pycon")
Using the data returned
Twitter API calls return decoded JSON. This is converted into
a bunch of Python lists, dicts, ints, and strings. For example::
- x = twitter.statuses.public_timeline()
+ x = twitter.statuses.home_timeline()
# The first 'tweet' in the timeline
x[0]
if api_version is _DEFAULT:
if domain == 'api.twitter.com':
- api_version = '1'
+ api_version = '1.1'
else:
api_version = None