2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 class TwitterJSONIter(object):
17 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
18 self
.decoder
= json
.JSONDecoder()
21 self
.arg_data
= arg_data
24 self
.timeout
= timeout
25 self
.timer
= time
.time()
28 def recv_chunk(self
, sock
):
31 crlf
= buf
.find(b
'\r\n') # Find the HTTP chunk size.
33 remaining
= int(buf
[:crlf
].decode(), 16) # Decode the chunk size.
34 chunk
= bytearray(buf
[crlf
+ 2:]) # Create the chunk buffer.
35 remaining
-= len(chunk
)
38 balance
= sock
.recv(remaining
+ 2) # Add the length of the chunk's CRLF pair.
41 remaining
-= len(balance
)
42 # If possible, remove the trailing CRLF pair. (This precludes an extra trip through the JSON parser.)
43 if remaining
== -2 and chunk
[-2] == 0x0d and chunk
[-1] == 0x0a:
50 if sys
.version_info
>= (3, 0):
51 sock
= self
.handle
.fp
.raw
._sock
53 sock
= self
.handle
.fp
._sock
.fp
._sock
54 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
55 if not self
.block
or self
.timeout
:
56 sock
.setblocking(False)
59 utf8_buf
= self
.buf
.decode('utf8').lstrip()
60 res
, ptr
= self
.decoder
.raw_decode(utf8_buf
)
61 self
.buf
= utf8_buf
[ptr
:].encode('utf8')
62 yield wrap_response(res
, self
.handle
.headers
)
63 self
.timer
= time
.time()
65 except ValueError as e
:
70 # this is a non-blocking read (ie, it will return if any data is available)
73 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
75 self
.buf
+= self
.recv_chunk(sock
)
76 if time
.time() - self
.timer
> self
.timeout
:
77 yield {"timeout":True}
79 yield {"timeout":True}
81 self
.buf
+= self
.recv_chunk(sock
)
83 if (not self
.block
or self
.timeout
) and (e
.errno
== 2):
84 # Apparently this means there was nothing in the socket buf
88 except urllib_error
.HTTPError
as e
:
89 raise TwitterHTTPError(e
, self
.uri
, 'json', self
.arg_data
)
91 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
92 handle
= urllib_request
.urlopen(req
,)
93 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
95 class TwitterStreamCallWithTimeout(TwitterCall
):
96 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
97 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
99 class TwitterStreamCall(TwitterCall
):
100 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
101 return handle_stream_response(req
, uri
, arg_data
, block
=True)
103 class TwitterStreamCallNonBlocking(TwitterCall
):
104 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
105 return handle_stream_response(req
, uri
, arg_data
, block
=False)
107 class TwitterStream(TwitterStreamCall
):
109 The TwitterStream object is an interface to the Twitter Stream API
110 (stream.twitter.com). This can be used pretty much the same as the
111 Twitter class except the result of calling a method will be an
112 iterator that yields objects decoded from the stream. For
115 twitter_stream = TwitterStream(auth=OAuth(...))
116 iterator = twitter_stream.statuses.sample()
118 for tweet in iterator:
119 ...do something with this tweet...
121 The iterator will yield tweets forever and ever (until the stream
122 breaks at which point it raises a TwitterHTTPError.)
124 The `block` parameter controls if the stream is blocking. Default
125 is blocking (True). When set to False, the iterator will
126 occasionally yield None when there is no available message.
129 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
130 api_version
='1.1', block
=True, timeout
=None):
132 uriparts
+= (str(api_version
),)
136 call_cls
= TwitterStreamCallWithTimeout
138 call_cls
= TwitterStreamCall
140 call_cls
= TwitterStreamCallNonBlocking
142 TwitterStreamCall
.__init
__(
143 self
, auth
=auth
, format
="json", domain
=domain
,
144 callable_cls
=call_cls
,
145 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)