# encoding: utf-8
-from __future__ import unicode_literals
+from __future__ import unicode_literals, print_function
+
+from .util import PY_3_OR_HIGHER, actually_bytes
try:
import urllib.request as urllib_request
import re
import sys
import gzip
+from time import sleep, time
try:
import http.client as http_client
if self.e.headers.get('Content-Encoding') == 'gzip':
buf = StringIO(data)
f = gzip.GzipFile(fileobj=buf)
- self.response_data = f.read()
+ data = f.read()
+ if len(data) == 0:
+ data = {}
+ elif "json" == self.format:
+ data = json.loads(data.decode('utf8'))
else:
- self.response_data = data
+ data = data.decode('utf8')
+ self.response_data = data
super(TwitterHTTPError, self).__init__(str(self))
def __str__(self):
return "POST"
return "GET"
+
+def build_uri(orig_uriparts, kwargs):
+ """
+ Build the URI from the original uriparts and kwargs. Modifies kwargs.
+ """
+ uriparts = []
+ for uripart in orig_uriparts:
+ # If this part matches a keyword argument (starting with _), use
+ # the supplied value. Otherwise, just use the part.
+ if uripart.startswith("_"):
+ part = (str(kwargs.pop(uripart, uripart)))
+ else:
+ part = uripart
+ uriparts.append(part)
+ uri = '/'.join(uriparts)
+
+ # If an id kwarg is present and there is no id to fill in in
+ # the list of uriparts, assume the id goes at the end.
+ id = kwargs.pop('id', None)
+ if id:
+ uri += "/%s" % (id)
+
+ return uri
+
+
class TwitterCall(object):
+ TWITTER_UNAVAILABLE_WAIT = 30 # delay after HTTP codes 502, 503 or 504
+
def __init__(
self, auth, format, domain, callable_cls, uri="",
- uriparts=None, secure=True, timeout=None, gzip=False):
+ uriparts=None, secure=True, timeout=None, gzip=False, retry=False):
self.auth = auth
self.format = format
self.domain = domain
self.secure = secure
self.timeout = timeout
self.gzip = gzip
+ self.retry = retry
def __getattr__(self, k):
try:
return self.callable_cls(
auth=self.auth, format=self.format, domain=self.domain,
callable_cls=self.callable_cls, timeout=self.timeout,
- secure=self.secure, gzip=self.gzip,
+ secure=self.secure, gzip=self.gzip, retry=self.retry,
uriparts=self.uriparts + (arg,))
if k == "_":
return extend_call
return extend_call(k)
def __call__(self, **kwargs):
- # Build the uri.
- uriparts = []
- for uripart in self.uriparts:
- # If this part matches a keyword argument, use the
- # supplied value otherwise, just use the part.
- uriparts.append(str(kwargs.pop(uripart, uripart)))
- uri = '/'.join(uriparts)
-
+ kwargs = dict(kwargs)
+ uri = build_uri(self.uriparts, kwargs)
method = kwargs.pop('_method', None) or method_for_uri(uri)
-
- # If an id kwarg is present and there is no id to fill in in
- # the list of uriparts, assume the id goes at the end.
- id = kwargs.pop('id', None)
- if id:
- uri += "/%s" % (id)
+ domain = self.domain
# If an _id kwarg is present, this is treated as id as a CGI
# param.
dot = ""
if self.format:
dot = "."
- uriBase = "http%s://%s/%s%s%s" % (
- secure_str, self.domain, uri, dot, self.format)
+ url_base = "http%s://%s/%s%s%s" % (
+ secure_str, domain, uri, dot, self.format)
# Check if argument tells whether img is already base64 encoded
- b64_convert = True
- if "_base64" in kwargs:
- b64_convert = not kwargs.pop("_base64")
+ b64_convert = not kwargs.pop("_base64", False)
if b64_convert:
import base64
# Catch media arguments to handle oauth query differently for multipart
media = None
- for arg in ['media[]']:
- if arg in kwargs:
- media = kwargs.pop(arg)
- if b64_convert:
- media = base64.b64encode(media)
- if sys.version_info >= (3, 0):
- media = str(media, 'utf8')
- mediafield = arg
- break
+ if 'media' in kwargs:
+ mediafield = 'media'
+ media = kwargs.pop('media')
+ media_raw = True
+ elif 'media[]' in kwargs:
+ mediafield = 'media[]'
+ media = kwargs.pop('media[]')
+ if b64_convert:
+ media = base64.b64encode(media)
+ media_raw = False
# Catch media arguments that are not accepted through multipart
# and are not yet base64 encoded
# Use urlencoded oauth args with no params when sending media
# via multipart and send it directly via uri even for post
arg_data = self.auth.encode_params(
- uriBase, method, {} if media else kwargs)
+ url_base, method, {} if media else kwargs)
if method == 'GET' or media:
- uriBase += '?' + arg_data
+ url_base += '?' + arg_data
else:
- body = arg_data.encode('utf8')
+ body = arg_data.encode('utf-8')
# Handle query as multipart when sending media
if media:
- BOUNDARY = "###Python-Twitter###"
+ BOUNDARY = b"###Python-Twitter###"
bod = []
- bod.append('--' + BOUNDARY)
+ bod.append(b'--' + BOUNDARY)
bod.append(
- 'Content-Disposition: form-data; name="%s"' % mediafield)
- bod.append('Content-Transfer-Encoding: base64')
- bod.append('')
- bod.append(media)
+ b'Content-Disposition: form-data; name="'
+ + actually_bytes(mediafield)
+ + b'"')
+ bod.append(b'Content-Type: application/octet-stream')
+ if not media_raw:
+ bod.append(b'Content-Transfer-Encoding: base64')
+ bod.append(b'')
+ bod.append(actually_bytes(media))
for k, v in kwargs.items():
- bod.append('--' + BOUNDARY)
- bod.append('Content-Disposition: form-data; name="%s"' % k)
- bod.append('')
+ k = actually_bytes(k)
+ v = actually_bytes(v)
+ bod.append(b'--' + BOUNDARY)
+ bod.append(b'Content-Disposition: form-data; name="' + k + b'"')
+ bod.append(b'Content-Type: text/plain;charset=utf-8')
+ bod.append(b'')
bod.append(v)
- bod.append('--' + BOUNDARY + '--')
- body = '\r\n'.join(bod).encode('utf8')
+ bod.append(b'--' + BOUNDARY + b'--')
+ bod.append(b'')
+ bod.append(b'')
+ body = b'\r\n'.join(bod)
+ # print(body.decode('utf-8', errors='ignore'))
headers['Content-Type'] = \
- 'multipart/form-data; boundary=%s' % BOUNDARY
+ b'multipart/form-data; boundary=' + BOUNDARY
- if sys.version_info[:2] == (2, 7):
- uriBase = uriBase.encode("utf-8")
+ if not PY_3_OR_HIGHER:
+ url_base = url_base.encode("utf-8")
for k in headers:
- headers[k.encode('utf-8')] = headers.pop(k)
+ headers[actually_bytes(k)] = actually_bytes(headers.pop(k))
- req = urllib_request.Request(uriBase, body, headers)
- return self._handle_response(req, uri, arg_data, _timeout)
+ req = urllib_request.Request(url_base, data=body, headers=headers)
+ if self.retry:
+ return self._handle_response_with_retry(req, uri, arg_data, _timeout)
+ else:
+ return self._handle_response(req, uri, arg_data, _timeout)
def _handle_response(self, req, uri, arg_data, _timeout=None):
kwargs = {}
else:
raise TwitterHTTPError(e, uri, self.format, arg_data)
+ def _handle_response_with_retry(self, req, uri, arg_data, _timeout=None):
+ retry = self.retry
+ while retry:
+ try:
+ return self._handle_response(req, uri, arg_data, _timeout)
+ except TwitterHTTPError as e:
+ if e.e.code == 429:
+ # API rate limit reached
+ reset = int(e.e.headers.get('X-Rate-Limit-Reset', time() + 30))
+ delay = int(reset - time() + 2) # add some extra margin
+ print("API rate limit reached; waiting for %ds..." % delay, file=sys.stderr)
+ elif e.e.code in (502, 503, 504):
+ delay = self.TWITTER_UNAVAILABLE_WAIT
+ print("Service unavailable; waiting for %ds..." % delay, file=sys.stderr)
+ else:
+ raise
+ if isinstance(retry, int) and not isinstance(retry, bool):
+ if retry <= 0:
+ raise
+ retry -= 1
+ sleep(delay)
+
class Twitter(TwitterCall):
"""
from twitter import *
t = Twitter(
- auth=OAuth(token, token_key, con_secret, con_secret_key)))
+ auth=OAuth(token, token_key, con_secret, con_secret_key))
# Get your "home" timeline
t.statuses.home_timeline()
text="I think yer swell!")
# Get the members of tamtar's list "Things That Are Rad"
- t._("tamtar")._("things-that-are-rad").members()
-
- # Note how the magic `_` method can be used to insert data
- # into the middle of a call. You can also use replacement:
- t.user.list.members(user="tamtar", list="things-that-are-rad")
+ t.lists.members(owner_screen_name="tamtar", slug="things-that-are-rad")
# An *optional* `_timeout` parameter can also be used for API
# calls which take much more time than normal or twitter stops
# Send a tweet with an image included (or set your banner or logo similarily)
# by just reading your image from the web or a file in a string:
- # Note that the text sent as status along with the picture must be unicode.
- status = u"PTT ★" # or with python 3: status = "PTT ★"
+ status = "PTT ★"
with open("example.png", "rb") as imagefile:
params = {"media[]": imagefile.read(), "status": status}
t.statuses.update_with_media(**params)
def __init__(
self, format="json",
domain="api.twitter.com", secure=True, auth=None,
- api_version=_DEFAULT):
+ api_version=_DEFAULT, retry=False):
"""
Create a new twitter API connector.
`api_version` is used to set the base uri. By default it's
'1.1'.
+
+ If `retry` is True, API rate limits will automatically be
+ handled by waiting until the next reset, as indicated by
+ the X-Rate-Limit-Reset HTTP header. If retry is an integer,
+ it defines the number of retries attempted.
"""
if not auth:
auth = NoAuth()
TwitterCall.__init__(
self, auth=auth, format=format, domain=domain,
callable_cls=TwitterCall,
- secure=secure, uriparts=uriparts)
+ secure=secure, uriparts=uriparts, retry=retry)
__all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]