]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Further simplification in progress.
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 PY_27_OR_HIGHER = sys.version_info >= (2, 7)
16 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
17
18 Timeout = {'timeout': True}
19 Hangup = {'hangup': True}
20 HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
21
22 class ChunkDecodeError(Exception):
23 pass
24
25 def recv_chunk(sock): # -> bytearray:
26 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
27 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
28
29 if not crlf:
30 raise ChunkDecodeError()
31
32 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
33 chunk = bytearray(size)
34 start = crlf + 2 # Add in the length of the header's CRLF pair.
35
36 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
37 chunk[:size] = header[start:start + size]
38 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
39 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
40 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
41 # and eliminates the need to address them.
42 else: # There is more to read in the chunk.
43 end = len(header) - start
44 chunk[:end] = header[start:]
45 if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer.
46 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
47 sock.recv_into(buffer)
48 else: # less efficient for python2.6 compatibility
49 chunk[end:] = sock.recv(max(0, size - end))
50 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
51
52 return chunk
53
54
55 class Timer(object):
56 def __init__(self, timeout):
57 # If timeout is None, we never expire.
58 self.timeout = timeout
59 self.reset()
60
61 def reset(self):
62 self.time = time.time()
63
64 def expired(self):
65 """
66 If expired, reset the timer and return True.
67 """
68 if self.timeout is None:
69 return False
70 elif time.time() - self.time > self.timeout:
71 self.reset()
72 return True
73 return False
74
75
76 class TwitterJSONIter(object):
77
78 def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
79 self.handle = handle
80 self.uri = uri
81 self.arg_data = arg_data
82 self.block = block
83 self.timeout = float(timeout) if timeout else None
84 self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
85
86
87 def __iter__(self):
88 actually_block = self.block and not self.timeout
89 sock_timeout = min(self.timeout, self.heartbeat_timeout) if actually_block else None
90 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
91 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
92 sock.setblocking(actually_block)
93 buf = ''
94 raw_decode = json.JSONDecoder().raw_decode
95 timer = Timer(self.timeout)
96 heartbeat_timer = Timer(self.heartbeat_timeout)
97 while True:
98 buf = buf.lstrip() # Remove any keep-alive delimiters
99 try:
100 res, ptr = raw_decode(buf)
101 buf = buf[ptr:]
102 except ValueError:
103 if not self.block and not self.timeout:
104 yield None
105 else:
106 yield wrap_response(res, self.handle.headers)
107 timer.reset()
108 heartbeat_timer.reset()
109 continue
110
111 if heartbeat_timer.expired():
112 yield HeartbeatTimeout
113 break
114 if timer.expired():
115 yield Timeout
116
117 try:
118 if not buf and sock_timeout:
119 ready_to_read = select.select([sock], [], [], sock_timeout)[0]
120 if not ready_to_read:
121 continue
122 buf += recv_chunk(sock).decode('utf-8')
123 if not buf:
124 yield Hangup
125 break
126 heartbeat_timer.reset()
127 except SSLError as e:
128 # Code 2 is error from a non-blocking read of an empty buffer.
129 if e.errno != 2:
130 raise
131
132 def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
133 try:
134 handle = urllib_request.urlopen(req,)
135 except urllib_error.HTTPError as e:
136 raise TwitterHTTPError(e, uri, 'json', arg_data)
137 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
138
139 class TwitterStream(TwitterCall):
140 """
141 The TwitterStream object is an interface to the Twitter Stream
142 API. This can be used pretty much the same as the Twitter class
143 except the result of calling a method will be an iterator that
144 yields objects decoded from the stream. For example::
145
146 twitter_stream = TwitterStream(auth=OAuth(...))
147 iterator = twitter_stream.statuses.sample()
148
149 for tweet in iterator:
150 ...do something with this tweet...
151
152 The iterator will yield until the TCP connection breaks. When the
153 connection breaks, the iterator yields `{'hangup': True}`, and
154 raises `StopIteration` if iterated again.
155
156 Similarly, if the stream does not produce heartbeats for more than
157 90 seconds, the iterator yields `{'hangup': True,
158 'heartbeat_timeout': True}`, and raises `StopIteration` if
159 iterated again.
160
161 The `timeout` parameter controls the maximum time between
162 yields. If it is nonzero, then the iterator will yield either
163 stream data or `{'timeout': True}` within the timeout period. This
164 is useful if you want your program to do other stuff in between
165 waiting for tweets.
166
167 The `block` parameter sets the stream to be fully non-blocking. In
168 this mode, the iterator always yields immediately. It returns
169 stream data, or `None`. Note that `timeout` supercedes this
170 argument, so it should also be set `None` to use this mode.
171 """
172 def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
173 api_version='1.1', block=True, timeout=None,
174 heartbeat_timeout=90.0):
175 uriparts = (str(api_version),)
176
177 class TwitterStreamCall(TwitterCall):
178 def _handle_response(self, req, uri, arg_data, _timeout=None):
179 return handle_stream_response(
180 req, uri, arg_data, block,
181 _timeout or timeout, heartbeat_timeout)
182
183 TwitterCall.__init__(
184 self, auth=auth, format="json", domain=domain,
185 callable_cls=TwitterStreamCall,
186 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)