]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Let's use the stdlib io system for great sanity.
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 except ImportError:
5 import urllib2 as urllib_request
6 import urllib2 as urllib_error
7 import io
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 CRLF = b'\r\n'
16
17 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
18
19 Timeout = {'timeout': True}
20 Hangup = {'hangup': True}
21 HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
22
23 class ChunkDecodeError(Exception):
24 pass
25
26 class EndOfStream(Exception):
27 pass
28
29 class SocketShim(io.IOBase):
30 """
31 Adapts a raw socket to fit the IO protocol.
32 """
33 def __init__(self, sock):
34 self.sock = sock
35 def readable(self):
36 return True
37 def read(self, size):
38 return self.sock.read(size)
39 def readinto(self, buf):
40 return self.sock.recv_into(buf)
41
42 def recv_chunk(reader): # -> bytearray:
43 for headerlen in xrange(12):
44 header = reader.peek(headerlen)[:headerlen]
45 if header.endswith(CRLF):
46 break
47 else:
48 raise ChunkDecodeError()
49
50 size = int(header, 16) # Decode the chunk size
51 reader.read(headerlen) # Ditch the header
52
53 if size == 0:
54 raise EndOfStream()
55
56 chunk = bytearray()
57 while len(chunk) < size:
58 remainder = size - len(chunk)
59 chunk.extend(reader.read(remainder))
60
61 reader.read(2) # Ditch remaining CRLF
62
63 return chunk
64
65
66 class Timer(object):
67 def __init__(self, timeout):
68 # If timeout is None, we never expire.
69 self.timeout = timeout
70 self.reset()
71
72 def reset(self):
73 self.time = time.time()
74
75 def expired(self):
76 """
77 If expired, reset the timer and return True.
78 """
79 if self.timeout is None:
80 return False
81 elif time.time() - self.time > self.timeout:
82 self.reset()
83 return True
84 return False
85
86
87 class TwitterJSONIter(object):
88
89 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
90 self.handle = handle
91 self.uri = uri
92 self.arg_data = arg_data
93 self.block = block
94 self.timeout = float(timeout) if timeout else None
95 self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
96
97
98 def __iter__(self):
99 actually_block = self.block and not self.timeout
100 sock_timeout = min(self.timeout, self.heartbeat_timeout) if actually_block else None
101 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
102 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
103 sock.setblocking(actually_block)
104 reader = io.BufferedReader(SocketShim(sock))
105 buf = ''
106 raw_decode = json.JSONDecoder().raw_decode
107 timer = Timer(self.timeout)
108 heartbeat_timer = Timer(self.heartbeat_timeout)
109 while True:
110 buf = buf.lstrip() # Remove any keep-alive delimiters
111 try:
112 res, ptr = raw_decode(buf)
113 buf = buf[ptr:]
114 except ValueError:
115 if not self.block and not self.timeout:
116 yield None
117 else:
118 yield wrap_response(res, self.handle.headers)
119 timer.reset()
120 heartbeat_timer.reset()
121 continue
122
123 if heartbeat_timer.expired():
124 yield HeartbeatTimeout
125 break
126 if timer.expired():
127 yield Timeout
128
129 try:
130 if sock_timeout:
131 ready_to_read = select.select([sock], [], [], sock_timeout)[0]
132 if not ready_to_read:
133 continue
134 received = recv_chunk(reader)
135 buf += received.decode('utf-8')
136 if received:
137 heartbeat_timer.reset()
138 except (ChunkDecodeError, EndOfStream):
139 yield Hangup
140 break
141 except SSLError as e:
142 # Code 2 is error from a non-blocking read of an empty buffer.
143 if e.errno != 2:
144 raise
145
146 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
147 try:
148 handle = urllib_request.urlopen(req,)
149 except urllib_error.HTTPError as e:
150 raise TwitterHTTPError(e, uri, 'json', arg_data)
151 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
152
153 class TwitterStream(TwitterCall):
154 """
155 The TwitterStream object is an interface to the Twitter Stream
156 API. This can be used pretty much the same as the Twitter class
157 except the result of calling a method will be an iterator that
158 yields objects decoded from the stream. For example::
159
160 twitter_stream = TwitterStream(auth=OAuth(...))
161 iterator = twitter_stream.statuses.sample()
162
163 for tweet in iterator:
164 ...do something with this tweet...
165
166 The iterator will yield until the TCP connection breaks. When the
167 connection breaks, the iterator yields `{'hangup': True}`, and
168 raises `StopIteration` if iterated again.
169
170 Similarly, if the stream does not produce heartbeats for more than
171 90 seconds, the iterator yields `{'hangup': True,
172 'heartbeat_timeout': True}`, and raises `StopIteration` if
173 iterated again.
174
175 The `timeout` parameter controls the maximum time between
176 yields. If it is nonzero, then the iterator will yield either
177 stream data or `{'timeout': True}` within the timeout period. This
178 is useful if you want your program to do other stuff in between
179 waiting for tweets.
180
181 The `block` parameter sets the stream to be fully non-blocking. In
182 this mode, the iterator always yields immediately. It returns
183 stream data, or `None`. Note that `timeout` supercedes this
184 argument, so it should also be set `None` to use this mode.
185 """
186 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
187 api_version='1.1', block=True, timeout=None,
188 heartbeat_timeout=90.0):
189 uriparts = (str(api_version),)
190
191 class TwitterStreamCall(TwitterCall):
192 def _handle_response(self, req, uri, arg_data, _timeout=None):
193 return handle_stream_response(
194 req, uri, arg_data, block,
195 _timeout or timeout, heartbeat_timeout)
196
197 TwitterCall.__init__(
198 self, auth=auth, format="json", domain=domain,
199 callable_cls=TwitterStreamCall,
200 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)