]>
jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 PY_27_OR_HIGHER
= sys
.version_info
>= (2, 7)
16 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
18 Timeout
= {'timeout': True}
19 Hangup
= {'hangup': True}
22 def recv_chunk(sock
): # -> bytearray:
24 header
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
25 crlf
= header
.find(b
'\r\n') # Find the HTTP chunk size.
27 if crlf
> 0: # If there is a length, then process it
29 size
= int(header
[:crlf
], 16) # Decode the chunk size. Rarely exceeds 8KiB.
30 chunk
= bytearray(size
)
31 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
33 if size
<= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
34 chunk
[:size
] = header
[start
:start
+ size
]
35 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
36 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
37 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
38 # and eliminates the need to address them.
39 else: # There is more to read in the chunk.
40 end
= len(header
) - start
41 chunk
[:end
] = header
[start
:]
42 if PY_27_OR_HIGHER
: # When possible, use less memory by reading directly into the buffer.
43 buffer = memoryview(chunk
)[end
:] # Create a view into the bytearray to hold the rest of the chunk.
44 sock
.recv_into(buffer)
45 else: # less efficient for python2.6 compatibility
46 chunk
[end
:] = sock
.recv(max(0, size
- end
))
47 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
55 def __init__(self
, timeout
):
56 # If timeout is None, we always expire.
57 self
.timeout
= timeout
61 self
.time
= time
.time()
65 If expired, reset the timer and return True.
67 if self
.timeout
is None:
69 elif time
.time() - self
.time
> self
.timeout
:
75 class TwitterJSONIter(object):
77 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
80 self
.arg_data
= arg_data
82 self
.timeout
= timeout
86 actually_blocking
= self
.block
and not self
.timeout
87 sock
= self
.handle
.fp
.raw
._sock
if PY_3_OR_HIGHER
else self
.handle
.fp
._sock
.fp
._sock
88 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
89 sock
.setblocking(actually_blocking
)
91 json_decoder
= json
.JSONDecoder()
92 timer
= Timer(self
.timeout
)
93 timeout_token
= Timeout
if self
.timeout
else None
95 buf
= buf
.lstrip() # Remove any keep-alive delimiters to detect hangups.
97 res
, ptr
= json_decoder
.raw_decode(buf
)
102 yield wrap_response(res
, self
.handle
.headers
)
106 if self
.timeout
and not buf
: # This is a non-blocking read.
107 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)[0]
108 if not ready_to_read
and timer
.expired():
111 buf
+= recv_chunk(sock
).decode('utf-8')
115 except SSLError
as e
:
116 # Error from a non-blocking read of an empty buffer.
117 if not actually_blocking
and (e
.errno
== 2):
122 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
124 handle
= urllib_request
.urlopen(req
,)
125 except urllib_error
.HTTPError
as e
:
126 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
127 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
129 class TwitterStream(TwitterCall
):
131 The TwitterStream object is an interface to the Twitter Stream
132 API. This can be used pretty much the same as the Twitter class
133 except the result of calling a method will be an iterator that
134 yields objects decoded from the stream. For example::
136 twitter_stream = TwitterStream(auth=OAuth(...))
137 iterator = twitter_stream.statuses.sample()
139 for tweet in iterator:
140 ...do something with this tweet...
142 The iterator will yield until the TCP connection breaks. When the
143 connection breaks, the iterator yields `{'hangup': True}`, and
144 raises `StopIteration` if iterated again.
146 The `timeout` parameter controls the maximum time between
147 yields. If it is nonzero, then the iterator will yield either
148 stream data or `{'timeout': True}`. This is useful if you want
149 your program to do other stuff in between waiting for tweets.
151 The `block` parameter sets the stream to be non-blocking. In this
152 mode, the iterator always yields immediately. It returns stream
153 data, or `None`. Note that `timeout` supercedes this argument, so
154 it should also be set `None` to use this mode.
157 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
158 api_version
='1.1', block
=True, timeout
=None):
159 uriparts
= (str(api_version
),)
160 timeout
= float(timeout
) if timeout
else None
162 class TwitterStreamCall(TwitterCall
):
163 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
164 return handle_stream_response(
165 req
, uri
, arg_data
, block
=block
, timeout
=_timeout
or timeout
)
167 TwitterCall
.__init
__(
168 self
, auth
=auth
, format
="json", domain
=domain
,
169 callable_cls
=TwitterStreamCall
,
170 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)