2 from __future__
import unicode_literals
5 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
8 import urllib
.request
as urllib_request
9 import urllib
.error
as urllib_error
11 import urllib2
as urllib_request
12 import urllib2
as urllib_error
14 from ssl
import SSLError
17 import sys
, select
, time
19 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
22 MIN_SOCK_TIMEOUT
= 0.0 # Apparenty select with zero wait is okay!
23 MAX_SOCK_TIMEOUT
= 10.0
24 HEARTBEAT_TIMEOUT
= 90.0
26 Timeout
= {'timeout': True}
27 Hangup
= {'hangup': True}
28 DecodeError
= {'hangup': True, 'decode_error': True}
29 HeartbeatTimeout
= {'hangup': True, 'heartbeat_timeout': True}
32 class HttpChunkDecoder(object):
35 self
.buf
= bytearray()
36 self
.munch_crlf
= False
38 def decode(self
, data
): # -> (bytearray, end_of_stream, decode_error)
41 munch_crlf
= self
.munch_crlf
47 # Dang, Twitter, you crazy. Twitter only sends a terminating
48 # CRLF at the beginning of the *next* message.
55 header_end_pos
= buf
.find(CRLF
)
56 if header_end_pos
== -1:
59 header
= buf
[:header_end_pos
]
60 data_start_pos
= header_end_pos
+ 2
62 chunk_len
= int(header
.decode('ascii'), 16)
71 data_end_pos
= data_start_pos
+ chunk_len
73 if len(buf
) >= data_end_pos
:
74 chunks
.append(buf
[data_start_pos
:data_end_pos
])
75 buf
= buf
[data_end_pos
:]
80 self
.munch_crlf
= munch_crlf
81 return bytearray().join(chunks
), end_of_stream
, decode_error
84 class JsonDecoder(object):
88 self
.raw_decode
= json
.JSONDecoder().raw_decode
90 def decode(self
, data
):
96 res
, ptr
= self
.raw_decode(buf
)
107 def __init__(self
, timeout
):
108 # If timeout is None, we never expire.
109 self
.timeout
= timeout
113 self
.time
= time
.time()
117 If expired, reset the timer and return True.
119 if self
.timeout
is None:
121 elif time
.time() - self
.time
> self
.timeout
:
127 class SockReader(object):
128 def __init__(self
, sock
, sock_timeout
):
130 self
.sock_timeout
= sock_timeout
134 ready_to_read
= select
.select([self
.sock
], [], [], self
.sock_timeout
)[0]
136 return self
.sock
.read()
137 except SSLError
as e
:
138 # Code 2 is error from a non-blocking read of an empty buffer.
144 class TwitterJSONIter(object):
146 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
149 self
.arg_data
= arg_data
150 self
.timeout_token
= Timeout
152 self
.heartbeat_timeout
= HEARTBEAT_TIMEOUT
153 if timeout
and timeout
> 0:
154 self
.timeout
= float(timeout
)
155 elif not (block
or timeout
):
156 self
.timeout_token
= None
157 self
.timeout
= MIN_SOCK_TIMEOUT
158 if heartbeat_timeout
and heartbeat_timeout
> 0:
159 self
.heartbeat_timeout
= float(heartbeat_timeout
)
162 timeouts
= [t
for t
in (self
.timeout
, self
.heartbeat_timeout
, MAX_SOCK_TIMEOUT
)
164 sock_timeout
= min(*timeouts
)
165 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
166 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
167 headers
= self
.handle
.headers
168 sock_reader
= SockReader(sock
, sock_timeout
)
169 chunk_decoder
= HttpChunkDecoder()
170 utf8_decoder
= codecs
.getincrementaldecoder("utf-8")()
171 json_decoder
= JsonDecoder()
172 timer
= Timer(self
.timeout
)
173 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
176 # Decode all the things:
177 data
= sock_reader
.read()
178 dechunked_data
, end_of_stream
, decode_error
= chunk_decoder
.decode(data
)
179 unicode_data
= utf8_decoder
.decode(dechunked_data
)
180 json_data
= json_decoder
.decode(unicode_data
)
182 # Yield data-like things:
183 for json_obj
in json_data
:
184 yield wrap_response(json_obj
, headers
)
188 heartbeat_timer
.reset()
192 # Yield timeouts and special things:
199 if heartbeat_timer
.expired():
200 yield HeartbeatTimeout
203 yield self
.timeout_token
206 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
208 handle
= urllib_request
.urlopen(req
,)
209 except urllib_error
.HTTPError
as e
:
210 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
211 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
213 class TwitterStream(TwitterCall
):
215 The TwitterStream object is an interface to the Twitter Stream
216 API. This can be used pretty much the same as the Twitter class
217 except the result of calling a method will be an iterator that
218 yields objects decoded from the stream. For example::
220 twitter_stream = TwitterStream(auth=OAuth(...))
221 iterator = twitter_stream.statuses.sample()
223 for tweet in iterator:
224 ...do something with this tweet...
226 The iterator will yield until the TCP connection breaks. When the
227 connection breaks, the iterator yields `{'hangup': True}`, and
228 raises `StopIteration` if iterated again.
230 Similarly, if the stream does not produce heartbeats for more than
231 90 seconds, the iterator yields `{'hangup': True,
232 'heartbeat_timeout': True}`, and raises `StopIteration` if
235 The `timeout` parameter controls the maximum time between
236 yields. If it is nonzero, then the iterator will yield either
237 stream data or `{'timeout': True}` within the timeout period. This
238 is useful if you want your program to do other stuff in between
241 The `block` parameter sets the stream to be fully non-blocking. In
242 this mode, the iterator always yields immediately. It returns
243 stream data, or `None`. Note that `timeout` supercedes this
244 argument, so it should also be set `None` to use this mode.
246 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
247 api_version
='1.1', block
=True, timeout
=None,
248 heartbeat_timeout
=90.0):
249 uriparts
= (str(api_version
),)
251 class TwitterStreamCall(TwitterCall
):
252 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
253 return handle_stream_response(
254 req
, uri
, arg_data
, block
,
255 _timeout
or timeout
, heartbeat_timeout
)
257 TwitterCall
.__init
__(
258 self
, auth
=auth
, format
="json", domain
=domain
,
259 callable_cls
=TwitterStreamCall
,
260 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)