2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 PY_27_OR_HIGHER
= sys
.version_info
>= (2, 7)
16 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
18 Timeout
= {'timeout': True}
19 Hangup
= {'hangup': True}
20 HeartbeatTimeout
= {'heartbeat_timeout': True, 'hangup': True}
22 class ChunkDecodeError(Exception):
25 def recv_chunk(sock
): # -> bytearray:
26 header
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
27 crlf
= header
.find(b
'\r\n') # Find the HTTP chunk size.
30 raise ChunkDecodeError()
32 size
= int(header
[:crlf
], 16) # Decode the chunk size. Rarely exceeds 8KiB.
33 chunk
= bytearray(size
)
34 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
36 if size
<= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
37 chunk
[:size
] = header
[start
:start
+ size
]
38 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
39 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
40 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
41 # and eliminates the need to address them.
42 else: # There is more to read in the chunk.
43 end
= len(header
) - start
44 chunk
[:end
] = header
[start
:]
45 if PY_27_OR_HIGHER
: # When possible, use less memory by reading directly into the buffer.
46 buffer = memoryview(chunk
)[end
:] # Create a view into the bytearray to hold the rest of the chunk.
47 sock
.recv_into(buffer)
48 else: # less efficient for python2.6 compatibility
49 chunk
[end
:] = sock
.recv(max(0, size
- end
))
50 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
56 def __init__(self
, timeout
):
57 # If timeout is None, we never expire.
58 self
.timeout
= timeout
62 self
.time
= time
.time()
66 If expired, reset the timer and return True.
68 if self
.timeout
is None:
70 elif time
.time() - self
.time
> self
.timeout
:
76 class TwitterJSONIter(object):
78 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
81 self
.arg_data
= arg_data
83 self
.timeout
= float(timeout
) if timeout
else None
84 self
.heartbeat_timeout
= float(heartbeat_timeout
) if heartbeat_timeout
else None
88 actually_block
= self
.block
and not self
.timeout
89 sock_timeout
= min(self
.timeout
, self
.heartbeat_timeout
) if actually_block
else None
90 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
91 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
92 sock
.setblocking(actually_block
)
94 raw_decode
= json
.JSONDecoder().raw_decode
95 timer
= Timer(self
.timeout
)
96 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
98 buf
= buf
.lstrip() # Remove any keep-alive delimiters
100 res
, ptr
= raw_decode(buf
)
103 if not self
.block
and not self
.timeout
:
106 yield wrap_response(res
, self
.handle
.headers
)
108 heartbeat_timer
.reset()
111 if heartbeat_timer
.expired():
112 yield HeartbeatTimeout
118 if not buf
and sock_timeout
:
119 ready_to_read
= select
.select([sock
], [], [], sock_timeout
)[0]
120 if not ready_to_read
:
122 buf
+= recv_chunk(sock
).decode('utf-8')
126 heartbeat_timer
.reset()
127 except SSLError
as e
:
128 # Code 2 is error from a non-blocking read of an empty buffer.
132 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
134 handle
= urllib_request
.urlopen(req
,)
135 except urllib_error
.HTTPError
as e
:
136 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
137 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
139 class TwitterStream(TwitterCall
):
141 The TwitterStream object is an interface to the Twitter Stream
142 API. This can be used pretty much the same as the Twitter class
143 except the result of calling a method will be an iterator that
144 yields objects decoded from the stream. For example::
146 twitter_stream = TwitterStream(auth=OAuth(...))
147 iterator = twitter_stream.statuses.sample()
149 for tweet in iterator:
150 ...do something with this tweet...
152 The iterator will yield until the TCP connection breaks. When the
153 connection breaks, the iterator yields `{'hangup': True}`, and
154 raises `StopIteration` if iterated again.
156 Similarly, if the stream does not produce heartbeats for more than
157 90 seconds, the iterator yields `{'hangup': True,
158 'heartbeat_timeout': True}`, and raises `StopIteration` if
161 The `timeout` parameter controls the maximum time between
162 yields. If it is nonzero, then the iterator will yield either
163 stream data or `{'timeout': True}` within the timeout period. This
164 is useful if you want your program to do other stuff in between
167 The `block` parameter sets the stream to be fully non-blocking. In
168 this mode, the iterator always yields immediately. It returns
169 stream data, or `None`. Note that `timeout` supercedes this
170 argument, so it should also be set `None` to use this mode.
172 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
173 api_version
='1.1', block
=True, timeout
=None,
174 heartbeat_timeout
=90.0):
175 uriparts
= (str(api_version
),)
177 class TwitterStreamCall(TwitterCall
):
178 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
179 return handle_stream_response(
180 req
, uri
, arg_data
, block
,
181 _timeout
or timeout
, heartbeat_timeout
)
183 TwitterCall
.__init
__(
184 self
, auth
=auth
, format
="json", domain
=domain
,
185 callable_cls
=TwitterStreamCall
,
186 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)