2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 def recv_chunk(sock
): # -> bytearray:
17 buf
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf
= buf
.find(b
'\r\n') # Find the HTTP chunk size.
20 if crlf
> 0: # If there is a length, then process it
22 remaining
= int(buf
[:crlf
], 16) # Decode the chunk size.
24 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
25 end
= len(buf
) - start
27 chunk
= bytearray(remaining
)
30 chunk
[:end
] = buf
[start
:]
31 chunk
[end
:] = sock
.recv(remaining
- end
)
32 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
33 else: # E.g. an HTTP chunk with just a keep-alive delimiter.
34 chunk
[:remaining
] = buf
[start
:start
+ remaining
]
43 class TwitterJSONIter(object):
45 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
48 self
.arg_data
= arg_data
50 self
.timeout
= timeout
54 sock
= self
.handle
.fp
.raw
._sock
if sys
.version_info
>= (3, 0) else self
.handle
.fp
._sock
.fp
._sock
55 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
56 sock
.setblocking(self
.block
and not self
.timeout
)
58 json_decoder
= json
.JSONDecoder()
63 res
, ptr
= json_decoder
.raw_decode(buf
)
65 yield wrap_response(res
, self
.handle
.headers
)
68 except ValueError as e
:
72 buf
= buf
.lstrip() # Remove any keep-alive delimiters to detect hangups.
74 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
76 buf
+= recv_chunk(sock
).decode('utf-8') # This is a non-blocking read.
77 if time
.time() - timer
> self
.timeout
:
78 yield {'timeout': True}
80 yield {'timeout': True}
82 buf
+= recv_chunk(sock
).decode('utf-8')
83 if not buf
and self
.block
:
84 yield {'hangup': True}
86 # Error from a non-blocking read of an empty buffer.
87 if (not self
.block
or self
.timeout
) and (e
.errno
== 2): pass
90 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
92 handle
= urllib_request
.urlopen(req
,)
93 except urllib_error
.HTTPError
as e
:
94 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
95 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
97 class TwitterStreamCallWithTimeout(TwitterCall
):
98 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
99 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
101 class TwitterStreamCall(TwitterCall
):
102 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
103 return handle_stream_response(req
, uri
, arg_data
, block
=True)
105 class TwitterStreamCallNonBlocking(TwitterCall
):
106 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
107 return handle_stream_response(req
, uri
, arg_data
, block
=False)
109 class TwitterStream(TwitterStreamCall
):
111 The TwitterStream object is an interface to the Twitter Stream API
112 (stream.twitter.com). This can be used pretty much the same as the
113 Twitter class except the result of calling a method will be an
114 iterator that yields objects decoded from the stream. For
117 twitter_stream = TwitterStream(auth=OAuth(...))
118 iterator = twitter_stream.statuses.sample()
120 for tweet in iterator:
121 ...do something with this tweet...
123 The iterator will yield tweets forever and ever (until the stream
124 breaks at which point it raises a TwitterHTTPError.)
126 The `block` parameter controls if the stream is blocking. Default
127 is blocking (True). When set to False, the iterator will
128 occasionally yield None when there is no available message.
131 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
132 api_version
='1.1', block
=True, timeout
=None):
134 uriparts
+= (str(api_version
),)
138 call_cls
= TwitterStreamCallWithTimeout
140 call_cls
= TwitterStreamCall
142 call_cls
= TwitterStreamCallNonBlocking
144 TwitterStreamCall
.__init
__(
145 self
, auth
=auth
, format
="json", domain
=domain
,
146 callable_cls
=call_cls
,
147 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)