]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Directly read the balance of the HTTP chunk into the byte array. This eliminates...
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
23dcd469
AD
15def recv_chunk(sock): # -> bytearray:
16
2aee87cc
AD
17 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
26938000 19
443e409d 20 if crlf > 0: # If there is a length, then process it
26938000 21
2aee87cc
AD
22 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB in size.
23 chunk = bytearray(size)
0d92536c 24 start = crlf + 2 # Add in the length of the header's CRLF pair.
0d92536c 25
2aee87cc
AD
26 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
27 chunk[:size] = header[start:start + size]
28 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
3e782f63 29 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
a8880f9f
AD
30 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
31 # and eliminates the need to address them.
3e782f63 32 else: # There is more to read in the chunk.
2aee87cc
AD
33 end = len(header) - start
34 chunk[:end] = header[start:]
35 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
36 sock.recv_into(buffer)
443e409d 37 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
0d92536c
AD
38
39 return chunk
28a8ef60 40
23dcd469 41 return bytearray()
26938000
AD
42
43## recv_chunk()
44
45
dd648a25
MV
46class TwitterJSONIter(object):
47
effd06bb 48 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 49 self.handle = handle
25ea832f
AD
50 self.uri = uri
51 self.arg_data = arg_data
2300838f 52 self.block = block
effd06bb 53 self.timeout = timeout
effd06bb 54
dd648a25
MV
55
56 def __iter__(self):
23dcd469 57 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
67ddbde4 58 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
443e409d 59 sock.setblocking(self.block and not self.timeout)
e80f5491 60 buf = ''
23dcd469 61 json_decoder = json.JSONDecoder()
e28a1da9 62 timer = time.time()
dd648a25 63 while True:
dd648a25 64 try:
23dcd469
AD
65 buf = buf.lstrip()
66 res, ptr = json_decoder.raw_decode(buf)
67 buf = buf[ptr:]
dd648a25 68 yield wrap_response(res, self.handle.headers)
e28a1da9 69 timer = time.time()
dd648a25 70 continue
e2d171b6 71 except ValueError as e:
443e409d
AD
72 if self.block: pass
73 else: yield None
2300838f 74 try:
c20d1a80 75 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
0d92536c 76 if self.timeout:
effd06bb
DK
77 ready_to_read = select.select([sock], [], [], self.timeout)
78 if ready_to_read[0]:
0d92536c 79 buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
e28a1da9 80 if time.time() - timer > self.timeout:
28a8ef60 81 yield {'timeout': True}
3e782f63 82 else: yield {'timeout': True}
effd06bb 83 else:
23dcd469 84 buf += recv_chunk(sock).decode('utf-8')
28a8ef60
AD
85 if not buf and self.block:
86 yield {'hangup': True}
e80f5491 87 break
2300838f 88 except SSLError as e:
443e409d
AD
89 # Error from a non-blocking read of an empty buffer.
90 if (not self.block or self.timeout) and (e.errno == 2): pass
0d92536c 91 else: raise
2300838f 92
d488eec0 93def handle_stream_response(req, uri, arg_data, block, timeout=None):
0d92536c
AD
94 try:
95 handle = urllib_request.urlopen(req,)
96 except urllib_error.HTTPError as e:
97 raise TwitterHTTPError(e, uri, 'json', arg_data)
effd06bb
DK
98 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
99
100class TwitterStreamCallWithTimeout(TwitterCall):
101 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 102 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
103
104class TwitterStreamCall(TwitterCall):
9e19b5a9 105 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 106 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
107
108class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 109 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 110 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
111
112class TwitterStream(TwitterStreamCall):
24950891 113 """
a6a7f763
MV
114 The TwitterStream object is an interface to the Twitter Stream API
115 (stream.twitter.com). This can be used pretty much the same as the
116 Twitter class except the result of calling a method will be an
117 iterator that yields objects decoded from the stream. For
118 example::
24950891 119
14b7a6ee 120 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
121 iterator = twitter_stream.statuses.sample()
122
123 for tweet in iterator:
124 ...do something with this tweet...
125
126 The iterator will yield tweets forever and ever (until the stream
127 breaks at which point it raises a TwitterHTTPError.)
2300838f 128
8df7be82 129 The `block` parameter controls if the stream is blocking. Default
74150740
MV
130 is blocking (True). When set to False, the iterator will
131 occasionally yield None when there is no available message.
24950891 132 """
dd648a25 133 def __init__(
56d221bd 134 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 135 api_version='1.1', block=True, timeout=None):
dd648a25
MV
136 uriparts = ()
137 uriparts += (str(api_version),)
138
2300838f 139 if block:
effd06bb
DK
140 if timeout:
141 call_cls = TwitterStreamCallWithTimeout
142 else:
143 call_cls = TwitterStreamCall
2300838f
MV
144 else:
145 call_cls = TwitterStreamCallNonBlocking
146
dd648a25
MV
147 TwitterStreamCall.__init__(
148 self, auth=auth, format="json", domain=domain,
2300838f 149 callable_cls=call_cls,
86318060 150 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)