+# encoding: utf-8
+from __future__ import unicode_literals, print_function
+
+from .util import PY_3_OR_HIGHER, actually_bytes
+
try:
import urllib.request as urllib_request
import urllib.error as urllib_error
except ImportError:
from io import BytesIO as StringIO
-from twitter.twitter_globals import POST_ACTIONS
-from twitter.auth import NoAuth
+from .twitter_globals import POST_ACTIONS
+from .auth import NoAuth
import re
+import sys
import gzip
-import httplib
+from time import sleep, time
+
+try:
+ import http.client as http_client
+except ImportError:
+ import httplib as http_client
try:
import json
except ImportError:
import simplejson as json
+
class _DEFAULT(object):
pass
+
class TwitterError(Exception):
"""
Base Exception thrown by the Twitter object when there is a
"""
pass
+
class TwitterHTTPError(TwitterError):
"""
Exception thrown by the Twitter object when there is an
self.uriparts = uriparts
try:
data = self.e.fp.read()
- except httplib.IncompleteRead, e:
+ except http_client.IncompleteRead as e:
# can't read the error text
# let's try some of it
data = e.partial
- if self.e.headers['Content-Encoding'] == 'gzip':
+ if self.e.headers.get('Content-Encoding') == 'gzip':
buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
- self.response_data = f.read()
+ data = f.read()
+ if len(data) == 0:
+ data = {}
+ elif "json" == self.format:
+ data = json.loads(data.decode('utf8'))
else:
- self.response_data = data
+ data = data.decode('utf8')
+ self.response_data = data
+ super(TwitterHTTPError, self).__init__(str(self))
def __str__(self):
fmt = ("." + self.format) if self.format else ""
return (
"Twitter sent status %i for URL: %s%s using parameters: "
- "(%s)\ndetails: %s" %(
+ "(%s)\ndetails: %s" % (
self.e.code, self.uri, fmt, self.uriparts,
self.response_data))
+
class TwitterResponse(object):
"""
Response from a twitter request. Behaves like a list or a string
httplib.HTTPHeaders instance. You can do
`response.headers.get('h')` to retrieve a header.
"""
- def __init__(self, headers):
- self.headers = headers
@property
def rate_limit_remaining(self):
return int(self.headers.get('X-Rate-Limit-Reset', "0"))
+class TwitterDictResponse(dict, TwitterResponse):
+ pass
+
+
+class TwitterListResponse(list, TwitterResponse):
+ pass
+
+
def wrap_response(response, headers):
response_typ = type(response)
- if response_typ is bool:
- # HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool.
- response_typ = int
+ if response_typ is dict:
+ res = TwitterDictResponse(response)
+ res.headers = headers
+ elif response_typ is list:
+ res = TwitterListResponse(response)
+ res.headers = headers
+ else:
+ res = response
+ return res
+
+
+POST_ACTIONS_RE = re.compile('(' + '|'.join(POST_ACTIONS) + r')(/\d+)?$')
- class WrappedTwitterResponse(response_typ, TwitterResponse):
- __doc__ = TwitterResponse.__doc__
+def method_for_uri(uri):
+ if POST_ACTIONS_RE.search(uri):
+ return "POST"
+ return "GET"
- def __init__(self, response, headers):
- response_typ.__init__(self, response)
- TwitterResponse.__init__(self, headers)
- def __new__(cls, response, headers):
- return response_typ.__new__(cls, response)
+def build_uri(orig_uriparts, kwargs):
+ """
+ Build the URI from the original uriparts and kwargs. Modifies kwargs.
+ """
+ uriparts = []
+ for uripart in orig_uriparts:
+ # If this part matches a keyword argument (starting with _), use
+ # the supplied value. Otherwise, just use the part.
+ if uripart.startswith("_"):
+ part = (str(kwargs.pop(uripart, uripart)))
+ else:
+ part = uripart
+ uriparts.append(part)
+ uri = '/'.join(uriparts)
- return WrappedTwitterResponse(response, headers)
+ # If an id kwarg is present and there is no id to fill in in
+ # the list of uriparts, assume the id goes at the end.
+ id = kwargs.pop('id', None)
+ if id:
+ uri += "/%s" % (id)
+ return uri
class TwitterCall(object):
+ TWITTER_UNAVAILABLE_WAIT = 30 # delay after HTTP codes 502, 503 or 504
+
def __init__(
- self, auth, format, domain, callable_cls, uri="",
- uriparts=None, secure=True):
+ self, auth, format, domain, callable_cls, uri="",
+ uriparts=None, secure=True, timeout=None, gzip=False, retry=False):
self.auth = auth
self.format = format
self.domain = domain
self.uri = uri
self.uriparts = uriparts
self.secure = secure
+ self.timeout = timeout
+ self.gzip = gzip
+ self.retry = retry
def __getattr__(self, k):
try:
def extend_call(arg):
return self.callable_cls(
auth=self.auth, format=self.format, domain=self.domain,
- callable_cls=self.callable_cls, uriparts=self.uriparts \
- + (arg,),
- secure=self.secure)
+ callable_cls=self.callable_cls, timeout=self.timeout,
+ secure=self.secure, gzip=self.gzip, retry=self.retry,
+ uriparts=self.uriparts + (arg,))
if k == "_":
return extend_call
else:
return extend_call(k)
def __call__(self, **kwargs):
- # Build the uri.
- uriparts = []
- for uripart in self.uriparts:
- # If this part matches a keyword argument, use the
- # supplied value otherwise, just use the part.
- uriparts.append(str(kwargs.pop(uripart, uripart)))
- uri = '/'.join(uriparts)
-
- method = kwargs.pop('_method', None)
- if not method:
- method = "GET"
- for action in POST_ACTIONS:
- if re.search("%s(/\d+)?$" % action, uri):
- method = "POST"
- break
-
- # If an id kwarg is present and there is no id to fill in in
- # the list of uriparts, assume the id goes at the end.
- id = kwargs.pop('id', None)
- if id:
- uri += "/%s" %(id)
+ kwargs = dict(kwargs)
+ uri = build_uri(self.uriparts, kwargs)
+ method = kwargs.pop('_method', None) or method_for_uri(uri)
+ domain = self.domain
# If an _id kwarg is present, this is treated as id as a CGI
# param.
dot = ""
if self.format:
dot = "."
- uriBase = "http%s://%s/%s%s%s" %(
- secure_str, self.domain, uri, dot, self.format)
-
- headers = {'Accept-Encoding': 'gzip'}
+ url_base = "http%s://%s/%s%s%s" % (
+ secure_str, domain, uri, dot, self.format)
+
+ # Check if argument tells whether img is already base64 encoded
+ b64_convert = not kwargs.pop("_base64", False)
+ if b64_convert:
+ import base64
+
+ # Catch media arguments to handle oauth query differently for multipart
+ media = None
+ if 'media' in kwargs:
+ mediafield = 'media'
+ media = kwargs.pop('media')
+ media_raw = True
+ elif 'media[]' in kwargs:
+ mediafield = 'media[]'
+ media = kwargs.pop('media[]')
+ if b64_convert:
+ media = base64.b64encode(media)
+ media_raw = False
+
+ # Catch media arguments that are not accepted through multipart
+ # and are not yet base64 encoded
+ if b64_convert:
+ for arg in ['banner', 'image']:
+ if arg in kwargs:
+ kwargs[arg] = base64.b64encode(kwargs[arg])
+
+ headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
+ body = None
+ arg_data = None
if self.auth:
headers.update(self.auth.generate_headers())
- arg_data = self.auth.encode_params(uriBase, method, kwargs)
- if method == 'GET':
- uriBase += '?' + arg_data
- body = None
+ # Use urlencoded oauth args with no params when sending media
+ # via multipart and send it directly via uri even for post
+ arg_data = self.auth.encode_params(
+ url_base, method, {} if media else kwargs)
+ if method == 'GET' or media:
+ url_base += '?' + arg_data
else:
- body = arg_data.encode('utf8')
-
- req = urllib_request.Request(uriBase, body, headers)
- return self._handle_response(req, uri, arg_data, _timeout)
+ body = arg_data.encode('utf-8')
+
+ # Handle query as multipart when sending media
+ if media:
+ BOUNDARY = b"###Python-Twitter###"
+ bod = []
+ bod.append(b'--' + BOUNDARY)
+ bod.append(
+ b'Content-Disposition: form-data; name="'
+ + actually_bytes(mediafield)
+ + b'"')
+ bod.append(b'Content-Type: application/octet-stream')
+ if not media_raw:
+ bod.append(b'Content-Transfer-Encoding: base64')
+ bod.append(b'')
+ bod.append(actually_bytes(media))
+ for k, v in kwargs.items():
+ k = actually_bytes(k)
+ v = actually_bytes(v)
+ bod.append(b'--' + BOUNDARY)
+ bod.append(b'Content-Disposition: form-data; name="' + k + b'"')
+ bod.append(b'Content-Type: text/plain;charset=utf-8')
+ bod.append(b'')
+ bod.append(v)
+ bod.append(b'--' + BOUNDARY + b'--')
+ bod.append(b'')
+ bod.append(b'')
+ body = b'\r\n'.join(bod)
+ # print(body.decode('utf-8', errors='ignore'))
+ headers['Content-Type'] = \
+ b'multipart/form-data; boundary=' + BOUNDARY
+
+ if not PY_3_OR_HIGHER:
+ url_base = url_base.encode("utf-8")
+ for k in headers:
+ headers[actually_bytes(k)] = actually_bytes(headers.pop(k))
+
+ req = urllib_request.Request(url_base, data=body, headers=headers)
+ if self.retry:
+ return self._handle_response_with_retry(req, uri, arg_data, _timeout)
+ else:
+ return self._handle_response(req, uri, arg_data, _timeout)
def _handle_response(self, req, uri, arg_data, _timeout=None):
kwargs = {}
return handle
try:
data = handle.read()
- except httplib.IncompleteRead, e:
+ except http_client.IncompleteRead as e:
# Even if we don't get all the bytes we should have there
# may be a complete response in e.partial
data = e.partial
buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
data = f.read()
- if "json" == self.format:
+ if len(data) == 0:
+ return wrap_response({}, handle.headers)
+ elif "json" == self.format:
res = json.loads(data.decode('utf8'))
return wrap_response(res, handle.headers)
else:
else:
raise TwitterHTTPError(e, uri, self.format, arg_data)
+ def _handle_response_with_retry(self, req, uri, arg_data, _timeout=None):
+ retry = self.retry
+ while retry:
+ try:
+ return self._handle_response(req, uri, arg_data, _timeout)
+ except TwitterHTTPError as e:
+ if e.e.code == 429:
+ # API rate limit reached
+ reset = int(e.e.headers.get('X-Rate-Limit-Reset', time() + 30))
+ delay = int(reset - time() + 2) # add some extra margin
+ print("API rate limit reached; waiting for %ds..." % delay, file=sys.stderr)
+ elif e.e.code in (502, 503, 504):
+ delay = self.TWITTER_UNAVAILABLE_WAIT
+ print("Service unavailable; waiting for %ds..." % delay, file=sys.stderr)
+ else:
+ raise
+ if isinstance(retry, int) and not isinstance(retry, bool):
+ if retry <= 0:
+ raise
+ retry -= 1
+ sleep(delay)
+
+
class Twitter(TwitterCall):
"""
The minimalist yet fully featured Twitter API class.
Examples::
+ from twitter import *
+
t = Twitter(
- auth=OAuth(token, token_key, con_secret, con_secret_key)))
+ auth=OAuth(token, token_key, con_secret, con_secret_key))
# Get your "home" timeline
t.statuses.home_timeline()
# Get a particular friend's timeline
- t.statuses.friends_timeline(id="billybob")
+ t.statuses.user_timeline(screen_name="billybob")
- # Also supported (but totally weird)
- t.statuses.friends_timeline.billybob()
+ # to pass in GET/POST parameters, such as `count`
+ t.statuses.home_timeline(count=5)
+
+ # to pass in the GET/POST parameter `id` you need to use `_id`
+ t.statuses.oembed(_id=1234567890)
# Update your status
t.statuses.update(
text="I think yer swell!")
# Get the members of tamtar's list "Things That Are Rad"
- t._("tamtar")._("things-that-are-rad").members()
-
- # Note how the magic `_` method can be used to insert data
- # into the middle of a call. You can also use replacement:
- t.user.list.members(user="tamtar", list="things-that-are-rad")
+ t.lists.members(owner_screen_name="tamtar", slug="things-that-are-rad")
# An *optional* `_timeout` parameter can also be used for API
# calls which take much more time than normal or twitter stops
- # responding for some reasone
+ # responding for some reason:
t.users.lookup(
screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
_timeout=1)
+ # Overriding Method: GET/POST
+ # you should not need to use this method as this library properly
+ # detects whether GET or POST should be used, Nevertheless
+ # to force a particular method, use `_method`
+ t.statuses.oembed(_id=1234567890, _method='GET')
+
+ # Send a tweet with an image included (or set your banner or logo similarily)
+ # by just reading your image from the web or a file in a string:
+ status = "PTT ★"
+ with open("example.png", "rb") as imagefile:
+ params = {"media[]": imagefile.read(), "status": status}
+ t.statuses.update_with_media(**params)
+
+ # Or by sending a base64 encoded image:
+ params = {"media[]": base64_image, "status": status, "_base64": True}
+ t.statuses.update_with_media(**params)
Searching Twitter::
"""
def __init__(
- self, format="json",
- domain="api.twitter.com", secure=True, auth=None,
- api_version=_DEFAULT):
+ self, format="json",
+ domain="api.twitter.com", secure=True, auth=None,
+ api_version=_DEFAULT, retry=False):
"""
Create a new twitter API connector.
`domain` lets you change the domain you are connecting. By
- default it's `api.twitter.com` but `search.twitter.com` may be
- useful too.
+ default it's `api.twitter.com`.
If `secure` is False you will connect with HTTP instead of
HTTPS.
`api_version` is used to set the base uri. By default it's
- '1'. If you are using "search.twitter.com" set this to None.
+ '1.1'.
+
+ If `retry` is True, API rate limits will automatically be
+ handled by waiting until the next reset, as indicated by
+ the X-Rate-Limit-Reset HTTP header. If retry is an integer,
+ it defines the number of retries attempted.
"""
if not auth:
auth = NoAuth()
if (format not in ("json", "xml", "")):
- raise ValueError("Unknown data format '%s'" %(format))
+ raise ValueError("Unknown data format '%s'" % (format))
if api_version is _DEFAULT:
- if domain == 'api.twitter.com':
- api_version = '1.1'
- else:
- api_version = None
+ api_version = '1.1'
uriparts = ()
if api_version:
TwitterCall.__init__(
self, auth=auth, format=format, domain=domain,
callable_cls=TwitterCall,
- secure=secure, uriparts=uriparts)
+ secure=secure, uriparts=uriparts, retry=retry)
__all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]