]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Merge pull request #205 from sixohsix/stream_cleanup
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 except ImportError:
5 import urllib2 as urllib_request
6 import urllib2 as urllib_error
7 import io
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
16
17 CRLF = b'\r\n'
18
19 Timeout = {'timeout': True}
20 Hangup = {'hangup': True}
21 HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
22
23 class ChunkDecodeError(Exception):
24 pass
25
26 class EndOfStream(Exception):
27 pass
28
29 range = range if PY_3_OR_HIGHER else xrange
30
31 class SocketShim(io.IOBase):
32 """
33 Adapts a raw socket to fit the IO protocol.
34 """
35 def __init__(self, sock):
36 self.sock = sock
37 def readable(self):
38 return True
39 def read(self, size):
40 return self.sock.read(size)
41 def readinto(self, buf):
42 return self.sock.recv_into(buf)
43
44 def recv_chunk(reader): # -> bytearray:
45 for headerlen in range(12):
46 header = reader.peek(headerlen)[:headerlen]
47 if header.endswith(CRLF):
48 break
49 else:
50 raise ChunkDecodeError()
51
52 size = int(header, 16) # Decode the chunk size
53 reader.read(headerlen) # Ditch the header
54
55 if size == 0:
56 raise EndOfStream()
57
58 chunk = bytearray()
59 while len(chunk) < size:
60 remainder = size - len(chunk)
61 chunk.extend(reader.read(remainder))
62
63 reader.read(2) # Ditch remaining CRLF
64
65 return chunk
66
67
68 class Timer(object):
69 def __init__(self, timeout):
70 # If timeout is None, we never expire.
71 self.timeout = timeout
72 self.reset()
73
74 def reset(self):
75 self.time = time.time()
76
77 def expired(self):
78 """
79 If expired, reset the timer and return True.
80 """
81 if self.timeout is None:
82 return False
83 elif time.time() - self.time > self.timeout:
84 self.reset()
85 return True
86 return False
87
88
89 class TwitterJSONIter(object):
90
91 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
92 self.handle = handle
93 self.uri = uri
94 self.arg_data = arg_data
95 self.block = block
96 self.timeout = float(timeout) if timeout else None
97 self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
98
99
100 def __iter__(self):
101 actually_block = self.block and not self.timeout
102 sock_timeout = min(self.timeout or 1000000, self.heartbeat_timeout) if actually_block else None
103 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
104 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
105 sock.setblocking(actually_block)
106 reader = io.BufferedReader(SocketShim(sock))
107 buf = ''
108 raw_decode = json.JSONDecoder().raw_decode
109 timer = Timer(self.timeout)
110 heartbeat_timer = Timer(self.heartbeat_timeout)
111 while True:
112 buf = buf.lstrip() # Remove any keep-alive delimiters
113 try:
114 res, ptr = raw_decode(buf)
115 buf = buf[ptr:]
116 except ValueError:
117 if not self.block and not self.timeout:
118 yield None
119 else:
120 yield wrap_response(res, self.handle.headers)
121 timer.reset()
122 heartbeat_timer.reset()
123 continue
124
125 if heartbeat_timer.expired():
126 yield HeartbeatTimeout
127 break
128 if timer.expired():
129 yield Timeout
130
131 try:
132 if sock_timeout:
133 ready_to_read = select.select([sock], [], [], sock_timeout)[0]
134 if not ready_to_read:
135 continue
136 received = recv_chunk(reader)
137 buf += received.decode('utf-8')
138 if received:
139 heartbeat_timer.reset()
140 except (ChunkDecodeError, EndOfStream):
141 yield Hangup
142 break
143 except SSLError as e:
144 # Code 2 is error from a non-blocking read of an empty buffer.
145 if e.errno != 2:
146 raise
147
148 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
149 try:
150 handle = urllib_request.urlopen(req,)
151 except urllib_error.HTTPError as e:
152 raise TwitterHTTPError(e, uri, 'json', arg_data)
153 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
154
155 class TwitterStream(TwitterCall):
156 """
157 The TwitterStream object is an interface to the Twitter Stream
158 API. This can be used pretty much the same as the Twitter class
159 except the result of calling a method will be an iterator that
160 yields objects decoded from the stream. For example::
161
162 twitter_stream = TwitterStream(auth=OAuth(...))
163 iterator = twitter_stream.statuses.sample()
164
165 for tweet in iterator:
166 ...do something with this tweet...
167
168 The iterator will yield until the TCP connection breaks. When the
169 connection breaks, the iterator yields `{'hangup': True}`, and
170 raises `StopIteration` if iterated again.
171
172 Similarly, if the stream does not produce heartbeats for more than
173 90 seconds, the iterator yields `{'hangup': True,
174 'heartbeat_timeout': True}`, and raises `StopIteration` if
175 iterated again.
176
177 The `timeout` parameter controls the maximum time between
178 yields. If it is nonzero, then the iterator will yield either
179 stream data or `{'timeout': True}` within the timeout period. This
180 is useful if you want your program to do other stuff in between
181 waiting for tweets.
182
183 The `block` parameter sets the stream to be fully non-blocking. In
184 this mode, the iterator always yields immediately. It returns
185 stream data, or `None`. Note that `timeout` supercedes this
186 argument, so it should also be set `None` to use this mode.
187 """
188 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
189 api_version='1.1', block=True, timeout=None,
190 heartbeat_timeout=90.0):
191 uriparts = (str(api_version),)
192
193 class TwitterStreamCall(TwitterCall):
194 def _handle_response(self, req, uri, arg_data, _timeout=None):
195 return handle_stream_response(
196 req, uri, arg_data, block,
197 _timeout or timeout, heartbeat_timeout)
198
199 TwitterCall.__init__(
200 self, auth=auth, format="json", domain=domain,
201 callable_cls=TwitterStreamCall,
202 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)