]>
Commit | Line | Data |
---|---|---|
dd648a25 MV |
1 | try: |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
b8fd1206 | 4 | import io |
dd648a25 MV |
5 | except ImportError: |
6 | import urllib2 as urllib_request | |
7 | import urllib2 as urllib_error | |
8 | import json | |
2300838f | 9 | from ssl import SSLError |
67ddbde4 | 10 | import socket |
effd06bb | 11 | import sys, select, time |
dd648a25 | 12 | |
2983f31e | 13 | from .api import TwitterCall, wrap_response, TwitterHTTPError |
dd648a25 | 14 | |
d3915c61 MV |
15 | PY_27_OR_HIGHER = sys.version_info >= (2, 7) |
16 | PY_3_OR_HIGHER = sys.version_info >= (3, 0) | |
17 | ||
18 | Timeout = {'timeout': True} | |
19 | Hangup = {'hangup': True} | |
03d8511c | 20 | HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True} |
d3915c61 | 21 | |
dcece3a6 MV |
22 | class ChunkDecodeError(Exception): |
23 | pass | |
23dcd469 | 24 | |
dcece3a6 | 25 | def recv_chunk(sock): # -> bytearray: |
2aee87cc AD |
26 | header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff). |
27 | crlf = header.find(b'\r\n') # Find the HTTP chunk size. | |
26938000 | 28 | |
dcece3a6 MV |
29 | if not crlf: |
30 | raise ChunkDecodeError() | |
26938000 | 31 | |
dcece3a6 MV |
32 | size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB. |
33 | chunk = bytearray(size) | |
34 | start = crlf + 2 # Add in the length of the header's CRLF pair. | |
0d92536c | 35 | |
dcece3a6 MV |
36 | if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0). |
37 | chunk[:size] = header[start:start + size] | |
38 | # There are several edge cases (size == [4-6]) as the chunk size exceeds the length | |
39 | # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The | |
40 | # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases | |
41 | # and eliminates the need to address them. | |
42 | else: # There is more to read in the chunk. | |
43 | end = len(header) - start | |
44 | chunk[:end] = header[start:] | |
45 | if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer. | |
46 | buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk. | |
47 | sock.recv_into(buffer) | |
48 | else: # less efficient for python2.6 compatibility | |
49 | chunk[end:] = sock.recv(max(0, size - end)) | |
50 | sock.recv(2) # Read the trailing CRLF pair. Throw it away. | |
0d92536c | 51 | |
dcece3a6 | 52 | return chunk |
26938000 | 53 | |
26938000 | 54 | |
d3915c61 MV |
55 | class Timer(object): |
56 | def __init__(self, timeout): | |
03d8511c | 57 | # If timeout is None, we never expire. |
d3915c61 MV |
58 | self.timeout = timeout |
59 | self.reset() | |
60 | ||
61 | def reset(self): | |
62 | self.time = time.time() | |
63 | ||
64 | def expired(self): | |
42b9cdee MV |
65 | """ |
66 | If expired, reset the timer and return True. | |
67 | """ | |
d3915c61 | 68 | if self.timeout is None: |
03d8511c | 69 | return False |
d3915c61 MV |
70 | elif time.time() - self.time > self.timeout: |
71 | self.reset() | |
72 | return True | |
73 | return False | |
74 | ||
75 | ||
dd648a25 MV |
76 | class TwitterJSONIter(object): |
77 | ||
03d8511c | 78 | def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout): |
dd648a25 | 79 | self.handle = handle |
25ea832f AD |
80 | self.uri = uri |
81 | self.arg_data = arg_data | |
2300838f | 82 | self.block = block |
03d8511c MV |
83 | self.timeout = float(timeout) if timeout else None |
84 | self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None | |
effd06bb | 85 | |
dd648a25 MV |
86 | |
87 | def __iter__(self): | |
03d8511c MV |
88 | actually_block = self.block and not self.timeout |
89 | sock_timeout = min(self.timeout, self.heartbeat_timeout) if actually_block else None | |
d3915c61 | 90 | sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock |
67ddbde4 | 91 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
03d8511c | 92 | sock.setblocking(actually_block) |
e80f5491 | 93 | buf = '' |
03d8511c | 94 | raw_decode = json.JSONDecoder().raw_decode |
d3915c61 | 95 | timer = Timer(self.timeout) |
03d8511c | 96 | heartbeat_timer = Timer(self.heartbeat_timeout) |
dd648a25 | 97 | while True: |
03d8511c | 98 | buf = buf.lstrip() # Remove any keep-alive delimiters |
dd648a25 | 99 | try: |
03d8511c | 100 | res, ptr = raw_decode(buf) |
23dcd469 | 101 | buf = buf[ptr:] |
42b9cdee | 102 | except ValueError: |
03d8511c MV |
103 | if not self.block and not self.timeout: |
104 | yield None | |
42b9cdee | 105 | else: |
dd648a25 | 106 | yield wrap_response(res, self.handle.headers) |
d3915c61 | 107 | timer.reset() |
03d8511c | 108 | heartbeat_timer.reset() |
dd648a25 | 109 | continue |
03d8511c MV |
110 | |
111 | if heartbeat_timer.expired(): | |
112 | yield HeartbeatTimeout | |
113 | break | |
114 | if timer.expired(): | |
115 | yield Timeout | |
116 | ||
2300838f | 117 | try: |
dcece3a6 MV |
118 | if not buf and sock_timeout: |
119 | ready_to_read = select.select([sock], [], [], sock_timeout)[0] | |
120 | if not ready_to_read: | |
121 | continue | |
122 | buf += recv_chunk(sock).decode('utf-8') | |
b29e07db | 123 | if not buf: |
d3915c61 | 124 | yield Hangup |
e80f5491 | 125 | break |
dcece3a6 | 126 | heartbeat_timer.reset() |
2300838f | 127 | except SSLError as e: |
03d8511c MV |
128 | # Code 2 is error from a non-blocking read of an empty buffer. |
129 | if e.errno != 2: | |
130 | raise | |
2300838f | 131 | |
03d8511c | 132 | def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout): |
0d92536c AD |
133 | try: |
134 | handle = urllib_request.urlopen(req,) | |
135 | except urllib_error.HTTPError as e: | |
136 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
03d8511c | 137 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout)) |
effd06bb | 138 | |
d3915c61 | 139 | class TwitterStream(TwitterCall): |
24950891 | 140 | """ |
42b9cdee MV |
141 | The TwitterStream object is an interface to the Twitter Stream |
142 | API. This can be used pretty much the same as the Twitter class | |
143 | except the result of calling a method will be an iterator that | |
144 | yields objects decoded from the stream. For example:: | |
24950891 | 145 | |
14b7a6ee | 146 | twitter_stream = TwitterStream(auth=OAuth(...)) |
24950891 MV |
147 | iterator = twitter_stream.statuses.sample() |
148 | ||
149 | for tweet in iterator: | |
150 | ...do something with this tweet... | |
151 | ||
42b9cdee MV |
152 | The iterator will yield until the TCP connection breaks. When the |
153 | connection breaks, the iterator yields `{'hangup': True}`, and | |
154 | raises `StopIteration` if iterated again. | |
2300838f | 155 | |
03d8511c MV |
156 | Similarly, if the stream does not produce heartbeats for more than |
157 | 90 seconds, the iterator yields `{'hangup': True, | |
158 | 'heartbeat_timeout': True}`, and raises `StopIteration` if | |
159 | iterated again. | |
160 | ||
42b9cdee MV |
161 | The `timeout` parameter controls the maximum time between |
162 | yields. If it is nonzero, then the iterator will yield either | |
03d8511c MV |
163 | stream data or `{'timeout': True}` within the timeout period. This |
164 | is useful if you want your program to do other stuff in between | |
165 | waiting for tweets. | |
166 | ||
167 | The `block` parameter sets the stream to be fully non-blocking. In | |
168 | this mode, the iterator always yields immediately. It returns | |
169 | stream data, or `None`. Note that `timeout` supercedes this | |
170 | argument, so it should also be set `None` to use this mode. | |
24950891 | 171 | """ |
03d8511c MV |
172 | def __init__(self, domain="stream.twitter.com", secure=True, auth=None, |
173 | api_version='1.1', block=True, timeout=None, | |
174 | heartbeat_timeout=90.0): | |
d3915c61 | 175 | uriparts = (str(api_version),) |
d3915c61 MV |
176 | |
177 | class TwitterStreamCall(TwitterCall): | |
178 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
179 | return handle_stream_response( | |
03d8511c MV |
180 | req, uri, arg_data, block, |
181 | _timeout or timeout, heartbeat_timeout) | |
d3915c61 MV |
182 | |
183 | TwitterCall.__init__( | |
dd648a25 | 184 | self, auth=auth, format="json", domain=domain, |
d3915c61 | 185 | callable_cls=TwitterStreamCall, |
86318060 | 186 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |