]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Version 1.17.0
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
f0603331
MV
1# encoding: utf-8
2from __future__ import unicode_literals
3
2b533cdc 4from .util import PY_3_OR_HIGHER
30af6c0f
MV
5
6if PY_3_OR_HIGHER:
dd648a25
MV
7 import urllib.request as urllib_request
8 import urllib.error as urllib_error
30af6c0f 9else:
dd648a25
MV
10 import urllib2 as urllib_request
11 import urllib2 as urllib_error
12import json
2300838f 13from ssl import SSLError
67ddbde4 14import socket
30af6c0f 15import codecs
effd06bb 16import sys, select, time
dd648a25 17
2983f31e 18from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 19
f560656d 20CRLF = b'\r\n'
f38ed662
MV
21MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
22MAX_SOCK_TIMEOUT = 10.0
43778459 23HEARTBEAT_TIMEOUT = 90.0
f560656d 24
d3915c61
MV
25Timeout = {'timeout': True}
26Hangup = {'hangup': True}
aa19e2be
MV
27DecodeError = {'hangup': True, 'decode_error': True}
28HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
26938000 29
e84abb1c 30
aa19e2be 31class HttpChunkDecoder(object):
30af6c0f
MV
32
33 def __init__(self):
34 self.buf = bytearray()
aa19e2be 35 self.munch_crlf = False
30af6c0f 36
aa19e2be 37 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
30af6c0f
MV
38 chunks = []
39 buf = self.buf
aa19e2be
MV
40 munch_crlf = self.munch_crlf
41 end_of_stream = False
42 decode_error = False
43 buf.extend(data)
30af6c0f 44 while True:
aa19e2be
MV
45 if munch_crlf:
46 # Dang, Twitter, you crazy. Twitter only sends a terminating
47 # CRLF at the beginning of the *next* message.
48 if len(buf) >= 2:
49 buf = buf[2:]
50 munch_crlf = False
51 else:
52 break
53
30af6c0f
MV
54 header_end_pos = buf.find(CRLF)
55 if header_end_pos == -1:
56 break
26938000 57
30af6c0f
MV
58 header = buf[:header_end_pos]
59 data_start_pos = header_end_pos + 2
60 try:
61 chunk_len = int(header.decode('ascii'), 16)
62 except ValueError:
aa19e2be
MV
63 decode_error = True
64 break
e84abb1c 65
30af6c0f 66 if chunk_len == 0:
aa19e2be
MV
67 end_of_stream = True
68 break
e84abb1c 69
30af6c0f 70 data_end_pos = data_start_pos + chunk_len
e84abb1c 71
aa19e2be 72 if len(buf) >= data_end_pos:
30af6c0f 73 chunks.append(buf[data_start_pos:data_end_pos])
aa19e2be
MV
74 buf = buf[data_end_pos:]
75 munch_crlf = True
30af6c0f
MV
76 else:
77 break
78 self.buf = buf
aa19e2be
MV
79 self.munch_crlf = munch_crlf
80 return bytearray().join(chunks), end_of_stream, decode_error
0d92536c 81
30af6c0f 82
aa19e2be 83class JsonDecoder(object):
30af6c0f
MV
84
85 def __init__(self):
f0603331 86 self.buf = ""
30af6c0f
MV
87 self.raw_decode = json.JSONDecoder().raw_decode
88
aa19e2be 89 def decode(self, data):
30af6c0f 90 chunks = []
aa19e2be 91 buf = self.buf + data
30af6c0f
MV
92 while True:
93 try:
94 buf = buf.lstrip()
95 res, ptr = self.raw_decode(buf)
96 buf = buf[ptr:]
97 chunks.append(res)
98 except ValueError:
99 break
100 self.buf = buf
101 return chunks
26938000 102
26938000 103
d3915c61 104class Timer(object):
aa19e2be 105
d3915c61 106 def __init__(self, timeout):
03d8511c 107 # If timeout is None, we never expire.
d3915c61
MV
108 self.timeout = timeout
109 self.reset()
110
111 def reset(self):
112 self.time = time.time()
113
114 def expired(self):
42b9cdee
MV
115 """
116 If expired, reset the timer and return True.
117 """
d3915c61 118 if self.timeout is None:
03d8511c 119 return False
d3915c61
MV
120 elif time.time() - self.time > self.timeout:
121 self.reset()
122 return True
123 return False
124
125
aa19e2be
MV
126class SockReader(object):
127 def __init__(self, sock, sock_timeout):
128 self.sock = sock
129 self.sock_timeout = sock_timeout
130
131 def read(self):
132 try:
133 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
134 if ready_to_read:
135 return self.sock.read()
136 except SSLError as e:
137 # Code 2 is error from a non-blocking read of an empty buffer.
138 if e.errno != 2:
139 raise
140 return bytearray()
141
142
dd648a25
MV
143class TwitterJSONIter(object):
144
03d8511c 145 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
dd648a25 146 self.handle = handle
25ea832f
AD
147 self.uri = uri
148 self.arg_data = arg_data
43778459 149 self.timeout_token = Timeout
f38ed662 150 self.timeout = None
43778459
MV
151 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
152 if timeout and timeout > 0:
153 self.timeout = float(timeout)
154 elif not (block or timeout):
155 self.timeout_token = None
f38ed662 156 self.timeout = MIN_SOCK_TIMEOUT
43778459
MV
157 if heartbeat_timeout and heartbeat_timeout > 0:
158 self.heartbeat_timeout = float(heartbeat_timeout)
effd06bb 159
dd648a25 160 def __iter__(self):
f38ed662
MV
161 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
162 if t is not None]
163 sock_timeout = min(*timeouts)
d3915c61 164 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
67ddbde4 165 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
30af6c0f 166 headers = self.handle.headers
aa19e2be
MV
167 sock_reader = SockReader(sock, sock_timeout)
168 chunk_decoder = HttpChunkDecoder()
169 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
170 json_decoder = JsonDecoder()
d3915c61 171 timer = Timer(self.timeout)
03d8511c 172 heartbeat_timer = Timer(self.heartbeat_timeout)
aa19e2be 173
dd648a25 174 while True:
aa19e2be
MV
175 # Decode all the things:
176 data = sock_reader.read()
177 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
f38ed662
MV
178 unicode_data = utf8_decoder.decode(dechunked_data)
179 json_data = json_decoder.decode(unicode_data)
aa19e2be
MV
180
181 # Yield data-like things:
182 for json_obj in json_data:
183 yield wrap_response(json_obj, headers)
184
185 # Reset timers:
186 if dechunked_data:
03d8511c 187 heartbeat_timer.reset()
aa19e2be
MV
188 if json_data:
189 timer.reset()
03d8511c 190
aa19e2be
MV
191 # Yield timeouts and special things:
192 if end_of_stream:
193 yield Hangup
194 break
195 if decode_error:
196 yield DecodeError
197 break
03d8511c
MV
198 if heartbeat_timer.expired():
199 yield HeartbeatTimeout
200 break
201 if timer.expired():
43778459 202 yield self.timeout_token
03d8511c 203
2300838f 204
03d8511c 205def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
0d92536c
AD
206 try:
207 handle = urllib_request.urlopen(req,)
208 except urllib_error.HTTPError as e:
209 raise TwitterHTTPError(e, uri, 'json', arg_data)
03d8511c 210 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
effd06bb 211
d3915c61 212class TwitterStream(TwitterCall):
24950891 213 """
42b9cdee
MV
214 The TwitterStream object is an interface to the Twitter Stream
215 API. This can be used pretty much the same as the Twitter class
216 except the result of calling a method will be an iterator that
217 yields objects decoded from the stream. For example::
24950891 218
14b7a6ee 219 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
220 iterator = twitter_stream.statuses.sample()
221
222 for tweet in iterator:
d4f3123e
MV
223 # ...do something with this tweet...
224
225 Per default the ``TwitterStream`` object uses
226 [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public).
227 If you want to use one of the other
228 [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL
229 manually:
230
231 - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com
232 - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com
233 - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com
234
235 Note that you require the proper
236 [permissions](https://dev.twitter.com/docs/application-permission-model) to
237 access these streams. E.g. for direct messages your
238 [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct
239 Messages" permission.
240
241 The following example demonstrates how to retrieve all new direct messages
242 from the user stream::
243
244 auth = OAuth(
245 consumer_key='[your consumer key]',
246 consumer_secret='[your consumer secret]',
247 token='[your token]',
248 token_secret='[your token secret]'
249 )
250 twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com')
251 for msg in twitter_userstream.user():
252 if 'direct_message' in msg:
253 print msg['direct_message']['text']
24950891 254
42b9cdee
MV
255 The iterator will yield until the TCP connection breaks. When the
256 connection breaks, the iterator yields `{'hangup': True}`, and
257 raises `StopIteration` if iterated again.
2300838f 258
03d8511c
MV
259 Similarly, if the stream does not produce heartbeats for more than
260 90 seconds, the iterator yields `{'hangup': True,
261 'heartbeat_timeout': True}`, and raises `StopIteration` if
262 iterated again.
263
42b9cdee
MV
264 The `timeout` parameter controls the maximum time between
265 yields. If it is nonzero, then the iterator will yield either
03d8511c
MV
266 stream data or `{'timeout': True}` within the timeout period. This
267 is useful if you want your program to do other stuff in between
268 waiting for tweets.
269
270 The `block` parameter sets the stream to be fully non-blocking. In
271 this mode, the iterator always yields immediately. It returns
272 stream data, or `None`. Note that `timeout` supercedes this
273 argument, so it should also be set `None` to use this mode.
24950891 274 """
03d8511c
MV
275 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
276 api_version='1.1', block=True, timeout=None,
277 heartbeat_timeout=90.0):
d3915c61 278 uriparts = (str(api_version),)
d3915c61
MV
279
280 class TwitterStreamCall(TwitterCall):
281 def _handle_response(self, req, uri, arg_data, _timeout=None):
282 return handle_stream_response(
03d8511c
MV
283 req, uri, arg_data, block,
284 _timeout or timeout, heartbeat_timeout)
d3915c61
MV
285
286 TwitterCall.__init__(
dd648a25 287 self, auth=auth, format="json", domain=domain,
d3915c61 288 callable_cls=TwitterStreamCall,
86318060 289 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)