2 import urllib
.request
as urllib_request
3 import urllib
.error
as urllib_error
6 import urllib2
as urllib_request
7 import urllib2
as urllib_error
9 from ssl
import SSLError
11 import sys
, select
, time
13 from .api
import TwitterCall
, wrap_response
, TwitterHTTPError
15 def recv_chunk_old(sock
): # -> bytearray:
17 Compatible with Python 2.6, but less efficient.
19 buf
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
20 crlf
= buf
.find(b
'\r\n') # Find the HTTP chunk size.
22 if crlf
> 0: # If there is a length, then process it
24 remaining
= int(buf
[:crlf
], 16) # Decode the chunk size.
26 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
27 end
= len(buf
) - start
29 chunk
= bytearray(remaining
)
31 if remaining
<= 2: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
32 chunk
[:remaining
] = buf
[start
:start
+ remaining
]
33 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
34 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
35 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
36 # and eliminates the need to address them.
37 else: # There is more to read in the chunk.
38 chunk
[:end
] = buf
[start
:]
39 chunk
[end
:] = sock
.recv(max(0, remaining
- end
))
40 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
48 def recv_chunk_new(sock
): # -> bytearray:
50 Compatible with Python 2.7+.
52 header
= sock
.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
53 crlf
= header
.find(b
'\r\n') # Find the HTTP chunk size.
55 if crlf
> 0: # If there is a length, then process it
57 size
= int(header
[:crlf
], 16) # Decode the chunk size. Rarely exceeds 8KiB.
58 chunk
= bytearray(size
)
59 start
= crlf
+ 2 # Add in the length of the header's CRLF pair.
61 if size
<= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
62 chunk
[:size
] = header
[start
:start
+ size
]
63 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
64 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
65 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
66 # and eliminates the need to address them.
67 else: # There is more to read in the chunk.
68 end
= len(header
) - start
69 chunk
[:end
] = header
[start
:]
70 buffer = memoryview(chunk
)[end
:] # Create a view into the bytearray to hold the rest of the chunk.
71 sock
.recv_into(buffer)
72 sock
.recv(2) # Read the trailing CRLF pair. Throw it away.
80 if (sys
.version_info
.major
, sys
.version_info
.minor
) >= (2, 7):
81 recv_chunk
= recv_chunk_new
83 recv_chunk
= recv_chunk_old
85 class TwitterJSONIter(object):
87 def __init__(self
, handle
, uri
, arg_data
, block
=True, timeout
=None):
90 self
.arg_data
= arg_data
92 self
.timeout
= timeout
96 sock
= self
.handle
.fp
.raw
._sock
if sys
.version_info
>= (3, 0) else self
.handle
.fp
._sock
.fp
._sock
97 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_KEEPALIVE
, 1)
98 sock
.setblocking(self
.block
and not self
.timeout
)
100 json_decoder
= json
.JSONDecoder()
105 res
, ptr
= json_decoder
.raw_decode(buf
)
107 yield wrap_response(res
, self
.handle
.headers
)
109 except ValueError as e
:
110 if self
.block
and not self
.timeout
: pass
113 buf
= buf
.lstrip() # Remove any keep-alive delimiters to detect hangups.
114 if self
.timeout
and not buf
: # This is a non-blocking read.
115 ready_to_read
= select
.select([sock
], [], [], self
.timeout
)
116 if not ready_to_read
[0] and time
.time() - timer
> self
.timeout
:
117 yield {'timeout': True}
120 buf
+= recv_chunk(sock
).decode('utf-8')
122 yield {'hangup': True}
124 except SSLError
as e
:
125 # Error from a non-blocking read of an empty buffer.
126 if (not self
.block
or self
.timeout
) and (e
.errno
== 2): pass
129 def handle_stream_response(req
, uri
, arg_data
, block
, timeout
=None):
131 handle
= urllib_request
.urlopen(req
,)
132 except urllib_error
.HTTPError
as e
:
133 raise TwitterHTTPError(e
, uri
, 'json', arg_data
)
134 return iter(TwitterJSONIter(handle
, uri
, arg_data
, block
, timeout
=timeout
))
136 class TwitterStreamCallWithTimeout(TwitterCall
):
137 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
138 return handle_stream_response(req
, uri
, arg_data
, block
=True, timeout
=self
.timeout
)
140 class TwitterStreamCall(TwitterCall
):
141 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
142 return handle_stream_response(req
, uri
, arg_data
, block
=True)
144 class TwitterStreamCallNonBlocking(TwitterCall
):
145 def _handle_response(self
, req
, uri
, arg_data
, _timeout
=None):
146 return handle_stream_response(req
, uri
, arg_data
, block
=False)
148 class TwitterStream(TwitterStreamCall
):
150 The TwitterStream object is an interface to the Twitter Stream API
151 (stream.twitter.com). This can be used pretty much the same as the
152 Twitter class except the result of calling a method will be an
153 iterator that yields objects decoded from the stream. For
156 twitter_stream = TwitterStream(auth=OAuth(...))
157 iterator = twitter_stream.statuses.sample()
159 for tweet in iterator:
160 ...do something with this tweet...
162 The iterator will yield tweets forever and ever (until the stream
163 breaks at which point it raises a TwitterHTTPError.)
165 The `block` parameter controls if the stream is blocking. Default
166 is blocking (True). When set to False, the iterator will
167 occasionally yield None when there is no available message.
170 self
, domain
="stream.twitter.com", secure
=True, auth
=None,
171 api_version
='1.1', block
=True, timeout
=None):
173 uriparts
+= (str(api_version
),)
177 call_cls
= TwitterStreamCallWithTimeout
179 call_cls
= TwitterStreamCall
181 call_cls
= TwitterStreamCallNonBlocking
183 TwitterStreamCall
.__init
__(
184 self
, auth
=auth
, format
="json", domain
=domain
,
185 callable_cls
=call_cls
,
186 secure
=secure
, uriparts
=uriparts
, timeout
=timeout
, gzip
=False)