]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
hangup should happen also in noblock mode
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
6129a21e
MV
15def recv_chunk_old(sock): # -> bytearray:
16 """
17 Compatible with Python 2.6, but less efficient.
18 """
19 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
20 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
23dcd469 21
6129a21e
MV
22 if crlf > 0: # If there is a length, then process it
23
24 remaining = int(buf[:crlf], 16) # Decode the chunk size.
25
26 start = crlf + 2 # Add in the length of the header's CRLF pair.
27 end = len(buf) - start
28
29 chunk = bytearray(remaining)
30
31 if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
32 chunk[:remaining] = buf[start:start + remaining]
33 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
34 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
35 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
36 # and eliminates the need to address them.
37 else: # There is more to read in the chunk.
38 chunk[:end] = buf[start:]
702da802 39 chunk[end:] = sock.recv(max(0, remaining - end))
6129a21e
MV
40 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
41
42 return chunk
43
44 return bytearray()
45
46## recv_chunk_old()
47
48def recv_chunk_new(sock): # -> bytearray:
49 """
50 Compatible with Python 2.7+.
51 """
2aee87cc
AD
52 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
53 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
26938000 54
443e409d 55 if crlf > 0: # If there is a length, then process it
26938000 56
90ec2759 57 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
2aee87cc 58 chunk = bytearray(size)
0d92536c 59 start = crlf + 2 # Add in the length of the header's CRLF pair.
0d92536c 60
2aee87cc
AD
61 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
62 chunk[:size] = header[start:start + size]
63 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
3e782f63 64 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
a8880f9f
AD
65 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
66 # and eliminates the need to address them.
3e782f63 67 else: # There is more to read in the chunk.
2aee87cc
AD
68 end = len(header) - start
69 chunk[:end] = header[start:]
70 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
71 sock.recv_into(buffer)
443e409d 72 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
0d92536c
AD
73
74 return chunk
28a8ef60 75
23dcd469 76 return bytearray()
26938000 77
6129a21e 78## recv_chunk_new()
26938000 79
6129a21e
MV
80if (sys.version_info.major, sys.version_info.minor) >= (2, 7):
81 recv_chunk = recv_chunk_new
82else:
83 recv_chunk = recv_chunk_old
26938000 84
dd648a25
MV
85class TwitterJSONIter(object):
86
effd06bb 87 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 88 self.handle = handle
25ea832f
AD
89 self.uri = uri
90 self.arg_data = arg_data
2300838f 91 self.block = block
effd06bb 92 self.timeout = timeout
effd06bb 93
dd648a25
MV
94
95 def __iter__(self):
23dcd469 96 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
67ddbde4 97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
443e409d 98 sock.setblocking(self.block and not self.timeout)
e80f5491 99 buf = ''
23dcd469 100 json_decoder = json.JSONDecoder()
e28a1da9 101 timer = time.time()
dd648a25 102 while True:
dd648a25 103 try:
23dcd469
AD
104 buf = buf.lstrip()
105 res, ptr = json_decoder.raw_decode(buf)
106 buf = buf[ptr:]
dd648a25 107 yield wrap_response(res, self.handle.headers)
dd648a25 108 continue
e2d171b6 109 except ValueError as e:
829c4c79 110 if self.block and not self.timeout: pass
443e409d 111 else: yield None
2300838f 112 try:
c20d1a80 113 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
fd613260 114 if self.timeout and not buf: # This is a non-blocking read.
effd06bb 115 ready_to_read = select.select([sock], [], [], self.timeout)
fd613260
R
116 if not ready_to_read[0] and time.time() - timer > self.timeout:
117 yield {'timeout': True}
118 continue
119 timer = time.time()
120 buf += recv_chunk(sock).decode('utf-8')
b29e07db 121 if not buf:
28a8ef60 122 yield {'hangup': True}
e80f5491 123 break
2300838f 124 except SSLError as e:
443e409d
AD
125 # Error from a non-blocking read of an empty buffer.
126 if (not self.block or self.timeout) and (e.errno == 2): pass
0d92536c 127 else: raise
2300838f 128
d488eec0 129def handle_stream_response(req, uri, arg_data, block, timeout=None):
0d92536c
AD
130 try:
131 handle = urllib_request.urlopen(req,)
132 except urllib_error.HTTPError as e:
133 raise TwitterHTTPError(e, uri, 'json', arg_data)
effd06bb
DK
134 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
135
136class TwitterStreamCallWithTimeout(TwitterCall):
137 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 138 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
139
140class TwitterStreamCall(TwitterCall):
9e19b5a9 141 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 142 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
143
144class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 145 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 146 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
147
148class TwitterStream(TwitterStreamCall):
24950891 149 """
a6a7f763
MV
150 The TwitterStream object is an interface to the Twitter Stream API
151 (stream.twitter.com). This can be used pretty much the same as the
152 Twitter class except the result of calling a method will be an
153 iterator that yields objects decoded from the stream. For
154 example::
24950891 155
14b7a6ee 156 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
157 iterator = twitter_stream.statuses.sample()
158
159 for tweet in iterator:
160 ...do something with this tweet...
161
162 The iterator will yield tweets forever and ever (until the stream
163 breaks at which point it raises a TwitterHTTPError.)
2300838f 164
8df7be82 165 The `block` parameter controls if the stream is blocking. Default
74150740
MV
166 is blocking (True). When set to False, the iterator will
167 occasionally yield None when there is no available message.
24950891 168 """
dd648a25 169 def __init__(
56d221bd 170 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 171 api_version='1.1', block=True, timeout=None):
dd648a25
MV
172 uriparts = ()
173 uriparts += (str(api_version),)
174
2300838f 175 if block:
effd06bb
DK
176 if timeout:
177 call_cls = TwitterStreamCallWithTimeout
178 else:
179 call_cls = TwitterStreamCall
2300838f
MV
180 else:
181 call_cls = TwitterStreamCallNonBlocking
182
dd648a25
MV
183 TwitterStreamCall.__init__(
184 self, auth=auth, format="json", domain=domain,
2300838f 185 callable_cls=call_cls,
86318060 186 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)