]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Further simplification in progress.
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
d3915c61
MV
15PY_27_OR_HIGHER = sys.version_info >= (2, 7)
16PY_3_OR_HIGHER = sys.version_info >= (3, 0)
17
18Timeout = {'timeout': True}
19Hangup = {'hangup': True}
03d8511c 20HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
d3915c61 21
dcece3a6
MV
22class ChunkDecodeError(Exception):
23 pass
23dcd469 24
dcece3a6 25def recv_chunk(sock): # -> bytearray:
2aee87cc
AD
26 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
27 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
26938000 28
dcece3a6
MV
29 if not crlf:
30 raise ChunkDecodeError()
26938000 31
dcece3a6
MV
32 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
33 chunk = bytearray(size)
34 start = crlf + 2 # Add in the length of the header's CRLF pair.
0d92536c 35
dcece3a6
MV
36 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
37 chunk[:size] = header[start:start + size]
38 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
39 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
40 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
41 # and eliminates the need to address them.
42 else: # There is more to read in the chunk.
43 end = len(header) - start
44 chunk[:end] = header[start:]
45 if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer.
46 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
47 sock.recv_into(buffer)
48 else: # less efficient for python2.6 compatibility
49 chunk[end:] = sock.recv(max(0, size - end))
50 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
0d92536c 51
dcece3a6 52 return chunk
26938000 53
26938000 54
d3915c61
MV
55class Timer(object):
56 def __init__(self, timeout):
03d8511c 57 # If timeout is None, we never expire.
d3915c61
MV
58 self.timeout = timeout
59 self.reset()
60
61 def reset(self):
62 self.time = time.time()
63
64 def expired(self):
42b9cdee
MV
65 """
66 If expired, reset the timer and return True.
67 """
d3915c61 68 if self.timeout is None:
03d8511c 69 return False
d3915c61
MV
70 elif time.time() - self.time > self.timeout:
71 self.reset()
72 return True
73 return False
74
75
dd648a25
MV
76class TwitterJSONIter(object):
77
03d8511c 78 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
dd648a25 79 self.handle = handle
25ea832f
AD
80 self.uri = uri
81 self.arg_data = arg_data
2300838f 82 self.block = block
03d8511c
MV
83 self.timeout = float(timeout) if timeout else None
84 self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
effd06bb 85
dd648a25
MV
86
87 def __iter__(self):
03d8511c
MV
88 actually_block = self.block and not self.timeout
89 sock_timeout = min(self.timeout, self.heartbeat_timeout) if actually_block else None
d3915c61 90 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
67ddbde4 91 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
03d8511c 92 sock.setblocking(actually_block)
e80f5491 93 buf = ''
03d8511c 94 raw_decode = json.JSONDecoder().raw_decode
d3915c61 95 timer = Timer(self.timeout)
03d8511c 96 heartbeat_timer = Timer(self.heartbeat_timeout)
dd648a25 97 while True:
03d8511c 98 buf = buf.lstrip() # Remove any keep-alive delimiters
dd648a25 99 try:
03d8511c 100 res, ptr = raw_decode(buf)
23dcd469 101 buf = buf[ptr:]
42b9cdee 102 except ValueError:
03d8511c
MV
103 if not self.block and not self.timeout:
104 yield None
42b9cdee 105 else:
dd648a25 106 yield wrap_response(res, self.handle.headers)
d3915c61 107 timer.reset()
03d8511c 108 heartbeat_timer.reset()
dd648a25 109 continue
03d8511c
MV
110
111 if heartbeat_timer.expired():
112 yield HeartbeatTimeout
113 break
114 if timer.expired():
115 yield Timeout
116
2300838f 117 try:
dcece3a6
MV
118 if not buf and sock_timeout:
119 ready_to_read = select.select([sock], [], [], sock_timeout)[0]
120 if not ready_to_read:
121 continue
122 buf += recv_chunk(sock).decode('utf-8')
b29e07db 123 if not buf:
d3915c61 124 yield Hangup
e80f5491 125 break
dcece3a6 126 heartbeat_timer.reset()
2300838f 127 except SSLError as e:
03d8511c
MV
128 # Code 2 is error from a non-blocking read of an empty buffer.
129 if e.errno != 2:
130 raise
2300838f 131
03d8511c 132def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
0d92536c
AD
133 try:
134 handle = urllib_request.urlopen(req,)
135 except urllib_error.HTTPError as e:
136 raise TwitterHTTPError(e, uri, 'json', arg_data)
03d8511c 137 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
effd06bb 138
d3915c61 139class TwitterStream(TwitterCall):
24950891 140 """
42b9cdee
MV
141 The TwitterStream object is an interface to the Twitter Stream
142 API. This can be used pretty much the same as the Twitter class
143 except the result of calling a method will be an iterator that
144 yields objects decoded from the stream. For example::
24950891 145
14b7a6ee 146 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
147 iterator = twitter_stream.statuses.sample()
148
149 for tweet in iterator:
150 ...do something with this tweet...
151
42b9cdee
MV
152 The iterator will yield until the TCP connection breaks. When the
153 connection breaks, the iterator yields `{'hangup': True}`, and
154 raises `StopIteration` if iterated again.
2300838f 155
03d8511c
MV
156 Similarly, if the stream does not produce heartbeats for more than
157 90 seconds, the iterator yields `{'hangup': True,
158 'heartbeat_timeout': True}`, and raises `StopIteration` if
159 iterated again.
160
42b9cdee
MV
161 The `timeout` parameter controls the maximum time between
162 yields. If it is nonzero, then the iterator will yield either
03d8511c
MV
163 stream data or `{'timeout': True}` within the timeout period. This
164 is useful if you want your program to do other stuff in between
165 waiting for tweets.
166
167 The `block` parameter sets the stream to be fully non-blocking. In
168 this mode, the iterator always yields immediately. It returns
169 stream data, or `None`. Note that `timeout` supercedes this
170 argument, so it should also be set `None` to use this mode.
24950891 171 """
03d8511c
MV
172 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
173 api_version='1.1', block=True, timeout=None,
174 heartbeat_timeout=90.0):
d3915c61 175 uriparts = (str(api_version),)
d3915c61
MV
176
177 class TwitterStreamCall(TwitterCall):
178 def _handle_response(self, req, uri, arg_data, _timeout=None):
179 return handle_stream_response(
03d8511c
MV
180 req, uri, arg_data, block,
181 _timeout or timeout, heartbeat_timeout)
d3915c61
MV
182
183 TwitterCall.__init__(
dd648a25 184 self, auth=auth, format="json", domain=domain,
d3915c61 185 callable_cls=TwitterStreamCall,
86318060 186 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)