]> jfr.im git - z_archive/twitter.git/blob - twitter/api.py
Unit test for previous silly regression
[z_archive/twitter.git] / twitter / api.py
1 # encoding: utf-8
2 from __future__ import unicode_literals, print_function
3
4 try:
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
7 except ImportError:
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10
11 try:
12 from cStringIO import StringIO
13 except ImportError:
14 from io import BytesIO as StringIO
15
16 from .twitter_globals import POST_ACTIONS
17 from .auth import NoAuth
18
19 import re
20 import sys
21 import gzip
22 from time import sleep, time
23
24 try:
25 import http.client as http_client
26 except ImportError:
27 import httplib as http_client
28
29 try:
30 import json
31 except ImportError:
32 import simplejson as json
33
34
35 class _DEFAULT(object):
36 pass
37
38
39 class TwitterError(Exception):
40 """
41 Base Exception thrown by the Twitter object when there is a
42 general error interacting with the API.
43 """
44 pass
45
46
47 class TwitterHTTPError(TwitterError):
48 """
49 Exception thrown by the Twitter object when there is an
50 HTTP error interacting with twitter.com.
51 """
52 def __init__(self, e, uri, format, uriparts):
53 self.e = e
54 self.uri = uri
55 self.format = format
56 self.uriparts = uriparts
57 try:
58 data = self.e.fp.read()
59 except http_client.IncompleteRead as e:
60 # can't read the error text
61 # let's try some of it
62 data = e.partial
63 if self.e.headers.get('Content-Encoding') == 'gzip':
64 buf = StringIO(data)
65 f = gzip.GzipFile(fileobj=buf)
66 self.response_data = f.read()
67 else:
68 self.response_data = data
69 super(TwitterHTTPError, self).__init__(str(self))
70
71 def __str__(self):
72 fmt = ("." + self.format) if self.format else ""
73 return (
74 "Twitter sent status %i for URL: %s%s using parameters: "
75 "(%s)\ndetails: %s" % (
76 self.e.code, self.uri, fmt, self.uriparts,
77 self.response_data))
78
79
80 class TwitterResponse(object):
81 """
82 Response from a twitter request. Behaves like a list or a string
83 (depending on requested format) but it has a few other interesting
84 attributes.
85
86 `headers` gives you access to the response headers as an
87 httplib.HTTPHeaders instance. You can do
88 `response.headers.get('h')` to retrieve a header.
89 """
90
91 @property
92 def rate_limit_remaining(self):
93 """
94 Remaining requests in the current rate-limit.
95 """
96 return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
97
98 @property
99 def rate_limit_limit(self):
100 """
101 The rate limit ceiling for that given request.
102 """
103 return int(self.headers.get('X-Rate-Limit-Limit', "0"))
104
105 @property
106 def rate_limit_reset(self):
107 """
108 Time in UTC epoch seconds when the rate limit will reset.
109 """
110 return int(self.headers.get('X-Rate-Limit-Reset', "0"))
111
112
113 class TwitterDictResponse(dict, TwitterResponse):
114 pass
115
116
117 class TwitterListResponse(list, TwitterResponse):
118 pass
119
120
121 def wrap_response(response, headers):
122 response_typ = type(response)
123 if response_typ is dict:
124 res = TwitterDictResponse(response)
125 res.headers = headers
126 elif response_typ is list:
127 res = TwitterListResponse(response)
128 res.headers = headers
129 else:
130 res = response
131 return res
132
133
134 POST_ACTIONS_RE = re.compile('(' + '|'.join(POST_ACTIONS) + r')(/\d+)?$')
135
136 def method_for_uri(uri):
137 if POST_ACTIONS_RE.search(uri):
138 return "POST"
139 return "GET"
140
141
142 def build_uri(orig_uriparts, kwargs):
143 """
144 Build the URI from the original uriparts and kwargs. Modifies kwargs.
145 """
146 uriparts = []
147 for uripart in orig_uriparts:
148 # If this part matches a keyword argument (starting with _), use
149 # the supplied value. Otherwise, just use the part.
150 if uripart.startswith("_"):
151 part = (str(kwargs.pop(uripart, uripart)))
152 else:
153 part = uripart
154 uriparts.append(part)
155 uri = '/'.join(uriparts)
156
157 # If an id kwarg is present and there is no id to fill in in
158 # the list of uriparts, assume the id goes at the end.
159 id = kwargs.pop('id', None)
160 if id:
161 uri += "/%s" % (id)
162
163 return uri
164
165
166 class TwitterCall(object):
167
168 TWITTER_UNAVAILABLE_WAIT = 30 # delay after HTTP codes 502, 503 or 504
169
170 def __init__(
171 self, auth, format, domain, callable_cls, uri="",
172 uriparts=None, secure=True, timeout=None, gzip=False, retry=False):
173 self.auth = auth
174 self.format = format
175 self.domain = domain
176 self.callable_cls = callable_cls
177 self.uri = uri
178 self.uriparts = uriparts
179 self.secure = secure
180 self.timeout = timeout
181 self.gzip = gzip
182 self.retry = retry
183
184 def __getattr__(self, k):
185 try:
186 return object.__getattr__(self, k)
187 except AttributeError:
188 def extend_call(arg):
189 return self.callable_cls(
190 auth=self.auth, format=self.format, domain=self.domain,
191 callable_cls=self.callable_cls, timeout=self.timeout,
192 secure=self.secure, gzip=self.gzip, retry=self.retry,
193 uriparts=self.uriparts + (arg,))
194 if k == "_":
195 return extend_call
196 else:
197 return extend_call(k)
198
199 def __call__(self, **kwargs):
200 kwargs = dict(kwargs)
201 uri = build_uri(self.uriparts, kwargs)
202 method = kwargs.pop('_method', None) or method_for_uri(uri)
203
204 # If an _id kwarg is present, this is treated as id as a CGI
205 # param.
206 _id = kwargs.pop('_id', None)
207 if _id:
208 kwargs['id'] = _id
209
210 # If an _timeout is specified in kwargs, use it
211 _timeout = kwargs.pop('_timeout', None)
212
213 secure_str = ''
214 if self.secure:
215 secure_str = 's'
216 dot = ""
217 if self.format:
218 dot = "."
219 uriBase = "http%s://%s/%s%s%s" % (
220 secure_str, self.domain, uri, dot, self.format)
221
222 # Check if argument tells whether img is already base64 encoded
223 b64_convert = not kwargs.pop("_base64", False)
224 if b64_convert:
225 import base64
226
227 # Catch media arguments to handle oauth query differently for multipart
228 media = None
229 if 'media' in kwargs:
230 mediafield = 'media'
231 media = kwargs.pop('media')
232 elif 'media[]' in kwargs:
233 mediafield = 'media[]'
234 media = kwargs.pop('media[]')
235 if b64_convert:
236 media = base64.b64encode(media)
237 if sys.version_info >= (3, 0):
238 media = str(media, 'utf8')
239
240 # Catch media arguments that are not accepted through multipart
241 # and are not yet base64 encoded
242 if b64_convert:
243 for arg in ['banner', 'image']:
244 if arg in kwargs:
245 kwargs[arg] = base64.b64encode(kwargs[arg])
246
247 headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
248 body = None
249 arg_data = None
250 if self.auth:
251 headers.update(self.auth.generate_headers())
252 # Use urlencoded oauth args with no params when sending media
253 # via multipart and send it directly via uri even for post
254 arg_data = self.auth.encode_params(
255 uriBase, method, {} if media else kwargs)
256 if method == 'GET' or media:
257 uriBase += '?' + arg_data
258 else:
259 body = arg_data.encode('utf8')
260
261 # Handle query as multipart when sending media
262 if media:
263 BOUNDARY = b"###Python-Twitter###"
264 bod = []
265 bod.append(b'--' + BOUNDARY)
266 bod.append(
267 b'Content-Disposition: form-data; name="%s"' % mediafield.encode('utf-8'))
268 bod.append(b'Content-Transfer-Encoding: base64')
269 bod.append(b'')
270 bod.append(media)
271 for k, v in kwargs.items():
272 if sys.version_info < (3, 0):
273 k = k.encode("utf-8")
274 v = v.encode("utf-8")
275 bod.append(b'--' + BOUNDARY)
276 bod.append(b'Content-Disposition: form-data; name="%s"' % k)
277 bod.append(b'')
278 bod.append(v)
279 bod.append(b'--' + BOUNDARY + b'--')
280 body = b'\r\n'.join(bod)
281 headers['Content-Type'] = \
282 'multipart/form-data; boundary=%s' % BOUNDARY
283
284 if sys.version_info < (3, 0):
285 uriBase = uriBase.encode("utf-8")
286 for k in headers:
287 headers[k.encode('utf-8')] = headers.pop(k)
288
289 req = urllib_request.Request(uriBase, body, headers)
290 if self.retry:
291 return self._handle_response_with_retry(req, uri, arg_data, _timeout)
292 else:
293 return self._handle_response(req, uri, arg_data, _timeout)
294
295 def _handle_response(self, req, uri, arg_data, _timeout=None):
296 kwargs = {}
297 if _timeout:
298 kwargs['timeout'] = _timeout
299 try:
300 handle = urllib_request.urlopen(req, **kwargs)
301 if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
302 return handle
303 try:
304 data = handle.read()
305 except http_client.IncompleteRead as e:
306 # Even if we don't get all the bytes we should have there
307 # may be a complete response in e.partial
308 data = e.partial
309 if handle.info().get('Content-Encoding') == 'gzip':
310 # Handle gzip decompression
311 buf = StringIO(data)
312 f = gzip.GzipFile(fileobj=buf)
313 data = f.read()
314 if len(data) == 0:
315 return wrap_response({}, handle.headers)
316 elif "json" == self.format:
317 res = json.loads(data.decode('utf8'))
318 return wrap_response(res, handle.headers)
319 else:
320 return wrap_response(
321 data.decode('utf8'), handle.headers)
322 except urllib_error.HTTPError as e:
323 if (e.code == 304):
324 return []
325 else:
326 raise TwitterHTTPError(e, uri, self.format, arg_data)
327
328 def _handle_response_with_retry(self, req, uri, arg_data, _timeout=None):
329 retry = self.retry
330 while retry:
331 try:
332 return self._handle_response(req, uri, arg_data, _timeout)
333 except TwitterHTTPError as e:
334 if e.e.code == 429:
335 # API rate limit reached
336 reset = int(e.e.headers.get('X-Rate-Limit-Reset', time() + 30))
337 delay = int(reset - time() + 2) # add some extra margin
338 print("API rate limit reached; waiting for %ds..." % delay, file=sys.stderr)
339 elif e.e.code in (502, 503, 504):
340 delay = self.TWITTER_UNAVAILABLE_WAIT
341 print("Service unavailable; waiting for %ds..." % delay, file=sys.stderr)
342 else:
343 raise
344 if isinstance(retry, int):
345 if retry <= 0:
346 raise
347 retry -= 1
348 sleep(delay)
349
350
351 class Twitter(TwitterCall):
352 """
353 The minimalist yet fully featured Twitter API class.
354
355 Get RESTful data by accessing members of this class. The result
356 is decoded python objects (lists and dicts).
357
358 The Twitter API is documented at:
359
360 http://dev.twitter.com/doc
361
362
363 Examples::
364
365 from twitter import *
366
367 t = Twitter(
368 auth=OAuth(token, token_key, con_secret, con_secret_key)))
369
370 # Get your "home" timeline
371 t.statuses.home_timeline()
372
373 # Get a particular friend's timeline
374 t.statuses.user_timeline(screen_name="billybob")
375
376 # to pass in GET/POST parameters, such as `count`
377 t.statuses.home_timeline(count=5)
378
379 # to pass in the GET/POST parameter `id` you need to use `_id`
380 t.statuses.oembed(_id=1234567890)
381
382 # Update your status
383 t.statuses.update(
384 status="Using @sixohsix's sweet Python Twitter Tools.")
385
386 # Send a direct message
387 t.direct_messages.new(
388 user="billybob",
389 text="I think yer swell!")
390
391 # Get the members of tamtar's list "Things That Are Rad"
392 t._("tamtar")._("things-that-are-rad").members()
393
394 # Note how the magic `_` method can be used to insert data
395 # into the middle of a call. You can also use replacement:
396 t.user.list.members(user="tamtar", list="things-that-are-rad")
397
398 # An *optional* `_timeout` parameter can also be used for API
399 # calls which take much more time than normal or twitter stops
400 # responding for some reason:
401 t.users.lookup(
402 screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
403 _timeout=1)
404
405 # Overriding Method: GET/POST
406 # you should not need to use this method as this library properly
407 # detects whether GET or POST should be used, Nevertheless
408 # to force a particular method, use `_method`
409 t.statuses.oembed(_id=1234567890, _method='GET')
410
411 # Send a tweet with an image included (or set your banner or logo similarily)
412 # by just reading your image from the web or a file in a string:
413 status = "PTT ★"
414 with open("example.png", "rb") as imagefile:
415 params = {"media[]": imagefile.read(), "status": status}
416 t.statuses.update_with_media(**params)
417
418 # Or by sending a base64 encoded image:
419 params = {"media[]": base64_image, "status": status, "_base64": True}
420 t.statuses.update_with_media(**params)
421
422
423 Searching Twitter::
424
425 # Search for the latest tweets about #pycon
426 t.search.tweets(q="#pycon")
427
428
429 Using the data returned
430 -----------------------
431
432 Twitter API calls return decoded JSON. This is converted into
433 a bunch of Python lists, dicts, ints, and strings. For example::
434
435 x = twitter.statuses.home_timeline()
436
437 # The first 'tweet' in the timeline
438 x[0]
439
440 # The screen name of the user who wrote the first 'tweet'
441 x[0]['user']['screen_name']
442
443
444 Getting raw XML data
445 --------------------
446
447 If you prefer to get your Twitter data in XML format, pass
448 format="xml" to the Twitter object when you instantiate it::
449
450 twitter = Twitter(format="xml")
451
452 The output will not be parsed in any way. It will be a raw string
453 of XML.
454
455 """
456 def __init__(
457 self, format="json",
458 domain="api.twitter.com", secure=True, auth=None,
459 api_version=_DEFAULT, retry=False):
460 """
461 Create a new twitter API connector.
462
463 Pass an `auth` parameter to use the credentials of a specific
464 user. Generally you'll want to pass an `OAuth`
465 instance::
466
467 twitter = Twitter(auth=OAuth(
468 token, token_secret, consumer_key, consumer_secret))
469
470
471 `domain` lets you change the domain you are connecting. By
472 default it's `api.twitter.com`.
473
474 If `secure` is False you will connect with HTTP instead of
475 HTTPS.
476
477 `api_version` is used to set the base uri. By default it's
478 '1.1'.
479
480 If `retry` is True, API rate limits will automatically be
481 handled by waiting until the next reset, as indicated by
482 the X-Rate-Limit-Reset HTTP header. If retry is an integer,
483 it defines the number of retries attempted.
484 """
485 if not auth:
486 auth = NoAuth()
487
488 if (format not in ("json", "xml", "")):
489 raise ValueError("Unknown data format '%s'" % (format))
490
491 if api_version is _DEFAULT:
492 api_version = '1.1'
493
494 uriparts = ()
495 if api_version:
496 uriparts += (str(api_version),)
497
498 TwitterCall.__init__(
499 self, auth=auth, format=format, domain=domain,
500 callable_cls=TwitterCall,
501 secure=secure, uriparts=uriparts, retry=retry)
502
503
504 __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]