]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Clean up documentation and README
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
f0603331
MV
1# encoding: utf-8
2from __future__ import unicode_literals
3
30af6c0f
MV
4import sys
5PY_3_OR_HIGHER = sys.version_info >= (3, 0)
6
7if PY_3_OR_HIGHER:
dd648a25
MV
8 import urllib.request as urllib_request
9 import urllib.error as urllib_error
30af6c0f 10else:
dd648a25
MV
11 import urllib2 as urllib_request
12 import urllib2 as urllib_error
13import json
2300838f 14from ssl import SSLError
67ddbde4 15import socket
30af6c0f 16import codecs
effd06bb 17import sys, select, time
dd648a25 18
2983f31e 19from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 20
f560656d 21CRLF = b'\r\n'
f38ed662
MV
22MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
23MAX_SOCK_TIMEOUT = 10.0
43778459 24HEARTBEAT_TIMEOUT = 90.0
f560656d 25
d3915c61
MV
26Timeout = {'timeout': True}
27Hangup = {'hangup': True}
aa19e2be
MV
28DecodeError = {'hangup': True, 'decode_error': True}
29HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
26938000 30
e84abb1c 31
aa19e2be 32class HttpChunkDecoder(object):
30af6c0f
MV
33
34 def __init__(self):
35 self.buf = bytearray()
aa19e2be 36 self.munch_crlf = False
30af6c0f 37
aa19e2be 38 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
30af6c0f
MV
39 chunks = []
40 buf = self.buf
aa19e2be
MV
41 munch_crlf = self.munch_crlf
42 end_of_stream = False
43 decode_error = False
44 buf.extend(data)
30af6c0f 45 while True:
aa19e2be
MV
46 if munch_crlf:
47 # Dang, Twitter, you crazy. Twitter only sends a terminating
48 # CRLF at the beginning of the *next* message.
49 if len(buf) >= 2:
50 buf = buf[2:]
51 munch_crlf = False
52 else:
53 break
54
30af6c0f
MV
55 header_end_pos = buf.find(CRLF)
56 if header_end_pos == -1:
57 break
26938000 58
30af6c0f
MV
59 header = buf[:header_end_pos]
60 data_start_pos = header_end_pos + 2
61 try:
62 chunk_len = int(header.decode('ascii'), 16)
63 except ValueError:
aa19e2be
MV
64 decode_error = True
65 break
e84abb1c 66
30af6c0f 67 if chunk_len == 0:
aa19e2be
MV
68 end_of_stream = True
69 break
e84abb1c 70
30af6c0f 71 data_end_pos = data_start_pos + chunk_len
e84abb1c 72
aa19e2be 73 if len(buf) >= data_end_pos:
30af6c0f 74 chunks.append(buf[data_start_pos:data_end_pos])
aa19e2be
MV
75 buf = buf[data_end_pos:]
76 munch_crlf = True
30af6c0f
MV
77 else:
78 break
79 self.buf = buf
aa19e2be
MV
80 self.munch_crlf = munch_crlf
81 return bytearray().join(chunks), end_of_stream, decode_error
0d92536c 82
30af6c0f 83
aa19e2be 84class JsonDecoder(object):
30af6c0f
MV
85
86 def __init__(self):
f0603331 87 self.buf = ""
30af6c0f
MV
88 self.raw_decode = json.JSONDecoder().raw_decode
89
aa19e2be 90 def decode(self, data):
30af6c0f 91 chunks = []
aa19e2be 92 buf = self.buf + data
30af6c0f
MV
93 while True:
94 try:
95 buf = buf.lstrip()
96 res, ptr = self.raw_decode(buf)
97 buf = buf[ptr:]
98 chunks.append(res)
99 except ValueError:
100 break
101 self.buf = buf
102 return chunks
26938000 103
26938000 104
d3915c61 105class Timer(object):
aa19e2be 106
d3915c61 107 def __init__(self, timeout):
03d8511c 108 # If timeout is None, we never expire.
d3915c61
MV
109 self.timeout = timeout
110 self.reset()
111
112 def reset(self):
113 self.time = time.time()
114
115 def expired(self):
42b9cdee
MV
116 """
117 If expired, reset the timer and return True.
118 """
d3915c61 119 if self.timeout is None:
03d8511c 120 return False
d3915c61
MV
121 elif time.time() - self.time > self.timeout:
122 self.reset()
123 return True
124 return False
125
126
aa19e2be
MV
127class SockReader(object):
128 def __init__(self, sock, sock_timeout):
129 self.sock = sock
130 self.sock_timeout = sock_timeout
131
132 def read(self):
133 try:
134 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
135 if ready_to_read:
136 return self.sock.read()
137 except SSLError as e:
138 # Code 2 is error from a non-blocking read of an empty buffer.
139 if e.errno != 2:
140 raise
141 return bytearray()
142
143
dd648a25
MV
144class TwitterJSONIter(object):
145
03d8511c 146 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
dd648a25 147 self.handle = handle
25ea832f
AD
148 self.uri = uri
149 self.arg_data = arg_data
43778459 150 self.timeout_token = Timeout
f38ed662 151 self.timeout = None
43778459
MV
152 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
153 if timeout and timeout > 0:
154 self.timeout = float(timeout)
155 elif not (block or timeout):
156 self.timeout_token = None
f38ed662 157 self.timeout = MIN_SOCK_TIMEOUT
43778459
MV
158 if heartbeat_timeout and heartbeat_timeout > 0:
159 self.heartbeat_timeout = float(heartbeat_timeout)
effd06bb 160
dd648a25 161 def __iter__(self):
f38ed662
MV
162 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
163 if t is not None]
164 sock_timeout = min(*timeouts)
d3915c61 165 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
67ddbde4 166 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
30af6c0f 167 headers = self.handle.headers
aa19e2be
MV
168 sock_reader = SockReader(sock, sock_timeout)
169 chunk_decoder = HttpChunkDecoder()
170 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
171 json_decoder = JsonDecoder()
d3915c61 172 timer = Timer(self.timeout)
03d8511c 173 heartbeat_timer = Timer(self.heartbeat_timeout)
aa19e2be 174
dd648a25 175 while True:
aa19e2be
MV
176 # Decode all the things:
177 data = sock_reader.read()
178 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
f38ed662
MV
179 unicode_data = utf8_decoder.decode(dechunked_data)
180 json_data = json_decoder.decode(unicode_data)
aa19e2be
MV
181
182 # Yield data-like things:
183 for json_obj in json_data:
184 yield wrap_response(json_obj, headers)
185
186 # Reset timers:
187 if dechunked_data:
03d8511c 188 heartbeat_timer.reset()
aa19e2be
MV
189 if json_data:
190 timer.reset()
03d8511c 191
aa19e2be
MV
192 # Yield timeouts and special things:
193 if end_of_stream:
194 yield Hangup
195 break
196 if decode_error:
197 yield DecodeError
198 break
03d8511c
MV
199 if heartbeat_timer.expired():
200 yield HeartbeatTimeout
201 break
202 if timer.expired():
43778459 203 yield self.timeout_token
03d8511c 204
2300838f 205
03d8511c 206def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
0d92536c
AD
207 try:
208 handle = urllib_request.urlopen(req,)
209 except urllib_error.HTTPError as e:
210 raise TwitterHTTPError(e, uri, 'json', arg_data)
03d8511c 211 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
effd06bb 212
d3915c61 213class TwitterStream(TwitterCall):
24950891 214 """
42b9cdee
MV
215 The TwitterStream object is an interface to the Twitter Stream
216 API. This can be used pretty much the same as the Twitter class
217 except the result of calling a method will be an iterator that
218 yields objects decoded from the stream. For example::
24950891 219
14b7a6ee 220 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
221 iterator = twitter_stream.statuses.sample()
222
223 for tweet in iterator:
d4f3123e
MV
224 # ...do something with this tweet...
225
226 Per default the ``TwitterStream`` object uses
227 [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public).
228 If you want to use one of the other
229 [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL
230 manually:
231
232 - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com
233 - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com
234 - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com
235
236 Note that you require the proper
237 [permissions](https://dev.twitter.com/docs/application-permission-model) to
238 access these streams. E.g. for direct messages your
239 [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct
240 Messages" permission.
241
242 The following example demonstrates how to retrieve all new direct messages
243 from the user stream::
244
245 auth = OAuth(
246 consumer_key='[your consumer key]',
247 consumer_secret='[your consumer secret]',
248 token='[your token]',
249 token_secret='[your token secret]'
250 )
251 twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com')
252 for msg in twitter_userstream.user():
253 if 'direct_message' in msg:
254 print msg['direct_message']['text']
24950891 255
42b9cdee
MV
256 The iterator will yield until the TCP connection breaks. When the
257 connection breaks, the iterator yields `{'hangup': True}`, and
258 raises `StopIteration` if iterated again.
2300838f 259
03d8511c
MV
260 Similarly, if the stream does not produce heartbeats for more than
261 90 seconds, the iterator yields `{'hangup': True,
262 'heartbeat_timeout': True}`, and raises `StopIteration` if
263 iterated again.
264
42b9cdee
MV
265 The `timeout` parameter controls the maximum time between
266 yields. If it is nonzero, then the iterator will yield either
03d8511c
MV
267 stream data or `{'timeout': True}` within the timeout period. This
268 is useful if you want your program to do other stuff in between
269 waiting for tweets.
270
271 The `block` parameter sets the stream to be fully non-blocking. In
272 this mode, the iterator always yields immediately. It returns
273 stream data, or `None`. Note that `timeout` supercedes this
274 argument, so it should also be set `None` to use this mode.
24950891 275 """
03d8511c
MV
276 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
277 api_version='1.1', block=True, timeout=None,
278 heartbeat_timeout=90.0):
d3915c61 279 uriparts = (str(api_version),)
d3915c61
MV
280
281 class TwitterStreamCall(TwitterCall):
282 def _handle_response(self, req, uri, arg_data, _timeout=None):
283 return handle_stream_response(
03d8511c
MV
284 req, uri, arg_data, block,
285 _timeout or timeout, heartbeat_timeout)
d3915c61
MV
286
287 TwitterCall.__init__(
dd648a25 288 self, auth=auth, format="json", domain=domain,
d3915c61 289 callable_cls=TwitterStreamCall,
86318060 290 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)