2 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
5 import urllib
.request
as urllib_request
6 import urllib
.error
as urllib_error
8 import urllib2
as urllib_request
9 import urllib2
as urllib_error
11 from ssl
import SSLError
14 import sys
, select
, time
16 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
19 MIN_SOCK_TIMEOUT
= 0.0 # Apparenty select with zero wait is okay!
20 MAX_SOCK_TIMEOUT
= 10.0
21 HEARTBEAT_TIMEOUT
= 90.0
23 Timeout
= {'timeout': True}
24 Hangup
= {'hangup': True}
25 DecodeError
= {'hangup': True, 'decode_error': True}
26 HeartbeatTimeout
= {'hangup': True, 'heartbeat_timeout': True}
29 class HttpChunkDecoder(object):
32 self
.buf
= bytearray()
33 self
.munch_crlf
= False
35 def decode(self
, data
): # -> (bytearray, end_of_stream, decode_error)
38 munch_crlf
= self
.munch_crlf
44 # Dang, Twitter, you crazy. Twitter only sends a terminating
45 # CRLF at the beginning of the *next* message.
52 header_end_pos
= buf
.find(CRLF
)
53 if header_end_pos
== -1:
56 header
= buf
[:header_end_pos
]
57 data_start_pos
= header_end_pos
+ 2
59 chunk_len
= int(header
.decode('ascii'), 16)
68 data_end_pos
= data_start_pos
+ chunk_len
70 if len(buf
) >= data_end_pos
:
71 chunks
.append(buf
[data_start_pos
:data_end_pos
])
72 buf
= buf
[data_end_pos
:]
77 self
.munch_crlf
= munch_crlf
78 return bytearray().join(chunks
), end_of_stream
, decode_error
81 class JsonDecoder(object):
85 self
.raw_decode
= json
.JSONDecoder().raw_decode
87 def decode(self
, data
):
93 res
, ptr
= self
.raw_decode(buf
)
104 def __init__(self
, timeout
):
105 # If timeout is None, we never expire.
106 self
.timeout
= timeout
110 self
.time
= time
.time()
114 If expired, reset the timer and return True.
116 if self
.timeout
is None:
118 elif time
.time() - self
.time
> self
.timeout
:
124 class SockReader(object):
125 def __init__(self
, sock
, sock_timeout
):
127 self
.sock_timeout
= sock_timeout
131 ready_to_read
= select
.select([self
.sock
], [], [], self
.sock_timeout
)[0]
133 return self
.sock
.read()
134 except SSLError
as e
:
135 # Code 2 is error from a non-blocking read of an empty buffer.
141 class TwitterJSONIter(object):
143 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
146 self
.arg_data
= arg_data
147 self
.timeout_token
= Timeout
149 self
.heartbeat_timeout
= HEARTBEAT_TIMEOUT
150 if timeout
and timeout
> 0:
151 self
.timeout
= float(timeout
)
152 elif not (block
or timeout
):
153 self
.timeout_token
= None
154 self
.timeout
= MIN_SOCK_TIMEOUT
155 if heartbeat_timeout
and heartbeat_timeout
> 0:
156 self
.heartbeat_timeout
= float(heartbeat_timeout
)
159 timeouts
= [t
for t
in (self
.timeout
, self
.heartbeat_timeout
, MAX_SOCK_TIMEOUT
)
161 sock_timeout
= min(*timeouts
)
162 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
163 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
164 headers
= self
.handle
.headers
165 sock_reader
= SockReader(sock
, sock_timeout
)
166 chunk_decoder
= HttpChunkDecoder()
167 utf8_decoder
= codecs
.getincrementaldecoder("utf-8")()
168 json_decoder
= JsonDecoder()
169 timer
= Timer(self
.timeout
)
170 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
173 # Decode all the things:
174 data
= sock_reader
.read()
175 dechunked_data
, end_of_stream
, decode_error
= chunk_decoder
.decode(data
)
176 unicode_data
= utf8_decoder
.decode(dechunked_data
)
177 json_data
= json_decoder
.decode(unicode_data
)
179 # Yield data-like things:
180 for json_obj
in json_data
:
181 yield wrap_response(json_obj
, headers
)
185 heartbeat_timer
.reset()
189 # Yield timeouts and special things:
196 if heartbeat_timer
.expired():
197 yield HeartbeatTimeout
200 yield self
.timeout_token
203 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
205 handle
= urllib_request
.urlopen(req
,)
206 except urllib_error
.HTTPError
as e
:
207 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
208 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
210 class TwitterStream(TwitterCall
):
212 The TwitterStream object is an interface to the Twitter Stream
213 API. This can be used pretty much the same as the Twitter class
214 except the result of calling a method will be an iterator that
215 yields objects decoded from the stream. For example::
217 twitter_stream = TwitterStream(auth=OAuth(...))
218 iterator = twitter_stream.statuses.sample()
220 for tweet in iterator:
221 ...do something with this tweet...
223 The iterator will yield until the TCP connection breaks. When the
224 connection breaks, the iterator yields `{'hangup': True}`, and
225 raises `StopIteration` if iterated again.
227 Similarly, if the stream does not produce heartbeats for more than
228 90 seconds, the iterator yields `{'hangup': True,
229 'heartbeat_timeout': True}`, and raises `StopIteration` if
232 The `timeout` parameter controls the maximum time between
233 yields. If it is nonzero, then the iterator will yield either
234 stream data or `{'timeout': True}` within the timeout period. This
235 is useful if you want your program to do other stuff in between
238 The `block` parameter sets the stream to be fully non-blocking. In
239 this mode, the iterator always yields immediately. It returns
240 stream data, or `None`. Note that `timeout` supercedes this
241 argument, so it should also be set `None` to use this mode.
243 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
244 api_version
='1.1', block
=True, timeout
=None,
245 heartbeat_timeout
=90.0):
246 uriparts
= (str(api_version
),)
248 class TwitterStreamCall(TwitterCall
):
249 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
250 return handle_stream_response(
251 req
, uri
, arg_data
, block
,
252 _timeout
or timeout
, heartbeat_timeout
)
254 TwitterCall
.__init
__(
255 self
, auth
=auth
, format
="json", domain
=domain
,
256 callable_cls
=TwitterStreamCall
,
257 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)