]> jfr.im git - z_archive/twitter.git/blob - twitter/api.py
use user_timeline way less rate limited for tests
[z_archive/twitter.git] / twitter / api.py
1 # encoding: utf-8
2 from __future__ import unicode_literals
3
4 try:
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
7 except ImportError:
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10
11 try:
12 from cStringIO import StringIO
13 except ImportError:
14 from io import BytesIO as StringIO
15
16 from .twitter_globals import POST_ACTIONS
17 from .auth import NoAuth
18
19 import re
20 import sys
21 import gzip
22
23 try:
24 import http.client as http_client
25 except ImportError:
26 import httplib as http_client
27
28 try:
29 import json
30 except ImportError:
31 import simplejson as json
32
33
34 class _DEFAULT(object):
35 pass
36
37
38 class TwitterError(Exception):
39 """
40 Base Exception thrown by the Twitter object when there is a
41 general error interacting with the API.
42 """
43 pass
44
45
46 class TwitterHTTPError(TwitterError):
47 """
48 Exception thrown by the Twitter object when there is an
49 HTTP error interacting with twitter.com.
50 """
51 def __init__(self, e, uri, format, uriparts):
52 self.e = e
53 self.uri = uri
54 self.format = format
55 self.uriparts = uriparts
56 try:
57 data = self.e.fp.read()
58 except http_client.IncompleteRead as e:
59 # can't read the error text
60 # let's try some of it
61 data = e.partial
62 if self.e.headers.get('Content-Encoding') == 'gzip':
63 buf = StringIO(data)
64 f = gzip.GzipFile(fileobj=buf)
65 self.response_data = f.read()
66 else:
67 self.response_data = data
68 super(TwitterHTTPError, self).__init__(str(self))
69
70 def __str__(self):
71 fmt = ("." + self.format) if self.format else ""
72 return (
73 "Twitter sent status %i for URL: %s%s using parameters: "
74 "(%s)\ndetails: %s" % (
75 self.e.code, self.uri, fmt, self.uriparts,
76 self.response_data))
77
78
79 class TwitterResponse(object):
80 """
81 Response from a twitter request. Behaves like a list or a string
82 (depending on requested format) but it has a few other interesting
83 attributes.
84
85 `headers` gives you access to the response headers as an
86 httplib.HTTPHeaders instance. You can do
87 `response.headers.get('h')` to retrieve a header.
88 """
89
90 @property
91 def rate_limit_remaining(self):
92 """
93 Remaining requests in the current rate-limit.
94 """
95 return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
96
97 @property
98 def rate_limit_limit(self):
99 """
100 The rate limit ceiling for that given request.
101 """
102 return int(self.headers.get('X-Rate-Limit-Limit', "0"))
103
104 @property
105 def rate_limit_reset(self):
106 """
107 Time in UTC epoch seconds when the rate limit will reset.
108 """
109 return int(self.headers.get('X-Rate-Limit-Reset', "0"))
110
111
112 class TwitterDictResponse(dict, TwitterResponse):
113 pass
114
115
116 class TwitterListResponse(list, TwitterResponse):
117 pass
118
119
120 def wrap_response(response, headers):
121 response_typ = type(response)
122 if response_typ is dict:
123 res = TwitterDictResponse(response)
124 res.headers = headers
125 elif response_typ is list:
126 res = TwitterListResponse(response)
127 res.headers = headers
128 else:
129 res = response
130 return res
131
132
133 POST_ACTIONS_RE = re.compile('(' + '|'.join(POST_ACTIONS) + r')(/\d+)?$')
134
135 def method_for_uri(uri):
136 if POST_ACTIONS_RE.search(uri):
137 return "POST"
138 return "GET"
139
140 class TwitterCall(object):
141
142 def __init__(
143 self, auth, format, domain, callable_cls, uri="",
144 uriparts=None, secure=True, timeout=None, gzip=False):
145 self.auth = auth
146 self.format = format
147 self.domain = domain
148 self.callable_cls = callable_cls
149 self.uri = uri
150 self.uriparts = uriparts
151 self.secure = secure
152 self.timeout = timeout
153 self.gzip = gzip
154
155 def __getattr__(self, k):
156 try:
157 return object.__getattr__(self, k)
158 except AttributeError:
159 def extend_call(arg):
160 return self.callable_cls(
161 auth=self.auth, format=self.format, domain=self.domain,
162 callable_cls=self.callable_cls, timeout=self.timeout,
163 secure=self.secure, gzip=self.gzip,
164 uriparts=self.uriparts + (arg,))
165 if k == "_":
166 return extend_call
167 else:
168 return extend_call(k)
169
170 def __call__(self, **kwargs):
171 # Build the uri.
172 uriparts = []
173 for uripart in self.uriparts:
174 # If this part matches a keyword argument, use the
175 # supplied value otherwise, just use the part.
176 uriparts.append(str(kwargs.pop(uripart, uripart)))
177 uri = '/'.join(uriparts)
178
179 method = kwargs.pop('_method', None) or method_for_uri(uri)
180
181 # If an id kwarg is present and there is no id to fill in in
182 # the list of uriparts, assume the id goes at the end.
183 id = kwargs.pop('id', None)
184 if id:
185 uri += "/%s" % (id)
186
187 # If an _id kwarg is present, this is treated as id as a CGI
188 # param.
189 _id = kwargs.pop('_id', None)
190 if _id:
191 kwargs['id'] = _id
192
193 # If an _timeout is specified in kwargs, use it
194 _timeout = kwargs.pop('_timeout', None)
195
196 secure_str = ''
197 if self.secure:
198 secure_str = 's'
199 dot = ""
200 if self.format:
201 dot = "."
202 uriBase = "http%s://%s/%s%s%s" % (
203 secure_str, self.domain, uri, dot, self.format)
204
205 # Check if argument tells whether img is already base64 encoded
206 b64_convert = True
207 if "_base64" in kwargs:
208 b64_convert = not kwargs.pop("_base64")
209 if b64_convert:
210 import base64
211
212 # Catch media arguments to handle oauth query differently for multipart
213 media = None
214 for arg in ['media[]']:
215 if arg in kwargs:
216 media = kwargs.pop(arg)
217 if b64_convert:
218 media = base64.b64encode(media)
219 if sys.version_info >= (3, 0):
220 media = str(media, 'utf8')
221 mediafield = arg
222 break
223
224 # Catch media arguments that are not accepted through multipart
225 # and are not yet base64 encoded
226 if b64_convert:
227 for arg in ['banner', 'image']:
228 if arg in kwargs:
229 kwargs[arg] = base64.b64encode(kwargs[arg])
230
231 headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
232 body = None
233 arg_data = None
234 if self.auth:
235 headers.update(self.auth.generate_headers())
236 # Use urlencoded oauth args with no params when sending media
237 # via multipart and send it directly via uri even for post
238 arg_data = self.auth.encode_params(
239 uriBase, method, {} if media else kwargs)
240 if method == 'GET' or media:
241 uriBase += '?' + arg_data
242 else:
243 body = arg_data.encode('utf8')
244
245 # Handle query as multipart when sending media
246 if media:
247 BOUNDARY = "###Python-Twitter###"
248 bod = []
249 bod.append('--' + BOUNDARY)
250 bod.append(
251 'Content-Disposition: form-data; name="%s"' % mediafield)
252 bod.append('Content-Transfer-Encoding: base64')
253 bod.append('')
254 bod.append(media)
255 for k, v in kwargs.items():
256 bod.append('--' + BOUNDARY)
257 bod.append('Content-Disposition: form-data; name="%s"' % k)
258 bod.append('')
259 if sys.version_info[:2] <= (2, 7):
260 try:
261 v = v.decode("utf-8")
262 except:
263 pass
264 bod.append(v)
265 bod.append('--' + BOUNDARY + '--')
266 body = '\r\n'.join(bod).encode('utf8')
267 headers['Content-Type'] = \
268 'multipart/form-data; boundary=%s' % BOUNDARY
269
270 if sys.version_info[:2] <= (2, 7):
271 uriBase = uriBase.encode("utf-8")
272 for k in headers:
273 headers[k.encode('utf-8')] = headers.pop(k)
274
275 req = urllib_request.Request(uriBase, body, headers)
276 return self._handle_response(req, uri, arg_data, _timeout)
277
278 def _handle_response(self, req, uri, arg_data, _timeout=None):
279 kwargs = {}
280 if _timeout:
281 kwargs['timeout'] = _timeout
282 try:
283 handle = urllib_request.urlopen(req, **kwargs)
284 if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
285 return handle
286 try:
287 data = handle.read()
288 except http_client.IncompleteRead as e:
289 # Even if we don't get all the bytes we should have there
290 # may be a complete response in e.partial
291 data = e.partial
292 if handle.info().get('Content-Encoding') == 'gzip':
293 # Handle gzip decompression
294 buf = StringIO(data)
295 f = gzip.GzipFile(fileobj=buf)
296 data = f.read()
297 if len(data) == 0:
298 return wrap_response({}, handle.headers)
299 elif "json" == self.format:
300 res = json.loads(data.decode('utf8'))
301 return wrap_response(res, handle.headers)
302 else:
303 return wrap_response(
304 data.decode('utf8'), handle.headers)
305 except urllib_error.HTTPError as e:
306 if (e.code == 304):
307 return []
308 else:
309 raise TwitterHTTPError(e, uri, self.format, arg_data)
310
311
312 class Twitter(TwitterCall):
313 """
314 The minimalist yet fully featured Twitter API class.
315
316 Get RESTful data by accessing members of this class. The result
317 is decoded python objects (lists and dicts).
318
319 The Twitter API is documented at:
320
321 http://dev.twitter.com/doc
322
323
324 Examples::
325
326 from twitter import *
327
328 t = Twitter(
329 auth=OAuth(token, token_key, con_secret, con_secret_key)))
330
331 # Get your "home" timeline
332 t.statuses.home_timeline()
333
334 # Get a particular friend's timeline
335 t.statuses.user_timeline(screen_name="billybob")
336
337 # to pass in GET/POST parameters, such as `count`
338 t.statuses.home_timeline(count=5)
339
340 # to pass in the GET/POST parameter `id` you need to use `_id`
341 t.statuses.oembed(_id=1234567890)
342
343 # Update your status
344 t.statuses.update(
345 status="Using @sixohsix's sweet Python Twitter Tools.")
346
347 # Send a direct message
348 t.direct_messages.new(
349 user="billybob",
350 text="I think yer swell!")
351
352 # Get the members of tamtar's list "Things That Are Rad"
353 t._("tamtar")._("things-that-are-rad").members()
354
355 # Note how the magic `_` method can be used to insert data
356 # into the middle of a call. You can also use replacement:
357 t.user.list.members(user="tamtar", list="things-that-are-rad")
358
359 # An *optional* `_timeout` parameter can also be used for API
360 # calls which take much more time than normal or twitter stops
361 # responding for some reason:
362 t.users.lookup(
363 screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
364 _timeout=1)
365
366 # Overriding Method: GET/POST
367 # you should not need to use this method as this library properly
368 # detects whether GET or POST should be used, Nevertheless
369 # to force a particular method, use `_method`
370 t.statuses.oembed(_id=1234567890, _method='GET')
371
372 # Send a tweet with an image included (or set your banner or logo similarily)
373 # by just reading your image from the web or a file in a string:
374 status = "PTT ★"
375 with open("example.png", "rb") as imagefile:
376 params = {"media[]": imagefile.read(), "status": status}
377 t.statuses.update_with_media(**params)
378
379 # Or by sending a base64 encoded image:
380 params = {"media[]": base64_image, "status": status, "_base64": True}
381 t.statuses.update_with_media(**params)
382
383
384 Searching Twitter::
385
386 # Search for the latest tweets about #pycon
387 t.search.tweets(q="#pycon")
388
389
390 Using the data returned
391 -----------------------
392
393 Twitter API calls return decoded JSON. This is converted into
394 a bunch of Python lists, dicts, ints, and strings. For example::
395
396 x = twitter.statuses.home_timeline()
397
398 # The first 'tweet' in the timeline
399 x[0]
400
401 # The screen name of the user who wrote the first 'tweet'
402 x[0]['user']['screen_name']
403
404
405 Getting raw XML data
406 --------------------
407
408 If you prefer to get your Twitter data in XML format, pass
409 format="xml" to the Twitter object when you instantiate it::
410
411 twitter = Twitter(format="xml")
412
413 The output will not be parsed in any way. It will be a raw string
414 of XML.
415
416 """
417 def __init__(
418 self, format="json",
419 domain="api.twitter.com", secure=True, auth=None,
420 api_version=_DEFAULT):
421 """
422 Create a new twitter API connector.
423
424 Pass an `auth` parameter to use the credentials of a specific
425 user. Generally you'll want to pass an `OAuth`
426 instance::
427
428 twitter = Twitter(auth=OAuth(
429 token, token_secret, consumer_key, consumer_secret))
430
431
432 `domain` lets you change the domain you are connecting. By
433 default it's `api.twitter.com`.
434
435 If `secure` is False you will connect with HTTP instead of
436 HTTPS.
437
438 `api_version` is used to set the base uri. By default it's
439 '1.1'.
440 """
441 if not auth:
442 auth = NoAuth()
443
444 if (format not in ("json", "xml", "")):
445 raise ValueError("Unknown data format '%s'" % (format))
446
447 if api_version is _DEFAULT:
448 api_version = '1.1'
449
450 uriparts = ()
451 if api_version:
452 uriparts += (str(api_version),)
453
454 TwitterCall.__init__(
455 self, auth=auth, format=format, domain=domain,
456 callable_cls=TwitterCall,
457 secure=secure, uriparts=uriparts)
458
459
460 __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]