]>
Commit | Line | Data |
---|---|---|
1 | # encoding: utf-8 | |
2 | from __future__ import unicode_literals | |
3 | ||
4 | from .util import PY_3_OR_HIGHER | |
5 | ||
6 | if PY_3_OR_HIGHER: | |
7 | import urllib.request as urllib_request | |
8 | import urllib.error as urllib_error | |
9 | else: | |
10 | import urllib2 as urllib_request | |
11 | import urllib2 as urllib_error | |
12 | import json | |
13 | from ssl import SSLError | |
14 | import socket | |
15 | import codecs | |
16 | import sys, select, time | |
17 | ||
18 | from .api import TwitterCall, wrap_response, TwitterHTTPError | |
19 | ||
20 | CRLF = b'\r\n' | |
21 | MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay! | |
22 | MAX_SOCK_TIMEOUT = 10.0 | |
23 | HEARTBEAT_TIMEOUT = 90.0 | |
24 | ||
25 | Timeout = {'timeout': True} | |
26 | Hangup = {'hangup': True} | |
27 | DecodeError = {'hangup': True, 'decode_error': True} | |
28 | HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True} | |
29 | ||
30 | ||
31 | class HttpChunkDecoder(object): | |
32 | ||
33 | def __init__(self): | |
34 | self.buf = bytearray() | |
35 | self.munch_crlf = False | |
36 | ||
37 | def decode(self, data): # -> (bytearray, end_of_stream, decode_error) | |
38 | chunks = [] | |
39 | buf = self.buf | |
40 | munch_crlf = self.munch_crlf | |
41 | end_of_stream = False | |
42 | decode_error = False | |
43 | buf.extend(data) | |
44 | while True: | |
45 | if munch_crlf: | |
46 | # Dang, Twitter, you crazy. Twitter only sends a terminating | |
47 | # CRLF at the beginning of the *next* message. | |
48 | if len(buf) >= 2: | |
49 | buf = buf[2:] | |
50 | munch_crlf = False | |
51 | else: | |
52 | break | |
53 | ||
54 | header_end_pos = buf.find(CRLF) | |
55 | if header_end_pos == -1: | |
56 | break | |
57 | ||
58 | header = buf[:header_end_pos] | |
59 | data_start_pos = header_end_pos + 2 | |
60 | try: | |
61 | chunk_len = int(header.decode('ascii'), 16) | |
62 | except ValueError: | |
63 | decode_error = True | |
64 | break | |
65 | ||
66 | if chunk_len == 0: | |
67 | end_of_stream = True | |
68 | break | |
69 | ||
70 | data_end_pos = data_start_pos + chunk_len | |
71 | ||
72 | if len(buf) >= data_end_pos: | |
73 | chunks.append(buf[data_start_pos:data_end_pos]) | |
74 | buf = buf[data_end_pos:] | |
75 | munch_crlf = True | |
76 | else: | |
77 | break | |
78 | self.buf = buf | |
79 | self.munch_crlf = munch_crlf | |
80 | return bytearray().join(chunks), end_of_stream, decode_error | |
81 | ||
82 | ||
83 | class JsonDecoder(object): | |
84 | ||
85 | def __init__(self): | |
86 | self.buf = "" | |
87 | self.raw_decode = json.JSONDecoder().raw_decode | |
88 | ||
89 | def decode(self, data): | |
90 | chunks = [] | |
91 | buf = self.buf + data | |
92 | while True: | |
93 | try: | |
94 | buf = buf.lstrip() | |
95 | res, ptr = self.raw_decode(buf) | |
96 | buf = buf[ptr:] | |
97 | chunks.append(res) | |
98 | except ValueError: | |
99 | break | |
100 | self.buf = buf | |
101 | return chunks | |
102 | ||
103 | ||
104 | class Timer(object): | |
105 | ||
106 | def __init__(self, timeout): | |
107 | # If timeout is None, we never expire. | |
108 | self.timeout = timeout | |
109 | self.reset() | |
110 | ||
111 | def reset(self): | |
112 | self.time = time.time() | |
113 | ||
114 | def expired(self): | |
115 | """ | |
116 | If expired, reset the timer and return True. | |
117 | """ | |
118 | if self.timeout is None: | |
119 | return False | |
120 | elif time.time() - self.time > self.timeout: | |
121 | self.reset() | |
122 | return True | |
123 | return False | |
124 | ||
125 | ||
126 | class SockReader(object): | |
127 | def __init__(self, sock, sock_timeout): | |
128 | self.sock = sock | |
129 | self.sock_timeout = sock_timeout | |
130 | ||
131 | def read(self): | |
132 | try: | |
133 | ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0] | |
134 | if ready_to_read: | |
135 | return self.sock.read() | |
136 | except SSLError as e: | |
137 | # Code 2 is error from a non-blocking read of an empty buffer. | |
138 | if e.errno != 2: | |
139 | raise | |
140 | return bytearray() | |
141 | ||
142 | ||
143 | class TwitterJSONIter(object): | |
144 | ||
145 | def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout): | |
146 | self.handle = handle | |
147 | self.uri = uri | |
148 | self.arg_data = arg_data | |
149 | self.timeout_token = Timeout | |
150 | self.timeout = None | |
151 | self.heartbeat_timeout = HEARTBEAT_TIMEOUT | |
152 | if timeout and timeout > 0: | |
153 | self.timeout = float(timeout) | |
154 | elif not (block or timeout): | |
155 | self.timeout_token = None | |
156 | self.timeout = MIN_SOCK_TIMEOUT | |
157 | if heartbeat_timeout and heartbeat_timeout > 0: | |
158 | self.heartbeat_timeout = float(heartbeat_timeout) | |
159 | ||
160 | def __iter__(self): | |
161 | timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT) | |
162 | if t is not None] | |
163 | sock_timeout = min(*timeouts) | |
164 | sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock | |
165 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | |
166 | headers = self.handle.headers | |
167 | sock_reader = SockReader(sock, sock_timeout) | |
168 | chunk_decoder = HttpChunkDecoder() | |
169 | utf8_decoder = codecs.getincrementaldecoder("utf-8")() | |
170 | json_decoder = JsonDecoder() | |
171 | timer = Timer(self.timeout) | |
172 | heartbeat_timer = Timer(self.heartbeat_timeout) | |
173 | ||
174 | while True: | |
175 | # Decode all the things: | |
176 | try: | |
177 | data = sock_reader.read() | |
178 | except SSLError: | |
179 | yield Hangup | |
180 | break | |
181 | dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data) | |
182 | unicode_data = utf8_decoder.decode(dechunked_data) | |
183 | json_data = json_decoder.decode(unicode_data) | |
184 | ||
185 | # Yield data-like things: | |
186 | for json_obj in json_data: | |
187 | yield wrap_response(json_obj, headers) | |
188 | ||
189 | # Reset timers: | |
190 | if dechunked_data: | |
191 | heartbeat_timer.reset() | |
192 | if json_data: | |
193 | timer.reset() | |
194 | ||
195 | # Yield timeouts and special things: | |
196 | if end_of_stream: | |
197 | yield Hangup | |
198 | break | |
199 | if decode_error: | |
200 | yield DecodeError | |
201 | break | |
202 | if heartbeat_timer.expired(): | |
203 | yield HeartbeatTimeout | |
204 | break | |
205 | if timer.expired(): | |
206 | yield self.timeout_token | |
207 | ||
208 | ||
209 | def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout): | |
210 | try: | |
211 | handle = urllib_request.urlopen(req,) | |
212 | except urllib_error.HTTPError as e: | |
213 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
214 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout)) | |
215 | ||
216 | class TwitterStream(TwitterCall): | |
217 | """ | |
218 | The TwitterStream object is an interface to the Twitter Stream | |
219 | API. This can be used pretty much the same as the Twitter class | |
220 | except the result of calling a method will be an iterator that | |
221 | yields objects decoded from the stream. For example:: | |
222 | ||
223 | twitter_stream = TwitterStream(auth=OAuth(...)) | |
224 | iterator = twitter_stream.statuses.sample() | |
225 | ||
226 | for tweet in iterator: | |
227 | # ...do something with this tweet... | |
228 | ||
229 | Per default the ``TwitterStream`` object uses | |
230 | [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public). | |
231 | If you want to use one of the other | |
232 | [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL | |
233 | manually: | |
234 | ||
235 | - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com | |
236 | - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com | |
237 | - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com | |
238 | ||
239 | Note that you require the proper | |
240 | [permissions](https://dev.twitter.com/docs/application-permission-model) to | |
241 | access these streams. E.g. for direct messages your | |
242 | [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct | |
243 | Messages" permission. | |
244 | ||
245 | The following example demonstrates how to retrieve all new direct messages | |
246 | from the user stream:: | |
247 | ||
248 | auth = OAuth( | |
249 | consumer_key='[your consumer key]', | |
250 | consumer_secret='[your consumer secret]', | |
251 | token='[your token]', | |
252 | token_secret='[your token secret]' | |
253 | ) | |
254 | twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com') | |
255 | for msg in twitter_userstream.user(): | |
256 | if 'direct_message' in msg: | |
257 | print msg['direct_message']['text'] | |
258 | ||
259 | The iterator will yield until the TCP connection breaks. When the | |
260 | connection breaks, the iterator yields `{'hangup': True}`, and | |
261 | raises `StopIteration` if iterated again. | |
262 | ||
263 | Similarly, if the stream does not produce heartbeats for more than | |
264 | 90 seconds, the iterator yields `{'hangup': True, | |
265 | 'heartbeat_timeout': True}`, and raises `StopIteration` if | |
266 | iterated again. | |
267 | ||
268 | The `timeout` parameter controls the maximum time between | |
269 | yields. If it is nonzero, then the iterator will yield either | |
270 | stream data or `{'timeout': True}` within the timeout period. This | |
271 | is useful if you want your program to do other stuff in between | |
272 | waiting for tweets. | |
273 | ||
274 | The `block` parameter sets the stream to be fully non-blocking. In | |
275 | this mode, the iterator always yields immediately. It returns | |
276 | stream data, or `None`. Note that `timeout` supercedes this | |
277 | argument, so it should also be set `None` to use this mode. | |
278 | """ | |
279 | def __init__(self, domain="stream.twitter.com", secure=True, auth=None, | |
280 | api_version='1.1', block=True, timeout=None, | |
281 | heartbeat_timeout=90.0): | |
282 | uriparts = (str(api_version),) | |
283 | ||
284 | class TwitterStreamCall(TwitterCall): | |
285 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
286 | return handle_stream_response( | |
287 | req, uri, arg_data, block, | |
288 | _timeout or timeout, heartbeat_timeout) | |
289 | ||
290 | TwitterCall.__init__( | |
291 | self, auth=auth, format="json", domain=domain, | |
292 | callable_cls=TwitterStreamCall, | |
293 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |