]> jfr.im git - z_archive/twitter.git/blob - twitter/api.py
4c20a338247225110a2270f9174c0fe676070741
[z_archive/twitter.git] / twitter / api.py
1 # encoding: utf-8
2 from __future__ import unicode_literals
3
4 try:
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
7 except ImportError:
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10
11 try:
12 from cStringIO import StringIO
13 except ImportError:
14 from io import BytesIO as StringIO
15
16 from .twitter_globals import POST_ACTIONS
17 from .auth import NoAuth
18
19 import re
20 import gzip
21
22 try:
23 import http.client as http_client
24 except ImportError:
25 import httplib as http_client
26
27 try:
28 import json
29 except ImportError:
30 import simplejson as json
31
32
33 class _DEFAULT(object):
34 pass
35
36
37 class TwitterError(Exception):
38 """
39 Base Exception thrown by the Twitter object when there is a
40 general error interacting with the API.
41 """
42 pass
43
44
45 class TwitterHTTPError(TwitterError):
46 """
47 Exception thrown by the Twitter object when there is an
48 HTTP error interacting with twitter.com.
49 """
50 def __init__(self, e, uri, format, uriparts):
51 self.e = e
52 self.uri = uri
53 self.format = format
54 self.uriparts = uriparts
55 try:
56 data = self.e.fp.read()
57 except http_client.IncompleteRead as e:
58 # can't read the error text
59 # let's try some of it
60 data = e.partial
61 if self.e.headers.get('Content-Encoding') == 'gzip':
62 buf = StringIO(data)
63 f = gzip.GzipFile(fileobj=buf)
64 self.response_data = f.read()
65 else:
66 self.response_data = data
67 super(TwitterHTTPError, self).__init__(str(self))
68
69 def __str__(self):
70 fmt = ("." + self.format) if self.format else ""
71 return (
72 "Twitter sent status %i for URL: %s%s using parameters: "
73 "(%s)\ndetails: %s" % (
74 self.e.code, self.uri, fmt, self.uriparts,
75 self.response_data))
76
77
78 class TwitterResponse(object):
79 """
80 Response from a twitter request. Behaves like a list or a string
81 (depending on requested format) but it has a few other interesting
82 attributes.
83
84 `headers` gives you access to the response headers as an
85 httplib.HTTPHeaders instance. You can do
86 `response.headers.get('h')` to retrieve a header.
87 """
88
89 @property
90 def rate_limit_remaining(self):
91 """
92 Remaining requests in the current rate-limit.
93 """
94 return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
95
96 @property
97 def rate_limit_limit(self):
98 """
99 The rate limit ceiling for that given request.
100 """
101 return int(self.headers.get('X-Rate-Limit-Limit', "0"))
102
103 @property
104 def rate_limit_reset(self):
105 """
106 Time in UTC epoch seconds when the rate limit will reset.
107 """
108 return int(self.headers.get('X-Rate-Limit-Reset', "0"))
109
110
111 class TwitterDictResponse(dict, TwitterResponse):
112 pass
113
114
115 class TwitterListResponse(list, TwitterResponse):
116 pass
117
118
119 def wrap_response(response, headers):
120 response_typ = type(response)
121 if response_typ is dict:
122 res = TwitterDictResponse(response)
123 res.headers = headers
124 elif response_typ is list:
125 res = TwitterListResponse(response)
126 res.headers = headers
127 else:
128 res = response
129 return res
130
131 def method_for_uri(uri):
132 method = "GET"
133 for action in POST_ACTIONS:
134 if re.search("%s(/\d+)?$" % action, uri):
135 method = "POST"
136 break
137 return method
138
139 class TwitterCall(object):
140
141 def __init__(
142 self, auth, format, domain, callable_cls, uri="",
143 uriparts=None, secure=True, timeout=None, gzip=False):
144 self.auth = auth
145 self.format = format
146 self.domain = domain
147 self.callable_cls = callable_cls
148 self.uri = uri
149 self.uriparts = uriparts
150 self.secure = secure
151 self.timeout = timeout
152 self.gzip = gzip
153
154 def __getattr__(self, k):
155 try:
156 return object.__getattr__(self, k)
157 except AttributeError:
158 def extend_call(arg):
159 return self.callable_cls(
160 auth=self.auth, format=self.format, domain=self.domain,
161 callable_cls=self.callable_cls, timeout=self.timeout,
162 secure=self.secure, gzip=self.gzip,
163 uriparts=self.uriparts + (arg,))
164 if k == "_":
165 return extend_call
166 else:
167 return extend_call(k)
168
169 def __call__(self, **kwargs):
170 # Build the uri.
171 uriparts = []
172 for uripart in self.uriparts:
173 # If this part matches a keyword argument, use the
174 # supplied value otherwise, just use the part.
175 uriparts.append(str(kwargs.pop(uripart, uripart)))
176 uri = '/'.join(uriparts)
177
178 method = kwargs.pop('_method', None) or method_for_uri(uri)
179
180 # If an id kwarg is present and there is no id to fill in in
181 # the list of uriparts, assume the id goes at the end.
182 id = kwargs.pop('id', None)
183 if id:
184 uri += "/%s" % (id)
185
186 # If an _id kwarg is present, this is treated as id as a CGI
187 # param.
188 _id = kwargs.pop('_id', None)
189 if _id:
190 kwargs['id'] = _id
191
192 # If an _timeout is specified in kwargs, use it
193 _timeout = kwargs.pop('_timeout', None)
194
195 secure_str = ''
196 if self.secure:
197 secure_str = 's'
198 dot = ""
199 if self.format:
200 dot = "."
201 uriBase = "http%s://%s/%s%s%s" % (
202 secure_str, self.domain, uri, dot, self.format)
203
204 # Check if argument tells whether img is already base64 encoded
205 b64_convert = True
206 if "_base64" in kwargs:
207 b64_convert = not kwargs.pop("_base64")
208 if b64_convert:
209 import base64
210
211 # Catch media arguments to handle oauth query differently for multipart
212 media = None
213 for arg in ['media[]', 'image']:
214 if arg in kwargs:
215 media = kwargs.pop(arg)
216 if b64_convert:
217 media = base64.b64encode(media)
218 mediafield = arg
219 break
220
221 # Catch media arguments that are not accepted through multipart
222 # and are not yet base64 encoded
223 if b64_convert:
224 for arg in ['banner']:
225 if arg in kwargs:
226 kwargs[arg] = base64.b64encode(kwargs[arg])
227
228 headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
229 body = None
230 arg_data = None
231 if self.auth:
232 headers.update(self.auth.generate_headers())
233 # Use urlencoded oauth args with no params when sending media
234 # via multipart and send it directly via uri even for post
235 arg_data = self.auth.encode_params(
236 uriBase, method, {} if media else kwargs)
237 if method == 'GET' or media:
238 uriBase += '?' + arg_data
239 else:
240 body = arg_data.encode('utf8')
241
242 # Handle query as multipart when sending media
243 if media:
244 BOUNDARY = "###Python-Twitter###"
245 bod = []
246 bod.append('--' + BOUNDARY)
247 bod.append(
248 'Content-Disposition: form-data; name="%s"' % mediafield)
249 bod.append('Content-Transfer-Encoding: base64')
250 bod.append('')
251 bod.append(media)
252 for k, v in kwargs.items():
253 bod.append('--' + BOUNDARY)
254 bod.append('Content-Disposition: form-data; name="%s"' % k)
255 bod.append('')
256 bod.append(v)
257 bod.append('--' + BOUNDARY + '--')
258 body = '\r\n'.join(bod).encode('utf8')
259 headers['Content-Type'] = \
260 'multipart/form-data; boundary=%s' % BOUNDARY
261
262 req = urllib_request.Request(uriBase, body, headers)
263 return self._handle_response(req, uri, arg_data, _timeout)
264
265 def _handle_response(self, req, uri, arg_data, _timeout=None):
266 kwargs = {}
267 if _timeout:
268 kwargs['timeout'] = _timeout
269 try:
270 handle = urllib_request.urlopen(req, **kwargs)
271 if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
272 return handle
273 try:
274 data = handle.read()
275 except http_client.IncompleteRead as e:
276 # Even if we don't get all the bytes we should have there
277 # may be a complete response in e.partial
278 data = e.partial
279 if handle.info().get('Content-Encoding') == 'gzip':
280 # Handle gzip decompression
281 buf = StringIO(data)
282 f = gzip.GzipFile(fileobj=buf)
283 data = f.read()
284 if len(data) == 0:
285 return wrap_response({}, handle.headers)
286 elif "json" == self.format:
287 res = json.loads(data.decode('utf8'))
288 return wrap_response(res, handle.headers)
289 else:
290 return wrap_response(
291 data.decode('utf8'), handle.headers)
292 except urllib_error.HTTPError as e:
293 if (e.code == 304):
294 return []
295 else:
296 raise TwitterHTTPError(e, uri, self.format, arg_data)
297
298
299 class Twitter(TwitterCall):
300 """
301 The minimalist yet fully featured Twitter API class.
302
303 Get RESTful data by accessing members of this class. The result
304 is decoded python objects (lists and dicts).
305
306 The Twitter API is documented at:
307
308 http://dev.twitter.com/doc
309
310
311 Examples::
312
313 from twitter import *
314
315 t = Twitter(
316 auth=OAuth(token, token_key, con_secret, con_secret_key)))
317
318 # Get your "home" timeline
319 t.statuses.home_timeline()
320
321 # Get a particular friend's timeline
322 t.statuses.user_timeline(screen_name="billybob")
323
324 # to pass in GET/POST parameters, such as `count`
325 t.statuses.home_timeline(count=5)
326
327 # to pass in the GET/POST parameter `id` you need to use `_id`
328 t.statuses.oembed(_id=1234567890)
329
330 # Update your status
331 t.statuses.update(
332 status="Using @sixohsix's sweet Python Twitter Tools.")
333
334 # Send a direct message
335 t.direct_messages.new(
336 user="billybob",
337 text="I think yer swell!")
338
339 # Get the members of tamtar's list "Things That Are Rad"
340 t._("tamtar")._("things-that-are-rad").members()
341
342 # Note how the magic `_` method can be used to insert data
343 # into the middle of a call. You can also use replacement:
344 t.user.list.members(user="tamtar", list="things-that-are-rad")
345
346 # An *optional* `_timeout` parameter can also be used for API
347 # calls which take much more time than normal or twitter stops
348 # responding for some reason:
349 t.users.lookup(
350 screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
351 _timeout=1)
352
353 # Overriding Method: GET/POST
354 # you should not need to use this method as this library properly
355 # detects whether GET or POST should be used, Nevertheless
356 # to force a particular method, use `_method`
357 t.statuses.oembed(_id=1234567890, _method='GET')
358
359 # Send a tweet with an image included (or set your banner or logo similarily)
360 # by just reading your image from the web or a file in a string:
361 with open("example.png", "rb") as imagefile:
362 params = {"media[]": imagefile.read(), "status": "PTT"}
363 t.statuses.update_with_media(**params)
364
365 # Or by sending a base64 encoded image:
366 params = {"media[]": base64_image, "status": "PTT", "_base64": True}
367 t.statuses.update_with_media(**params)
368
369
370 Searching Twitter::
371
372 # Search for the latest tweets about #pycon
373 t.search.tweets(q="#pycon")
374
375
376 Using the data returned
377 -----------------------
378
379 Twitter API calls return decoded JSON. This is converted into
380 a bunch of Python lists, dicts, ints, and strings. For example::
381
382 x = twitter.statuses.home_timeline()
383
384 # The first 'tweet' in the timeline
385 x[0]
386
387 # The screen name of the user who wrote the first 'tweet'
388 x[0]['user']['screen_name']
389
390
391 Getting raw XML data
392 --------------------
393
394 If you prefer to get your Twitter data in XML format, pass
395 format="xml" to the Twitter object when you instantiate it::
396
397 twitter = Twitter(format="xml")
398
399 The output will not be parsed in any way. It will be a raw string
400 of XML.
401
402 """
403 def __init__(
404 self, format="json",
405 domain="api.twitter.com", secure=True, auth=None,
406 api_version=_DEFAULT):
407 """
408 Create a new twitter API connector.
409
410 Pass an `auth` parameter to use the credentials of a specific
411 user. Generally you'll want to pass an `OAuth`
412 instance::
413
414 twitter = Twitter(auth=OAuth(
415 token, token_secret, consumer_key, consumer_secret))
416
417
418 `domain` lets you change the domain you are connecting. By
419 default it's `api.twitter.com`.
420
421 If `secure` is False you will connect with HTTP instead of
422 HTTPS.
423
424 `api_version` is used to set the base uri. By default it's
425 '1.1'.
426 """
427 if not auth:
428 auth = NoAuth()
429
430 if (format not in ("json", "xml", "")):
431 raise ValueError("Unknown data format '%s'" % (format))
432
433 if api_version is _DEFAULT:
434 api_version = '1.1'
435
436 uriparts = ()
437 if api_version:
438 uriparts += (str(api_version),)
439
440 TwitterCall.__init__(
441 self, auth=auth, format=format, domain=domain,
442 callable_cls=TwitterCall,
443 secure=secure, uriparts=uriparts)
444
445
446 __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]