]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Merge pull request #212 from justinclift/friends_timeline_update
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
30af6c0f
MV
1import sys
2PY_3_OR_HIGHER = sys.version_info >= (3, 0)
3
4if PY_3_OR_HIGHER:
dd648a25
MV
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
30af6c0f 7else:
dd648a25
MV
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10import json
2300838f 11from ssl import SSLError
67ddbde4 12import socket
30af6c0f 13import codecs
effd06bb 14import sys, select, time
dd648a25 15
2983f31e 16from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 17
f560656d 18CRLF = b'\r\n'
f38ed662
MV
19MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
20MAX_SOCK_TIMEOUT = 10.0
43778459 21HEARTBEAT_TIMEOUT = 90.0
f560656d 22
d3915c61
MV
23Timeout = {'timeout': True}
24Hangup = {'hangup': True}
aa19e2be
MV
25DecodeError = {'hangup': True, 'decode_error': True}
26HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
26938000 27
e84abb1c 28
aa19e2be 29class HttpChunkDecoder(object):
30af6c0f
MV
30
31 def __init__(self):
32 self.buf = bytearray()
aa19e2be 33 self.munch_crlf = False
30af6c0f 34
aa19e2be 35 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
30af6c0f
MV
36 chunks = []
37 buf = self.buf
aa19e2be
MV
38 munch_crlf = self.munch_crlf
39 end_of_stream = False
40 decode_error = False
41 buf.extend(data)
30af6c0f 42 while True:
aa19e2be
MV
43 if munch_crlf:
44 # Dang, Twitter, you crazy. Twitter only sends a terminating
45 # CRLF at the beginning of the *next* message.
46 if len(buf) >= 2:
47 buf = buf[2:]
48 munch_crlf = False
49 else:
50 break
51
30af6c0f
MV
52 header_end_pos = buf.find(CRLF)
53 if header_end_pos == -1:
54 break
26938000 55
30af6c0f
MV
56 header = buf[:header_end_pos]
57 data_start_pos = header_end_pos + 2
58 try:
59 chunk_len = int(header.decode('ascii'), 16)
60 except ValueError:
aa19e2be
MV
61 decode_error = True
62 break
e84abb1c 63
30af6c0f 64 if chunk_len == 0:
aa19e2be
MV
65 end_of_stream = True
66 break
e84abb1c 67
30af6c0f 68 data_end_pos = data_start_pos + chunk_len
e84abb1c 69
aa19e2be 70 if len(buf) >= data_end_pos:
30af6c0f 71 chunks.append(buf[data_start_pos:data_end_pos])
aa19e2be
MV
72 buf = buf[data_end_pos:]
73 munch_crlf = True
30af6c0f
MV
74 else:
75 break
76 self.buf = buf
aa19e2be
MV
77 self.munch_crlf = munch_crlf
78 return bytearray().join(chunks), end_of_stream, decode_error
0d92536c 79
30af6c0f 80
aa19e2be 81class JsonDecoder(object):
30af6c0f
MV
82
83 def __init__(self):
84 self.buf = u""
85 self.raw_decode = json.JSONDecoder().raw_decode
86
aa19e2be 87 def decode(self, data):
30af6c0f 88 chunks = []
aa19e2be 89 buf = self.buf + data
30af6c0f
MV
90 while True:
91 try:
92 buf = buf.lstrip()
93 res, ptr = self.raw_decode(buf)
94 buf = buf[ptr:]
95 chunks.append(res)
96 except ValueError:
97 break
98 self.buf = buf
99 return chunks
26938000 100
26938000 101
d3915c61 102class Timer(object):
aa19e2be 103
d3915c61 104 def __init__(self, timeout):
03d8511c 105 # If timeout is None, we never expire.
d3915c61
MV
106 self.timeout = timeout
107 self.reset()
108
109 def reset(self):
110 self.time = time.time()
111
112 def expired(self):
42b9cdee
MV
113 """
114 If expired, reset the timer and return True.
115 """
d3915c61 116 if self.timeout is None:
03d8511c 117 return False
d3915c61
MV
118 elif time.time() - self.time > self.timeout:
119 self.reset()
120 return True
121 return False
122
123
aa19e2be
MV
124class SockReader(object):
125 def __init__(self, sock, sock_timeout):
126 self.sock = sock
127 self.sock_timeout = sock_timeout
128
129 def read(self):
130 try:
131 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
132 if ready_to_read:
133 return self.sock.read()
134 except SSLError as e:
135 # Code 2 is error from a non-blocking read of an empty buffer.
136 if e.errno != 2:
137 raise
138 return bytearray()
139
140
dd648a25
MV
141class TwitterJSONIter(object):
142
03d8511c 143 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
dd648a25 144 self.handle = handle
25ea832f
AD
145 self.uri = uri
146 self.arg_data = arg_data
43778459 147 self.timeout_token = Timeout
f38ed662 148 self.timeout = None
43778459
MV
149 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
150 if timeout and timeout > 0:
151 self.timeout = float(timeout)
152 elif not (block or timeout):
153 self.timeout_token = None
f38ed662 154 self.timeout = MIN_SOCK_TIMEOUT
43778459
MV
155 if heartbeat_timeout and heartbeat_timeout > 0:
156 self.heartbeat_timeout = float(heartbeat_timeout)
effd06bb 157
dd648a25 158 def __iter__(self):
f38ed662
MV
159 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
160 if t is not None]
161 sock_timeout = min(*timeouts)
d3915c61 162 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
67ddbde4 163 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
30af6c0f 164 headers = self.handle.headers
aa19e2be
MV
165 sock_reader = SockReader(sock, sock_timeout)
166 chunk_decoder = HttpChunkDecoder()
167 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
168 json_decoder = JsonDecoder()
d3915c61 169 timer = Timer(self.timeout)
03d8511c 170 heartbeat_timer = Timer(self.heartbeat_timeout)
aa19e2be 171
dd648a25 172 while True:
aa19e2be
MV
173 # Decode all the things:
174 data = sock_reader.read()
175 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
f38ed662
MV
176 unicode_data = utf8_decoder.decode(dechunked_data)
177 json_data = json_decoder.decode(unicode_data)
aa19e2be
MV
178
179 # Yield data-like things:
180 for json_obj in json_data:
181 yield wrap_response(json_obj, headers)
182
183 # Reset timers:
184 if dechunked_data:
03d8511c 185 heartbeat_timer.reset()
aa19e2be
MV
186 if json_data:
187 timer.reset()
03d8511c 188
aa19e2be
MV
189 # Yield timeouts and special things:
190 if end_of_stream:
191 yield Hangup
192 break
193 if decode_error:
194 yield DecodeError
195 break
03d8511c
MV
196 if heartbeat_timer.expired():
197 yield HeartbeatTimeout
198 break
199 if timer.expired():
43778459 200 yield self.timeout_token
03d8511c 201
2300838f 202
03d8511c 203def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
0d92536c
AD
204 try:
205 handle = urllib_request.urlopen(req,)
206 except urllib_error.HTTPError as e:
207 raise TwitterHTTPError(e, uri, 'json', arg_data)
03d8511c 208 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
effd06bb 209
d3915c61 210class TwitterStream(TwitterCall):
24950891 211 """
42b9cdee
MV
212 The TwitterStream object is an interface to the Twitter Stream
213 API. This can be used pretty much the same as the Twitter class
214 except the result of calling a method will be an iterator that
215 yields objects decoded from the stream. For example::
24950891 216
14b7a6ee 217 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
218 iterator = twitter_stream.statuses.sample()
219
220 for tweet in iterator:
221 ...do something with this tweet...
222
42b9cdee
MV
223 The iterator will yield until the TCP connection breaks. When the
224 connection breaks, the iterator yields `{'hangup': True}`, and
225 raises `StopIteration` if iterated again.
2300838f 226
03d8511c
MV
227 Similarly, if the stream does not produce heartbeats for more than
228 90 seconds, the iterator yields `{'hangup': True,
229 'heartbeat_timeout': True}`, and raises `StopIteration` if
230 iterated again.
231
42b9cdee
MV
232 The `timeout` parameter controls the maximum time between
233 yields. If it is nonzero, then the iterator will yield either
03d8511c
MV
234 stream data or `{'timeout': True}` within the timeout period. This
235 is useful if you want your program to do other stuff in between
236 waiting for tweets.
237
238 The `block` parameter sets the stream to be fully non-blocking. In
239 this mode, the iterator always yields immediately. It returns
240 stream data, or `None`. Note that `timeout` supercedes this
241 argument, so it should also be set `None` to use this mode.
24950891 242 """
03d8511c
MV
243 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
244 api_version='1.1', block=True, timeout=None,
245 heartbeat_timeout=90.0):
d3915c61 246 uriparts = (str(api_version),)
d3915c61
MV
247
248 class TwitterStreamCall(TwitterCall):
249 def _handle_response(self, req, uri, arg_data, _timeout=None):
250 return handle_stream_response(
03d8511c
MV
251 req, uri, arg_data, block,
252 _timeout or timeout, heartbeat_timeout)
d3915c61
MV
253
254 TwitterCall.__init__(
dd648a25 255 self, auth=auth, format="json", domain=domain,
d3915c61 256 callable_cls=TwitterStreamCall,
86318060 257 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)