]>
Commit | Line | Data |
---|---|---|
1 | import sys | |
2 | PY_3_OR_HIGHER = sys.version_info >= (3, 0) | |
3 | ||
4 | if PY_3_OR_HIGHER: | |
5 | import urllib.request as urllib_request | |
6 | import urllib.error as urllib_error | |
7 | else: | |
8 | import urllib2 as urllib_request | |
9 | import urllib2 as urllib_error | |
10 | import json | |
11 | from ssl import SSLError | |
12 | import socket | |
13 | import codecs | |
14 | import sys, select, time | |
15 | ||
16 | from .api import TwitterCall, wrap_response, TwitterHTTPError | |
17 | ||
18 | CRLF = b'\r\n' | |
19 | MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay! | |
20 | MAX_SOCK_TIMEOUT = 10.0 | |
21 | HEARTBEAT_TIMEOUT = 90.0 | |
22 | ||
23 | Timeout = {'timeout': True} | |
24 | Hangup = {'hangup': True} | |
25 | DecodeError = {'hangup': True, 'decode_error': True} | |
26 | HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True} | |
27 | ||
28 | ||
29 | class HttpChunkDecoder(object): | |
30 | ||
31 | def __init__(self): | |
32 | self.buf = bytearray() | |
33 | self.munch_crlf = False | |
34 | ||
35 | def decode(self, data): # -> (bytearray, end_of_stream, decode_error) | |
36 | chunks = [] | |
37 | buf = self.buf | |
38 | munch_crlf = self.munch_crlf | |
39 | end_of_stream = False | |
40 | decode_error = False | |
41 | buf.extend(data) | |
42 | while True: | |
43 | if munch_crlf: | |
44 | # Dang, Twitter, you crazy. Twitter only sends a terminating | |
45 | # CRLF at the beginning of the *next* message. | |
46 | if len(buf) >= 2: | |
47 | buf = buf[2:] | |
48 | munch_crlf = False | |
49 | else: | |
50 | break | |
51 | ||
52 | header_end_pos = buf.find(CRLF) | |
53 | if header_end_pos == -1: | |
54 | break | |
55 | ||
56 | header = buf[:header_end_pos] | |
57 | data_start_pos = header_end_pos + 2 | |
58 | try: | |
59 | chunk_len = int(header.decode('ascii'), 16) | |
60 | except ValueError: | |
61 | decode_error = True | |
62 | break | |
63 | ||
64 | if chunk_len == 0: | |
65 | end_of_stream = True | |
66 | break | |
67 | ||
68 | data_end_pos = data_start_pos + chunk_len | |
69 | ||
70 | if len(buf) >= data_end_pos: | |
71 | chunks.append(buf[data_start_pos:data_end_pos]) | |
72 | buf = buf[data_end_pos:] | |
73 | munch_crlf = True | |
74 | else: | |
75 | break | |
76 | self.buf = buf | |
77 | self.munch_crlf = munch_crlf | |
78 | return bytearray().join(chunks), end_of_stream, decode_error | |
79 | ||
80 | ||
81 | class JsonDecoder(object): | |
82 | ||
83 | def __init__(self): | |
84 | self.buf = u"" | |
85 | self.raw_decode = json.JSONDecoder().raw_decode | |
86 | ||
87 | def decode(self, data): | |
88 | chunks = [] | |
89 | buf = self.buf + data | |
90 | while True: | |
91 | try: | |
92 | buf = buf.lstrip() | |
93 | res, ptr = self.raw_decode(buf) | |
94 | buf = buf[ptr:] | |
95 | chunks.append(res) | |
96 | except ValueError: | |
97 | break | |
98 | self.buf = buf | |
99 | return chunks | |
100 | ||
101 | ||
102 | class Timer(object): | |
103 | ||
104 | def __init__(self, timeout): | |
105 | # If timeout is None, we never expire. | |
106 | self.timeout = timeout | |
107 | self.reset() | |
108 | ||
109 | def reset(self): | |
110 | self.time = time.time() | |
111 | ||
112 | def expired(self): | |
113 | """ | |
114 | If expired, reset the timer and return True. | |
115 | """ | |
116 | if self.timeout is None: | |
117 | return False | |
118 | elif time.time() - self.time > self.timeout: | |
119 | self.reset() | |
120 | return True | |
121 | return False | |
122 | ||
123 | ||
124 | class SockReader(object): | |
125 | def __init__(self, sock, sock_timeout): | |
126 | self.sock = sock | |
127 | self.sock_timeout = sock_timeout | |
128 | ||
129 | def read(self): | |
130 | try: | |
131 | ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0] | |
132 | if ready_to_read: | |
133 | return self.sock.read() | |
134 | except SSLError as e: | |
135 | # Code 2 is error from a non-blocking read of an empty buffer. | |
136 | if e.errno != 2: | |
137 | raise | |
138 | return bytearray() | |
139 | ||
140 | ||
141 | class TwitterJSONIter(object): | |
142 | ||
143 | def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout): | |
144 | self.handle = handle | |
145 | self.uri = uri | |
146 | self.arg_data = arg_data | |
147 | self.timeout_token = Timeout | |
148 | self.timeout = None | |
149 | self.heartbeat_timeout = HEARTBEAT_TIMEOUT | |
150 | if timeout and timeout > 0: | |
151 | self.timeout = float(timeout) | |
152 | elif not (block or timeout): | |
153 | self.timeout_token = None | |
154 | self.timeout = MIN_SOCK_TIMEOUT | |
155 | if heartbeat_timeout and heartbeat_timeout > 0: | |
156 | self.heartbeat_timeout = float(heartbeat_timeout) | |
157 | ||
158 | def __iter__(self): | |
159 | timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT) | |
160 | if t is not None] | |
161 | sock_timeout = min(*timeouts) | |
162 | sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock | |
163 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | |
164 | headers = self.handle.headers | |
165 | sock_reader = SockReader(sock, sock_timeout) | |
166 | chunk_decoder = HttpChunkDecoder() | |
167 | utf8_decoder = codecs.getincrementaldecoder("utf-8")() | |
168 | json_decoder = JsonDecoder() | |
169 | timer = Timer(self.timeout) | |
170 | heartbeat_timer = Timer(self.heartbeat_timeout) | |
171 | ||
172 | while True: | |
173 | # Decode all the things: | |
174 | data = sock_reader.read() | |
175 | dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data) | |
176 | unicode_data = utf8_decoder.decode(dechunked_data) | |
177 | json_data = json_decoder.decode(unicode_data) | |
178 | ||
179 | # Yield data-like things: | |
180 | for json_obj in json_data: | |
181 | yield wrap_response(json_obj, headers) | |
182 | ||
183 | # Reset timers: | |
184 | if dechunked_data: | |
185 | heartbeat_timer.reset() | |
186 | if json_data: | |
187 | timer.reset() | |
188 | ||
189 | # Yield timeouts and special things: | |
190 | if end_of_stream: | |
191 | yield Hangup | |
192 | break | |
193 | if decode_error: | |
194 | yield DecodeError | |
195 | break | |
196 | if heartbeat_timer.expired(): | |
197 | yield HeartbeatTimeout | |
198 | break | |
199 | if timer.expired(): | |
200 | yield self.timeout_token | |
201 | ||
202 | ||
203 | def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout): | |
204 | try: | |
205 | handle = urllib_request.urlopen(req,) | |
206 | except urllib_error.HTTPError as e: | |
207 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
208 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout)) | |
209 | ||
210 | class TwitterStream(TwitterCall): | |
211 | """ | |
212 | The TwitterStream object is an interface to the Twitter Stream | |
213 | API. This can be used pretty much the same as the Twitter class | |
214 | except the result of calling a method will be an iterator that | |
215 | yields objects decoded from the stream. For example:: | |
216 | ||
217 | twitter_stream = TwitterStream(auth=OAuth(...)) | |
218 | iterator = twitter_stream.statuses.sample() | |
219 | ||
220 | for tweet in iterator: | |
221 | ...do something with this tweet... | |
222 | ||
223 | The iterator will yield until the TCP connection breaks. When the | |
224 | connection breaks, the iterator yields `{'hangup': True}`, and | |
225 | raises `StopIteration` if iterated again. | |
226 | ||
227 | Similarly, if the stream does not produce heartbeats for more than | |
228 | 90 seconds, the iterator yields `{'hangup': True, | |
229 | 'heartbeat_timeout': True}`, and raises `StopIteration` if | |
230 | iterated again. | |
231 | ||
232 | The `timeout` parameter controls the maximum time between | |
233 | yields. If it is nonzero, then the iterator will yield either | |
234 | stream data or `{'timeout': True}` within the timeout period. This | |
235 | is useful if you want your program to do other stuff in between | |
236 | waiting for tweets. | |
237 | ||
238 | The `block` parameter sets the stream to be fully non-blocking. In | |
239 | this mode, the iterator always yields immediately. It returns | |
240 | stream data, or `None`. Note that `timeout` supercedes this | |
241 | argument, so it should also be set `None` to use this mode. | |
242 | """ | |
243 | def __init__(self, domain="stream.twitter.com", secure=True, auth=None, | |
244 | api_version='1.1', block=True, timeout=None, | |
245 | heartbeat_timeout=90.0): | |
246 | uriparts = (str(api_version),) | |
247 | ||
248 | class TwitterStreamCall(TwitterCall): | |
249 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
250 | return handle_stream_response( | |
251 | req, uri, arg_data, block, | |
252 | _timeout or timeout, heartbeat_timeout) | |
253 | ||
254 | TwitterCall.__init__( | |
255 | self, auth=auth, format="json", domain=domain, | |
256 | callable_cls=TwitterStreamCall, | |
257 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |