]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Okay. It works now. Omg.
[z_archive/twitter.git] / twitter / stream.py
1 import sys
2 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
3
4 if PY_3_OR_HIGHER:
5 import urllib.request as urllib_request
6 import urllib.error as urllib_error
7 else:
8 import urllib2 as urllib_request
9 import urllib2 as urllib_error
10 import json
11 from ssl import SSLError
12 import socket
13 import io
14 import codecs
15 import sys, select, time
16
17 from .api import TwitterCall, wrap_response, TwitterHTTPError
18
19 CRLF = b'\r\n'
20
21 Timeout = {'timeout': True}
22 Hangup = {'hangup': True}
23 HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
24
25 class ChunkDecodeError(Exception):
26 pass
27
28 class EndOfStream(Exception):
29 pass
30
31 range = range if PY_3_OR_HIGHER else xrange
32
33
34 class HttpDeChunker(object):
35
36 def __init__(self):
37 self.buf = bytearray()
38
39 def extend(self, data):
40 self.buf.extend(data)
41
42 def read_chunks(self): # -> [bytearray]
43 chunks = []
44 buf = self.buf
45 while True:
46 header_end_pos = buf.find(CRLF)
47 if header_end_pos == -1:
48 break
49
50 header = buf[:header_end_pos]
51 data_start_pos = header_end_pos + 2
52 try:
53 chunk_len = int(header.decode('ascii'), 16)
54 except ValueError:
55 raise ChunkDecodeError()
56
57 if chunk_len == 0:
58 raise EndOfStream()
59
60 data_end_pos = data_start_pos + chunk_len
61
62 if len(buf) > data_end_pos + 2:
63 chunks.append(buf[data_start_pos:data_end_pos])
64 buf = buf[data_end_pos + 2:]
65 else:
66 break
67 self.buf = buf
68 return chunks
69
70
71 class JsonDeChunker(object):
72
73 def __init__(self):
74 self.buf = u""
75 self.raw_decode = json.JSONDecoder().raw_decode
76
77 def extend(self, data):
78 self.buf += data
79
80 def read_json_chunks(self):
81 chunks = []
82 buf = self.buf
83 while True:
84 try:
85 buf = buf.lstrip()
86 res, ptr = self.raw_decode(buf)
87 buf = buf[ptr:]
88 chunks.append(res)
89 except ValueError:
90 break
91 self.buf = buf
92 return chunks
93
94
95 class Timer(object):
96 def __init__(self, timeout):
97 # If timeout is None, we never expire.
98 self.timeout = timeout
99 self.reset()
100
101 def reset(self):
102 self.time = time.time()
103
104 def expired(self):
105 """
106 If expired, reset the timer and return True.
107 """
108 if self.timeout is None:
109 return False
110 elif time.time() - self.time > self.timeout:
111 self.reset()
112 return True
113 return False
114
115
116 class TwitterJSONIter(object):
117
118 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
119 self.handle = handle
120 self.uri = uri
121 self.arg_data = arg_data
122 self.block = block
123 self.timeout = float(timeout) if timeout else None
124 self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
125
126
127 def __iter__(self):
128 actually_block = self.block and not self.timeout
129 sock_timeout = min(self.timeout or 1000000, self.heartbeat_timeout)
130 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
131 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
132 sock.setblocking(actually_block)
133 headers = self.handle.headers
134 dechunker = HttpDeChunker()
135 utf8decoder = codecs.getincrementaldecoder("utf-8")()
136 json_dechunker = JsonDeChunker()
137 timer = Timer(self.timeout)
138 heartbeat_timer = Timer(self.heartbeat_timeout)
139 while True:
140 json_chunks = json_dechunker.read_json_chunks()
141 for json in json_chunks:
142 yield wrap_response(json, headers)
143 if json_chunks:
144 timer.reset()
145 heartbeat_timer.reset()
146
147 if not self.block and not self.timeout:
148 yield None
149 if heartbeat_timer.expired():
150 yield HeartbeatTimeout
151 break
152 if timer.expired():
153 yield Timeout
154
155 try:
156 ready_to_read = select.select([sock], [], [], sock_timeout)[0]
157 if not ready_to_read:
158 continue
159 data = sock.read()
160 except SSLError as e:
161 # Code 2 is error from a non-blocking read of an empty buffer.
162 if e.errno != 2:
163 raise
164 continue
165
166 dechunker.extend(data)
167
168 try:
169 chunks = dechunker.read_chunks()
170 except (ChunkDecodeError, EndOfStream):
171 yield Hangup
172 break
173
174 for chunk in chunks:
175 if chunk:
176 json_dechunker.extend(utf8decoder.decode(chunk))
177 if chunks:
178 heartbeat_timer.reset()
179
180 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
181 try:
182 handle = urllib_request.urlopen(req,)
183 except urllib_error.HTTPError as e:
184 raise TwitterHTTPError(e, uri, 'json', arg_data)
185 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
186
187 class TwitterStream(TwitterCall):
188 """
189 The TwitterStream object is an interface to the Twitter Stream
190 API. This can be used pretty much the same as the Twitter class
191 except the result of calling a method will be an iterator that
192 yields objects decoded from the stream. For example::
193
194 twitter_stream = TwitterStream(auth=OAuth(...))
195 iterator = twitter_stream.statuses.sample()
196
197 for tweet in iterator:
198 ...do something with this tweet...
199
200 The iterator will yield until the TCP connection breaks. When the
201 connection breaks, the iterator yields `{'hangup': True}`, and
202 raises `StopIteration` if iterated again.
203
204 Similarly, if the stream does not produce heartbeats for more than
205 90 seconds, the iterator yields `{'hangup': True,
206 'heartbeat_timeout': True}`, and raises `StopIteration` if
207 iterated again.
208
209 The `timeout` parameter controls the maximum time between
210 yields. If it is nonzero, then the iterator will yield either
211 stream data or `{'timeout': True}` within the timeout period. This
212 is useful if you want your program to do other stuff in between
213 waiting for tweets.
214
215 The `block` parameter sets the stream to be fully non-blocking. In
216 this mode, the iterator always yields immediately. It returns
217 stream data, or `None`. Note that `timeout` supercedes this
218 argument, so it should also be set `None` to use this mode.
219 """
220 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
221 api_version='1.1', block=True, timeout=None,
222 heartbeat_timeout=90.0):
223 uriparts = (str(api_version),)
224
225 class TwitterStreamCall(TwitterCall):
226 def _handle_response(self, req, uri, arg_data, _timeout=None):
227 return handle_stream_response(
228 req, uri, arg_data, block,
229 _timeout or timeout, heartbeat_timeout)
230
231 TwitterCall.__init__(
232 self, auth=auth, format="json", domain=domain,
233 callable_cls=TwitterStreamCall,
234 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)