]> jfr.im git - z_archive/twitter.git/blob - twitter/api.py
Adapt code to archive DMs
[z_archive/twitter.git] / twitter / api.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 except ImportError:
5 import urllib2 as urllib_request
6 import urllib2 as urllib_error
7
8 try:
9 from cStringIO import StringIO
10 except ImportError:
11 from io import BytesIO as StringIO
12
13 from .twitter_globals import POST_ACTIONS
14 from .auth import NoAuth
15
16 import re
17 import gzip
18
19 try:
20 import http.client as http_client
21 except ImportError:
22 import httplib as http_client
23
24 import json
25
26
27 class _DEFAULT(object):
28 pass
29
30 class TwitterError(Exception):
31 """
32 Base Exception thrown by the Twitter object when there is a
33 general error interacting with the API.
34 """
35 pass
36
37 class TwitterHTTPError(TwitterError):
38 """
39 Exception thrown by the Twitter object when there is an
40 HTTP error interacting with twitter.com.
41 """
42 def __init__(self, e, uri, format, uriparts):
43 self.e = e
44 self.uri = uri
45 self.format = format
46 self.uriparts = uriparts
47 try:
48 data = self.e.fp.read()
49 except http_client.IncompleteRead as e:
50 # can't read the error text
51 # let's try some of it
52 data = e.partial
53 if self.e.headers.get('Content-Encoding') == 'gzip':
54 buf = StringIO(data)
55 f = gzip.GzipFile(fileobj=buf)
56 self.response_data = f.read()
57 else:
58 self.response_data = data
59
60 def __str__(self):
61 fmt = ("." + self.format) if self.format else ""
62 return (
63 "Twitter sent status %i for URL: %s%s using parameters: "
64 "(%s)\ndetails: %s" %(
65 self.e.code, self.uri, fmt, self.uriparts,
66 self.response_data))
67
68 class TwitterResponse(object):
69 """
70 Response from a twitter request. Behaves like a list or a string
71 (depending on requested format) but it has a few other interesting
72 attributes.
73
74 `headers` gives you access to the response headers as an
75 httplib.HTTPHeaders instance. You can do
76 `response.headers.get('h')` to retrieve a header.
77 """
78 def __init__(self, headers):
79 self.headers = headers
80
81 @property
82 def rate_limit_remaining(self):
83 """
84 Remaining requests in the current rate-limit.
85 """
86 return int(self.headers.get('X-Rate-Limit-Remaining', "0"))
87
88 @property
89 def rate_limit_limit(self):
90 """
91 The rate limit ceiling for that given request.
92 """
93 return int(self.headers.get('X-Rate-Limit-Limit', "0"))
94
95 @property
96 def rate_limit_reset(self):
97 """
98 Time in UTC epoch seconds when the rate limit will reset.
99 """
100 return int(self.headers.get('X-Rate-Limit-Reset', "0"))
101
102
103 def wrap_response(response, headers):
104 response_typ = type(response)
105 if response_typ is bool:
106 # HURF DURF MY NAME IS PYTHON AND I CAN'T SUBCLASS bool.
107 response_typ = int
108 elif response_typ is str:
109 return response
110
111 class WrappedTwitterResponse(response_typ, TwitterResponse):
112 __doc__ = TwitterResponse.__doc__
113
114 def __init__(self, response, headers):
115 response_typ.__init__(self, response)
116 TwitterResponse.__init__(self, headers)
117 def __new__(cls, response, headers):
118 return response_typ.__new__(cls, response)
119
120 return WrappedTwitterResponse(response, headers)
121
122
123
124 class TwitterCall(object):
125
126 def __init__(
127 self, auth, format, domain, callable_cls, uri="",
128 uriparts=None, secure=True):
129 self.auth = auth
130 self.format = format
131 self.domain = domain
132 self.callable_cls = callable_cls
133 self.uri = uri
134 self.uriparts = uriparts
135 self.secure = secure
136
137 def __getattr__(self, k):
138 try:
139 return object.__getattr__(self, k)
140 except AttributeError:
141 def extend_call(arg):
142 return self.callable_cls(
143 auth=self.auth, format=self.format, domain=self.domain,
144 callable_cls=self.callable_cls, uriparts=self.uriparts \
145 + (arg,),
146 secure=self.secure)
147 if k == "_":
148 return extend_call
149 else:
150 return extend_call(k)
151
152 def __call__(self, **kwargs):
153 # Build the uri.
154 uriparts = []
155 for uripart in self.uriparts:
156 # If this part matches a keyword argument, use the
157 # supplied value otherwise, just use the part.
158 uriparts.append(str(kwargs.pop(uripart, uripart)))
159 uri = '/'.join(uriparts)
160
161 method = kwargs.pop('_method', None)
162 if not method:
163 method = "GET"
164 for action in POST_ACTIONS:
165 if re.search("%s(/\d+)?$" % action, uri):
166 method = "POST"
167 break
168
169 # If an id kwarg is present and there is no id to fill in in
170 # the list of uriparts, assume the id goes at the end.
171 id = kwargs.pop('id', None)
172 if id:
173 uri += "/%s" %(id)
174
175 # If an _id kwarg is present, this is treated as id as a CGI
176 # param.
177 _id = kwargs.pop('_id', None)
178 if _id:
179 kwargs['id'] = _id
180
181 # If an _timeout is specified in kwargs, use it
182 _timeout = kwargs.pop('_timeout', None)
183
184 secure_str = ''
185 if self.secure:
186 secure_str = 's'
187 dot = ""
188 if self.format:
189 dot = "."
190 uriBase = "http%s://%s/%s%s%s" %(
191 secure_str, self.domain, uri, dot, self.format)
192
193 headers = {'Accept-Encoding': 'gzip'}
194 if self.auth:
195 headers.update(self.auth.generate_headers())
196 arg_data = self.auth.encode_params(uriBase, method, kwargs)
197 if method == 'GET':
198 uriBase += '?' + arg_data
199 body = None
200 else:
201 body = arg_data.encode('utf8')
202
203 req = urllib_request.Request(uriBase, body, headers)
204 return self._handle_response(req, uri, arg_data, _timeout)
205
206 def _handle_response(self, req, uri, arg_data, _timeout=None):
207 kwargs = {}
208 if _timeout:
209 kwargs['timeout'] = _timeout
210 try:
211 handle = urllib_request.urlopen(req, **kwargs)
212 if handle.headers['Content-Type'] in ['image/jpeg', 'image/png']:
213 return handle
214 try:
215 data = handle.read()
216 except http_client.IncompleteRead as e:
217 # Even if we don't get all the bytes we should have there
218 # may be a complete response in e.partial
219 data = e.partial
220 if handle.info().get('Content-Encoding') == 'gzip':
221 # Handle gzip decompression
222 buf = StringIO(data)
223 f = gzip.GzipFile(fileobj=buf)
224 data = f.read()
225 if "json" == self.format:
226 res = json.loads(data.decode('utf8'))
227 return wrap_response(res, handle.headers)
228 else:
229 return wrap_response(
230 data.decode('utf8'), handle.headers)
231 except urllib_error.HTTPError as e:
232 if (e.code == 304):
233 return []
234 else:
235 raise TwitterHTTPError(e, uri, self.format, arg_data)
236
237 class Twitter(TwitterCall):
238 """
239 The minimalist yet fully featured Twitter API class.
240
241 Get RESTful data by accessing members of this class. The result
242 is decoded python objects (lists and dicts).
243
244 The Twitter API is documented at:
245
246 http://dev.twitter.com/doc
247
248
249 Examples::
250
251 t = Twitter(
252 auth=OAuth(token, token_key, con_secret, con_secret_key)))
253
254 # Get your "home" timeline
255 t.statuses.home_timeline()
256
257 # Get a particular friend's timeline
258 t.statuses.friends_timeline(id="billybob")
259
260 # Also supported (but totally weird)
261 t.statuses.friends_timeline.billybob()
262
263 # Update your status
264 t.statuses.update(
265 status="Using @sixohsix's sweet Python Twitter Tools.")
266
267 # Send a direct message
268 t.direct_messages.new(
269 user="billybob",
270 text="I think yer swell!")
271
272 # Get the members of tamtar's list "Things That Are Rad"
273 t._("tamtar")._("things-that-are-rad").members()
274
275 # Note how the magic `_` method can be used to insert data
276 # into the middle of a call. You can also use replacement:
277 t.user.list.members(user="tamtar", list="things-that-are-rad")
278
279 # An *optional* `_timeout` parameter can also be used for API
280 # calls which take much more time than normal or twitter stops
281 # responding for some reasone
282 t.users.lookup(
283 screen_name=','.join(A_LIST_OF_100_SCREEN_NAMES), \
284 _timeout=1)
285
286
287
288 Searching Twitter::
289
290 # Search for the latest tweets about #pycon
291 t.search.tweets(q="#pycon")
292
293
294 Using the data returned
295 -----------------------
296
297 Twitter API calls return decoded JSON. This is converted into
298 a bunch of Python lists, dicts, ints, and strings. For example::
299
300 x = twitter.statuses.home_timeline()
301
302 # The first 'tweet' in the timeline
303 x[0]
304
305 # The screen name of the user who wrote the first 'tweet'
306 x[0]['user']['screen_name']
307
308
309 Getting raw XML data
310 --------------------
311
312 If you prefer to get your Twitter data in XML format, pass
313 format="xml" to the Twitter object when you instantiate it::
314
315 twitter = Twitter(format="xml")
316
317 The output will not be parsed in any way. It will be a raw string
318 of XML.
319
320 """
321 def __init__(
322 self, format="json",
323 domain="api.twitter.com", secure=True, auth=None,
324 api_version=_DEFAULT):
325 """
326 Create a new twitter API connector.
327
328 Pass an `auth` parameter to use the credentials of a specific
329 user. Generally you'll want to pass an `OAuth`
330 instance::
331
332 twitter = Twitter(auth=OAuth(
333 token, token_secret, consumer_key, consumer_secret))
334
335
336 `domain` lets you change the domain you are connecting. By
337 default it's `api.twitter.com` but `search.twitter.com` may be
338 useful too.
339
340 If `secure` is False you will connect with HTTP instead of
341 HTTPS.
342
343 `api_version` is used to set the base uri. By default it's
344 '1'. If you are using "search.twitter.com" set this to None.
345 """
346 if not auth:
347 auth = NoAuth()
348
349 if (format not in ("json", "xml", "")):
350 raise ValueError("Unknown data format '%s'" %(format))
351
352 if api_version is _DEFAULT:
353 api_version = '1.1'
354
355 uriparts = ()
356 if api_version:
357 uriparts += (str(api_version),)
358
359 TwitterCall.__init__(
360 self, auth=auth, format=format, domain=domain,
361 callable_cls=TwitterCall,
362 secure=secure, uriparts=uriparts)
363
364
365 __all__ = ["Twitter", "TwitterError", "TwitterHTTPError", "TwitterResponse"]