]>
Commit | Line | Data |
---|---|---|
dd648a25 MV |
1 | try: |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
4 | except ImportError: | |
5 | import urllib2 as urllib_request | |
6 | import urllib2 as urllib_error | |
e84abb1c | 7 | import io |
dd648a25 | 8 | import json |
2300838f | 9 | from ssl import SSLError |
67ddbde4 | 10 | import socket |
effd06bb | 11 | import sys, select, time |
dd648a25 | 12 | |
2983f31e | 13 | from .api import TwitterCall, wrap_response, TwitterHTTPError |
dd648a25 | 14 | |
d3915c61 MV |
15 | PY_3_OR_HIGHER = sys.version_info >= (3, 0) |
16 | ||
f560656d MV |
17 | CRLF = b'\r\n' |
18 | ||
d3915c61 MV |
19 | Timeout = {'timeout': True} |
20 | Hangup = {'hangup': True} | |
03d8511c | 21 | HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True} |
d3915c61 | 22 | |
dcece3a6 MV |
23 | class ChunkDecodeError(Exception): |
24 | pass | |
23dcd469 | 25 | |
e84abb1c MV |
26 | class EndOfStream(Exception): |
27 | pass | |
26938000 | 28 | |
f560656d MV |
29 | range = range if PY_3_OR_HIGHER else xrange |
30 | ||
e84abb1c MV |
31 | class SocketShim(io.IOBase): |
32 | """ | |
33 | Adapts a raw socket to fit the IO protocol. | |
34 | """ | |
35 | def __init__(self, sock): | |
36 | self.sock = sock | |
37 | def readable(self): | |
38 | return True | |
39 | def read(self, size): | |
40 | return self.sock.read(size) | |
41 | def readinto(self, buf): | |
42 | return self.sock.recv_into(buf) | |
43 | ||
44 | def recv_chunk(reader): # -> bytearray: | |
f560656d | 45 | for headerlen in range(12): |
e84abb1c MV |
46 | header = reader.peek(headerlen)[:headerlen] |
47 | if header.endswith(CRLF): | |
48 | break | |
49 | else: | |
dcece3a6 | 50 | raise ChunkDecodeError() |
26938000 | 51 | |
e84abb1c MV |
52 | size = int(header, 16) # Decode the chunk size |
53 | reader.read(headerlen) # Ditch the header | |
54 | ||
55 | if size == 0: | |
56 | raise EndOfStream() | |
57 | ||
58 | chunk = bytearray() | |
59 | while len(chunk) < size: | |
60 | remainder = size - len(chunk) | |
61 | chunk.extend(reader.read(remainder)) | |
62 | ||
63 | reader.read(2) # Ditch remaining CRLF | |
0d92536c | 64 | |
dcece3a6 | 65 | return chunk |
26938000 | 66 | |
26938000 | 67 | |
d3915c61 MV |
68 | class Timer(object): |
69 | def __init__(self, timeout): | |
03d8511c | 70 | # If timeout is None, we never expire. |
d3915c61 MV |
71 | self.timeout = timeout |
72 | self.reset() | |
73 | ||
74 | def reset(self): | |
75 | self.time = time.time() | |
76 | ||
77 | def expired(self): | |
42b9cdee MV |
78 | """ |
79 | If expired, reset the timer and return True. | |
80 | """ | |
d3915c61 | 81 | if self.timeout is None: |
03d8511c | 82 | return False |
d3915c61 MV |
83 | elif time.time() - self.time > self.timeout: |
84 | self.reset() | |
85 | return True | |
86 | return False | |
87 | ||
88 | ||
dd648a25 MV |
89 | class TwitterJSONIter(object): |
90 | ||
03d8511c | 91 | def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout): |
dd648a25 | 92 | self.handle = handle |
25ea832f AD |
93 | self.uri = uri |
94 | self.arg_data = arg_data | |
2300838f | 95 | self.block = block |
03d8511c MV |
96 | self.timeout = float(timeout) if timeout else None |
97 | self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None | |
effd06bb | 98 | |
dd648a25 MV |
99 | |
100 | def __iter__(self): | |
03d8511c | 101 | actually_block = self.block and not self.timeout |
f560656d | 102 | sock_timeout = min(self.timeout or 1000000, self.heartbeat_timeout) if actually_block else None |
d3915c61 | 103 | sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock |
67ddbde4 | 104 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
03d8511c | 105 | sock.setblocking(actually_block) |
e84abb1c | 106 | reader = io.BufferedReader(SocketShim(sock)) |
e80f5491 | 107 | buf = '' |
03d8511c | 108 | raw_decode = json.JSONDecoder().raw_decode |
d3915c61 | 109 | timer = Timer(self.timeout) |
03d8511c | 110 | heartbeat_timer = Timer(self.heartbeat_timeout) |
dd648a25 | 111 | while True: |
03d8511c | 112 | buf = buf.lstrip() # Remove any keep-alive delimiters |
dd648a25 | 113 | try: |
03d8511c | 114 | res, ptr = raw_decode(buf) |
23dcd469 | 115 | buf = buf[ptr:] |
42b9cdee | 116 | except ValueError: |
03d8511c MV |
117 | if not self.block and not self.timeout: |
118 | yield None | |
42b9cdee | 119 | else: |
dd648a25 | 120 | yield wrap_response(res, self.handle.headers) |
d3915c61 | 121 | timer.reset() |
03d8511c | 122 | heartbeat_timer.reset() |
dd648a25 | 123 | continue |
03d8511c MV |
124 | |
125 | if heartbeat_timer.expired(): | |
126 | yield HeartbeatTimeout | |
127 | break | |
128 | if timer.expired(): | |
129 | yield Timeout | |
130 | ||
2300838f | 131 | try: |
e84abb1c | 132 | if sock_timeout: |
dcece3a6 MV |
133 | ready_to_read = select.select([sock], [], [], sock_timeout)[0] |
134 | if not ready_to_read: | |
135 | continue | |
e84abb1c MV |
136 | received = recv_chunk(reader) |
137 | buf += received.decode('utf-8') | |
138 | if received: | |
139 | heartbeat_timer.reset() | |
140 | except (ChunkDecodeError, EndOfStream): | |
141 | yield Hangup | |
142 | break | |
2300838f | 143 | except SSLError as e: |
03d8511c MV |
144 | # Code 2 is error from a non-blocking read of an empty buffer. |
145 | if e.errno != 2: | |
146 | raise | |
2300838f | 147 | |
03d8511c | 148 | def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout): |
0d92536c AD |
149 | try: |
150 | handle = urllib_request.urlopen(req,) | |
151 | except urllib_error.HTTPError as e: | |
152 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
03d8511c | 153 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout)) |
effd06bb | 154 | |
d3915c61 | 155 | class TwitterStream(TwitterCall): |
24950891 | 156 | """ |
42b9cdee MV |
157 | The TwitterStream object is an interface to the Twitter Stream |
158 | API. This can be used pretty much the same as the Twitter class | |
159 | except the result of calling a method will be an iterator that | |
160 | yields objects decoded from the stream. For example:: | |
24950891 | 161 | |
14b7a6ee | 162 | twitter_stream = TwitterStream(auth=OAuth(...)) |
24950891 MV |
163 | iterator = twitter_stream.statuses.sample() |
164 | ||
165 | for tweet in iterator: | |
166 | ...do something with this tweet... | |
167 | ||
42b9cdee MV |
168 | The iterator will yield until the TCP connection breaks. When the |
169 | connection breaks, the iterator yields `{'hangup': True}`, and | |
170 | raises `StopIteration` if iterated again. | |
2300838f | 171 | |
03d8511c MV |
172 | Similarly, if the stream does not produce heartbeats for more than |
173 | 90 seconds, the iterator yields `{'hangup': True, | |
174 | 'heartbeat_timeout': True}`, and raises `StopIteration` if | |
175 | iterated again. | |
176 | ||
42b9cdee MV |
177 | The `timeout` parameter controls the maximum time between |
178 | yields. If it is nonzero, then the iterator will yield either | |
03d8511c MV |
179 | stream data or `{'timeout': True}` within the timeout period. This |
180 | is useful if you want your program to do other stuff in between | |
181 | waiting for tweets. | |
182 | ||
183 | The `block` parameter sets the stream to be fully non-blocking. In | |
184 | this mode, the iterator always yields immediately. It returns | |
185 | stream data, or `None`. Note that `timeout` supercedes this | |
186 | argument, so it should also be set `None` to use this mode. | |
24950891 | 187 | """ |
03d8511c MV |
188 | def __init__(self, domain="stream.twitter.com", secure=True, auth=None, |
189 | api_version='1.1', block=True, timeout=None, | |
190 | heartbeat_timeout=90.0): | |
d3915c61 | 191 | uriparts = (str(api_version),) |
d3915c61 MV |
192 | |
193 | class TwitterStreamCall(TwitterCall): | |
194 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
195 | return handle_stream_response( | |
03d8511c MV |
196 | req, uri, arg_data, block, |
197 | _timeout or timeout, heartbeat_timeout) | |
d3915c61 MV |
198 | |
199 | TwitterCall.__init__( | |
dd648a25 | 200 | self, auth=auth, format="json", domain=domain, |
d3915c61 | 201 | callable_cls=TwitterStreamCall, |
86318060 | 202 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |