]>
Commit | Line | Data |
---|---|---|
f0603331 MV |
1 | # encoding: utf-8 |
2 | from __future__ import unicode_literals | |
3 | ||
30af6c0f MV |
4 | import sys |
5 | PY_3_OR_HIGHER = sys.version_info >= (3, 0) | |
6 | ||
7 | if PY_3_OR_HIGHER: | |
dd648a25 MV |
8 | import urllib.request as urllib_request |
9 | import urllib.error as urllib_error | |
30af6c0f | 10 | else: |
dd648a25 MV |
11 | import urllib2 as urllib_request |
12 | import urllib2 as urllib_error | |
13 | import json | |
2300838f | 14 | from ssl import SSLError |
67ddbde4 | 15 | import socket |
30af6c0f | 16 | import codecs |
effd06bb | 17 | import sys, select, time |
dd648a25 | 18 | |
2983f31e | 19 | from .api import TwitterCall, wrap_response, TwitterHTTPError |
dd648a25 | 20 | |
f560656d | 21 | CRLF = b'\r\n' |
f38ed662 MV |
22 | MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay! |
23 | MAX_SOCK_TIMEOUT = 10.0 | |
43778459 | 24 | HEARTBEAT_TIMEOUT = 90.0 |
f560656d | 25 | |
d3915c61 MV |
26 | Timeout = {'timeout': True} |
27 | Hangup = {'hangup': True} | |
aa19e2be MV |
28 | DecodeError = {'hangup': True, 'decode_error': True} |
29 | HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True} | |
26938000 | 30 | |
e84abb1c | 31 | |
aa19e2be | 32 | class HttpChunkDecoder(object): |
30af6c0f MV |
33 | |
34 | def __init__(self): | |
35 | self.buf = bytearray() | |
aa19e2be | 36 | self.munch_crlf = False |
30af6c0f | 37 | |
aa19e2be | 38 | def decode(self, data): # -> (bytearray, end_of_stream, decode_error) |
30af6c0f MV |
39 | chunks = [] |
40 | buf = self.buf | |
aa19e2be MV |
41 | munch_crlf = self.munch_crlf |
42 | end_of_stream = False | |
43 | decode_error = False | |
44 | buf.extend(data) | |
30af6c0f | 45 | while True: |
aa19e2be MV |
46 | if munch_crlf: |
47 | # Dang, Twitter, you crazy. Twitter only sends a terminating | |
48 | # CRLF at the beginning of the *next* message. | |
49 | if len(buf) >= 2: | |
50 | buf = buf[2:] | |
51 | munch_crlf = False | |
52 | else: | |
53 | break | |
54 | ||
30af6c0f MV |
55 | header_end_pos = buf.find(CRLF) |
56 | if header_end_pos == -1: | |
57 | break | |
26938000 | 58 | |
30af6c0f MV |
59 | header = buf[:header_end_pos] |
60 | data_start_pos = header_end_pos + 2 | |
61 | try: | |
62 | chunk_len = int(header.decode('ascii'), 16) | |
63 | except ValueError: | |
aa19e2be MV |
64 | decode_error = True |
65 | break | |
e84abb1c | 66 | |
30af6c0f | 67 | if chunk_len == 0: |
aa19e2be MV |
68 | end_of_stream = True |
69 | break | |
e84abb1c | 70 | |
30af6c0f | 71 | data_end_pos = data_start_pos + chunk_len |
e84abb1c | 72 | |
aa19e2be | 73 | if len(buf) >= data_end_pos: |
30af6c0f | 74 | chunks.append(buf[data_start_pos:data_end_pos]) |
aa19e2be MV |
75 | buf = buf[data_end_pos:] |
76 | munch_crlf = True | |
30af6c0f MV |
77 | else: |
78 | break | |
79 | self.buf = buf | |
aa19e2be MV |
80 | self.munch_crlf = munch_crlf |
81 | return bytearray().join(chunks), end_of_stream, decode_error | |
0d92536c | 82 | |
30af6c0f | 83 | |
aa19e2be | 84 | class JsonDecoder(object): |
30af6c0f MV |
85 | |
86 | def __init__(self): | |
f0603331 | 87 | self.buf = "" |
30af6c0f MV |
88 | self.raw_decode = json.JSONDecoder().raw_decode |
89 | ||
aa19e2be | 90 | def decode(self, data): |
30af6c0f | 91 | chunks = [] |
aa19e2be | 92 | buf = self.buf + data |
30af6c0f MV |
93 | while True: |
94 | try: | |
95 | buf = buf.lstrip() | |
96 | res, ptr = self.raw_decode(buf) | |
97 | buf = buf[ptr:] | |
98 | chunks.append(res) | |
99 | except ValueError: | |
100 | break | |
101 | self.buf = buf | |
102 | return chunks | |
26938000 | 103 | |
26938000 | 104 | |
d3915c61 | 105 | class Timer(object): |
aa19e2be | 106 | |
d3915c61 | 107 | def __init__(self, timeout): |
03d8511c | 108 | # If timeout is None, we never expire. |
d3915c61 MV |
109 | self.timeout = timeout |
110 | self.reset() | |
111 | ||
112 | def reset(self): | |
113 | self.time = time.time() | |
114 | ||
115 | def expired(self): | |
42b9cdee MV |
116 | """ |
117 | If expired, reset the timer and return True. | |
118 | """ | |
d3915c61 | 119 | if self.timeout is None: |
03d8511c | 120 | return False |
d3915c61 MV |
121 | elif time.time() - self.time > self.timeout: |
122 | self.reset() | |
123 | return True | |
124 | return False | |
125 | ||
126 | ||
aa19e2be MV |
127 | class SockReader(object): |
128 | def __init__(self, sock, sock_timeout): | |
129 | self.sock = sock | |
130 | self.sock_timeout = sock_timeout | |
131 | ||
132 | def read(self): | |
133 | try: | |
134 | ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0] | |
135 | if ready_to_read: | |
136 | return self.sock.read() | |
137 | except SSLError as e: | |
138 | # Code 2 is error from a non-blocking read of an empty buffer. | |
139 | if e.errno != 2: | |
140 | raise | |
141 | return bytearray() | |
142 | ||
143 | ||
dd648a25 MV |
144 | class TwitterJSONIter(object): |
145 | ||
03d8511c | 146 | def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout): |
dd648a25 | 147 | self.handle = handle |
25ea832f AD |
148 | self.uri = uri |
149 | self.arg_data = arg_data | |
43778459 | 150 | self.timeout_token = Timeout |
f38ed662 | 151 | self.timeout = None |
43778459 MV |
152 | self.heartbeat_timeout = HEARTBEAT_TIMEOUT |
153 | if timeout and timeout > 0: | |
154 | self.timeout = float(timeout) | |
155 | elif not (block or timeout): | |
156 | self.timeout_token = None | |
f38ed662 | 157 | self.timeout = MIN_SOCK_TIMEOUT |
43778459 MV |
158 | if heartbeat_timeout and heartbeat_timeout > 0: |
159 | self.heartbeat_timeout = float(heartbeat_timeout) | |
effd06bb | 160 | |
dd648a25 | 161 | def __iter__(self): |
f38ed662 MV |
162 | timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT) |
163 | if t is not None] | |
164 | sock_timeout = min(*timeouts) | |
d3915c61 | 165 | sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock |
67ddbde4 | 166 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
30af6c0f | 167 | headers = self.handle.headers |
aa19e2be MV |
168 | sock_reader = SockReader(sock, sock_timeout) |
169 | chunk_decoder = HttpChunkDecoder() | |
170 | utf8_decoder = codecs.getincrementaldecoder("utf-8")() | |
171 | json_decoder = JsonDecoder() | |
d3915c61 | 172 | timer = Timer(self.timeout) |
03d8511c | 173 | heartbeat_timer = Timer(self.heartbeat_timeout) |
aa19e2be | 174 | |
dd648a25 | 175 | while True: |
aa19e2be MV |
176 | # Decode all the things: |
177 | data = sock_reader.read() | |
178 | dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data) | |
f38ed662 MV |
179 | unicode_data = utf8_decoder.decode(dechunked_data) |
180 | json_data = json_decoder.decode(unicode_data) | |
aa19e2be MV |
181 | |
182 | # Yield data-like things: | |
183 | for json_obj in json_data: | |
184 | yield wrap_response(json_obj, headers) | |
185 | ||
186 | # Reset timers: | |
187 | if dechunked_data: | |
03d8511c | 188 | heartbeat_timer.reset() |
aa19e2be MV |
189 | if json_data: |
190 | timer.reset() | |
03d8511c | 191 | |
aa19e2be MV |
192 | # Yield timeouts and special things: |
193 | if end_of_stream: | |
194 | yield Hangup | |
195 | break | |
196 | if decode_error: | |
197 | yield DecodeError | |
198 | break | |
03d8511c MV |
199 | if heartbeat_timer.expired(): |
200 | yield HeartbeatTimeout | |
201 | break | |
202 | if timer.expired(): | |
43778459 | 203 | yield self.timeout_token |
03d8511c | 204 | |
2300838f | 205 | |
03d8511c | 206 | def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout): |
0d92536c AD |
207 | try: |
208 | handle = urllib_request.urlopen(req,) | |
209 | except urllib_error.HTTPError as e: | |
210 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
03d8511c | 211 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout)) |
effd06bb | 212 | |
d3915c61 | 213 | class TwitterStream(TwitterCall): |
24950891 | 214 | """ |
42b9cdee MV |
215 | The TwitterStream object is an interface to the Twitter Stream |
216 | API. This can be used pretty much the same as the Twitter class | |
217 | except the result of calling a method will be an iterator that | |
218 | yields objects decoded from the stream. For example:: | |
24950891 | 219 | |
14b7a6ee | 220 | twitter_stream = TwitterStream(auth=OAuth(...)) |
24950891 MV |
221 | iterator = twitter_stream.statuses.sample() |
222 | ||
223 | for tweet in iterator: | |
224 | ...do something with this tweet... | |
225 | ||
42b9cdee MV |
226 | The iterator will yield until the TCP connection breaks. When the |
227 | connection breaks, the iterator yields `{'hangup': True}`, and | |
228 | raises `StopIteration` if iterated again. | |
2300838f | 229 | |
03d8511c MV |
230 | Similarly, if the stream does not produce heartbeats for more than |
231 | 90 seconds, the iterator yields `{'hangup': True, | |
232 | 'heartbeat_timeout': True}`, and raises `StopIteration` if | |
233 | iterated again. | |
234 | ||
42b9cdee MV |
235 | The `timeout` parameter controls the maximum time between |
236 | yields. If it is nonzero, then the iterator will yield either | |
03d8511c MV |
237 | stream data or `{'timeout': True}` within the timeout period. This |
238 | is useful if you want your program to do other stuff in between | |
239 | waiting for tweets. | |
240 | ||
241 | The `block` parameter sets the stream to be fully non-blocking. In | |
242 | this mode, the iterator always yields immediately. It returns | |
243 | stream data, or `None`. Note that `timeout` supercedes this | |
244 | argument, so it should also be set `None` to use this mode. | |
24950891 | 245 | """ |
03d8511c MV |
246 | def __init__(self, domain="stream.twitter.com", secure=True, auth=None, |
247 | api_version='1.1', block=True, timeout=None, | |
248 | heartbeat_timeout=90.0): | |
d3915c61 | 249 | uriparts = (str(api_version),) |
d3915c61 MV |
250 | |
251 | class TwitterStreamCall(TwitterCall): | |
252 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
253 | return handle_stream_response( | |
03d8511c MV |
254 | req, uri, arg_data, block, |
255 | _timeout or timeout, heartbeat_timeout) | |
d3915c61 MV |
256 | |
257 | TwitterCall.__init__( | |
dd648a25 | 258 | self, auth=auth, format="json", domain=domain, |
d3915c61 | 259 | callable_cls=TwitterStreamCall, |
86318060 | 260 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |