2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 python27_3
= sys
.version_info
>= (2, 7)
16 def recv_chunk(sock
): # -> bytearray:
18 header
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
19 crlf
= header
.find(b
'\r\n') # Find the HTTP chunk size.
21 if crlf
> 0: # If there is a length, then process it
23 size
= int(header
[:crlf
], 16) # Decode the chunk size. Rarely exceeds 8KiB.
24 chunk
= bytearray(size
)
25 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
27 if size
<= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
28 chunk
[:size
] = header
[start
:start
+ size
]
29 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
30 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
31 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
32 # and eliminates the need to address them.
33 else: # There is more to read in the chunk.
34 end
= len(header
) - start
35 chunk
[:end
] = header
[start
:]
36 if python27_3
: # When possible, use less memory by reading directly into the buffer.
37 buffer = memoryview(chunk
)[end
:] # Create a view into the bytearray to hold the rest of the chunk.
38 sock
.recv_into(buffer)
39 else: # less efficient for python2.6 compatibility
40 chunk
[end
:] = sock
.recv(max(0, size
- end
))
41 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
48 class TwitterJSONIter(object):
50 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
53 self
.arg_data
= arg_data
55 self
.timeout
= timeout
59 sock
= self
.handle
.fp
.raw
._sock
if sys
.version_info
>= (3, 0) else self
.handle
.fp
._sock
.fp
._sock
60 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
61 sock
.setblocking(self
.block
and not self
.timeout
)
63 json_decoder
= json
.JSONDecoder()
68 res
, ptr
= json_decoder
.raw_decode(buf
)
70 yield wrap_response(res
, self
.handle
.headers
)
72 except ValueError as e
:
76 buf
= buf
.lstrip() # Remove any keep-alive delimiters to detect hangups.
77 if self
.timeout
and not buf
: # This is a non-blocking read.
78 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
79 if not ready_to_read
[0] and time
.time() - timer
> self
.timeout
:
80 yield {'timeout': True}
83 buf
+= recv_chunk(sock
).decode('utf-8')
85 yield {'hangup': True}
88 # Error from a non-blocking read of an empty buffer.
89 if (not self
.block
or self
.timeout
) and (e
.errno
== 2): pass
92 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
94 handle
= urllib_request
.urlopen(req
,)
95 except urllib_error
.HTTPError
as e
:
96 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
97 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
99 class TwitterStreamCallWithTimeout(TwitterCall
):
100 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
101 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
103 class TwitterStreamCall(TwitterCall
):
104 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
105 return handle_stream_response(req
, uri
, arg_data
, block
=True)
107 class TwitterStreamCallNonBlocking(TwitterCall
):
108 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
109 return handle_stream_response(req
, uri
, arg_data
, block
=False)
111 class TwitterStream(TwitterStreamCall
):
113 The TwitterStream object is an interface to the Twitter Stream API
114 (stream.twitter.com). This can be used pretty much the same as the
115 Twitter class except the result of calling a method will be an
116 iterator that yields objects decoded from the stream. For
119 twitter_stream = TwitterStream(auth=OAuth(...))
120 iterator = twitter_stream.statuses.sample()
122 for tweet in iterator:
123 ...do something with this tweet...
125 The iterator will yield tweets forever and ever (until the stream
126 breaks at which point it raises a TwitterHTTPError.)
128 The `block` parameter controls if the stream is blocking. Default
129 is blocking (True). When set to False, the iterator will
130 occasionally yield None when there is no available message.
133 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
134 api_version
='1.1', block
=True, timeout
=None):
136 uriparts
+= (str(api_version
),)
140 call_cls
= TwitterStreamCallWithTimeout
142 call_cls
= TwitterStreamCall
144 call_cls
= TwitterStreamCallNonBlocking
146 TwitterStreamCall
.__init
__(
147 self
, auth
=auth
, format
="json", domain
=domain
,
148 callable_cls
=call_cls
,
149 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)