2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 def recv_chunk(sock
): # -> bytearray:
17 timeout
= sock
.gettimeout(); sock
.setblocking(True) # Read the whole HTTP chunk.
18 buf
= sock
.recv(10) # Scan for an up to a 4GiB chunk size (0xffffffff).
20 crlf
= buf
.find(b
'\r\n') # Find the HTTP chunk size.
22 remaining
= int(buf
[:crlf
], 16) # Decode the chunk size.
23 chunk
= bytearray(remaining
) # Create the chunk buffer.
25 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
26 end
= len(buf
) - start
28 chunk
[:end
] = buf
[start
:]
29 chunk
[end
:] = sock
.recv(remaining
- end
)
31 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
32 sock
.settimeout(timeout
)
36 sock
.settimeout(timeout
)
42 class TwitterJSONIter(object):
44 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
47 self
.arg_data
= arg_data
49 self
.timeout
= timeout
53 sock
= self
.handle
.fp
.raw
._sock
if sys
.version_info
>= (3, 0) else self
.handle
.fp
._sock
.fp
._sock
54 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
55 sock
.setblocking(self
.block
and not self
.timeout
) # not (not self.block or self.timeout)
57 json_decoder
= json
.JSONDecoder()
62 res
, ptr
= json_decoder
.raw_decode(buf
)
64 yield wrap_response(res
, self
.handle
.headers
)
67 except ValueError as e
:
73 if self
.timeout
: # this is a non-blocking read (ie, it will return if any data is available)
75 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
77 buf
+= recv_chunk(sock
).decode('utf-8')
78 if time
.time() - timer
> self
.timeout
:
79 yield {'timeout': True}
81 yield {'timeout': True}
83 buf
+= recv_chunk(sock
).decode('utf-8')
84 if not buf
and self
.block
:
85 yield {'hangup': True}
87 if (not self
.block
or self
.timeout
) and (e
.errno
== 2):
88 # Apparently this means there was nothing in the socket buf
92 except urllib_error
.HTTPError
as e
:
93 raise TwitterHTTPError(e
, self
.uri
, 'json', self
.arg_data
)
95 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
96 handle
= urllib_request
.urlopen(req
,)
97 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
99 class TwitterStreamCallWithTimeout(TwitterCall
):
100 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
101 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
103 class TwitterStreamCall(TwitterCall
):
104 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
105 return handle_stream_response(req
, uri
, arg_data
, block
=True)
107 class TwitterStreamCallNonBlocking(TwitterCall
):
108 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
109 return handle_stream_response(req
, uri
, arg_data
, block
=False)
111 class TwitterStream(TwitterStreamCall
):
113 The TwitterStream object is an interface to the Twitter Stream API
114 (stream.twitter.com). This can be used pretty much the same as the
115 Twitter class except the result of calling a method will be an
116 iterator that yields objects decoded from the stream. For
119 twitter_stream = TwitterStream(auth=OAuth(...))
120 iterator = twitter_stream.statuses.sample()
122 for tweet in iterator:
123 ...do something with this tweet...
125 The iterator will yield tweets forever and ever (until the stream
126 breaks at which point it raises a TwitterHTTPError.)
128 The `block` parameter controls if the stream is blocking. Default
129 is blocking (True). When set to False, the iterator will
130 occasionally yield None when there is no available message.
133 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
134 api_version
='1.1', block
=True, timeout
=None):
136 uriparts
+= (str(api_version
),)
140 call_cls
= TwitterStreamCallWithTimeout
142 call_cls
= TwitterStreamCall
144 call_cls
= TwitterStreamCallNonBlocking
146 TwitterStreamCall
.__init
__(
147 self
, auth
=auth
, format
="json", domain
=domain
,
148 callable_cls
=call_cls
,
149 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)