2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 class TwitterJSONIter(object):
17 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
18 self
.decoder
= json
.JSONDecoder()
22 self
.timeout
= timeout
23 self
.timer
= time
.time()
27 if sys
.version_info
>= (3, 0):
28 sock
= self
.handle
.fp
.raw
._sock
30 sock
= self
.handle
.fp
._sock
.fp
._sock
31 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
32 if not self
.block
or self
.timeout
:
33 sock
.setblocking(False)
36 utf8_buf
= self
.buf
.decode('utf8').lstrip()
37 res
, ptr
= self
.decoder
.raw_decode(utf8_buf
)
38 self
.buf
= utf8_buf
[ptr
:].encode('utf8')
39 yield wrap_response(res
, self
.handle
.headers
)
40 self
.timer
= time
.time()
42 except ValueError as e
:
47 except urllib_error
.HTTPError
as e
:
48 raise TwitterHTTPError(e
, uri
, self
.format
, arg_data
)
49 # this is a non-blocking read (ie, it will return if any data is available)
52 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
54 self
.buf
+= sock
.recv(1024)
55 if time
.time() - self
.timer
> self
.timeout
:
56 yield {"timeout":True}
58 yield {"timeout":True}
60 self
.buf
+= sock
.recv(1024)
62 if (not self
.block
or self
.timeout
) and (e
.errno
== 2):
63 # Apparently this means there was nothing in the socket buf
68 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
69 handle
= urllib_request
.urlopen(req
,)
70 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
72 class TwitterStreamCallWithTimeout(TwitterCall
):
73 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
74 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
76 class TwitterStreamCall(TwitterCall
):
77 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
78 return handle_stream_response(req
, uri
, arg_data
, block
=True)
80 class TwitterStreamCallNonBlocking(TwitterCall
):
81 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
82 return handle_stream_response(req
, uri
, arg_data
, block
=False)
84 class TwitterStream(TwitterStreamCall
):
86 The TwitterStream object is an interface to the Twitter Stream API
87 (stream.twitter.com). This can be used pretty much the same as the
88 Twitter class except the result of calling a method will be an
89 iterator that yields objects decoded from the stream. For
92 twitter_stream = TwitterStream(auth=OAuth(...))
93 iterator = twitter_stream.statuses.sample()
95 for tweet in iterator:
96 ...do something with this tweet...
98 The iterator will yield tweets forever and ever (until the stream
99 breaks at which point it raises a TwitterHTTPError.)
101 The `block` parameter controls if the stream is blocking. Default
102 is blocking (True). When set to False, the iterator will
103 occasionally yield None when there is no available message.
106 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
107 api_version
='1.1', block
=True, timeout
=None):
109 uriparts
+= (str(api_version
),)
113 call_cls
= TwitterStreamCallWithTimeout
115 call_cls
= TwitterStreamCall
117 call_cls
= TwitterStreamCallNonBlocking
119 TwitterStreamCall
.__init
__(
120 self
, auth
=auth
, format
="json", domain
=domain
,
121 callable_cls
=call_cls
,
122 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
)