]>
Commit | Line | Data |
---|---|---|
30af6c0f MV |
1 | import sys |
2 | PY_3_OR_HIGHER = sys.version_info >= (3, 0) | |
3 | ||
4 | if PY_3_OR_HIGHER: | |
dd648a25 MV |
5 | import urllib.request as urllib_request |
6 | import urllib.error as urllib_error | |
30af6c0f | 7 | else: |
dd648a25 MV |
8 | import urllib2 as urllib_request |
9 | import urllib2 as urllib_error | |
10 | import json | |
2300838f | 11 | from ssl import SSLError |
67ddbde4 | 12 | import socket |
30af6c0f | 13 | import codecs |
effd06bb | 14 | import sys, select, time |
dd648a25 | 15 | |
2983f31e | 16 | from .api import TwitterCall, wrap_response, TwitterHTTPError |
dd648a25 | 17 | |
f560656d | 18 | CRLF = b'\r\n' |
f38ed662 MV |
19 | MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay! |
20 | MAX_SOCK_TIMEOUT = 10.0 | |
43778459 | 21 | HEARTBEAT_TIMEOUT = 90.0 |
f560656d | 22 | |
d3915c61 MV |
23 | Timeout = {'timeout': True} |
24 | Hangup = {'hangup': True} | |
aa19e2be MV |
25 | DecodeError = {'hangup': True, 'decode_error': True} |
26 | HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True} | |
26938000 | 27 | |
e84abb1c | 28 | |
aa19e2be | 29 | class HttpChunkDecoder(object): |
30af6c0f MV |
30 | |
31 | def __init__(self): | |
32 | self.buf = bytearray() | |
aa19e2be | 33 | self.munch_crlf = False |
30af6c0f | 34 | |
aa19e2be | 35 | def decode(self, data): # -> (bytearray, end_of_stream, decode_error) |
30af6c0f MV |
36 | chunks = [] |
37 | buf = self.buf | |
aa19e2be MV |
38 | munch_crlf = self.munch_crlf |
39 | end_of_stream = False | |
40 | decode_error = False | |
41 | buf.extend(data) | |
30af6c0f | 42 | while True: |
aa19e2be MV |
43 | if munch_crlf: |
44 | # Dang, Twitter, you crazy. Twitter only sends a terminating | |
45 | # CRLF at the beginning of the *next* message. | |
46 | if len(buf) >= 2: | |
47 | buf = buf[2:] | |
48 | munch_crlf = False | |
49 | else: | |
50 | break | |
51 | ||
30af6c0f MV |
52 | header_end_pos = buf.find(CRLF) |
53 | if header_end_pos == -1: | |
54 | break | |
26938000 | 55 | |
30af6c0f MV |
56 | header = buf[:header_end_pos] |
57 | data_start_pos = header_end_pos + 2 | |
58 | try: | |
59 | chunk_len = int(header.decode('ascii'), 16) | |
60 | except ValueError: | |
aa19e2be MV |
61 | decode_error = True |
62 | break | |
e84abb1c | 63 | |
30af6c0f | 64 | if chunk_len == 0: |
aa19e2be MV |
65 | end_of_stream = True |
66 | break | |
e84abb1c | 67 | |
30af6c0f | 68 | data_end_pos = data_start_pos + chunk_len |
e84abb1c | 69 | |
aa19e2be | 70 | if len(buf) >= data_end_pos: |
30af6c0f | 71 | chunks.append(buf[data_start_pos:data_end_pos]) |
aa19e2be MV |
72 | buf = buf[data_end_pos:] |
73 | munch_crlf = True | |
30af6c0f MV |
74 | else: |
75 | break | |
76 | self.buf = buf | |
aa19e2be MV |
77 | self.munch_crlf = munch_crlf |
78 | return bytearray().join(chunks), end_of_stream, decode_error | |
0d92536c | 79 | |
30af6c0f | 80 | |
aa19e2be | 81 | class JsonDecoder(object): |
30af6c0f MV |
82 | |
83 | def __init__(self): | |
84 | self.buf = u"" | |
85 | self.raw_decode = json.JSONDecoder().raw_decode | |
86 | ||
aa19e2be | 87 | def decode(self, data): |
30af6c0f | 88 | chunks = [] |
aa19e2be | 89 | buf = self.buf + data |
30af6c0f MV |
90 | while True: |
91 | try: | |
92 | buf = buf.lstrip() | |
93 | res, ptr = self.raw_decode(buf) | |
94 | buf = buf[ptr:] | |
95 | chunks.append(res) | |
96 | except ValueError: | |
97 | break | |
98 | self.buf = buf | |
99 | return chunks | |
26938000 | 100 | |
26938000 | 101 | |
d3915c61 | 102 | class Timer(object): |
aa19e2be | 103 | |
d3915c61 | 104 | def __init__(self, timeout): |
03d8511c | 105 | # If timeout is None, we never expire. |
d3915c61 MV |
106 | self.timeout = timeout |
107 | self.reset() | |
108 | ||
109 | def reset(self): | |
110 | self.time = time.time() | |
111 | ||
112 | def expired(self): | |
42b9cdee MV |
113 | """ |
114 | If expired, reset the timer and return True. | |
115 | """ | |
d3915c61 | 116 | if self.timeout is None: |
03d8511c | 117 | return False |
d3915c61 MV |
118 | elif time.time() - self.time > self.timeout: |
119 | self.reset() | |
120 | return True | |
121 | return False | |
122 | ||
123 | ||
aa19e2be MV |
124 | class SockReader(object): |
125 | def __init__(self, sock, sock_timeout): | |
126 | self.sock = sock | |
127 | self.sock_timeout = sock_timeout | |
128 | ||
129 | def read(self): | |
130 | try: | |
131 | ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0] | |
132 | if ready_to_read: | |
133 | return self.sock.read() | |
134 | except SSLError as e: | |
135 | # Code 2 is error from a non-blocking read of an empty buffer. | |
136 | if e.errno != 2: | |
137 | raise | |
138 | return bytearray() | |
139 | ||
140 | ||
dd648a25 MV |
141 | class TwitterJSONIter(object): |
142 | ||
03d8511c | 143 | def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout): |
dd648a25 | 144 | self.handle = handle |
25ea832f AD |
145 | self.uri = uri |
146 | self.arg_data = arg_data | |
43778459 | 147 | self.timeout_token = Timeout |
f38ed662 | 148 | self.timeout = None |
43778459 MV |
149 | self.heartbeat_timeout = HEARTBEAT_TIMEOUT |
150 | if timeout and timeout > 0: | |
151 | self.timeout = float(timeout) | |
152 | elif not (block or timeout): | |
153 | self.timeout_token = None | |
f38ed662 | 154 | self.timeout = MIN_SOCK_TIMEOUT |
43778459 MV |
155 | if heartbeat_timeout and heartbeat_timeout > 0: |
156 | self.heartbeat_timeout = float(heartbeat_timeout) | |
effd06bb | 157 | |
dd648a25 | 158 | def __iter__(self): |
f38ed662 MV |
159 | timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT) |
160 | if t is not None] | |
161 | sock_timeout = min(*timeouts) | |
d3915c61 | 162 | sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock |
67ddbde4 | 163 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
30af6c0f | 164 | headers = self.handle.headers |
aa19e2be MV |
165 | sock_reader = SockReader(sock, sock_timeout) |
166 | chunk_decoder = HttpChunkDecoder() | |
167 | utf8_decoder = codecs.getincrementaldecoder("utf-8")() | |
168 | json_decoder = JsonDecoder() | |
d3915c61 | 169 | timer = Timer(self.timeout) |
03d8511c | 170 | heartbeat_timer = Timer(self.heartbeat_timeout) |
aa19e2be | 171 | |
dd648a25 | 172 | while True: |
aa19e2be MV |
173 | # Decode all the things: |
174 | data = sock_reader.read() | |
175 | dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data) | |
f38ed662 MV |
176 | unicode_data = utf8_decoder.decode(dechunked_data) |
177 | json_data = json_decoder.decode(unicode_data) | |
aa19e2be MV |
178 | |
179 | # Yield data-like things: | |
180 | for json_obj in json_data: | |
181 | yield wrap_response(json_obj, headers) | |
182 | ||
183 | # Reset timers: | |
184 | if dechunked_data: | |
03d8511c | 185 | heartbeat_timer.reset() |
aa19e2be MV |
186 | if json_data: |
187 | timer.reset() | |
03d8511c | 188 | |
aa19e2be MV |
189 | # Yield timeouts and special things: |
190 | if end_of_stream: | |
191 | yield Hangup | |
192 | break | |
193 | if decode_error: | |
194 | yield DecodeError | |
195 | break | |
03d8511c MV |
196 | if heartbeat_timer.expired(): |
197 | yield HeartbeatTimeout | |
198 | break | |
199 | if timer.expired(): | |
43778459 | 200 | yield self.timeout_token |
03d8511c | 201 | |
2300838f | 202 | |
03d8511c | 203 | def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout): |
0d92536c AD |
204 | try: |
205 | handle = urllib_request.urlopen(req,) | |
206 | except urllib_error.HTTPError as e: | |
207 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
03d8511c | 208 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout)) |
effd06bb | 209 | |
d3915c61 | 210 | class TwitterStream(TwitterCall): |
24950891 | 211 | """ |
42b9cdee MV |
212 | The TwitterStream object is an interface to the Twitter Stream |
213 | API. This can be used pretty much the same as the Twitter class | |
214 | except the result of calling a method will be an iterator that | |
215 | yields objects decoded from the stream. For example:: | |
24950891 | 216 | |
14b7a6ee | 217 | twitter_stream = TwitterStream(auth=OAuth(...)) |
24950891 MV |
218 | iterator = twitter_stream.statuses.sample() |
219 | ||
220 | for tweet in iterator: | |
221 | ...do something with this tweet... | |
222 | ||
42b9cdee MV |
223 | The iterator will yield until the TCP connection breaks. When the |
224 | connection breaks, the iterator yields `{'hangup': True}`, and | |
225 | raises `StopIteration` if iterated again. | |
2300838f | 226 | |
03d8511c MV |
227 | Similarly, if the stream does not produce heartbeats for more than |
228 | 90 seconds, the iterator yields `{'hangup': True, | |
229 | 'heartbeat_timeout': True}`, and raises `StopIteration` if | |
230 | iterated again. | |
231 | ||
42b9cdee MV |
232 | The `timeout` parameter controls the maximum time between |
233 | yields. If it is nonzero, then the iterator will yield either | |
03d8511c MV |
234 | stream data or `{'timeout': True}` within the timeout period. This |
235 | is useful if you want your program to do other stuff in between | |
236 | waiting for tweets. | |
237 | ||
238 | The `block` parameter sets the stream to be fully non-blocking. In | |
239 | this mode, the iterator always yields immediately. It returns | |
240 | stream data, or `None`. Note that `timeout` supercedes this | |
241 | argument, so it should also be set `None` to use this mode. | |
24950891 | 242 | """ |
03d8511c MV |
243 | def __init__(self, domain="stream.twitter.com", secure=True, auth=None, |
244 | api_version='1.1', block=True, timeout=None, | |
245 | heartbeat_timeout=90.0): | |
d3915c61 | 246 | uriparts = (str(api_version),) |
d3915c61 MV |
247 | |
248 | class TwitterStreamCall(TwitterCall): | |
249 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
250 | return handle_stream_response( | |
03d8511c MV |
251 | req, uri, arg_data, block, |
252 | _timeout or timeout, heartbeat_timeout) | |
d3915c61 MV |
253 | |
254 | TwitterCall.__init__( | |
dd648a25 | 255 | self, auth=auth, format="json", domain=domain, |
d3915c61 | 256 | callable_cls=TwitterStreamCall, |
86318060 | 257 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |