2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 def recv_chunk_old(sock
): # -> bytearray:
17 Compatible with Python 2.6, but less efficient.
19 buf
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
20 crlf
= buf
.find(b
'\r\n') # Find the HTTP chunk size.
22 if crlf
> 0: # If there is a length, then process it
24 remaining
= int(buf
[:crlf
], 16) # Decode the chunk size.
26 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
27 end
= len(buf
) - start
29 chunk
= bytearray(remaining
)
31 if remaining
<= 2: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
32 chunk
[:remaining
] = buf
[start
:start
+ remaining
]
33 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
34 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
35 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
36 # and eliminates the need to address them.
37 else: # There is more to read in the chunk.
38 chunk
[:end
] = buf
[start
:]
39 chunk
[end
:] = sock
.recv(remaining
- end
)
40 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
48 def recv_chunk_new(sock
): # -> bytearray:
50 Compatible with Python 2.7+.
52 header
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
53 crlf
= header
.find(b
'\r\n') # Find the HTTP chunk size.
55 if crlf
> 0: # If there is a length, then process it
57 size
= int(header
[:crlf
], 16) # Decode the chunk size. Rarely exceeds 8KiB.
58 chunk
= bytearray(size
)
59 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
61 if size
<= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
62 chunk
[:size
] = header
[start
:start
+ size
]
63 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
64 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
65 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
66 # and eliminates the need to address them.
67 else: # There is more to read in the chunk.
68 end
= len(header
) - start
69 chunk
[:end
] = header
[start
:]
70 buffer = memoryview(chunk
)[end
:] # Create a view into the bytearray to hold the rest of the chunk.
71 sock
.recv_into(buffer)
72 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
80 if (sys
.version_info
.major
, sys
.version_info
.minor
) >= (2, 7):
81 recv_chunk
= recv_chunk_new
83 recv_chunk
= recv_chunk_old
85 class TwitterJSONIter(object):
87 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
90 self
.arg_data
= arg_data
92 self
.timeout
= timeout
96 sock
= self
.handle
.fp
.raw
._sock
if sys
.version_info
>= (3, 0) else self
.handle
.fp
._sock
.fp
._sock
97 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
98 sock
.setblocking(self
.block
and not self
.timeout
)
100 json_decoder
= json
.JSONDecoder()
105 res
, ptr
= json_decoder
.raw_decode(buf
)
107 yield wrap_response(res
, self
.handle
.headers
)
110 except ValueError as e
:
114 buf
= buf
.lstrip() # Remove any keep-alive delimiters to detect hangups.
116 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
118 buf
+= recv_chunk(sock
).decode('utf-8') # This is a non-blocking read.
119 if time
.time() - timer
> self
.timeout
:
120 yield {'timeout': True}
121 else: yield {'timeout': True}
123 buf
+= recv_chunk(sock
).decode('utf-8')
124 if not buf
and self
.block
:
125 yield {'hangup': True}
127 except SSLError
as e
:
128 # Error from a non-blocking read of an empty buffer.
129 if (not self
.block
or self
.timeout
) and (e
.errno
== 2): pass
132 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
134 handle
= urllib_request
.urlopen(req
,)
135 except urllib_error
.HTTPError
as e
:
136 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
137 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
139 class TwitterStreamCallWithTimeout(TwitterCall
):
140 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
141 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
143 class TwitterStreamCall(TwitterCall
):
144 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
145 return handle_stream_response(req
, uri
, arg_data
, block
=True)
147 class TwitterStreamCallNonBlocking(TwitterCall
):
148 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
149 return handle_stream_response(req
, uri
, arg_data
, block
=False)
151 class TwitterStream(TwitterStreamCall
):
153 The TwitterStream object is an interface to the Twitter Stream API
154 (stream.twitter.com). This can be used pretty much the same as the
155 Twitter class except the result of calling a method will be an
156 iterator that yields objects decoded from the stream. For
159 twitter_stream = TwitterStream(auth=OAuth(...))
160 iterator = twitter_stream.statuses.sample()
162 for tweet in iterator:
163 ...do something with this tweet...
165 The iterator will yield tweets forever and ever (until the stream
166 breaks at which point it raises a TwitterHTTPError.)
168 The `block` parameter controls if the stream is blocking. Default
169 is blocking (True). When set to False, the iterator will
170 occasionally yield None when there is no available message.
173 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
174 api_version
='1.1', block
=True, timeout
=None):
176 uriparts
+= (str(api_version
),)
180 call_cls
= TwitterStreamCallWithTimeout
182 call_cls
= TwitterStreamCall
184 call_cls
= TwitterStreamCallNonBlocking
186 TwitterStreamCall
.__init
__(
187 self
, auth
=auth
, format
="json", domain
=domain
,
188 callable_cls
=call_cls
,
189 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)