]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Merge pull request #212 from justinclift/friends_timeline_update
[z_archive/twitter.git] / twitter / stream.py
1 import sys
2 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
3
4 if PY_3_OR_HIGHER:
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
7 else:
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10 import json
11 from ssl import SSLError
12 import socket
13 import codecs
14 import sys, select, time
15
16 from .api import TwitterCall, wrap_response, TwitterHTTPError
17
18 CRLF = b'\r\n'
19 MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
20 MAX_SOCK_TIMEOUT = 10.0
21 HEARTBEAT_TIMEOUT = 90.0
22
23 Timeout = {'timeout': True}
24 Hangup = {'hangup': True}
25 DecodeError = {'hangup': True, 'decode_error': True}
26 HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
27
28
29 class HttpChunkDecoder(object):
30
31 def __init__(self):
32 self.buf = bytearray()
33 self.munch_crlf = False
34
35 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
36 chunks = []
37 buf = self.buf
38 munch_crlf = self.munch_crlf
39 end_of_stream = False
40 decode_error = False
41 buf.extend(data)
42 while True:
43 if munch_crlf:
44 # Dang, Twitter, you crazy. Twitter only sends a terminating
45 # CRLF at the beginning of the *next* message.
46 if len(buf) >= 2:
47 buf = buf[2:]
48 munch_crlf = False
49 else:
50 break
51
52 header_end_pos = buf.find(CRLF)
53 if header_end_pos == -1:
54 break
55
56 header = buf[:header_end_pos]
57 data_start_pos = header_end_pos + 2
58 try:
59 chunk_len = int(header.decode('ascii'), 16)
60 except ValueError:
61 decode_error = True
62 break
63
64 if chunk_len == 0:
65 end_of_stream = True
66 break
67
68 data_end_pos = data_start_pos + chunk_len
69
70 if len(buf) >= data_end_pos:
71 chunks.append(buf[data_start_pos:data_end_pos])
72 buf = buf[data_end_pos:]
73 munch_crlf = True
74 else:
75 break
76 self.buf = buf
77 self.munch_crlf = munch_crlf
78 return bytearray().join(chunks), end_of_stream, decode_error
79
80
81 class JsonDecoder(object):
82
83 def __init__(self):
84 self.buf = u""
85 self.raw_decode = json.JSONDecoder().raw_decode
86
87 def decode(self, data):
88 chunks = []
89 buf = self.buf + data
90 while True:
91 try:
92 buf = buf.lstrip()
93 res, ptr = self.raw_decode(buf)
94 buf = buf[ptr:]
95 chunks.append(res)
96 except ValueError:
97 break
98 self.buf = buf
99 return chunks
100
101
102 class Timer(object):
103
104 def __init__(self, timeout):
105 # If timeout is None, we never expire.
106 self.timeout = timeout
107 self.reset()
108
109 def reset(self):
110 self.time = time.time()
111
112 def expired(self):
113 """
114 If expired, reset the timer and return True.
115 """
116 if self.timeout is None:
117 return False
118 elif time.time() - self.time > self.timeout:
119 self.reset()
120 return True
121 return False
122
123
124 class SockReader(object):
125 def __init__(self, sock, sock_timeout):
126 self.sock = sock
127 self.sock_timeout = sock_timeout
128
129 def read(self):
130 try:
131 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
132 if ready_to_read:
133 return self.sock.read()
134 except SSLError as e:
135 # Code 2 is error from a non-blocking read of an empty buffer.
136 if e.errno != 2:
137 raise
138 return bytearray()
139
140
141 class TwitterJSONIter(object):
142
143 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
144 self.handle = handle
145 self.uri = uri
146 self.arg_data = arg_data
147 self.timeout_token = Timeout
148 self.timeout = None
149 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
150 if timeout and timeout > 0:
151 self.timeout = float(timeout)
152 elif not (block or timeout):
153 self.timeout_token = None
154 self.timeout = MIN_SOCK_TIMEOUT
155 if heartbeat_timeout and heartbeat_timeout > 0:
156 self.heartbeat_timeout = float(heartbeat_timeout)
157
158 def __iter__(self):
159 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
160 if t is not None]
161 sock_timeout = min(*timeouts)
162 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
163 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
164 headers = self.handle.headers
165 sock_reader = SockReader(sock, sock_timeout)
166 chunk_decoder = HttpChunkDecoder()
167 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
168 json_decoder = JsonDecoder()
169 timer = Timer(self.timeout)
170 heartbeat_timer = Timer(self.heartbeat_timeout)
171
172 while True:
173 # Decode all the things:
174 data = sock_reader.read()
175 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
176 unicode_data = utf8_decoder.decode(dechunked_data)
177 json_data = json_decoder.decode(unicode_data)
178
179 # Yield data-like things:
180 for json_obj in json_data:
181 yield wrap_response(json_obj, headers)
182
183 # Reset timers:
184 if dechunked_data:
185 heartbeat_timer.reset()
186 if json_data:
187 timer.reset()
188
189 # Yield timeouts and special things:
190 if end_of_stream:
191 yield Hangup
192 break
193 if decode_error:
194 yield DecodeError
195 break
196 if heartbeat_timer.expired():
197 yield HeartbeatTimeout
198 break
199 if timer.expired():
200 yield self.timeout_token
201
202
203 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
204 try:
205 handle = urllib_request.urlopen(req,)
206 except urllib_error.HTTPError as e:
207 raise TwitterHTTPError(e, uri, 'json', arg_data)
208 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
209
210 class TwitterStream(TwitterCall):
211 """
212 The TwitterStream object is an interface to the Twitter Stream
213 API. This can be used pretty much the same as the Twitter class
214 except the result of calling a method will be an iterator that
215 yields objects decoded from the stream. For example::
216
217 twitter_stream = TwitterStream(auth=OAuth(...))
218 iterator = twitter_stream.statuses.sample()
219
220 for tweet in iterator:
221 ...do something with this tweet...
222
223 The iterator will yield until the TCP connection breaks. When the
224 connection breaks, the iterator yields `{'hangup': True}`, and
225 raises `StopIteration` if iterated again.
226
227 Similarly, if the stream does not produce heartbeats for more than
228 90 seconds, the iterator yields `{'hangup': True,
229 'heartbeat_timeout': True}`, and raises `StopIteration` if
230 iterated again.
231
232 The `timeout` parameter controls the maximum time between
233 yields. If it is nonzero, then the iterator will yield either
234 stream data or `{'timeout': True}` within the timeout period. This
235 is useful if you want your program to do other stuff in between
236 waiting for tweets.
237
238 The `block` parameter sets the stream to be fully non-blocking. In
239 this mode, the iterator always yields immediately. It returns
240 stream data, or `None`. Note that `timeout` supercedes this
241 argument, so it should also be set `None` to use this mode.
242 """
243 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
244 api_version='1.1', block=True, timeout=None,
245 heartbeat_timeout=90.0):
246 uriparts = (str(api_version),)
247
248 class TwitterStreamCall(TwitterCall):
249 def _handle_response(self, req, uri, arg_data, _timeout=None):
250 return handle_stream_response(
251 req, uri, arg_data, block,
252 _timeout or timeout, heartbeat_timeout)
253
254 TwitterCall.__init__(
255 self, auth=auth, format="json", domain=domain,
256 callable_cls=TwitterStreamCall,
257 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)