]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Version 1.14.0
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4except ImportError:
5 import urllib2 as urllib_request
6 import urllib2 as urllib_error
e84abb1c 7import io
dd648a25 8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
d3915c61
MV
15PY_3_OR_HIGHER = sys.version_info >= (3, 0)
16
f560656d
MV
17CRLF = b'\r\n'
18
d3915c61
MV
19Timeout = {'timeout': True}
20Hangup = {'hangup': True}
03d8511c 21HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
d3915c61 22
dcece3a6
MV
23class ChunkDecodeError(Exception):
24 pass
23dcd469 25
e84abb1c
MV
26class EndOfStream(Exception):
27 pass
26938000 28
f560656d
MV
29range = range if PY_3_OR_HIGHER else xrange
30
e84abb1c
MV
31class SocketShim(io.IOBase):
32 """
33 Adapts a raw socket to fit the IO protocol.
34 """
35 def __init__(self, sock):
36 self.sock = sock
37 def readable(self):
38 return True
39 def read(self, size):
40 return self.sock.read(size)
41 def readinto(self, buf):
42 return self.sock.recv_into(buf)
43
44def recv_chunk(reader): # -> bytearray:
f560656d 45 for headerlen in range(12):
e84abb1c
MV
46 header = reader.peek(headerlen)[:headerlen]
47 if header.endswith(CRLF):
48 break
49 else:
dcece3a6 50 raise ChunkDecodeError()
26938000 51
e84abb1c
MV
52 size = int(header, 16) # Decode the chunk size
53 reader.read(headerlen) # Ditch the header
54
55 if size == 0:
56 raise EndOfStream()
57
58 chunk = bytearray()
59 while len(chunk) < size:
60 remainder = size - len(chunk)
61 chunk.extend(reader.read(remainder))
62
63 reader.read(2) # Ditch remaining CRLF
0d92536c 64
dcece3a6 65 return chunk
26938000 66
26938000 67
d3915c61
MV
68class Timer(object):
69 def __init__(self, timeout):
03d8511c 70 # If timeout is None, we never expire.
d3915c61
MV
71 self.timeout = timeout
72 self.reset()
73
74 def reset(self):
75 self.time = time.time()
76
77 def expired(self):
42b9cdee
MV
78 """
79 If expired, reset the timer and return True.
80 """
d3915c61 81 if self.timeout is None:
03d8511c 82 return False
d3915c61
MV
83 elif time.time() - self.time > self.timeout:
84 self.reset()
85 return True
86 return False
87
88
dd648a25
MV
89class TwitterJSONIter(object):
90
03d8511c 91 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
dd648a25 92 self.handle = handle
25ea832f
AD
93 self.uri = uri
94 self.arg_data = arg_data
2300838f 95 self.block = block
03d8511c
MV
96 self.timeout = float(timeout) if timeout else None
97 self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
effd06bb 98
dd648a25
MV
99
100 def __iter__(self):
03d8511c 101 actually_block = self.block and not self.timeout
f560656d 102 sock_timeout = min(self.timeout or 1000000, self.heartbeat_timeout) if actually_block else None
d3915c61 103 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
67ddbde4 104 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
03d8511c 105 sock.setblocking(actually_block)
e84abb1c 106 reader = io.BufferedReader(SocketShim(sock))
e80f5491 107 buf = ''
03d8511c 108 raw_decode = json.JSONDecoder().raw_decode
d3915c61 109 timer = Timer(self.timeout)
03d8511c 110 heartbeat_timer = Timer(self.heartbeat_timeout)
dd648a25 111 while True:
03d8511c 112 buf = buf.lstrip() # Remove any keep-alive delimiters
dd648a25 113 try:
03d8511c 114 res, ptr = raw_decode(buf)
23dcd469 115 buf = buf[ptr:]
42b9cdee 116 except ValueError:
03d8511c
MV
117 if not self.block and not self.timeout:
118 yield None
42b9cdee 119 else:
dd648a25 120 yield wrap_response(res, self.handle.headers)
d3915c61 121 timer.reset()
03d8511c 122 heartbeat_timer.reset()
dd648a25 123 continue
03d8511c
MV
124
125 if heartbeat_timer.expired():
126 yield HeartbeatTimeout
127 break
128 if timer.expired():
129 yield Timeout
130
2300838f 131 try:
e84abb1c 132 if sock_timeout:
dcece3a6
MV
133 ready_to_read = select.select([sock], [], [], sock_timeout)[0]
134 if not ready_to_read:
135 continue
e84abb1c
MV
136 received = recv_chunk(reader)
137 buf += received.decode('utf-8')
138 if received:
139 heartbeat_timer.reset()
140 except (ChunkDecodeError, EndOfStream):
141 yield Hangup
142 break
2300838f 143 except SSLError as e:
03d8511c
MV
144 # Code 2 is error from a non-blocking read of an empty buffer.
145 if e.errno != 2:
146 raise
2300838f 147
03d8511c 148def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
0d92536c
AD
149 try:
150 handle = urllib_request.urlopen(req,)
151 except urllib_error.HTTPError as e:
152 raise TwitterHTTPError(e, uri, 'json', arg_data)
03d8511c 153 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
effd06bb 154
d3915c61 155class TwitterStream(TwitterCall):
24950891 156 """
42b9cdee
MV
157 The TwitterStream object is an interface to the Twitter Stream
158 API. This can be used pretty much the same as the Twitter class
159 except the result of calling a method will be an iterator that
160 yields objects decoded from the stream. For example::
24950891 161
14b7a6ee 162 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
163 iterator = twitter_stream.statuses.sample()
164
165 for tweet in iterator:
166 ...do something with this tweet...
167
42b9cdee
MV
168 The iterator will yield until the TCP connection breaks. When the
169 connection breaks, the iterator yields `{'hangup': True}`, and
170 raises `StopIteration` if iterated again.
2300838f 171
03d8511c
MV
172 Similarly, if the stream does not produce heartbeats for more than
173 90 seconds, the iterator yields `{'hangup': True,
174 'heartbeat_timeout': True}`, and raises `StopIteration` if
175 iterated again.
176
42b9cdee
MV
177 The `timeout` parameter controls the maximum time between
178 yields. If it is nonzero, then the iterator will yield either
03d8511c
MV
179 stream data or `{'timeout': True}` within the timeout period. This
180 is useful if you want your program to do other stuff in between
181 waiting for tweets.
182
183 The `block` parameter sets the stream to be fully non-blocking. In
184 this mode, the iterator always yields immediately. It returns
185 stream data, or `None`. Note that `timeout` supercedes this
186 argument, so it should also be set `None` to use this mode.
24950891 187 """
03d8511c
MV
188 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
189 api_version='1.1', block=True, timeout=None,
190 heartbeat_timeout=90.0):
d3915c61 191 uriparts = (str(api_version),)
d3915c61
MV
192
193 class TwitterStreamCall(TwitterCall):
194 def _handle_response(self, req, uri, arg_data, _timeout=None):
195 return handle_stream_response(
03d8511c
MV
196 req, uri, arg_data, block,
197 _timeout or timeout, heartbeat_timeout)
d3915c61
MV
198
199 TwitterCall.__init__(
dd648a25 200 self, auth=auth, format="json", domain=domain,
d3915c61 201 callable_cls=TwitterStreamCall,
86318060 202 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)