except ImportError:
from io import BytesIO as StringIO
-from twitter.twitter_globals import POST_ACTIONS
-from twitter.auth import NoAuth
+from .twitter_globals import POST_ACTIONS
+from .auth import NoAuth
import re
import gzip
+try:
+ import http.client as http_client
+except ImportError:
+ import httplib as http_client
+
try:
import json
except ImportError:
import simplejson as json
+
class _DEFAULT(object):
pass
self.uri = uri
self.format = format
self.uriparts = uriparts
- if self.e.headers['Content-Encoding'] == 'gzip':
- buf = StringIO(self.e.fp.read())
+ try:
+ data = self.e.fp.read()
+ except http_client.IncompleteRead as e:
+ # can't read the error text
+ # let's try some of it
+ data = e.partial
+ if self.e.headers.get('Content-Encoding') == 'gzip':
+ buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
self.response_data = f.read()
else:
- self.response_data = self.e.fp.read()
+ self.response_data = data
def __str__(self):
fmt = ("." + self.format) if self.format else ""
"""
Remaining requests in the current rate-limit.
"""
- return int(self.headers.get('X-RateLimit-Remaining', "0"))
+ return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
+
+ @property
+ def rate_limit_limit(self):
+ """
+ The rate limit ceiling for that given request.
+ """
+ return int(self.headers.get('X-Rate-Limit-Limit', "0"))
@property
def rate_limit_reset(self):
"""
Time in UTC epoch seconds when the rate limit will reset.
"""
- return int(self.headers.get('X-RateLimit-Reset', "0"))
+ return int(self.headers.get('X-Rate-Limit-Reset', "0"))
def wrap_response(response, headers):
if response_typ is bool:
# HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool.
response_typ = int
+ elif response_typ is str:
+ return response
class WrappedTwitterResponse(response_typ, TwitterResponse):
__doc__ = TwitterResponse.__doc__
def __new__(cls, response, headers):
return response_typ.__new__(cls, response)
-
return WrappedTwitterResponse(response, headers)
def __init__(
self, auth, format, domain, callable_cls, uri="",
- uriparts=None, secure=True):
+ uriparts=None, secure=True, timeout=None, gzip=False):
self.auth = auth
self.format = format
self.domain = domain
self.uri = uri
self.uriparts = uriparts
self.secure = secure
+ self.timeout = timeout
+ self.gzip = gzip
def __getattr__(self, k):
try:
def extend_call(arg):
return self.callable_cls(
auth=self.auth, format=self.format, domain=self.domain,
- callable_cls=self.callable_cls, uriparts=self.uriparts \
- + (arg,),
- secure=self.secure)
+ callable_cls=self.callable_cls, timeout=self.timeout,
+ secure=self.secure, gzip=self.gzip,
+ uriparts=self.uriparts + (arg,))
if k == "_":
return extend_call
else:
if id:
uri += "/%s" %(id)
+ # If an _id kwarg is present, this is treated as id as a CGI
+ # param.
+ _id = kwargs.pop('_id', None)
+ if _id:
+ kwargs['id'] = _id
+
+ # If an _timeout is specified in kwargs, use it
+ _timeout = kwargs.pop('_timeout', None)
+
secure_str = ''
if self.secure:
secure_str = 's'
uriBase = "http%s://%s/%s%s%s" %(
secure_str, self.domain, uri, dot, self.format)
- headers = {'Accept-Encoding': 'gzip'}
+ headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
+ body = None; arg_data = None
if self.auth:
headers.update(self.auth.generate_headers())
arg_data = self.auth.encode_params(uriBase, method, kwargs)
if method == 'GET':
uriBase += '?' + arg_data
- body = None
else:
body = arg_data.encode('utf8')
req = urllib_request.Request(uriBase, body, headers)
- return self._handle_response(req, uri, arg_data)
+ return self._handle_response(req, uri, arg_data, _timeout)
- def _handle_response(self, req, uri, arg_data):
+ def _handle_response(self, req, uri, arg_data, _timeout=None):
+ kwargs = {}
+ if _timeout:
+ kwargs['timeout'] = _timeout
try:
- handle = urllib_request.urlopen(req)
+ handle = urllib_request.urlopen(req, **kwargs)
if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
return handle
- elif handle.info().get('Content-Encoding') == 'gzip':
+ try:
+ data = handle.read()
+ except http_client.IncompleteRead as e:
+ # Even if we don't get all the bytes we should have there
+ # may be a complete response in e.partial
+ data = e.partial
+ if handle.info().get('Content-Encoding') == 'gzip':
# Handle gzip decompression
- buf = StringIO(handle.read())
+ buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
data = f.read()
- else:
- data = handle.read()
-
if "json" == self.format:
res = json.loads(data.decode('utf8'))
return wrap_response(res, handle.headers)
t = Twitter(
auth=OAuth(token, token_key, con_secret, con_secret_key)))
- # Get the public timeline
- t.statuses.public_timeline()
+ # Get your "home" timeline
+ t.statuses.home_timeline()
# Get a particular friend's timeline
t.statuses.friends_timeline(id="billybob")
# into the middle of a call. You can also use replacement:
t.user.list.members(user="tamtar", list="things-that-are-rad")
+ # An *optional* `_timeout` parameter can also be used for API
+ # calls which take much more time than normal or twitter stops
+ # responding for some reasone
+ t.users.lookup(
+ screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
+ _timeout=1)
- Searching Twitter::
- twitter_search = Twitter(domain="search.twitter.com")
- # Find the latest search trends
- twitter_search.trends()
+ Searching Twitter::
- # Search for the latest News on #gaza
- twitter_search.search(q="#gaza")
+ # Search for the latest tweets about #pycon
+ t.search.tweets(q="#pycon")
Using the data returned
Twitter API calls return decoded JSON. This is converted into
a bunch of Python lists, dicts, ints, and strings. For example::
- x = twitter.statuses.public_timeline()
+ x = twitter.statuses.home_timeline()
# The first 'tweet' in the timeline
x[0]
raise ValueError("Unknown data format '%s'" %(format))
if api_version is _DEFAULT:
- if domain == 'api.twitter.com':
- api_version = '1'
- else:
- api_version = None
+ api_version = '1.1'
uriparts = ()
if api_version: