2 from __future__
import unicode_literals
4 from .util
import PY_3_OR_HIGHER
7 import urllib
.request
as urllib_request
8 import urllib
.error
as urllib_error
10 import urllib2
as urllib_request
11 import urllib2
as urllib_error
13 from ssl
import SSLError
16 import sys
, select
, time
18 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
21 MIN_SOCK_TIMEOUT
= 0.0 # Apparenty select with zero wait is okay!
22 MAX_SOCK_TIMEOUT
= 10.0
23 HEARTBEAT_TIMEOUT
= 90.0
25 Timeout
= {'timeout': True}
26 Hangup
= {'hangup': True}
27 DecodeError
= {'hangup': True, 'decode_error': True}
28 HeartbeatTimeout
= {'hangup': True, 'heartbeat_timeout': True}
31 class HttpChunkDecoder(object):
34 self
.buf
= bytearray()
35 self
.munch_crlf
= False
37 def decode(self
, data
): # -> (bytearray, end_of_stream, decode_error)
40 munch_crlf
= self
.munch_crlf
46 # Dang, Twitter, you crazy. Twitter only sends a terminating
47 # CRLF at the beginning of the *next* message.
54 header_end_pos
= buf
.find(CRLF
)
55 if header_end_pos
== -1:
58 header
= buf
[:header_end_pos
]
59 data_start_pos
= header_end_pos
+ 2
61 chunk_len
= int(header
.decode('ascii'), 16)
70 data_end_pos
= data_start_pos
+ chunk_len
72 if len(buf
) >= data_end_pos
:
73 chunks
.append(buf
[data_start_pos
:data_end_pos
])
74 buf
= buf
[data_end_pos
:]
79 self
.munch_crlf
= munch_crlf
80 return bytearray().join(chunks
), end_of_stream
, decode_error
83 class JsonDecoder(object):
87 self
.raw_decode
= json
.JSONDecoder().raw_decode
89 def decode(self
, data
):
95 res
, ptr
= self
.raw_decode(buf
)
106 def __init__(self
, timeout
):
107 # If timeout is None, we never expire.
108 self
.timeout
= timeout
112 self
.time
= time
.time()
116 If expired, reset the timer and return True.
118 if self
.timeout
is None:
120 elif time
.time() - self
.time
> self
.timeout
:
126 class SockReader(object):
127 def __init__(self
, sock
, sock_timeout
):
129 self
.sock_timeout
= sock_timeout
133 ready_to_read
= select
.select([self
.sock
], [], [], self
.sock_timeout
)[0]
135 return self
.sock
.read()
136 except SSLError
as e
:
137 # Code 2 is error from a non-blocking read of an empty buffer.
143 class TwitterJSONIter(object):
145 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
148 self
.arg_data
= arg_data
149 self
.timeout_token
= Timeout
151 self
.heartbeat_timeout
= HEARTBEAT_TIMEOUT
152 if timeout
and timeout
> 0:
153 self
.timeout
= float(timeout
)
154 elif not (block
or timeout
):
155 self
.timeout_token
= None
156 self
.timeout
= MIN_SOCK_TIMEOUT
157 if heartbeat_timeout
and heartbeat_timeout
> 0:
158 self
.heartbeat_timeout
= float(heartbeat_timeout
)
161 timeouts
= [t
for t
in (self
.timeout
, self
.heartbeat_timeout
, MAX_SOCK_TIMEOUT
)
163 sock_timeout
= min(*timeouts
)
164 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
165 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
166 headers
= self
.handle
.headers
167 sock_reader
= SockReader(sock
, sock_timeout
)
168 chunk_decoder
= HttpChunkDecoder()
169 utf8_decoder
= codecs
.getincrementaldecoder("utf-8")()
170 json_decoder
= JsonDecoder()
171 timer
= Timer(self
.timeout
)
172 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
175 # Decode all the things:
176 data
= sock_reader
.read()
177 dechunked_data
, end_of_stream
, decode_error
= chunk_decoder
.decode(data
)
178 unicode_data
= utf8_decoder
.decode(dechunked_data
)
179 json_data
= json_decoder
.decode(unicode_data
)
181 # Yield data-like things:
182 for json_obj
in json_data
:
183 yield wrap_response(json_obj
, headers
)
187 heartbeat_timer
.reset()
191 # Yield timeouts and special things:
198 if heartbeat_timer
.expired():
199 yield HeartbeatTimeout
202 yield self
.timeout_token
205 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
207 handle
= urllib_request
.urlopen(req
,)
208 except urllib_error
.HTTPError
as e
:
209 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
210 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
212 class TwitterStream(TwitterCall
):
214 The TwitterStream object is an interface to the Twitter Stream
215 API. This can be used pretty much the same as the Twitter class
216 except the result of calling a method will be an iterator that
217 yields objects decoded from the stream. For example::
219 twitter_stream = TwitterStream(auth=OAuth(...))
220 iterator = twitter_stream.statuses.sample()
222 for tweet in iterator:
223 # ...do something with this tweet...
225 Per default the ``TwitterStream`` object uses
226 [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public).
227 If you want to use one of the other
228 [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL
231 - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com
232 - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com
233 - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com
235 Note that you require the proper
236 [permissions](https://dev.twitter.com/docs/application-permission-model) to
237 access these streams. E.g. for direct messages your
238 [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct
239 Messages" permission.
241 The following example demonstrates how to retrieve all new direct messages
242 from the user stream::
245 consumer_key='[your consumer key]',
246 consumer_secret='[your consumer secret]',
247 token='[your token]',
248 token_secret='[your token secret]'
250 twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com')
251 for msg in twitter_userstream.user():
252 if 'direct_message' in msg:
253 print msg['direct_message']['text']
255 The iterator will yield until the TCP connection breaks. When the
256 connection breaks, the iterator yields `{'hangup': True}`, and
257 raises `StopIteration` if iterated again.
259 Similarly, if the stream does not produce heartbeats for more than
260 90 seconds, the iterator yields `{'hangup': True,
261 'heartbeat_timeout': True}`, and raises `StopIteration` if
264 The `timeout` parameter controls the maximum time between
265 yields. If it is nonzero, then the iterator will yield either
266 stream data or `{'timeout': True}` within the timeout period. This
267 is useful if you want your program to do other stuff in between
270 The `block` parameter sets the stream to be fully non-blocking. In
271 this mode, the iterator always yields immediately. It returns
272 stream data, or `None`. Note that `timeout` supercedes this
273 argument, so it should also be set `None` to use this mode.
275 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
276 api_version
='1.1', block
=True, timeout
=None,
277 heartbeat_timeout
=90.0):
278 uriparts
= (str(api_version
),)
280 class TwitterStreamCall(TwitterCall
):
281 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
282 return handle_stream_response(
283 req
, uri
, arg_data
, block
,
284 _timeout
or timeout
, heartbeat_timeout
)
286 TwitterCall
.__init
__(
287 self
, auth
=auth
, format
="json", domain
=domain
,
288 callable_cls
=TwitterStreamCall
,
289 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)