2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 class TwitterJSONIter(object):
17 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
18 self
.decoder
= json
.JSONDecoder()
21 self
.arg_data
= arg_data
24 self
.timeout
= timeout
25 self
.timer
= time
.time()
29 if sys
.version_info
>= (3, 0):
30 sock
= self
.handle
.fp
.raw
._sock
32 sock
= self
.handle
.fp
._sock
.fp
._sock
33 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
34 if not self
.block
or self
.timeout
:
35 sock
.setblocking(False)
38 utf8_buf
= self
.buf
.decode('utf8').lstrip()
39 if utf8_buf
and utf8_buf
[0] != '{': # Remove the hex delimiter length and extra whitespace.
40 utf8_buf
= utf8_buf
.lstrip('0123456789abcdefABCDEF')
41 utf8_buf
= utf8_buf
.lstrip()
42 res
, ptr
= self
.decoder
.raw_decode(utf8_buf
)
43 self
.buf
= utf8_buf
[ptr
:].encode('utf8')
44 yield wrap_response(res
, self
.handle
.headers
)
45 self
.timer
= time
.time()
47 except ValueError as e
:
52 except urllib_error
.HTTPError
as e
: # Probably unnecessary, no dynamic url calls in the try block.
53 raise TwitterHTTPError(e
, self
.uri
, 'json', self
.arg_data
)
54 # this is a non-blocking read (ie, it will return if any data is available)
57 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
59 self
.buf
+= sock
.recv(1024)
60 if time
.time() - self
.timer
> self
.timeout
:
61 yield {"timeout":True}
63 yield {"timeout":True}
65 self
.buf
+= sock
.recv(1024) # As tweets are typically longer than 1KB, consider increasing this size.
67 if (not self
.block
or self
.timeout
) and (e
.errno
== 2):
68 # Apparently this means there was nothing in the socket buf
72 except urllib_error
.HTTPError
as e
:
73 raise TwitterHTTPError(e
, self
.uri
, 'json', self
.arg_data
)
75 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
76 handle
= urllib_request
.urlopen(req
,)
77 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
79 class TwitterStreamCallWithTimeout(TwitterCall
):
80 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
81 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
83 class TwitterStreamCall(TwitterCall
):
84 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
85 return handle_stream_response(req
, uri
, arg_data
, block
=True)
87 class TwitterStreamCallNonBlocking(TwitterCall
):
88 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
89 return handle_stream_response(req
, uri
, arg_data
, block
=False)
91 class TwitterStream(TwitterStreamCall
):
93 The TwitterStream object is an interface to the Twitter Stream API
94 (stream.twitter.com). This can be used pretty much the same as the
95 Twitter class except the result of calling a method will be an
96 iterator that yields objects decoded from the stream. For
99 twitter_stream = TwitterStream(auth=OAuth(...))
100 iterator = twitter_stream.statuses.sample()
102 for tweet in iterator:
103 ...do something with this tweet...
105 The iterator will yield tweets forever and ever (until the stream
106 breaks at which point it raises a TwitterHTTPError.)
108 The `block` parameter controls if the stream is blocking. Default
109 is blocking (True). When set to False, the iterator will
110 occasionally yield None when there is no available message.
113 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
114 api_version
='1.1', block
=True, timeout
=None):
116 uriparts
+= (str(api_version
),)
120 call_cls
= TwitterStreamCallWithTimeout
122 call_cls
= TwitterStreamCall
124 call_cls
= TwitterStreamCallNonBlocking
126 TwitterStreamCall
.__init
__(
127 self
, auth
=auth
, format
="json", domain
=domain
,
128 callable_cls
=call_cls
,
129 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)