2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 def recv_chunk(sock
): # -> bytearray:
17 buf
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf
= buf
.find(b
'\r\n') # Find the HTTP chunk size.
20 if crlf
> 0: # If there is a length, then process it
22 remaining
= int(buf
[:crlf
], 16) # Decode the chunk size.
24 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
25 end
= len(buf
) - start
27 chunk
= bytearray(remaining
)
29 if remaining
<= 2: # E.g. an HTTP chunk with just a keep-alive delimiter.
30 chunk
[:remaining
] = buf
[start
:start
+ remaining
]
31 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
32 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
33 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
34 # and eliminates the need to address them.
35 else: # There is more to read in the chunk.
36 chunk
[:end
] = buf
[start
:]
37 chunk
[end
:] = sock
.recv(remaining
- end
)
38 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
47 class TwitterJSONIter(object):
49 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
52 self
.arg_data
= arg_data
54 self
.timeout
= timeout
58 sock
= self
.handle
.fp
.raw
._sock
if sys
.version_info
>= (3, 0) else self
.handle
.fp
._sock
.fp
._sock
59 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
60 sock
.setblocking(self
.block
and not self
.timeout
)
62 json_decoder
= json
.JSONDecoder()
67 res
, ptr
= json_decoder
.raw_decode(buf
)
69 yield wrap_response(res
, self
.handle
.headers
)
72 except ValueError as e
:
76 buf
= buf
.lstrip() # Remove any keep-alive delimiters to detect hangups.
78 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
80 buf
+= recv_chunk(sock
).decode('utf-8') # This is a non-blocking read.
81 if time
.time() - timer
> self
.timeout
:
82 yield {'timeout': True}
83 else: yield {'timeout': True}
85 buf
+= recv_chunk(sock
).decode('utf-8')
86 if not buf
and self
.block
:
87 yield {'hangup': True}
89 # Error from a non-blocking read of an empty buffer.
90 if (not self
.block
or self
.timeout
) and (e
.errno
== 2): pass
93 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
95 handle
= urllib_request
.urlopen(req
,)
96 except urllib_error
.HTTPError
as e
:
97 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
98 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
100 class TwitterStreamCallWithTimeout(TwitterCall
):
101 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
102 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
104 class TwitterStreamCall(TwitterCall
):
105 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
106 return handle_stream_response(req
, uri
, arg_data
, block
=True)
108 class TwitterStreamCallNonBlocking(TwitterCall
):
109 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
110 return handle_stream_response(req
, uri
, arg_data
, block
=False)
112 class TwitterStream(TwitterStreamCall
):
114 The TwitterStream object is an interface to the Twitter Stream API
115 (stream.twitter.com). This can be used pretty much the same as the
116 Twitter class except the result of calling a method will be an
117 iterator that yields objects decoded from the stream. For
120 twitter_stream = TwitterStream(auth=OAuth(...))
121 iterator = twitter_stream.statuses.sample()
123 for tweet in iterator:
124 ...do something with this tweet...
126 The iterator will yield tweets forever and ever (until the stream
127 breaks at which point it raises a TwitterHTTPError.)
129 The `block` parameter controls if the stream is blocking. Default
130 is blocking (True). When set to False, the iterator will
131 occasionally yield None when there is no available message.
134 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
135 api_version
='1.1', block
=True, timeout
=None):
137 uriparts
+= (str(api_version
),)
141 call_cls
= TwitterStreamCallWithTimeout
143 call_cls
= TwitterStreamCall
145 call_cls
= TwitterStreamCallNonBlocking
147 TwitterStreamCall
.__init
__(
148 self
, auth
=auth
, format
="json", domain
=domain
,
149 callable_cls
=call_cls
,
150 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)