]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
yield Hangup and break on Stream SSLError (close #284)
[z_archive/twitter.git] / twitter / stream.py
1 # encoding: utf-8
2 from __future__ import unicode_literals
3
4 from .util import PY_3_OR_HIGHER
5
6 if PY_3_OR_HIGHER:
7 import urllib.request as urllib_request
8 import urllib.error as urllib_error
9 else:
10 import urllib2 as urllib_request
11 import urllib2 as urllib_error
12 import json
13 from ssl import SSLError
14 import socket
15 import codecs
16 import sys, select, time
17
18 from .api import TwitterCall, wrap_response, TwitterHTTPError
19
20 CRLF = b'\r\n'
21 MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
22 MAX_SOCK_TIMEOUT = 10.0
23 HEARTBEAT_TIMEOUT = 90.0
24
25 Timeout = {'timeout': True}
26 Hangup = {'hangup': True}
27 DecodeError = {'hangup': True, 'decode_error': True}
28 HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
29
30
31 class HttpChunkDecoder(object):
32
33 def __init__(self):
34 self.buf = bytearray()
35 self.munch_crlf = False
36
37 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
38 chunks = []
39 buf = self.buf
40 munch_crlf = self.munch_crlf
41 end_of_stream = False
42 decode_error = False
43 buf.extend(data)
44 while True:
45 if munch_crlf:
46 # Dang, Twitter, you crazy. Twitter only sends a terminating
47 # CRLF at the beginning of the *next* message.
48 if len(buf) >= 2:
49 buf = buf[2:]
50 munch_crlf = False
51 else:
52 break
53
54 header_end_pos = buf.find(CRLF)
55 if header_end_pos == -1:
56 break
57
58 header = buf[:header_end_pos]
59 data_start_pos = header_end_pos + 2
60 try:
61 chunk_len = int(header.decode('ascii'), 16)
62 except ValueError:
63 decode_error = True
64 break
65
66 if chunk_len == 0:
67 end_of_stream = True
68 break
69
70 data_end_pos = data_start_pos + chunk_len
71
72 if len(buf) >= data_end_pos:
73 chunks.append(buf[data_start_pos:data_end_pos])
74 buf = buf[data_end_pos:]
75 munch_crlf = True
76 else:
77 break
78 self.buf = buf
79 self.munch_crlf = munch_crlf
80 return bytearray().join(chunks), end_of_stream, decode_error
81
82
83 class JsonDecoder(object):
84
85 def __init__(self):
86 self.buf = ""
87 self.raw_decode = json.JSONDecoder().raw_decode
88
89 def decode(self, data):
90 chunks = []
91 buf = self.buf + data
92 while True:
93 try:
94 buf = buf.lstrip()
95 res, ptr = self.raw_decode(buf)
96 buf = buf[ptr:]
97 chunks.append(res)
98 except ValueError:
99 break
100 self.buf = buf
101 return chunks
102
103
104 class Timer(object):
105
106 def __init__(self, timeout):
107 # If timeout is None, we never expire.
108 self.timeout = timeout
109 self.reset()
110
111 def reset(self):
112 self.time = time.time()
113
114 def expired(self):
115 """
116 If expired, reset the timer and return True.
117 """
118 if self.timeout is None:
119 return False
120 elif time.time() - self.time > self.timeout:
121 self.reset()
122 return True
123 return False
124
125
126 class SockReader(object):
127 def __init__(self, sock, sock_timeout):
128 self.sock = sock
129 self.sock_timeout = sock_timeout
130
131 def read(self):
132 try:
133 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
134 if ready_to_read:
135 return self.sock.read()
136 except SSLError as e:
137 # Code 2 is error from a non-blocking read of an empty buffer.
138 if e.errno != 2:
139 raise
140 return bytearray()
141
142
143 class TwitterJSONIter(object):
144
145 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
146 self.handle = handle
147 self.uri = uri
148 self.arg_data = arg_data
149 self.timeout_token = Timeout
150 self.timeout = None
151 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
152 if timeout and timeout > 0:
153 self.timeout = float(timeout)
154 elif not (block or timeout):
155 self.timeout_token = None
156 self.timeout = MIN_SOCK_TIMEOUT
157 if heartbeat_timeout and heartbeat_timeout > 0:
158 self.heartbeat_timeout = float(heartbeat_timeout)
159
160 def __iter__(self):
161 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
162 if t is not None]
163 sock_timeout = min(*timeouts)
164 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
165 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
166 headers = self.handle.headers
167 sock_reader = SockReader(sock, sock_timeout)
168 chunk_decoder = HttpChunkDecoder()
169 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
170 json_decoder = JsonDecoder()
171 timer = Timer(self.timeout)
172 heartbeat_timer = Timer(self.heartbeat_timeout)
173
174 while True:
175 # Decode all the things:
176 try:
177 data = sock_reader.read()
178 except SSLError:
179 yield Hangup
180 break
181 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
182 unicode_data = utf8_decoder.decode(dechunked_data)
183 json_data = json_decoder.decode(unicode_data)
184
185 # Yield data-like things:
186 for json_obj in json_data:
187 yield wrap_response(json_obj, headers)
188
189 # Reset timers:
190 if dechunked_data:
191 heartbeat_timer.reset()
192 if json_data:
193 timer.reset()
194
195 # Yield timeouts and special things:
196 if end_of_stream:
197 yield Hangup
198 break
199 if decode_error:
200 yield DecodeError
201 break
202 if heartbeat_timer.expired():
203 yield HeartbeatTimeout
204 break
205 if timer.expired():
206 yield self.timeout_token
207
208
209 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
210 try:
211 handle = urllib_request.urlopen(req,)
212 except urllib_error.HTTPError as e:
213 raise TwitterHTTPError(e, uri, 'json', arg_data)
214 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
215
216 class TwitterStream(TwitterCall):
217 """
218 The TwitterStream object is an interface to the Twitter Stream
219 API. This can be used pretty much the same as the Twitter class
220 except the result of calling a method will be an iterator that
221 yields objects decoded from the stream. For example::
222
223 twitter_stream = TwitterStream(auth=OAuth(...))
224 iterator = twitter_stream.statuses.sample()
225
226 for tweet in iterator:
227 # ...do something with this tweet...
228
229 Per default the ``TwitterStream`` object uses
230 [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public).
231 If you want to use one of the other
232 [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL
233 manually:
234
235 - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com
236 - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com
237 - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com
238
239 Note that you require the proper
240 [permissions](https://dev.twitter.com/docs/application-permission-model) to
241 access these streams. E.g. for direct messages your
242 [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct
243 Messages" permission.
244
245 The following example demonstrates how to retrieve all new direct messages
246 from the user stream::
247
248 auth = OAuth(
249 consumer_key='[your consumer key]',
250 consumer_secret='[your consumer secret]',
251 token='[your token]',
252 token_secret='[your token secret]'
253 )
254 twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com')
255 for msg in twitter_userstream.user():
256 if 'direct_message' in msg:
257 print msg['direct_message']['text']
258
259 The iterator will yield until the TCP connection breaks. When the
260 connection breaks, the iterator yields `{'hangup': True}`, and
261 raises `StopIteration` if iterated again.
262
263 Similarly, if the stream does not produce heartbeats for more than
264 90 seconds, the iterator yields `{'hangup': True,
265 'heartbeat_timeout': True}`, and raises `StopIteration` if
266 iterated again.
267
268 The `timeout` parameter controls the maximum time between
269 yields. If it is nonzero, then the iterator will yield either
270 stream data or `{'timeout': True}` within the timeout period. This
271 is useful if you want your program to do other stuff in between
272 waiting for tweets.
273
274 The `block` parameter sets the stream to be fully non-blocking. In
275 this mode, the iterator always yields immediately. It returns
276 stream data, or `None`. Note that `timeout` supercedes this
277 argument, so it should also be set `None` to use this mode.
278 """
279 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
280 api_version='1.1', block=True, timeout=None,
281 heartbeat_timeout=90.0):
282 uriparts = (str(api_version),)
283
284 class TwitterStreamCall(TwitterCall):
285 def _handle_response(self, req, uri, arg_data, _timeout=None):
286 return handle_stream_response(
287 req, uri, arg_data, block,
288 _timeout or timeout, heartbeat_timeout)
289
290 TwitterCall.__init__(
291 self, auth=auth, format="json", domain=domain,
292 callable_cls=TwitterStreamCall,
293 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)