]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Restore Python 2.6 compatibility by restoring the old version of recv_chunk, only...
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 def recv_chunk_old(sock): # -> bytearray:
16 """
17 Compatible with Python 2.6, but less efficient.
18 """
19 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
20 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
21
22 if crlf > 0: # If there is a length, then process it
23
24 remaining = int(buf[:crlf], 16) # Decode the chunk size.
25
26 start = crlf + 2 # Add in the length of the header's CRLF pair.
27 end = len(buf) - start
28
29 chunk = bytearray(remaining)
30
31 if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
32 chunk[:remaining] = buf[start:start + remaining]
33 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
34 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
35 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
36 # and eliminates the need to address them.
37 else: # There is more to read in the chunk.
38 chunk[:end] = buf[start:]
39 chunk[end:] = sock.recv(remaining - end)
40 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
41
42 return chunk
43
44 return bytearray()
45
46 ## recv_chunk_old()
47
48 def recv_chunk_new(sock): # -> bytearray:
49 """
50 Compatible with Python 2.7+.
51 """
52 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
53 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
54
55 if crlf > 0: # If there is a length, then process it
56
57 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
58 chunk = bytearray(size)
59 start = crlf + 2 # Add in the length of the header's CRLF pair.
60
61 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
62 chunk[:size] = header[start:start + size]
63 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
64 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
65 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
66 # and eliminates the need to address them.
67 else: # There is more to read in the chunk.
68 end = len(header) - start
69 chunk[:end] = header[start:]
70 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
71 sock.recv_into(buffer)
72 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
73
74 return chunk
75
76 return bytearray()
77
78 ## recv_chunk_new()
79
80 if (sys.version_info.major, sys.version_info.minor) >= (2, 7):
81 recv_chunk = recv_chunk_new
82 else:
83 recv_chunk = recv_chunk_old
84
85 class TwitterJSONIter(object):
86
87 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
88 self.handle = handle
89 self.uri = uri
90 self.arg_data = arg_data
91 self.block = block
92 self.timeout = timeout
93
94
95 def __iter__(self):
96 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
98 sock.setblocking(self.block and not self.timeout)
99 buf = ''
100 json_decoder = json.JSONDecoder()
101 timer = time.time()
102 while True:
103 try:
104 buf = buf.lstrip()
105 res, ptr = json_decoder.raw_decode(buf)
106 buf = buf[ptr:]
107 yield wrap_response(res, self.handle.headers)
108 timer = time.time()
109 continue
110 except ValueError as e:
111 if self.block: pass
112 else: yield None
113 try:
114 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
115 if self.timeout:
116 ready_to_read = select.select([sock], [], [], self.timeout)
117 if ready_to_read[0]:
118 buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
119 if time.time() - timer > self.timeout:
120 yield {'timeout': True}
121 else: yield {'timeout': True}
122 else:
123 buf += recv_chunk(sock).decode('utf-8')
124 if not buf and self.block:
125 yield {'hangup': True}
126 break
127 except SSLError as e:
128 # Error from a non-blocking read of an empty buffer.
129 if (not self.block or self.timeout) and (e.errno == 2): pass
130 else: raise
131
132 def handle_stream_response(req, uri, arg_data, block, timeout=None):
133 try:
134 handle = urllib_request.urlopen(req,)
135 except urllib_error.HTTPError as e:
136 raise TwitterHTTPError(e, uri, 'json', arg_data)
137 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
138
139 class TwitterStreamCallWithTimeout(TwitterCall):
140 def _handle_response(self, req, uri, arg_data, _timeout=None):
141 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
142
143 class TwitterStreamCall(TwitterCall):
144 def _handle_response(self, req, uri, arg_data, _timeout=None):
145 return handle_stream_response(req, uri, arg_data, block=True)
146
147 class TwitterStreamCallNonBlocking(TwitterCall):
148 def _handle_response(self, req, uri, arg_data, _timeout=None):
149 return handle_stream_response(req, uri, arg_data, block=False)
150
151 class TwitterStream(TwitterStreamCall):
152 """
153 The TwitterStream object is an interface to the Twitter Stream API
154 (stream.twitter.com). This can be used pretty much the same as the
155 Twitter class except the result of calling a method will be an
156 iterator that yields objects decoded from the stream. For
157 example::
158
159 twitter_stream = TwitterStream(auth=OAuth(...))
160 iterator = twitter_stream.statuses.sample()
161
162 for tweet in iterator:
163 ...do something with this tweet...
164
165 The iterator will yield tweets forever and ever (until the stream
166 breaks at which point it raises a TwitterHTTPError.)
167
168 The `block` parameter controls if the stream is blocking. Default
169 is blocking (True). When set to False, the iterator will
170 occasionally yield None when there is no available message.
171 """
172 def __init__(
173 self, domain="stream.twitter.com", secure=True, auth=None,
174 api_version='1.1', block=True, timeout=None):
175 uriparts = ()
176 uriparts += (str(api_version),)
177
178 if block:
179 if timeout:
180 call_cls = TwitterStreamCallWithTimeout
181 else:
182 call_cls = TwitterStreamCall
183 else:
184 call_cls = TwitterStreamCallNonBlocking
185
186 TwitterStreamCall.__init__(
187 self, auth=auth, format="json", domain=domain,
188 callable_cls=call_cls,
189 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)