]> jfr.im git - z_archive/twitter.git/blob - twitter/api.py
715e91d35314a1ddc2d24f9648571d3a49417c70
[z_archive/twitter.git] / twitter / api.py
1 # encoding: utf-8
2 from __future__ import unicode_literals
3
4 try:
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
7 except ImportError:
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10
11 try:
12 from cStringIO import StringIO
13 except ImportError:
14 from io import BytesIO as StringIO
15
16 from .twitter_globals import POST_ACTIONS
17 from .auth import NoAuth
18
19 import re
20 import gzip
21
22 try:
23 import http.client as http_client
24 except ImportError:
25 import httplib as http_client
26
27 try:
28 import json
29 except ImportError:
30 import simplejson as json
31
32
33 class _DEFAULT(object):
34 pass
35
36
37 class TwitterError(Exception):
38 """
39 Base Exception thrown by the Twitter object when there is a
40 general error interacting with the API.
41 """
42 pass
43
44
45 class TwitterHTTPError(TwitterError):
46 """
47 Exception thrown by the Twitter object when there is an
48 HTTP error interacting with twitter.com.
49 """
50 def __init__(self, e, uri, format, uriparts):
51 self.e = e
52 self.uri = uri
53 self.format = format
54 self.uriparts = uriparts
55 try:
56 data = self.e.fp.read()
57 except http_client.IncompleteRead as e:
58 # can't read the error text
59 # let's try some of it
60 data = e.partial
61 if self.e.headers.get('Content-Encoding') == 'gzip':
62 buf = StringIO(data)
63 f = gzip.GzipFile(fileobj=buf)
64 self.response_data = f.read()
65 else:
66 self.response_data = data
67 super(TwitterHTTPError, self).__init__(str(self))
68
69 def __str__(self):
70 fmt = ("." + self.format) if self.format else ""
71 return (
72 "Twitter sent status %i for URL: %s%s using parameters: "
73 "(%s)\ndetails: %s" % (
74 self.e.code, self.uri, fmt, self.uriparts,
75 self.response_data))
76
77
78 class TwitterResponse(object):
79 """
80 Response from a twitter request. Behaves like a list or a string
81 (depending on requested format) but it has a few other interesting
82 attributes.
83
84 `headers` gives you access to the response headers as an
85 httplib.HTTPHeaders instance. You can do
86 `response.headers.get('h')` to retrieve a header.
87 """
88
89 @property
90 def rate_limit_remaining(self):
91 """
92 Remaining requests in the current rate-limit.
93 """
94 return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
95
96 @property
97 def rate_limit_limit(self):
98 """
99 The rate limit ceiling for that given request.
100 """
101 return int(self.headers.get('X-Rate-Limit-Limit', "0"))
102
103 @property
104 def rate_limit_reset(self):
105 """
106 Time in UTC epoch seconds when the rate limit will reset.
107 """
108 return int(self.headers.get('X-Rate-Limit-Reset', "0"))
109
110
111 class TwitterDictResponse(dict, TwitterResponse):
112 pass
113
114
115 class TwitterListResponse(list, TwitterResponse):
116 pass
117
118
119 def wrap_response(response, headers):
120 response_typ = type(response)
121 if response_typ is dict:
122 res = TwitterDictResponse(response)
123 res.headers = headers
124 elif response_typ is list:
125 res = TwitterListResponse(response)
126 res.headers = headers
127 else:
128 res = response
129 return res
130
131 def method_for_uri(uri):
132 method = "GET"
133 for action in POST_ACTIONS:
134 if re.search("%s(/\d+)?$" % action, uri):
135 method = "POST"
136 break
137 return method
138
139 class TwitterCall(object):
140
141 def __init__(
142 self, auth, format, domain, callable_cls, uri="",
143 uriparts=None, secure=True, timeout=None, gzip=False):
144 self.auth = auth
145 self.format = format
146 self.domain = domain
147 self.callable_cls = callable_cls
148 self.uri = uri
149 self.uriparts = uriparts
150 self.secure = secure
151 self.timeout = timeout
152 self.gzip = gzip
153
154 def __getattr__(self, k):
155 try:
156 return object.__getattr__(self, k)
157 except AttributeError:
158 def extend_call(arg):
159 return self.callable_cls(
160 auth=self.auth, format=self.format, domain=self.domain,
161 callable_cls=self.callable_cls, timeout=self.timeout,
162 secure=self.secure, gzip=self.gzip,
163 uriparts=self.uriparts + (arg,))
164 if k == "_":
165 return extend_call
166 else:
167 return extend_call(k)
168
169 def __call__(self, **kwargs):
170 # Build the uri.
171 uriparts = []
172 for uripart in self.uriparts:
173 # If this part matches a keyword argument, use the
174 # supplied value otherwise, just use the part.
175 uriparts.append(str(kwargs.pop(uripart, uripart)))
176 uri = '/'.join(uriparts)
177
178 method = kwargs.pop('_method', None) or method_for_uri(uri)
179
180 # If an id kwarg is present and there is no id to fill in in
181 # the list of uriparts, assume the id goes at the end.
182 id = kwargs.pop('id', None)
183 if id:
184 uri += "/%s" % (id)
185
186 # If an _id kwarg is present, this is treated as id as a CGI
187 # param.
188 _id = kwargs.pop('_id', None)
189 if _id:
190 kwargs['id'] = _id
191
192 # If an _timeout is specified in kwargs, use it
193 _timeout = kwargs.pop('_timeout', None)
194
195 secure_str = ''
196 if self.secure:
197 secure_str = 's'
198 dot = ""
199 if self.format:
200 dot = "."
201 uriBase = "http%s://%s/%s%s%s" % (
202 secure_str, self.domain, uri, dot, self.format)
203
204 # Catch media arguments to handle oauth query differently for multipart
205 media = None
206 for arg in ['media[]', 'banner', 'image']:
207 if arg in kwargs:
208 media = kwargs.pop(arg)
209 # Check if argument tells whether img is already base64 encoded
210 b64_convert = True
211 if "_base64" in kwargs:
212 b64_convert = not kwargs.pop("_base64")
213 if b64_convert:
214 import base64
215 media = base64.b64encode(media)
216 mediafield = arg
217 break
218
219 headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
220 body = None
221 arg_data = None
222 if self.auth:
223 headers.update(self.auth.generate_headers())
224 # Use urlencoded oauth args with no params when sending media
225 # via multipart and send it directly via uri even for post
226 arg_data = self.auth.encode_params(
227 uriBase, method, {} if media else kwargs)
228 if method == 'GET' or media:
229 uriBase += '?' + arg_data
230 else:
231 body = arg_data.encode('utf8')
232
233 # Handle query as multipart when sending media
234 if media:
235 BOUNDARY = "###Python-Twitter###"
236 bod = []
237 bod.append('--' + BOUNDARY)
238 bod.append(
239 'Content-Disposition: form-data; name="%s"' % mediafield)
240 bod.append('Content-Transfer-Encoding: base64')
241 bod.append('')
242 bod.append(media)
243 for k, v in kwargs.items():
244 bod.append('--' + BOUNDARY)
245 bod.append('Content-Disposition: form-data; name="%s"' % k)
246 bod.append('')
247 bod.append(v)
248 bod.append('--' + BOUNDARY + '--')
249 body = '\r\n'.join(bod)
250 headers['Content-Type'] = \
251 'multipart/form-data; boundary=%s' % BOUNDARY
252
253 req = urllib_request.Request(uriBase, body, headers)
254 return self._handle_response(req, uri, arg_data, _timeout)
255
256 def _handle_response(self, req, uri, arg_data, _timeout=None):
257 kwargs = {}
258 if _timeout:
259 kwargs['timeout'] = _timeout
260 try:
261 handle = urllib_request.urlopen(req, **kwargs)
262 if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
263 return handle
264 try:
265 data = handle.read()
266 except http_client.IncompleteRead as e:
267 # Even if we don't get all the bytes we should have there
268 # may be a complete response in e.partial
269 data = e.partial
270 if handle.info().get('Content-Encoding') == 'gzip':
271 # Handle gzip decompression
272 buf = StringIO(data)
273 f = gzip.GzipFile(fileobj=buf)
274 data = f.read()
275 if "json" == self.format:
276 res = json.loads(data.decode('utf8'))
277 return wrap_response(res, handle.headers)
278 else:
279 return wrap_response(
280 data.decode('utf8'), handle.headers)
281 except urllib_error.HTTPError as e:
282 if (e.code == 304):
283 return []
284 else:
285 raise TwitterHTTPError(e, uri, self.format, arg_data)
286
287
288 class Twitter(TwitterCall):
289 """
290 The minimalist yet fully featured Twitter API class.
291
292 Get RESTful data by accessing members of this class. The result
293 is decoded python objects (lists and dicts).
294
295 The Twitter API is documented at:
296
297 http://dev.twitter.com/doc
298
299
300 Examples::
301
302 from twitter import *
303
304 t = Twitter(
305 auth=OAuth(token, token_key, con_secret, con_secret_key)))
306
307 # Get your "home" timeline
308 t.statuses.home_timeline()
309
310 # Get a particular friend's timeline
311 t.statuses.user_timeline(screen_name="billybob")
312
313 # to pass in GET/POST parameters, such as `count`
314 t.statuses.home_timeline(count=5)
315
316 # to pass in the GET/POST parameter `id` you need to use `_id`
317 t.statuses.oembed(_id=1234567890)
318
319 # Update your status
320 t.statuses.update(
321 status="Using @sixohsix's sweet Python Twitter Tools.")
322
323 # Send a direct message
324 t.direct_messages.new(
325 user="billybob",
326 text="I think yer swell!")
327
328 # Get the members of tamtar's list "Things That Are Rad"
329 t._("tamtar")._("things-that-are-rad").members()
330
331 # Note how the magic `_` method can be used to insert data
332 # into the middle of a call. You can also use replacement:
333 t.user.list.members(user="tamtar", list="things-that-are-rad")
334
335 # An *optional* `_timeout` parameter can also be used for API
336 # calls which take much more time than normal or twitter stops
337 # responding for some reason:
338 t.users.lookup(
339 screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
340 _timeout=1)
341
342 # Overriding Method: GET/POST
343 # you should not need to use this method as this library properly
344 # detects whether GET or POST should be used, Nevertheless
345 # to force a particular method, use `_method`
346 t.statuses.oembed(_id=1234567890, _method='GET')
347
348 # Send a tweet with an image included (or set your banner or logo similarily)
349 # by just reading your image from the web or a file in a string:
350 with open("example.png", "rb") as imagefile:
351 params = {"media[]": imagefile.read(), "status": "PTT"}
352 t.statuses.update_with_media(**params)
353
354 # Or by sending a base64 encoded image:
355 params = {"media[]": base64_image, "status": "PTT", "_base64": True}
356 t.statuses.update_with_media(**params)
357
358
359 Searching Twitter::
360
361 # Search for the latest tweets about #pycon
362 t.search.tweets(q="#pycon")
363
364
365 Using the data returned
366 -----------------------
367
368 Twitter API calls return decoded JSON. This is converted into
369 a bunch of Python lists, dicts, ints, and strings. For example::
370
371 x = twitter.statuses.home_timeline()
372
373 # The first 'tweet' in the timeline
374 x[0]
375
376 # The screen name of the user who wrote the first 'tweet'
377 x[0]['user']['screen_name']
378
379
380 Getting raw XML data
381 --------------------
382
383 If you prefer to get your Twitter data in XML format, pass
384 format="xml" to the Twitter object when you instantiate it::
385
386 twitter = Twitter(format="xml")
387
388 The output will not be parsed in any way. It will be a raw string
389 of XML.
390
391 """
392 def __init__(
393 self, format="json",
394 domain="api.twitter.com", secure=True, auth=None,
395 api_version=_DEFAULT):
396 """
397 Create a new twitter API connector.
398
399 Pass an `auth` parameter to use the credentials of a specific
400 user. Generally you'll want to pass an `OAuth`
401 instance::
402
403 twitter = Twitter(auth=OAuth(
404 token, token_secret, consumer_key, consumer_secret))
405
406
407 `domain` lets you change the domain you are connecting. By
408 default it's `api.twitter.com`.
409
410 If `secure` is False you will connect with HTTP instead of
411 HTTPS.
412
413 `api_version` is used to set the base uri. By default it's
414 '1.1'.
415 """
416 if not auth:
417 auth = NoAuth()
418
419 if (format not in ("json", "xml", "")):
420 raise ValueError("Unknown data format '%s'" % (format))
421
422 if api_version is _DEFAULT:
423 api_version = '1.1'
424
425 uriparts = ()
426 if api_version:
427 uriparts += (str(api_version),)
428
429 TwitterCall.__init__(
430 self, auth=auth, format=format, domain=domain,
431 callable_cls=TwitterCall,
432 secure=secure, uriparts=uriparts)
433
434
435 __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]