]> jfr.im git - z_archive/twitter.git/blob - twitter/api.py
Simplify further. New args to example script. Update documentation.
[z_archive/twitter.git] / twitter / api.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 except ImportError:
5 import urllib2 as urllib_request
6 import urllib2 as urllib_error
7
8 try:
9 from cStringIO import StringIO
10 except ImportError:
11 from io import BytesIO as StringIO
12
13 from .twitter_globals import POST_ACTIONS
14 from .auth import NoAuth
15
16 import re
17 import gzip
18
19 try:
20 import http.client as http_client
21 except ImportError:
22 import httplib as http_client
23
24 try:
25 import json
26 except ImportError:
27 import simplejson as json
28
29
30 class _DEFAULT(object):
31 pass
32
33 class TwitterError(Exception):
34 """
35 Base Exception thrown by the Twitter object when there is a
36 general error interacting with the API.
37 """
38 pass
39
40 class TwitterHTTPError(TwitterError):
41 """
42 Exception thrown by the Twitter object when there is an
43 HTTP error interacting with twitter.com.
44 """
45 def __init__(self, e, uri, format, uriparts):
46 self.e = e
47 self.uri = uri
48 self.format = format
49 self.uriparts = uriparts
50 try:
51 data = self.e.fp.read()
52 except http_client.IncompleteRead as e:
53 # can't read the error text
54 # let's try some of it
55 data = e.partial
56 if self.e.headers.get('Content-Encoding') == 'gzip':
57 buf = StringIO(data)
58 f = gzip.GzipFile(fileobj=buf)
59 self.response_data = f.read()
60 else:
61 self.response_data = data
62 super(TwitterHTTPError, self).__init__(str(self))
63
64 def __str__(self):
65 fmt = ("." + self.format) if self.format else ""
66 return (
67 "Twitter sent status %i for URL: %s%s using parameters: "
68 "(%s)\ndetails: %s" %(
69 self.e.code, self.uri, fmt, self.uriparts,
70 self.response_data))
71
72 class TwitterResponse(object):
73 """
74 Response from a twitter request. Behaves like a list or a string
75 (depending on requested format) but it has a few other interesting
76 attributes.
77
78 `headers` gives you access to the response headers as an
79 httplib.HTTPHeaders instance. You can do
80 `response.headers.get('h')` to retrieve a header.
81 """
82 def __init__(self, headers):
83 self.headers = headers
84
85 @property
86 def rate_limit_remaining(self):
87 """
88 Remaining requests in the current rate-limit.
89 """
90 return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
91
92 @property
93 def rate_limit_limit(self):
94 """
95 The rate limit ceiling for that given request.
96 """
97 return int(self.headers.get('X-Rate-Limit-Limit', "0"))
98
99 @property
100 def rate_limit_reset(self):
101 """
102 Time in UTC epoch seconds when the rate limit will reset.
103 """
104 return int(self.headers.get('X-Rate-Limit-Reset', "0"))
105
106
107 def wrap_response(response, headers):
108 response_typ = type(response)
109 if response_typ is bool:
110 # HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool.
111 response_typ = int
112 elif response_typ is str:
113 return response
114
115 class WrappedTwitterResponse(response_typ, TwitterResponse):
116 __doc__ = TwitterResponse.__doc__
117
118 def __init__(self, response, headers):
119 response_typ.__init__(self, response)
120 TwitterResponse.__init__(self, headers)
121 def __new__(cls, response, headers):
122 return response_typ.__new__(cls, response)
123
124 return WrappedTwitterResponse(response, headers)
125
126
127
128 class TwitterCall(object):
129
130 def __init__(
131 self, auth, format, domain, callable_cls, uri="",
132 uriparts=None, secure=True, timeout=None, gzip=False):
133 self.auth = auth
134 self.format = format
135 self.domain = domain
136 self.callable_cls = callable_cls
137 self.uri = uri
138 self.uriparts = uriparts
139 self.secure = secure
140 self.timeout = timeout
141 self.gzip = gzip
142
143 def __getattr__(self, k):
144 try:
145 return object.__getattr__(self, k)
146 except AttributeError:
147 def extend_call(arg):
148 return self.callable_cls(
149 auth=self.auth, format=self.format, domain=self.domain,
150 callable_cls=self.callable_cls, timeout=self.timeout,
151 secure=self.secure, gzip=self.gzip,
152 uriparts=self.uriparts + (arg,))
153 if k == "_":
154 return extend_call
155 else:
156 return extend_call(k)
157
158 def __call__(self, **kwargs):
159 # Build the uri.
160 uriparts = []
161 for uripart in self.uriparts:
162 # If this part matches a keyword argument, use the
163 # supplied value otherwise, just use the part.
164 uriparts.append(str(kwargs.pop(uripart, uripart)))
165 uri = '/'.join(uriparts)
166
167 method = kwargs.pop('_method', None)
168 if not method:
169 method = "GET"
170 for action in POST_ACTIONS:
171 if re.search("%s(/\d+)?$" % action, uri):
172 method = "POST"
173 break
174
175 # If an id kwarg is present and there is no id to fill in in
176 # the list of uriparts, assume the id goes at the end.
177 id = kwargs.pop('id', None)
178 if id:
179 uri += "/%s" %(id)
180
181 # If an _id kwarg is present, this is treated as id as a CGI
182 # param.
183 _id = kwargs.pop('_id', None)
184 if _id:
185 kwargs['id'] = _id
186
187 # If an _timeout is specified in kwargs, use it
188 _timeout = kwargs.pop('_timeout', None)
189
190 secure_str = ''
191 if self.secure:
192 secure_str = 's'
193 dot = ""
194 if self.format:
195 dot = "."
196 uriBase = "http%s://%s/%s%s%s" %(
197 secure_str, self.domain, uri, dot, self.format)
198
199 # Catch media arguments to handle oauth query differently for multipart
200 media = None
201 for arg in ['media[]', 'banner', 'image']:
202 if arg in kwargs:
203 media = kwargs.pop(arg)
204 mediafield = arg
205 break
206
207 headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
208 body = None; arg_data = None
209 if self.auth:
210 headers.update(self.auth.generate_headers())
211 # Use urlencoded oauth args with no params when sending media
212 # via multipart and send it directly via uri even for post
213 arg_data = self.auth.encode_params(uriBase, method,
214 {} if media else kwargs )
215 if method == 'GET' or media:
216 uriBase += '?' + arg_data
217 else:
218 body = arg_data.encode('utf8')
219
220 # Handle query as multipart when sending media
221 if media:
222 BOUNDARY = "###Python-Twitter###"
223 bod = []
224 bod.append('--' + BOUNDARY)
225 bod.append('Content-Disposition: form-data; name="%s"' %
226 mediafield)
227 bod.append('')
228 bod.append(media)
229 for k, v in kwargs.items():
230 bod.append('--' + BOUNDARY)
231 bod.append('Content-Disposition: form-data; name="%s"' % k)
232 bod.append('')
233 bod.append(v)
234 bod.append('--' + BOUNDARY + '--')
235 body = '\r\n'.join(bod)
236 headers['Content-Type'] = 'multipart/form-data; boundary=%s' % BOUNDARY
237
238 req = urllib_request.Request(uriBase, body, headers)
239 return self._handle_response(req, uri, arg_data, _timeout)
240
241 def _handle_response(self, req, uri, arg_data, _timeout=None):
242 kwargs = {}
243 if _timeout:
244 kwargs['timeout'] = _timeout
245 try:
246 handle = urllib_request.urlopen(req, **kwargs)
247 if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
248 return handle
249 try:
250 data = handle.read()
251 except http_client.IncompleteRead as e:
252 # Even if we don't get all the bytes we should have there
253 # may be a complete response in e.partial
254 data = e.partial
255 if handle.info().get('Content-Encoding') == 'gzip':
256 # Handle gzip decompression
257 buf = StringIO(data)
258 f = gzip.GzipFile(fileobj=buf)
259 data = f.read()
260 if "json" == self.format:
261 res = json.loads(data.decode('utf8'))
262 return wrap_response(res, handle.headers)
263 else:
264 return wrap_response(
265 data.decode('utf8'), handle.headers)
266 except urllib_error.HTTPError as e:
267 if (e.code == 304):
268 return []
269 else:
270 raise TwitterHTTPError(e, uri, self.format, arg_data)
271
272 class Twitter(TwitterCall):
273 """
274 The minimalist yet fully featured Twitter API class.
275
276 Get RESTful data by accessing members of this class. The result
277 is decoded python objects (lists and dicts).
278
279 The Twitter API is documented at:
280
281 http://dev.twitter.com/doc
282
283
284 Examples::
285
286 t = Twitter(
287 auth=OAuth(token, token_key, con_secret, con_secret_key)))
288
289 # Get your "home" timeline
290 t.statuses.home_timeline()
291
292 # Get a particular friend's tweets
293 t.statuses.user_timeline(user_id="billybob")
294
295 # Update your status
296 t.statuses.update(
297 status="Using @sixohsix's sweet Python Twitter Tools.")
298
299 # Send a direct message
300 t.direct_messages.new(
301 user="billybob",
302 text="I think yer swell!")
303
304 # Get the members of tamtar's list "Things That Are Rad"
305 t._("tamtar")._("things-that-are-rad").members()
306
307 # Note how the magic `_` method can be used to insert data
308 # into the middle of a call. You can also use replacement:
309 t.user.list.members(user="tamtar", list="things-that-are-rad")
310
311 # An *optional* `_timeout` parameter can also be used for API
312 # calls which take much more time than normal or twitter stops
313 # responding for some reasone
314 t.users.lookup(
315 screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
316 _timeout=1)
317
318
319
320 Searching Twitter::
321
322 # Search for the latest tweets about #pycon
323 t.search.tweets(q="#pycon")
324
325
326 Using the data returned
327 -----------------------
328
329 Twitter API calls return decoded JSON. This is converted into
330 a bunch of Python lists, dicts, ints, and strings. For example::
331
332 x = twitter.statuses.home_timeline()
333
334 # The first 'tweet' in the timeline
335 x[0]
336
337 # The screen name of the user who wrote the first 'tweet'
338 x[0]['user']['screen_name']
339
340
341 Getting raw XML data
342 --------------------
343
344 If you prefer to get your Twitter data in XML format, pass
345 format="xml" to the Twitter object when you instantiate it::
346
347 twitter = Twitter(format="xml")
348
349 The output will not be parsed in any way. It will be a raw string
350 of XML.
351
352 """
353 def __init__(
354 self, format="json",
355 domain="api.twitter.com", secure=True, auth=None,
356 api_version=_DEFAULT):
357 """
358 Create a new twitter API connector.
359
360 Pass an `auth` parameter to use the credentials of a specific
361 user. Generally you'll want to pass an `OAuth`
362 instance::
363
364 twitter = Twitter(auth=OAuth(
365 token, token_secret, consumer_key, consumer_secret))
366
367
368 `domain` lets you change the domain you are connecting. By
369 default it's `api.twitter.com` but `search.twitter.com` may be
370 useful too.
371
372 If `secure` is False you will connect with HTTP instead of
373 HTTPS.
374
375 `api_version` is used to set the base uri. By default it's
376 '1'. If you are using "search.twitter.com" set this to None.
377 """
378 if not auth:
379 auth = NoAuth()
380
381 if (format not in ("json", "xml", "")):
382 raise ValueError("Unknown data format '%s'" %(format))
383
384 if api_version is _DEFAULT:
385 api_version = '1.1'
386
387 uriparts = ()
388 if api_version:
389 uriparts += (str(api_version),)
390
391 TwitterCall.__init__(
392 self, auth=auth, format=format, domain=domain,
393 callable_cls=TwitterCall,
394 secure=secure, uriparts=uriparts)
395
396
397 __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]