]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Merge branch 'fix-stream' into pr-fix-stream
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
23dcd469
AD
15def recv_chunk(sock): # -> bytearray:
16
0d92536c
AD
17 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
26938000 19
443e409d 20 if crlf > 0: # If there is a length, then process it
26938000 21
0d92536c 22 remaining = int(buf[:crlf], 16) # Decode the chunk size.
26938000 23
0d92536c
AD
24 start = crlf + 2 # Add in the length of the header's CRLF pair.
25 end = len(buf) - start
26938000 26
0d92536c 27 chunk = bytearray(remaining)
0d92536c 28
3e782f63
AD
29 if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter.
30 chunk[:remaining] = buf[start:start + remaining]
31 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
32 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
a8880f9f
AD
33 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
34 # and eliminates the need to address them.
3e782f63 35 else: # There is more to read in the chunk.
443e409d
AD
36 chunk[:end] = buf[start:]
37 chunk[end:] = sock.recv(remaining - end)
38 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
0d92536c
AD
39
40 return chunk
28a8ef60 41
23dcd469 42 return bytearray()
26938000
AD
43
44## recv_chunk()
45
46
dd648a25
MV
47class TwitterJSONIter(object):
48
effd06bb 49 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 50 self.handle = handle
25ea832f
AD
51 self.uri = uri
52 self.arg_data = arg_data
2300838f 53 self.block = block
effd06bb 54 self.timeout = timeout
effd06bb 55
dd648a25
MV
56
57 def __iter__(self):
23dcd469 58 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
67ddbde4 59 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
443e409d 60 sock.setblocking(self.block and not self.timeout)
23dcd469
AD
61 buf = u''
62 json_decoder = json.JSONDecoder()
e28a1da9 63 timer = time.time()
dd648a25 64 while True:
dd648a25 65 try:
23dcd469
AD
66 buf = buf.lstrip()
67 res, ptr = json_decoder.raw_decode(buf)
68 buf = buf[ptr:]
dd648a25 69 yield wrap_response(res, self.handle.headers)
e28a1da9 70 timer = time.time()
dd648a25 71 continue
e2d171b6 72 except ValueError as e:
443e409d
AD
73 if self.block: pass
74 else: yield None
2300838f 75 try:
c20d1a80 76 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
0d92536c 77 if self.timeout:
effd06bb
DK
78 ready_to_read = select.select([sock], [], [], self.timeout)
79 if ready_to_read[0]:
0d92536c 80 buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
e28a1da9 81 if time.time() - timer > self.timeout:
28a8ef60 82 yield {'timeout': True}
3e782f63 83 else: yield {'timeout': True}
effd06bb 84 else:
23dcd469 85 buf += recv_chunk(sock).decode('utf-8')
28a8ef60
AD
86 if not buf and self.block:
87 yield {'hangup': True}
2300838f 88 except SSLError as e:
443e409d
AD
89 # Error from a non-blocking read of an empty buffer.
90 if (not self.block or self.timeout) and (e.errno == 2): pass
0d92536c 91 else: raise
2300838f 92
d488eec0 93def handle_stream_response(req, uri, arg_data, block, timeout=None):
0d92536c
AD
94 try:
95 handle = urllib_request.urlopen(req,)
96 except urllib_error.HTTPError as e:
97 raise TwitterHTTPError(e, uri, 'json', arg_data)
effd06bb
DK
98 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
99
100class TwitterStreamCallWithTimeout(TwitterCall):
101 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 102 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
103
104class TwitterStreamCall(TwitterCall):
9e19b5a9 105 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 106 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
107
108class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 109 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 110 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
111
112class TwitterStream(TwitterStreamCall):
24950891 113 """
a6a7f763
MV
114 The TwitterStream object is an interface to the Twitter Stream API
115 (stream.twitter.com). This can be used pretty much the same as the
116 Twitter class except the result of calling a method will be an
117 iterator that yields objects decoded from the stream. For
118 example::
24950891 119
14b7a6ee 120 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
121 iterator = twitter_stream.statuses.sample()
122
123 for tweet in iterator:
124 ...do something with this tweet...
125
126 The iterator will yield tweets forever and ever (until the stream
127 breaks at which point it raises a TwitterHTTPError.)
2300838f 128
8df7be82 129 The `block` parameter controls if the stream is blocking. Default
74150740
MV
130 is blocking (True). When set to False, the iterator will
131 occasionally yield None when there is no available message.
24950891 132 """
dd648a25 133 def __init__(
56d221bd 134 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 135 api_version='1.1', block=True, timeout=None):
dd648a25
MV
136 uriparts = ()
137 uriparts += (str(api_version),)
138
2300838f 139 if block:
effd06bb
DK
140 if timeout:
141 call_cls = TwitterStreamCallWithTimeout
142 else:
143 call_cls = TwitterStreamCall
2300838f
MV
144 else:
145 call_cls = TwitterStreamCallNonBlocking
146
dd648a25
MV
147 TwitterStreamCall.__init__(
148 self, auth=auth, format="json", domain=domain,
2300838f 149 callable_cls=call_cls,
86318060 150 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)