2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
5 import urllib2
as urllib_request
6 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
17 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
19 Timeout
= {'timeout': True}
20 Hangup
= {'hangup': True}
21 HeartbeatTimeout
= {'heartbeat_timeout': True, 'hangup': True}
23 class ChunkDecodeError(Exception):
26 class EndOfStream(Exception):
29 class SocketShim(io
.IOBase
):
31 Adapts a raw socket to fit the IO protocol.
33 def __init__(self
, sock
):
38 return self
.sock
.read(size
)
39 def readinto(self
, buf
):
40 return self
.sock
.recv_into(buf
)
42 def recv_chunk(reader
): # -> bytearray:
43 for headerlen
in xrange(12):
44 header
= reader
.peek(headerlen
)[:headerlen
]
45 if header
.endswith(CRLF
):
48 raise ChunkDecodeError()
50 size
= int(header
, 16) # Decode the chunk size
51 reader
.read(headerlen
) # Ditch the header
57 while len(chunk
) < size
:
58 remainder
= size
- len(chunk
)
59 chunk
.extend(reader
.read(remainder
))
61 reader
.read(2) # Ditch remaining CRLF
67 def __init__(self
, timeout
):
68 # If timeout is None, we never expire.
69 self
.timeout
= timeout
73 self
.time
= time
.time()
77 If expired, reset the timer and return True.
79 if self
.timeout
is None:
81 elif time
.time() - self
.time
> self
.timeout
:
87 class TwitterJSONIter(object):
89 def __init__(self
, handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
92 self
.arg_data
= arg_data
94 self
.timeout
= float(timeout
) if timeout
else None
95 self
.heartbeat_timeout
= float(heartbeat_timeout
) if heartbeat_timeout
else None
99 actually_block
= self
.block
and not self
.timeout
100 sock_timeout
= min(self
.timeout
, self
.heartbeat_timeout
) if actually_block
else None
101 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
102 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
103 sock
.setblocking(actually_block
)
104 reader
= io
.BufferedReader(SocketShim(sock
))
106 raw_decode
= json
.JSONDecoder().raw_decode
107 timer
= Timer(self
.timeout
)
108 heartbeat_timer
= Timer(self
.heartbeat_timeout
)
110 buf
= buf
.lstrip() # Remove any keep-alive delimiters
112 res
, ptr
= raw_decode(buf
)
115 if not self
.block
and not self
.timeout
:
118 yield wrap_response(res
, self
.handle
.headers
)
120 heartbeat_timer
.reset()
123 if heartbeat_timer
.expired():
124 yield HeartbeatTimeout
131 ready_to_read
= select
.select([sock
], [], [], sock_timeout
)[0]
132 if not ready_to_read
:
134 received
= recv_chunk(reader
)
135 buf
+= received
.decode('utf-8')
137 heartbeat_timer
.reset()
138 except (ChunkDecodeError
, EndOfStream
):
141 except SSLError
as e
:
142 # Code 2 is error from a non-blocking read of an empty buffer.
146 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
):
148 handle
= urllib_request
.urlopen(req
,)
149 except urllib_error
.HTTPError
as e
:
150 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
151 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
, heartbeat_timeout
))
153 class TwitterStream(TwitterCall
):
155 The TwitterStream object is an interface to the Twitter Stream
156 API. This can be used pretty much the same as the Twitter class
157 except the result of calling a method will be an iterator that
158 yields objects decoded from the stream. For example::
160 twitter_stream = TwitterStream(auth=OAuth(...))
161 iterator = twitter_stream.statuses.sample()
163 for tweet in iterator:
164 ...do something with this tweet...
166 The iterator will yield until the TCP connection breaks. When the
167 connection breaks, the iterator yields `{'hangup': True}`, and
168 raises `StopIteration` if iterated again.
170 Similarly, if the stream does not produce heartbeats for more than
171 90 seconds, the iterator yields `{'hangup': True,
172 'heartbeat_timeout': True}`, and raises `StopIteration` if
175 The `timeout` parameter controls the maximum time between
176 yields. If it is nonzero, then the iterator will yield either
177 stream data or `{'timeout': True}` within the timeout period. This
178 is useful if you want your program to do other stuff in between
181 The `block` parameter sets the stream to be fully non-blocking. In
182 this mode, the iterator always yields immediately. It returns
183 stream data, or `None`. Note that `timeout` supercedes this
184 argument, so it should also be set `None` to use this mode.
186 def __init__(self
, domain
="stream.twitter.com", secure
=True, auth
=None,
187 api_version
='1.1', block
=True, timeout
=None,
188 heartbeat_timeout
=90.0):
189 uriparts
= (str(api_version
),)
191 class TwitterStreamCall(TwitterCall
):
192 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
193 return handle_stream_response(
194 req
, uri
, arg_data
, block
,
195 _timeout
or timeout
, heartbeat_timeout
)
197 TwitterCall
.__init
__(
198 self
, auth
=auth
, format
="json", domain
=domain
,
199 callable_cls
=TwitterStreamCall
,
200 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)