]> jfr.im git - z_archive/twitter.git/blob - twitter/api.py
be0ef43f3add9f2e9c67a335e004f227b83906c3
[z_archive/twitter.git] / twitter / api.py
1 # encoding: utf-8
2 from __future__ import unicode_literals
3
4 try:
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
7 except ImportError:
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10
11 try:
12 from cStringIO import StringIO
13 except ImportError:
14 from io import BytesIO as StringIO
15
16 from .twitter_globals import POST_ACTIONS
17 from .auth import NoAuth
18
19 import re
20 import gzip
21
22 try:
23 import http.client as http_client
24 except ImportError:
25 import httplib as http_client
26
27 try:
28 import json
29 except ImportError:
30 import simplejson as json
31
32
33 class _DEFAULT(object):
34 pass
35
36
37 class TwitterError(Exception):
38 """
39 Base Exception thrown by the Twitter object when there is a
40 general error interacting with the API.
41 """
42 pass
43
44
45 class TwitterHTTPError(TwitterError):
46 """
47 Exception thrown by the Twitter object when there is an
48 HTTP error interacting with twitter.com.
49 """
50 def __init__(self, e, uri, format, uriparts):
51 self.e = e
52 self.uri = uri
53 self.format = format
54 self.uriparts = uriparts
55 try:
56 data = self.e.fp.read()
57 except http_client.IncompleteRead as e:
58 # can't read the error text
59 # let's try some of it
60 data = e.partial
61 if self.e.headers.get('Content-Encoding') == 'gzip':
62 buf = StringIO(data)
63 f = gzip.GzipFile(fileobj=buf)
64 self.response_data = f.read()
65 else:
66 self.response_data = data
67 super(TwitterHTTPError, self).__init__(str(self))
68
69 def __str__(self):
70 fmt = ("." + self.format) if self.format else ""
71 return (
72 "Twitter sent status %i for URL: %s%s using parameters: "
73 "(%s)\ndetails: %s" % (
74 self.e.code, self.uri, fmt, self.uriparts,
75 self.response_data))
76
77
78 class TwitterResponse(object):
79 """
80 Response from a twitter request. Behaves like a list or a string
81 (depending on requested format) but it has a few other interesting
82 attributes.
83
84 `headers` gives you access to the response headers as an
85 httplib.HTTPHeaders instance. You can do
86 `response.headers.get('h')` to retrieve a header.
87 """
88
89 @property
90 def rate_limit_remaining(self):
91 """
92 Remaining requests in the current rate-limit.
93 """
94 return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
95
96 @property
97 def rate_limit_limit(self):
98 """
99 The rate limit ceiling for that given request.
100 """
101 return int(self.headers.get('X-Rate-Limit-Limit', "0"))
102
103 @property
104 def rate_limit_reset(self):
105 """
106 Time in UTC epoch seconds when the rate limit will reset.
107 """
108 return int(self.headers.get('X-Rate-Limit-Reset', "0"))
109
110
111 class TwitterDictResponse(dict, TwitterResponse):
112 pass
113
114
115 class TwitterListResponse(list, TwitterResponse):
116 pass
117
118
119 def wrap_response(response, headers):
120 response_typ = type(response)
121 if response_typ is dict:
122 res = TwitterDictResponse(response)
123 res.headers = headers
124 elif response_typ is list:
125 res = TwitterListResponse(response)
126 res.headers = headers
127 else:
128 res = response
129 return res
130
131
132 class TwitterCall(object):
133
134 def __init__(
135 self, auth, format, domain, callable_cls, uri="",
136 uriparts=None, secure=True, timeout=None, gzip=False):
137 self.auth = auth
138 self.format = format
139 self.domain = domain
140 self.callable_cls = callable_cls
141 self.uri = uri
142 self.uriparts = uriparts
143 self.secure = secure
144 self.timeout = timeout
145 self.gzip = gzip
146
147 def __getattr__(self, k):
148 try:
149 return object.__getattr__(self, k)
150 except AttributeError:
151 def extend_call(arg):
152 return self.callable_cls(
153 auth=self.auth, format=self.format, domain=self.domain,
154 callable_cls=self.callable_cls, timeout=self.timeout,
155 secure=self.secure, gzip=self.gzip,
156 uriparts=self.uriparts + (arg,))
157 if k == "_":
158 return extend_call
159 else:
160 return extend_call(k)
161
162 def __call__(self, **kwargs):
163 # Build the uri.
164 uriparts = []
165 for uripart in self.uriparts:
166 # If this part matches a keyword argument, use the
167 # supplied value otherwise, just use the part.
168 uriparts.append(str(kwargs.pop(uripart, uripart)))
169 uri = '/'.join(uriparts)
170
171 method = kwargs.pop('_method', None)
172 if not method:
173 method = "GET"
174 for action in POST_ACTIONS:
175 if re.search("%s(/\d+)?$" % action, uri):
176 method = "POST"
177 break
178
179 # If an id kwarg is present and there is no id to fill in in
180 # the list of uriparts, assume the id goes at the end.
181 id = kwargs.pop('id', None)
182 if id:
183 uri += "/%s" % (id)
184
185 # If an _id kwarg is present, this is treated as id as a CGI
186 # param.
187 _id = kwargs.pop('_id', None)
188 if _id:
189 kwargs['id'] = _id
190
191 # If an _timeout is specified in kwargs, use it
192 _timeout = kwargs.pop('_timeout', None)
193
194 secure_str = ''
195 if self.secure:
196 secure_str = 's'
197 dot = ""
198 if self.format:
199 dot = "."
200 uriBase = "http%s://%s/%s%s%s" % (
201 secure_str, self.domain, uri, dot, self.format)
202
203 # Catch media arguments to handle oauth query differently for multipart
204 media = None
205 for arg in ['media[]', 'banner', 'image']:
206 if arg in kwargs:
207 media = kwargs.pop(arg)
208 mediafield = arg
209 break
210
211 headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
212 body = None
213 arg_data = None
214 if self.auth:
215 headers.update(self.auth.generate_headers())
216 # Use urlencoded oauth args with no params when sending media
217 # via multipart and send it directly via uri even for post
218 arg_data = self.auth.encode_params(
219 uriBase, method, {} if media else kwargs)
220 if method == 'GET' or media:
221 uriBase += '?' + arg_data
222 else:
223 body = arg_data.encode('utf8')
224
225 # Handle query as multipart when sending media
226 if media:
227 BOUNDARY = "###Python-Twitter###"
228 bod = []
229 bod.append('--' + BOUNDARY)
230 bod.append(
231 'Content-Disposition: form-data; name="%s"' % mediafield)
232 bod.append('')
233 bod.append(media)
234 for k, v in kwargs.items():
235 bod.append('--' + BOUNDARY)
236 bod.append('Content-Disposition: form-data; name="%s"' % k)
237 bod.append('')
238 bod.append(v)
239 bod.append('--' + BOUNDARY + '--')
240 body = '\r\n'.join(bod)
241 headers['Content-Type'] = \
242 'multipart/form-data; boundary=%s' % BOUNDARY
243
244 req = urllib_request.Request(uriBase, body, headers)
245 return self._handle_response(req, uri, arg_data, _timeout)
246
247 def _handle_response(self, req, uri, arg_data, _timeout=None):
248 kwargs = {}
249 if _timeout:
250 kwargs['timeout'] = _timeout
251 try:
252 handle = urllib_request.urlopen(req, **kwargs)
253 if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
254 return handle
255 try:
256 data = handle.read()
257 except http_client.IncompleteRead as e:
258 # Even if we don't get all the bytes we should have there
259 # may be a complete response in e.partial
260 data = e.partial
261 if handle.info().get('Content-Encoding') == 'gzip':
262 # Handle gzip decompression
263 buf = StringIO(data)
264 f = gzip.GzipFile(fileobj=buf)
265 data = f.read()
266 if "json" == self.format:
267 res = json.loads(data.decode('utf8'))
268 return wrap_response(res, handle.headers)
269 else:
270 return wrap_response(
271 data.decode('utf8'), handle.headers)
272 except urllib_error.HTTPError as e:
273 if (e.code == 304):
274 return []
275 else:
276 raise TwitterHTTPError(e, uri, self.format, arg_data)
277
278
279 class Twitter(TwitterCall):
280 """
281 The minimalist yet fully featured Twitter API class.
282
283 Get RESTful data by accessing members of this class. The result
284 is decoded python objects (lists and dicts).
285
286 The Twitter API is documented at:
287
288 http://dev.twitter.com/doc
289
290
291 Examples::
292
293 t = Twitter(
294 auth=OAuth(token, token_key, con_secret, con_secret_key)))
295
296 # Get your "home" timeline
297 t.statuses.home_timeline()
298
299 # Get a particular friend's tweets
300 t.statuses.user_timeline(user_id="billybob")
301
302 # Update your status
303 t.statuses.update(
304 status="Using @sixohsix's sweet Python Twitter Tools.")
305
306 # Send a direct message
307 t.direct_messages.new(
308 user="billybob",
309 text="I think yer swell!")
310
311 # Get the members of tamtar's list "Things That Are Rad"
312 t._("tamtar")._("things-that-are-rad").members()
313
314 # Note how the magic `_` method can be used to insert data
315 # into the middle of a call. You can also use replacement:
316 t.user.list.members(user="tamtar", list="things-that-are-rad")
317
318 # An *optional* `_timeout` parameter can also be used for API
319 # calls which take much more time than normal or twitter stops
320 # responding for some reasone
321 t.users.lookup(
322 screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
323 _timeout=1)
324
325
326
327 Searching Twitter::
328
329 # Search for the latest tweets about #pycon
330 t.search.tweets(q="#pycon")
331
332
333 Using the data returned
334 -----------------------
335
336 Twitter API calls return decoded JSON. This is converted into
337 a bunch of Python lists, dicts, ints, and strings. For example::
338
339 x = twitter.statuses.home_timeline()
340
341 # The first 'tweet' in the timeline
342 x[0]
343
344 # The screen name of the user who wrote the first 'tweet'
345 x[0]['user']['screen_name']
346
347
348 Getting raw XML data
349 --------------------
350
351 If you prefer to get your Twitter data in XML format, pass
352 format="xml" to the Twitter object when you instantiate it::
353
354 twitter = Twitter(format="xml")
355
356 The output will not be parsed in any way. It will be a raw string
357 of XML.
358
359 """
360 def __init__(
361 self, format="json",
362 domain="api.twitter.com", secure=True, auth=None,
363 api_version=_DEFAULT):
364 """
365 Create a new twitter API connector.
366
367 Pass an `auth` parameter to use the credentials of a specific
368 user. Generally you'll want to pass an `OAuth`
369 instance::
370
371 twitter = Twitter(auth=OAuth(
372 token, token_secret, consumer_key, consumer_secret))
373
374
375 `domain` lets you change the domain you are connecting. By
376 default it's `api.twitter.com`.
377
378 If `secure` is False you will connect with HTTP instead of
379 HTTPS.
380
381 `api_version` is used to set the base uri. By default it's
382 '1.1'.
383 """
384 if not auth:
385 auth = NoAuth()
386
387 if (format not in ("json", "xml", "")):
388 raise ValueError("Unknown data format '%s'" % (format))
389
390 if api_version is _DEFAULT:
391 api_version = '1.1'
392
393 uriparts = ()
394 if api_version:
395 uriparts += (str(api_version),)
396
397 TwitterCall.__init__(
398 self, auth=auth, format=format, domain=domain,
399 callable_cls=TwitterCall,
400 secure=secure, uriparts=uriparts)
401
402
403 __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]