]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Implementation of multipart/form-data that handles more imports and is more conformant
[z_archive/twitter.git] / twitter / stream.py
1 # encoding: utf-8
2 from __future__ import unicode_literals
3
4 from .util import PY_3_OR_HIGHER
5
6 if PY_3_OR_HIGHER:
7 import urllib.request as urllib_request
8 import urllib.error as urllib_error
9 else:
10 import urllib2 as urllib_request
11 import urllib2 as urllib_error
12 import json
13 from ssl import SSLError
14 import socket
15 import codecs
16 import sys, select, time
17
18 from .api import TwitterCall, wrap_response, TwitterHTTPError
19
20 CRLF = b'\r\n'
21 MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
22 MAX_SOCK_TIMEOUT = 10.0
23 HEARTBEAT_TIMEOUT = 90.0
24
25 Timeout = {'timeout': True}
26 Hangup = {'hangup': True}
27 DecodeError = {'hangup': True, 'decode_error': True}
28 HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
29
30
31 class HttpChunkDecoder(object):
32
33 def __init__(self):
34 self.buf = bytearray()
35 self.munch_crlf = False
36
37 def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
38 chunks = []
39 buf = self.buf
40 munch_crlf = self.munch_crlf
41 end_of_stream = False
42 decode_error = False
43 buf.extend(data)
44 while True:
45 if munch_crlf:
46 # Dang, Twitter, you crazy. Twitter only sends a terminating
47 # CRLF at the beginning of the *next* message.
48 if len(buf) >= 2:
49 buf = buf[2:]
50 munch_crlf = False
51 else:
52 break
53
54 header_end_pos = buf.find(CRLF)
55 if header_end_pos == -1:
56 break
57
58 header = buf[:header_end_pos]
59 data_start_pos = header_end_pos + 2
60 try:
61 chunk_len = int(header.decode('ascii'), 16)
62 except ValueError:
63 decode_error = True
64 break
65
66 if chunk_len == 0:
67 end_of_stream = True
68 break
69
70 data_end_pos = data_start_pos + chunk_len
71
72 if len(buf) >= data_end_pos:
73 chunks.append(buf[data_start_pos:data_end_pos])
74 buf = buf[data_end_pos:]
75 munch_crlf = True
76 else:
77 break
78 self.buf = buf
79 self.munch_crlf = munch_crlf
80 return bytearray().join(chunks), end_of_stream, decode_error
81
82
83 class JsonDecoder(object):
84
85 def __init__(self):
86 self.buf = ""
87 self.raw_decode = json.JSONDecoder().raw_decode
88
89 def decode(self, data):
90 chunks = []
91 buf = self.buf + data
92 while True:
93 try:
94 buf = buf.lstrip()
95 res, ptr = self.raw_decode(buf)
96 buf = buf[ptr:]
97 chunks.append(res)
98 except ValueError:
99 break
100 self.buf = buf
101 return chunks
102
103
104 class Timer(object):
105
106 def __init__(self, timeout):
107 # If timeout is None, we never expire.
108 self.timeout = timeout
109 self.reset()
110
111 def reset(self):
112 self.time = time.time()
113
114 def expired(self):
115 """
116 If expired, reset the timer and return True.
117 """
118 if self.timeout is None:
119 return False
120 elif time.time() - self.time > self.timeout:
121 self.reset()
122 return True
123 return False
124
125
126 class SockReader(object):
127 def __init__(self, sock, sock_timeout):
128 self.sock = sock
129 self.sock_timeout = sock_timeout
130
131 def read(self):
132 try:
133 ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
134 if ready_to_read:
135 return self.sock.read()
136 except SSLError as e:
137 # Code 2 is error from a non-blocking read of an empty buffer.
138 if e.errno != 2:
139 raise
140 return bytearray()
141
142
143 class TwitterJSONIter(object):
144
145 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
146 self.handle = handle
147 self.uri = uri
148 self.arg_data = arg_data
149 self.timeout_token = Timeout
150 self.timeout = None
151 self.heartbeat_timeout = HEARTBEAT_TIMEOUT
152 if timeout and timeout > 0:
153 self.timeout = float(timeout)
154 elif not (block or timeout):
155 self.timeout_token = None
156 self.timeout = MIN_SOCK_TIMEOUT
157 if heartbeat_timeout and heartbeat_timeout > 0:
158 self.heartbeat_timeout = float(heartbeat_timeout)
159
160 def __iter__(self):
161 timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
162 if t is not None]
163 sock_timeout = min(*timeouts)
164 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
165 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
166 headers = self.handle.headers
167 sock_reader = SockReader(sock, sock_timeout)
168 chunk_decoder = HttpChunkDecoder()
169 utf8_decoder = codecs.getincrementaldecoder("utf-8")()
170 json_decoder = JsonDecoder()
171 timer = Timer(self.timeout)
172 heartbeat_timer = Timer(self.heartbeat_timeout)
173
174 while True:
175 # Decode all the things:
176 data = sock_reader.read()
177 dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
178 unicode_data = utf8_decoder.decode(dechunked_data)
179 json_data = json_decoder.decode(unicode_data)
180
181 # Yield data-like things:
182 for json_obj in json_data:
183 yield wrap_response(json_obj, headers)
184
185 # Reset timers:
186 if dechunked_data:
187 heartbeat_timer.reset()
188 if json_data:
189 timer.reset()
190
191 # Yield timeouts and special things:
192 if end_of_stream:
193 yield Hangup
194 break
195 if decode_error:
196 yield DecodeError
197 break
198 if heartbeat_timer.expired():
199 yield HeartbeatTimeout
200 break
201 if timer.expired():
202 yield self.timeout_token
203
204
205 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
206 try:
207 handle = urllib_request.urlopen(req,)
208 except urllib_error.HTTPError as e:
209 raise TwitterHTTPError(e, uri, 'json', arg_data)
210 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
211
212 class TwitterStream(TwitterCall):
213 """
214 The TwitterStream object is an interface to the Twitter Stream
215 API. This can be used pretty much the same as the Twitter class
216 except the result of calling a method will be an iterator that
217 yields objects decoded from the stream. For example::
218
219 twitter_stream = TwitterStream(auth=OAuth(...))
220 iterator = twitter_stream.statuses.sample()
221
222 for tweet in iterator:
223 # ...do something with this tweet...
224
225 Per default the ``TwitterStream`` object uses
226 [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public).
227 If you want to use one of the other
228 [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL
229 manually:
230
231 - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com
232 - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com
233 - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com
234
235 Note that you require the proper
236 [permissions](https://dev.twitter.com/docs/application-permission-model) to
237 access these streams. E.g. for direct messages your
238 [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct
239 Messages" permission.
240
241 The following example demonstrates how to retrieve all new direct messages
242 from the user stream::
243
244 auth = OAuth(
245 consumer_key='[your consumer key]',
246 consumer_secret='[your consumer secret]',
247 token='[your token]',
248 token_secret='[your token secret]'
249 )
250 twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com')
251 for msg in twitter_userstream.user():
252 if 'direct_message' in msg:
253 print msg['direct_message']['text']
254
255 The iterator will yield until the TCP connection breaks. When the
256 connection breaks, the iterator yields `{'hangup': True}`, and
257 raises `StopIteration` if iterated again.
258
259 Similarly, if the stream does not produce heartbeats for more than
260 90 seconds, the iterator yields `{'hangup': True,
261 'heartbeat_timeout': True}`, and raises `StopIteration` if
262 iterated again.
263
264 The `timeout` parameter controls the maximum time between
265 yields. If it is nonzero, then the iterator will yield either
266 stream data or `{'timeout': True}` within the timeout period. This
267 is useful if you want your program to do other stuff in between
268 waiting for tweets.
269
270 The `block` parameter sets the stream to be fully non-blocking. In
271 this mode, the iterator always yields immediately. It returns
272 stream data, or `None`. Note that `timeout` supercedes this
273 argument, so it should also be set `None` to use this mode.
274 """
275 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
276 api_version='1.1', block=True, timeout=None,
277 heartbeat_timeout=90.0):
278 uriparts = (str(api_version),)
279
280 class TwitterStreamCall(TwitterCall):
281 def _handle_response(self, req, uri, arg_data, _timeout=None):
282 return handle_stream_response(
283 req, uri, arg_data, block,
284 _timeout or timeout, heartbeat_timeout)
285
286 TwitterCall.__init__(
287 self, auth=auth, format="json", domain=domain,
288 callable_cls=TwitterStreamCall,
289 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)