2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
5 import urllib2
as urllib_request
6 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
19 Timeout
= {'timeout': True}
20 Hangup
= {'hangup': True}
21 HeartbeatTimeout
= {'heartbeat_timeout': True, 'hangup': True}
23 class ChunkDecodeError(Exception):
26 class EndOfStream(Exception):
29 range = range if PY_3_OR_HIGHER
else xrange
31 class SocketShim(io
.IOBase
):
33 Adapts a raw socket to fit the IO protocol.
35 def __init__(self
, sock
):
40 return self
.sock
.read(size
)
41 def readinto(self
, buf
):
42 return self
.sock
.recv_into(buf
)
44 def recv_chunk(reader
): # -> bytearray:
45 for headerlen
in range(12):
46 header
= reader
.peek(headerlen
)[:headerlen
]
47 if header
.endswith(CRLF
):
50 raise ChunkDecodeError()
52 size
= int(header
, 16) # Decode the chunk size
53 reader
.read(headerlen
) # Ditch the header
59 while len(chunk
) < size
:
60 remainder
= size
- len(chunk
)
61 chunk
.extend(reader
.read(remainder
))
63 reader
.read(2) # Ditch remaining CRLF
69 def __init__(self
, timeout
):
70 # If timeout is None, we never expire.
71 self
.timeout
= timeout
75 self
.time
= time
.time()
79 If expired, reset the timer and return True.
81 if self
.timeout
is None:
83 elif time
.time() - self
.time
> self
.timeout
:
89 class TwitterJSONIter(object):
91 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
94 self
.arg_data
= arg_data
96 self
.timeout
= float(timeout
) if timeout
else None
97 self
.heartbeat_timeout
= float(heartbeat_timeout
) if heartbeat_timeout
else None
101 actually_block
= self
.block
and not self
.timeout
102 sock_timeout
= min(self
.timeout
or 1000000, self
.heartbeat_timeout
) if actually_block
else None
103 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
104 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
105 sock
.setblocking(actually_block
)
106 reader
= io
.BufferedReader(SocketShim(sock
))
108 raw_decode
= json
.JSONDecoder().raw_decode
109 timer
= Timer(self
.timeout
)
110 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
112 buf
= buf
.lstrip() # Remove any keep-alive delimiters
114 res
, ptr
= raw_decode(buf
)
117 if not self
.block
and not self
.timeout
:
120 yield wrap_response(res
, self
.handle
.headers
)
122 heartbeat_timer
.reset()
125 if heartbeat_timer
.expired():
126 yield HeartbeatTimeout
133 ready_to_read
= select
.select([sock
], [], [], sock_timeout
)[0]
134 if not ready_to_read
:
136 received
= recv_chunk(reader
)
137 buf
+= received
.decode('utf-8')
139 heartbeat_timer
.reset()
140 except (ChunkDecodeError
, EndOfStream
):
143 except SSLError
as e
:
144 # Code 2 is error from a non-blocking read of an empty buffer.
148 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
150 handle
= urllib_request
.urlopen(req
,)
151 except urllib_error
.HTTPError
as e
:
152 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
153 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
155 class TwitterStream(TwitterCall
):
157 The TwitterStream object is an interface to the Twitter Stream
158 API. This can be used pretty much the same as the Twitter class
159 except the result of calling a method will be an iterator that
160 yields objects decoded from the stream. For example::
162 twitter_stream = TwitterStream(auth=OAuth(...))
163 iterator = twitter_stream.statuses.sample()
165 for tweet in iterator:
166 ...do something with this tweet...
168 The iterator will yield until the TCP connection breaks. When the
169 connection breaks, the iterator yields `{'hangup': True}`, and
170 raises `StopIteration` if iterated again.
172 Similarly, if the stream does not produce heartbeats for more than
173 90 seconds, the iterator yields `{'hangup': True,
174 'heartbeat_timeout': True}`, and raises `StopIteration` if
177 The `timeout` parameter controls the maximum time between
178 yields. If it is nonzero, then the iterator will yield either
179 stream data or `{'timeout': True}` within the timeout period. This
180 is useful if you want your program to do other stuff in between
183 The `block` parameter sets the stream to be fully non-blocking. In
184 this mode, the iterator always yields immediately. It returns
185 stream data, or `None`. Note that `timeout` supercedes this
186 argument, so it should also be set `None` to use this mode.
188 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
189 api_version
='1.1', block
=True, timeout
=None,
190 heartbeat_timeout
=90.0):
191 uriparts
= (str(api_version
),)
193 class TwitterStreamCall(TwitterCall
):
194 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
195 return handle_stream_response(
196 req
, uri
, arg_data
, block
,
197 _timeout
or timeout
, heartbeat_timeout
)
199 TwitterCall
.__init
__(
200 self
, auth
=auth
, format
="json", domain
=domain
,
201 callable_cls
=TwitterStreamCall
,
202 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)