]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Improve OAuth2 app-only code and documentation
[z_archive/twitter.git] / twitter / stream.py
1 # encoding: utf-8
2 from __future__ import unicode_literals
3
4 import sys
5 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
6
7 if PY_3_OR_HIGHER:
8 import urllib.request as urllib_request
9 import urllib.error as urllib_error
10 else:
11 import urllib2 as urllib_request
12 import urllib2 as urllib_error
13 import json
14 from ssl import SSLError
15 import socket
16 import codecs
17 import sys, select, time
18
19 from .api import TwitterCall, wrap_response, TwitterHTTPError
20
21 CRLF = b'\r\n'
22 MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
23 MAX_SOCK_TIMEOUT = 10.0
24 HEARTBEAT_TIMEOUT = 90.0
25
26 Timeout = {'timeout': True}
27 Hangup = {'hangup': True}
28 DecodeError = {'hangup': True, 'decode_error': True}
29 HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
30
31
32 class HttpChunkDecoder(object):
33
34 def __init__(self):
35 self.buf = bytearray()
36 self.munch_crlf = False
37
38 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
39 chunks = []
40 buf = self.buf
41 munch_crlf = self.munch_crlf
42 end_of_stream = False
43 decode_error = False
44 buf.extend(data)
45 while True:
46 if munch_crlf:
47 # Dang, Twitter, you crazy. Twitter only sends a terminating
48 # CRLF at the beginning of the *next* message.
49 if len(buf) >= 2:
50 buf = buf[2:]
51 munch_crlf = False
52 else:
53 break
54
55 header_end_pos = buf.find(CRLF)
56 if header_end_pos == -1:
57 break
58
59 header = buf[:header_end_pos]
60 data_start_pos = header_end_pos + 2
61 try:
62 chunk_len = int(header.decode('ascii'), 16)
63 except ValueError:
64 decode_error = True
65 break
66
67 if chunk_len == 0:
68 end_of_stream = True
69 break
70
71 data_end_pos = data_start_pos + chunk_len
72
73 if len(buf) >= data_end_pos:
74 chunks.append(buf[data_start_pos:data_end_pos])
75 buf = buf[data_end_pos:]
76 munch_crlf = True
77 else:
78 break
79 self.buf = buf
80 self.munch_crlf = munch_crlf
81 return bytearray().join(chunks), end_of_stream, decode_error
82
83
84 class JsonDecoder(object):
85
86 def __init__(self):
87 self.buf = ""
88 self.raw_decode = json.JSONDecoder().raw_decode
89
90 def decode(self, data):
91 chunks = []
92 buf = self.buf + data
93 while True:
94 try:
95 buf = buf.lstrip()
96 res, ptr = self.raw_decode(buf)
97 buf = buf[ptr:]
98 chunks.append(res)
99 except ValueError:
100 break
101 self.buf = buf
102 return chunks
103
104
105 class Timer(object):
106
107 def __init__(self, timeout):
108 # If timeout is None, we never expire.
109 self.timeout = timeout
110 self.reset()
111
112 def reset(self):
113 self.time = time.time()
114
115 def expired(self):
116 """
117 If expired, reset the timer and return True.
118 """
119 if self.timeout is None:
120 return False
121 elif time.time() - self.time > self.timeout:
122 self.reset()
123 return True
124 return False
125
126
127 class SockReader(object):
128 def __init__(self, sock, sock_timeout):
129 self.sock = sock
130 self.sock_timeout = sock_timeout
131
132 def read(self):
133 try:
134 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
135 if ready_to_read:
136 return self.sock.read()
137 except SSLError as e:
138 # Code 2 is error from a non-blocking read of an empty buffer.
139 if e.errno != 2:
140 raise
141 return bytearray()
142
143
144 class TwitterJSONIter(object):
145
146 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
147 self.handle = handle
148 self.uri = uri
149 self.arg_data = arg_data
150 self.timeout_token = Timeout
151 self.timeout = None
152 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
153 if timeout and timeout > 0:
154 self.timeout = float(timeout)
155 elif not (block or timeout):
156 self.timeout_token = None
157 self.timeout = MIN_SOCK_TIMEOUT
158 if heartbeat_timeout and heartbeat_timeout > 0:
159 self.heartbeat_timeout = float(heartbeat_timeout)
160
161 def __iter__(self):
162 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
163 if t is not None]
164 sock_timeout = min(*timeouts)
165 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
166 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
167 headers = self.handle.headers
168 sock_reader = SockReader(sock, sock_timeout)
169 chunk_decoder = HttpChunkDecoder()
170 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
171 json_decoder = JsonDecoder()
172 timer = Timer(self.timeout)
173 heartbeat_timer = Timer(self.heartbeat_timeout)
174
175 while True:
176 # Decode all the things:
177 data = sock_reader.read()
178 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
179 unicode_data = utf8_decoder.decode(dechunked_data)
180 json_data = json_decoder.decode(unicode_data)
181
182 # Yield data-like things:
183 for json_obj in json_data:
184 yield wrap_response(json_obj, headers)
185
186 # Reset timers:
187 if dechunked_data:
188 heartbeat_timer.reset()
189 if json_data:
190 timer.reset()
191
192 # Yield timeouts and special things:
193 if end_of_stream:
194 yield Hangup
195 break
196 if decode_error:
197 yield DecodeError
198 break
199 if heartbeat_timer.expired():
200 yield HeartbeatTimeout
201 break
202 if timer.expired():
203 yield self.timeout_token
204
205
206 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
207 try:
208 handle = urllib_request.urlopen(req,)
209 except urllib_error.HTTPError as e:
210 raise TwitterHTTPError(e, uri, 'json', arg_data)
211 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
212
213 class TwitterStream(TwitterCall):
214 """
215 The TwitterStream object is an interface to the Twitter Stream
216 API. This can be used pretty much the same as the Twitter class
217 except the result of calling a method will be an iterator that
218 yields objects decoded from the stream. For example::
219
220 twitter_stream = TwitterStream(auth=OAuth(...))
221 iterator = twitter_stream.statuses.sample()
222
223 for tweet in iterator:
224 ...do something with this tweet...
225
226 The iterator will yield until the TCP connection breaks. When the
227 connection breaks, the iterator yields `{'hangup': True}`, and
228 raises `StopIteration` if iterated again.
229
230 Similarly, if the stream does not produce heartbeats for more than
231 90 seconds, the iterator yields `{'hangup': True,
232 'heartbeat_timeout': True}`, and raises `StopIteration` if
233 iterated again.
234
235 The `timeout` parameter controls the maximum time between
236 yields. If it is nonzero, then the iterator will yield either
237 stream data or `{'timeout': True}` within the timeout period. This
238 is useful if you want your program to do other stuff in between
239 waiting for tweets.
240
241 The `block` parameter sets the stream to be fully non-blocking. In
242 this mode, the iterator always yields immediately. It returns
243 stream data, or `None`. Note that `timeout` supercedes this
244 argument, so it should also be set `None` to use this mode.
245 """
246 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
247 api_version='1.1', block=True, timeout=None,
248 heartbeat_timeout=90.0):
249 uriparts = (str(api_version),)
250
251 class TwitterStreamCall(TwitterCall):
252 def _handle_response(self, req, uri, arg_data, _timeout=None):
253 return handle_stream_response(
254 req, uri, arg_data, block,
255 _timeout or timeout, heartbeat_timeout)
256
257 TwitterCall.__init__(
258 self, auth=auth, format="json", domain=domain,
259 callable_cls=TwitterStreamCall,
260 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)