2 from __future__
import unicode_literals
4 from .util
import PY_3_OR_HIGHER
7 import urllib
.request
as urllib_request
8 import urllib
.error
as urllib_error
10 import urllib2
as urllib_request
11 import urllib2
as urllib_error
13 from ssl
import SSLError
16 import sys
, select
, time
18 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
21 MIN_SOCK_TIMEOUT
= 0.0 # Apparenty select with zero wait is okay!
22 MAX_SOCK_TIMEOUT
= 10.0
23 HEARTBEAT_TIMEOUT
= 90.0
25 Timeout
= {'timeout': True}
26 Hangup
= {'hangup': True}
27 DecodeError
= {'hangup': True, 'decode_error': True}
28 HeartbeatTimeout
= {'hangup': True, 'heartbeat_timeout': True}
31 class HttpChunkDecoder(object):
34 self
.buf
= bytearray()
35 self
.munch_crlf
= False
37 def decode(self
, data
): # -> (bytearray, end_of_stream, decode_error)
40 munch_crlf
= self
.munch_crlf
46 # Dang, Twitter, you crazy. Twitter only sends a terminating
47 # CRLF at the beginning of the *next* message.
54 header_end_pos
= buf
.find(CRLF
)
55 if header_end_pos
== -1:
58 header
= buf
[:header_end_pos
]
59 data_start_pos
= header_end_pos
+ 2
61 chunk_len
= int(header
.decode('ascii'), 16)
70 data_end_pos
= data_start_pos
+ chunk_len
72 if len(buf
) >= data_end_pos
:
73 chunks
.append(buf
[data_start_pos
:data_end_pos
])
74 buf
= buf
[data_end_pos
:]
79 self
.munch_crlf
= munch_crlf
80 return bytearray().join(chunks
), end_of_stream
, decode_error
83 class JsonDecoder(object):
87 self
.raw_decode
= json
.JSONDecoder().raw_decode
89 def decode(self
, data
):
95 res
, ptr
= self
.raw_decode(buf
)
106 def __init__(self
, timeout
):
107 # If timeout is None, we never expire.
108 self
.timeout
= timeout
112 self
.time
= time
.time()
116 If expired, reset the timer and return True.
118 if self
.timeout
is None:
120 elif time
.time() - self
.time
> self
.timeout
:
126 class SockReader(object):
127 def __init__(self
, sock
, sock_timeout
):
129 self
.sock_timeout
= sock_timeout
133 ready_to_read
= select
.select([self
.sock
], [], [], self
.sock_timeout
)[0]
135 return self
.sock
.read()
136 except SSLError
as e
:
137 # Code 2 is error from a non-blocking read of an empty buffer.
143 class TwitterJSONIter(object):
145 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
148 self
.arg_data
= arg_data
149 self
.timeout_token
= Timeout
151 self
.heartbeat_timeout
= HEARTBEAT_TIMEOUT
152 if timeout
and timeout
> 0:
153 self
.timeout
= float(timeout
)
154 elif not (block
or timeout
):
155 self
.timeout_token
= None
156 self
.timeout
= MIN_SOCK_TIMEOUT
157 if heartbeat_timeout
and heartbeat_timeout
> 0:
158 self
.heartbeat_timeout
= float(heartbeat_timeout
)
161 timeouts
= [t
for t
in (self
.timeout
, self
.heartbeat_timeout
, MAX_SOCK_TIMEOUT
)
163 sock_timeout
= min(*timeouts
)
164 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
165 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
166 headers
= self
.handle
.headers
167 sock_reader
= SockReader(sock
, sock_timeout
)
168 chunk_decoder
= HttpChunkDecoder()
169 utf8_decoder
= codecs
.getincrementaldecoder("utf-8")()
170 json_decoder
= JsonDecoder()
171 timer
= Timer(self
.timeout
)
172 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
175 # Decode all the things:
177 data
= sock_reader
.read()
181 dechunked_data
, end_of_stream
, decode_error
= chunk_decoder
.decode(data
)
182 unicode_data
= utf8_decoder
.decode(dechunked_data
)
183 json_data
= json_decoder
.decode(unicode_data
)
185 # Yield data-like things:
186 for json_obj
in json_data
:
187 yield wrap_response(json_obj
, headers
)
191 heartbeat_timer
.reset()
195 # Yield timeouts and special things:
202 if heartbeat_timer
.expired():
203 yield HeartbeatTimeout
206 yield self
.timeout_token
209 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
211 handle
= urllib_request
.urlopen(req
,)
212 except urllib_error
.HTTPError
as e
:
213 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
214 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
216 class TwitterStream(TwitterCall
):
218 The TwitterStream object is an interface to the Twitter Stream
219 API. This can be used pretty much the same as the Twitter class
220 except the result of calling a method will be an iterator that
221 yields objects decoded from the stream. For example::
223 twitter_stream = TwitterStream(auth=OAuth(...))
224 iterator = twitter_stream.statuses.sample()
226 for tweet in iterator:
227 # ...do something with this tweet...
229 Per default the ``TwitterStream`` object uses
230 [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public).
231 If you want to use one of the other
232 [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL
235 - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com
236 - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com
237 - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com
239 Note that you require the proper
240 [permissions](https://dev.twitter.com/docs/application-permission-model) to
241 access these streams. E.g. for direct messages your
242 [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct
243 Messages" permission.
245 The following example demonstrates how to retrieve all new direct messages
246 from the user stream::
249 consumer_key='[your consumer key]',
250 consumer_secret='[your consumer secret]',
251 token='[your token]',
252 token_secret='[your token secret]'
254 twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com')
255 for msg in twitter_userstream.user():
256 if 'direct_message' in msg:
257 print msg['direct_message']['text']
259 The iterator will yield until the TCP connection breaks. When the
260 connection breaks, the iterator yields `{'hangup': True}`, and
261 raises `StopIteration` if iterated again.
263 Similarly, if the stream does not produce heartbeats for more than
264 90 seconds, the iterator yields `{'hangup': True,
265 'heartbeat_timeout': True}`, and raises `StopIteration` if
268 The `timeout` parameter controls the maximum time between
269 yields. If it is nonzero, then the iterator will yield either
270 stream data or `{'timeout': True}` within the timeout period. This
271 is useful if you want your program to do other stuff in between
274 The `block` parameter sets the stream to be fully non-blocking. In
275 this mode, the iterator always yields immediately. It returns
276 stream data, or `None`. Note that `timeout` supercedes this
277 argument, so it should also be set `None` to use this mode.
279 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
280 api_version
='1.1', block
=True, timeout
=None,
281 heartbeat_timeout
=90.0):
282 uriparts
= (str(api_version
),)
284 class TwitterStreamCall(TwitterCall
):
285 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
286 return handle_stream_response(
287 req
, uri
, arg_data
, block
,
288 _timeout
or timeout
, heartbeat_timeout
)
290 TwitterCall
.__init
__(
291 self
, auth
=auth
, format
="json", domain
=domain
,
292 callable_cls
=TwitterStreamCall
,
293 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)