]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
hangup should happen also in noblock mode
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 def recv_chunk_old(sock): # -> bytearray:
16 """
17 Compatible with Python 2.6, but less efficient.
18 """
19 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
20 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
21
22 if crlf > 0: # If there is a length, then process it
23
24 remaining = int(buf[:crlf], 16) # Decode the chunk size.
25
26 start = crlf + 2 # Add in the length of the header's CRLF pair.
27 end = len(buf) - start
28
29 chunk = bytearray(remaining)
30
31 if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
32 chunk[:remaining] = buf[start:start + remaining]
33 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
34 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
35 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
36 # and eliminates the need to address them.
37 else: # There is more to read in the chunk.
38 chunk[:end] = buf[start:]
39 chunk[end:] = sock.recv(max(0, remaining - end))
40 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
41
42 return chunk
43
44 return bytearray()
45
46 ## recv_chunk_old()
47
48 def recv_chunk_new(sock): # -> bytearray:
49 """
50 Compatible with Python 2.7+.
51 """
52 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
53 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
54
55 if crlf > 0: # If there is a length, then process it
56
57 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
58 chunk = bytearray(size)
59 start = crlf + 2 # Add in the length of the header's CRLF pair.
60
61 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
62 chunk[:size] = header[start:start + size]
63 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
64 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
65 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
66 # and eliminates the need to address them.
67 else: # There is more to read in the chunk.
68 end = len(header) - start
69 chunk[:end] = header[start:]
70 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
71 sock.recv_into(buffer)
72 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
73
74 return chunk
75
76 return bytearray()
77
78 ## recv_chunk_new()
79
80 if (sys.version_info.major, sys.version_info.minor) >= (2, 7):
81 recv_chunk = recv_chunk_new
82 else:
83 recv_chunk = recv_chunk_old
84
85 class TwitterJSONIter(object):
86
87 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
88 self.handle = handle
89 self.uri = uri
90 self.arg_data = arg_data
91 self.block = block
92 self.timeout = timeout
93
94
95 def __iter__(self):
96 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
98 sock.setblocking(self.block and not self.timeout)
99 buf = ''
100 json_decoder = json.JSONDecoder()
101 timer = time.time()
102 while True:
103 try:
104 buf = buf.lstrip()
105 res, ptr = json_decoder.raw_decode(buf)
106 buf = buf[ptr:]
107 yield wrap_response(res, self.handle.headers)
108 continue
109 except ValueError as e:
110 if self.block and not self.timeout: pass
111 else: yield None
112 try:
113 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
114 if self.timeout and not buf: # This is a non-blocking read.
115 ready_to_read = select.select([sock], [], [], self.timeout)
116 if not ready_to_read[0] and time.time() - timer > self.timeout:
117 yield {'timeout': True}
118 continue
119 timer = time.time()
120 buf += recv_chunk(sock).decode('utf-8')
121 if not buf:
122 yield {'hangup': True}
123 break
124 except SSLError as e:
125 # Error from a non-blocking read of an empty buffer.
126 if (not self.block or self.timeout) and (e.errno == 2): pass
127 else: raise
128
129 def handle_stream_response(req, uri, arg_data, block, timeout=None):
130 try:
131 handle = urllib_request.urlopen(req,)
132 except urllib_error.HTTPError as e:
133 raise TwitterHTTPError(e, uri, 'json', arg_data)
134 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
135
136 class TwitterStreamCallWithTimeout(TwitterCall):
137 def _handle_response(self, req, uri, arg_data, _timeout=None):
138 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
139
140 class TwitterStreamCall(TwitterCall):
141 def _handle_response(self, req, uri, arg_data, _timeout=None):
142 return handle_stream_response(req, uri, arg_data, block=True)
143
144 class TwitterStreamCallNonBlocking(TwitterCall):
145 def _handle_response(self, req, uri, arg_data, _timeout=None):
146 return handle_stream_response(req, uri, arg_data, block=False)
147
148 class TwitterStream(TwitterStreamCall):
149 """
150 The TwitterStream object is an interface to the Twitter Stream API
151 (stream.twitter.com). This can be used pretty much the same as the
152 Twitter class except the result of calling a method will be an
153 iterator that yields objects decoded from the stream. For
154 example::
155
156 twitter_stream = TwitterStream(auth=OAuth(...))
157 iterator = twitter_stream.statuses.sample()
158
159 for tweet in iterator:
160 ...do something with this tweet...
161
162 The iterator will yield tweets forever and ever (until the stream
163 breaks at which point it raises a TwitterHTTPError.)
164
165 The `block` parameter controls if the stream is blocking. Default
166 is blocking (True). When set to False, the iterator will
167 occasionally yield None when there is no available message.
168 """
169 def __init__(
170 self, domain="stream.twitter.com", secure=True, auth=None,
171 api_version='1.1', block=True, timeout=None):
172 uriparts = ()
173 uriparts += (str(api_version),)
174
175 if block:
176 if timeout:
177 call_cls = TwitterStreamCallWithTimeout
178 else:
179 call_cls = TwitterStreamCall
180 else:
181 call_cls = TwitterStreamCallNonBlocking
182
183 TwitterStreamCall.__init__(
184 self, auth=auth, format="json", domain=domain,
185 callable_cls=call_cls,
186 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)