2 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
5 import urllib
.request
as urllib_request
6 import urllib
.error
as urllib_error
8 import urllib2
as urllib_request
9 import urllib2
as urllib_error
11 from ssl
import SSLError
15 import sys
, select
, time
17 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
21 Timeout
= {'timeout': True}
22 Hangup
= {'hangup': True}
23 HeartbeatTimeout
= {'heartbeat_timeout': True, 'hangup': True}
25 class ChunkDecodeError(Exception):
28 class EndOfStream(Exception):
31 range = range if PY_3_OR_HIGHER
else xrange
34 class HttpDeChunker(object):
37 self
.buf
= bytearray()
39 def extend(self
, data
):
42 def read_chunks(self
): # -> [bytearray]
46 header_end_pos
= buf
.find(CRLF
)
47 if header_end_pos
== -1:
50 header
= buf
[:header_end_pos
]
51 data_start_pos
= header_end_pos
+ 2
53 chunk_len
= int(header
.decode('ascii'), 16)
55 raise ChunkDecodeError()
60 data_end_pos
= data_start_pos
+ chunk_len
62 if len(buf
) > data_end_pos
+ 2:
63 chunks
.append(buf
[data_start_pos
:data_end_pos
])
64 buf
= buf
[data_end_pos
+ 2:]
71 class JsonDeChunker(object):
75 self
.raw_decode
= json
.JSONDecoder().raw_decode
77 def extend(self
, data
):
80 def read_json_chunks(self
):
86 res
, ptr
= self
.raw_decode(buf
)
96 def __init__(self
, timeout
):
97 # If timeout is None, we never expire.
98 self
.timeout
= timeout
102 self
.time
= time
.time()
106 If expired, reset the timer and return True.
108 if self
.timeout
is None:
110 elif time
.time() - self
.time
> self
.timeout
:
116 class TwitterJSONIter(object):
118 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
121 self
.arg_data
= arg_data
123 self
.timeout
= float(timeout
) if timeout
else None
124 self
.heartbeat_timeout
= float(heartbeat_timeout
) if heartbeat_timeout
else None
128 actually_block
= self
.block
and not self
.timeout
129 sock_timeout
= min(self
.timeout
or 1000000, self
.heartbeat_timeout
)
130 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
131 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
132 sock
.setblocking(actually_block
)
133 headers
= self
.handle
.headers
134 dechunker
= HttpDeChunker()
135 utf8decoder
= codecs
.getincrementaldecoder("utf-8")()
136 json_dechunker
= JsonDeChunker()
137 timer
= Timer(self
.timeout
)
138 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
140 json_chunks
= json_dechunker
.read_json_chunks()
141 for json
in json_chunks
:
142 yield wrap_response(json
, headers
)
145 heartbeat_timer
.reset()
147 if not self
.block
and not self
.timeout
:
149 if heartbeat_timer
.expired():
150 yield HeartbeatTimeout
156 ready_to_read
= select
.select([sock
], [], [], sock_timeout
)[0]
157 if not ready_to_read
:
160 except SSLError
as e
:
161 # Code 2 is error from a non-blocking read of an empty buffer.
166 dechunker
.extend(data
)
169 chunks
= dechunker
.read_chunks()
170 except (ChunkDecodeError
, EndOfStream
):
176 json_dechunker
.extend(utf8decoder
.decode(chunk
))
178 heartbeat_timer
.reset()
180 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
182 handle
= urllib_request
.urlopen(req
,)
183 except urllib_error
.HTTPError
as e
:
184 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
185 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
187 class TwitterStream(TwitterCall
):
189 The TwitterStream object is an interface to the Twitter Stream
190 API. This can be used pretty much the same as the Twitter class
191 except the result of calling a method will be an iterator that
192 yields objects decoded from the stream. For example::
194 twitter_stream = TwitterStream(auth=OAuth(...))
195 iterator = twitter_stream.statuses.sample()
197 for tweet in iterator:
198 ...do something with this tweet...
200 The iterator will yield until the TCP connection breaks. When the
201 connection breaks, the iterator yields `{'hangup': True}`, and
202 raises `StopIteration` if iterated again.
204 Similarly, if the stream does not produce heartbeats for more than
205 90 seconds, the iterator yields `{'hangup': True,
206 'heartbeat_timeout': True}`, and raises `StopIteration` if
209 The `timeout` parameter controls the maximum time between
210 yields. If it is nonzero, then the iterator will yield either
211 stream data or `{'timeout': True}` within the timeout period. This
212 is useful if you want your program to do other stuff in between
215 The `block` parameter sets the stream to be fully non-blocking. In
216 this mode, the iterator always yields immediately. It returns
217 stream data, or `None`. Note that `timeout` supercedes this
218 argument, so it should also be set `None` to use this mode.
220 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
221 api_version
='1.1', block
=True, timeout
=None,
222 heartbeat_timeout
=90.0):
223 uriparts
= (str(api_version
),)
225 class TwitterStreamCall(TwitterCall
):
226 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
227 return handle_stream_response(
228 req
, uri
, arg_data
, block
,
229 _timeout
or timeout
, heartbeat_timeout
)
231 TwitterCall
.__init
__(
232 self
, auth
=auth
, format
="json", domain
=domain
,
233 callable_cls
=TwitterStreamCall
,
234 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)