]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
yield Hangup and break on Stream SSLError (close #284)
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
f0603331
MV
1# encoding: utf-8
2from __future__ import unicode_literals
3
2b533cdc 4from .util import PY_3_OR_HIGHER
30af6c0f
MV
5
6if PY_3_OR_HIGHER:
dd648a25
MV
7 import urllib.request as urllib_request
8 import urllib.error as urllib_error
30af6c0f 9else:
dd648a25
MV
10 import urllib2 as urllib_request
11 import urllib2 as urllib_error
12import json
2300838f 13from ssl import SSLError
67ddbde4 14import socket
30af6c0f 15import codecs
effd06bb 16import sys, select, time
dd648a25 17
2983f31e 18from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 19
f560656d 20CRLF = b'\r\n'
f38ed662
MV
21MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
22MAX_SOCK_TIMEOUT = 10.0
43778459 23HEARTBEAT_TIMEOUT = 90.0
f560656d 24
d3915c61
MV
25Timeout = {'timeout': True}
26Hangup = {'hangup': True}
aa19e2be
MV
27DecodeError = {'hangup': True, 'decode_error': True}
28HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
26938000 29
e84abb1c 30
aa19e2be 31class HttpChunkDecoder(object):
30af6c0f
MV
32
33 def __init__(self):
34 self.buf = bytearray()
aa19e2be 35 self.munch_crlf = False
30af6c0f 36
aa19e2be 37 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
30af6c0f
MV
38 chunks = []
39 buf = self.buf
aa19e2be
MV
40 munch_crlf = self.munch_crlf
41 end_of_stream = False
42 decode_error = False
43 buf.extend(data)
30af6c0f 44 while True:
aa19e2be
MV
45 if munch_crlf:
46 # Dang, Twitter, you crazy. Twitter only sends a terminating
47 # CRLF at the beginning of the *next* message.
48 if len(buf) >= 2:
49 buf = buf[2:]
50 munch_crlf = False
51 else:
52 break
53
30af6c0f
MV
54 header_end_pos = buf.find(CRLF)
55 if header_end_pos == -1:
56 break
26938000 57
30af6c0f
MV
58 header = buf[:header_end_pos]
59 data_start_pos = header_end_pos + 2
60 try:
61 chunk_len = int(header.decode('ascii'), 16)
62 except ValueError:
aa19e2be
MV
63 decode_error = True
64 break
e84abb1c 65
30af6c0f 66 if chunk_len == 0:
aa19e2be
MV
67 end_of_stream = True
68 break
e84abb1c 69
30af6c0f 70 data_end_pos = data_start_pos + chunk_len
e84abb1c 71
aa19e2be 72 if len(buf) >= data_end_pos:
30af6c0f 73 chunks.append(buf[data_start_pos:data_end_pos])
aa19e2be
MV
74 buf = buf[data_end_pos:]
75 munch_crlf = True
30af6c0f
MV
76 else:
77 break
78 self.buf = buf
aa19e2be
MV
79 self.munch_crlf = munch_crlf
80 return bytearray().join(chunks), end_of_stream, decode_error
0d92536c 81
30af6c0f 82
aa19e2be 83class JsonDecoder(object):
30af6c0f
MV
84
85 def __init__(self):
f0603331 86 self.buf = ""
30af6c0f
MV
87 self.raw_decode = json.JSONDecoder().raw_decode
88
aa19e2be 89 def decode(self, data):
30af6c0f 90 chunks = []
aa19e2be 91 buf = self.buf + data
30af6c0f
MV
92 while True:
93 try:
94 buf = buf.lstrip()
95 res, ptr = self.raw_decode(buf)
96 buf = buf[ptr:]
97 chunks.append(res)
98 except ValueError:
99 break
100 self.buf = buf
101 return chunks
26938000 102
26938000 103
d3915c61 104class Timer(object):
aa19e2be 105
d3915c61 106 def __init__(self, timeout):
03d8511c 107 # If timeout is None, we never expire.
d3915c61
MV
108 self.timeout = timeout
109 self.reset()
110
111 def reset(self):
112 self.time = time.time()
113
114 def expired(self):
42b9cdee
MV
115 """
116 If expired, reset the timer and return True.
117 """
d3915c61 118 if self.timeout is None:
03d8511c 119 return False
d3915c61
MV
120 elif time.time() - self.time > self.timeout:
121 self.reset()
122 return True
123 return False
124
125
aa19e2be
MV
126class SockReader(object):
127 def __init__(self, sock, sock_timeout):
128 self.sock = sock
129 self.sock_timeout = sock_timeout
130
131 def read(self):
132 try:
133 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
134 if ready_to_read:
135 return self.sock.read()
136 except SSLError as e:
137 # Code 2 is error from a non-blocking read of an empty buffer.
138 if e.errno != 2:
139 raise
140 return bytearray()
141
142
dd648a25
MV
143class TwitterJSONIter(object):
144
03d8511c 145 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
dd648a25 146 self.handle = handle
25ea832f
AD
147 self.uri = uri
148 self.arg_data = arg_data
43778459 149 self.timeout_token = Timeout
f38ed662 150 self.timeout = None
43778459
MV
151 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
152 if timeout and timeout > 0:
153 self.timeout = float(timeout)
154 elif not (block or timeout):
155 self.timeout_token = None
f38ed662 156 self.timeout = MIN_SOCK_TIMEOUT
43778459
MV
157 if heartbeat_timeout and heartbeat_timeout > 0:
158 self.heartbeat_timeout = float(heartbeat_timeout)
effd06bb 159
dd648a25 160 def __iter__(self):
f38ed662
MV
161 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
162 if t is not None]
163 sock_timeout = min(*timeouts)
d3915c61 164 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
67ddbde4 165 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
30af6c0f 166 headers = self.handle.headers
aa19e2be
MV
167 sock_reader = SockReader(sock, sock_timeout)
168 chunk_decoder = HttpChunkDecoder()
169 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
170 json_decoder = JsonDecoder()
d3915c61 171 timer = Timer(self.timeout)
03d8511c 172 heartbeat_timer = Timer(self.heartbeat_timeout)
aa19e2be 173
dd648a25 174 while True:
aa19e2be 175 # Decode all the things:
489299c8
R
176 try:
177 data = sock_reader.read()
178 except SSLError:
179 yield Hangup
180 break
aa19e2be 181 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
f38ed662
MV
182 unicode_data = utf8_decoder.decode(dechunked_data)
183 json_data = json_decoder.decode(unicode_data)
aa19e2be
MV
184
185 # Yield data-like things:
186 for json_obj in json_data:
187 yield wrap_response(json_obj, headers)
188
189 # Reset timers:
190 if dechunked_data:
03d8511c 191 heartbeat_timer.reset()
aa19e2be
MV
192 if json_data:
193 timer.reset()
03d8511c 194
aa19e2be
MV
195 # Yield timeouts and special things:
196 if end_of_stream:
197 yield Hangup
198 break
199 if decode_error:
200 yield DecodeError
201 break
03d8511c
MV
202 if heartbeat_timer.expired():
203 yield HeartbeatTimeout
204 break
205 if timer.expired():
43778459 206 yield self.timeout_token
03d8511c 207
2300838f 208
03d8511c 209def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
0d92536c
AD
210 try:
211 handle = urllib_request.urlopen(req,)
212 except urllib_error.HTTPError as e:
213 raise TwitterHTTPError(e, uri, 'json', arg_data)
03d8511c 214 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
effd06bb 215
d3915c61 216class TwitterStream(TwitterCall):
24950891 217 """
42b9cdee
MV
218 The TwitterStream object is an interface to the Twitter Stream
219 API. This can be used pretty much the same as the Twitter class
220 except the result of calling a method will be an iterator that
221 yields objects decoded from the stream. For example::
24950891 222
14b7a6ee 223 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
224 iterator = twitter_stream.statuses.sample()
225
226 for tweet in iterator:
d4f3123e
MV
227 # ...do something with this tweet...
228
229 Per default the ``TwitterStream`` object uses
230 [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public).
231 If you want to use one of the other
232 [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL
233 manually:
234
235 - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com
236 - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com
237 - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com
238
239 Note that you require the proper
240 [permissions](https://dev.twitter.com/docs/application-permission-model) to
241 access these streams. E.g. for direct messages your
242 [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct
243 Messages" permission.
244
245 The following example demonstrates how to retrieve all new direct messages
246 from the user stream::
247
248 auth = OAuth(
249 consumer_key='[your consumer key]',
250 consumer_secret='[your consumer secret]',
251 token='[your token]',
252 token_secret='[your token secret]'
253 )
254 twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com')
255 for msg in twitter_userstream.user():
256 if 'direct_message' in msg:
257 print msg['direct_message']['text']
24950891 258
42b9cdee
MV
259 The iterator will yield until the TCP connection breaks. When the
260 connection breaks, the iterator yields `{'hangup': True}`, and
261 raises `StopIteration` if iterated again.
2300838f 262
03d8511c
MV
263 Similarly, if the stream does not produce heartbeats for more than
264 90 seconds, the iterator yields `{'hangup': True,
265 'heartbeat_timeout': True}`, and raises `StopIteration` if
266 iterated again.
267
42b9cdee
MV
268 The `timeout` parameter controls the maximum time between
269 yields. If it is nonzero, then the iterator will yield either
03d8511c
MV
270 stream data or `{'timeout': True}` within the timeout period. This
271 is useful if you want your program to do other stuff in between
272 waiting for tweets.
273
274 The `block` parameter sets the stream to be fully non-blocking. In
275 this mode, the iterator always yields immediately. It returns
276 stream data, or `None`. Note that `timeout` supercedes this
277 argument, so it should also be set `None` to use this mode.
24950891 278 """
03d8511c
MV
279 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
280 api_version='1.1', block=True, timeout=None,
281 heartbeat_timeout=90.0):
d3915c61 282 uriparts = (str(api_version),)
d3915c61
MV
283
284 class TwitterStreamCall(TwitterCall):
285 def _handle_response(self, req, uri, arg_data, _timeout=None):
286 return handle_stream_response(
03d8511c
MV
287 req, uri, arg_data, block,
288 _timeout or timeout, heartbeat_timeout)
d3915c61
MV
289
290 TwitterCall.__init__(
dd648a25 291 self, auth=auth, format="json", domain=domain,
d3915c61 292 callable_cls=TwitterStreamCall,
86318060 293 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)