2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
16 buf
= sock
.recv(10) # Scan for an up to a 4GiB chunk size (0xffffffff).
18 crlf
= buf
.find(b
'\r\n') # Find the HTTP chunk size.
20 remaining
= int(buf
[:crlf
], 16) # Decode the chunk size.
21 chunk
= bytearray(remaining
) # Create the chunk buffer.
23 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
24 end
= len(buf
) - start
26 chunk
[:end
] = buf
[start
:]
27 chunk
[end
:] = sock
.recv(remaining
- end
)
29 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
37 class TwitterJSONIter(object):
39 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
40 self
.decoder
= json
.JSONDecoder()
43 self
.arg_data
= arg_data
46 self
.timeout
= timeout
47 self
.timer
= time
.time()
51 if sys
.version_info
>= (3, 0):
52 sock
= self
.handle
.fp
.raw
._sock
54 sock
= self
.handle
.fp
._sock
.fp
._sock
55 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
56 if not self
.block
or self
.timeout
:
57 sock
.setblocking(False)
60 utf8_buf
= self
.buf
.decode('utf8').lstrip()
61 res
, ptr
= self
.decoder
.raw_decode(utf8_buf
)
62 self
.buf
= utf8_buf
[ptr
:].encode('utf8')
63 yield wrap_response(res
, self
.handle
.headers
)
64 self
.timer
= time
.time()
66 except ValueError as e
:
71 # this is a non-blocking read (ie, it will return if any data is available)
74 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
76 self
.buf
+= recv_chunk(sock
)
77 if time
.time() - self
.timer
> self
.timeout
:
78 yield {"timeout":True}
80 yield {"timeout":True}
82 self
.buf
+= recv_chunk(sock
)
84 if (not self
.block
or self
.timeout
) and (e
.errno
== 2):
85 # Apparently this means there was nothing in the socket buf
89 except urllib_error
.HTTPError
as e
:
90 raise TwitterHTTPError(e
, self
.uri
, 'json', self
.arg_data
)
92 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
93 handle
= urllib_request
.urlopen(req
,)
94 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
96 class TwitterStreamCallWithTimeout(TwitterCall
):
97 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
98 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
100 class TwitterStreamCall(TwitterCall
):
101 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
102 return handle_stream_response(req
, uri
, arg_data
, block
=True)
104 class TwitterStreamCallNonBlocking(TwitterCall
):
105 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
106 return handle_stream_response(req
, uri
, arg_data
, block
=False)
108 class TwitterStream(TwitterStreamCall
):
110 The TwitterStream object is an interface to the Twitter Stream API
111 (stream.twitter.com). This can be used pretty much the same as the
112 Twitter class except the result of calling a method will be an
113 iterator that yields objects decoded from the stream. For
116 twitter_stream = TwitterStream(auth=OAuth(...))
117 iterator = twitter_stream.statuses.sample()
119 for tweet in iterator:
120 ...do something with this tweet...
122 The iterator will yield tweets forever and ever (until the stream
123 breaks at which point it raises a TwitterHTTPError.)
125 The `block` parameter controls if the stream is blocking. Default
126 is blocking (True). When set to False, the iterator will
127 occasionally yield None when there is no available message.
130 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
131 api_version
='1.1', block
=True, timeout
=None):
133 uriparts
+= (str(api_version
),)
137 call_cls
= TwitterStreamCallWithTimeout
139 call_cls
= TwitterStreamCall
141 call_cls
= TwitterStreamCallNonBlocking
143 TwitterStreamCall
.__init
__(
144 self
, auth
=auth
, format
="json", domain
=domain
,
145 callable_cls
=call_cls
,
146 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)