]>
Commit | Line | Data |
---|---|---|
dd648a25 MV |
1 | try: |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
b8fd1206 | 4 | import io |
dd648a25 MV |
5 | except ImportError: |
6 | import urllib2 as urllib_request | |
7 | import urllib2 as urllib_error | |
8 | import json | |
2300838f | 9 | from ssl import SSLError |
67ddbde4 | 10 | import socket |
effd06bb | 11 | import sys, select, time |
dd648a25 | 12 | |
2983f31e | 13 | from .api import TwitterCall, wrap_response, TwitterHTTPError |
dd648a25 | 14 | |
d3915c61 MV |
15 | PY_27_OR_HIGHER = sys.version_info >= (2, 7) |
16 | PY_3_OR_HIGHER = sys.version_info >= (3, 0) | |
17 | ||
18 | Timeout = {'timeout': True} | |
19 | Hangup = {'hangup': True} | |
20 | ||
21 | ||
ff33f9f1 | 22 | def recv_chunk(sock): # -> bytearray: |
23dcd469 | 23 | |
2aee87cc AD |
24 | header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff). |
25 | crlf = header.find(b'\r\n') # Find the HTTP chunk size. | |
26938000 | 26 | |
443e409d | 27 | if crlf > 0: # If there is a length, then process it |
26938000 | 28 | |
90ec2759 | 29 | size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB. |
2aee87cc | 30 | chunk = bytearray(size) |
0d92536c | 31 | start = crlf + 2 # Add in the length of the header's CRLF pair. |
0d92536c | 32 | |
2aee87cc AD |
33 | if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0). |
34 | chunk[:size] = header[start:start + size] | |
35 | # There are several edge cases (size == [4-6]) as the chunk size exceeds the length | |
3e782f63 | 36 | # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The |
a8880f9f AD |
37 | # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases |
38 | # and eliminates the need to address them. | |
3e782f63 | 39 | else: # There is more to read in the chunk. |
2aee87cc AD |
40 | end = len(header) - start |
41 | chunk[:end] = header[start:] | |
d3915c61 | 42 | if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer. |
ff33f9f1 R |
43 | buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk. |
44 | sock.recv_into(buffer) | |
894d3717 BOT |
45 | else: # less efficient for python2.6 compatibility |
46 | chunk[end:] = sock.recv(max(0, size - end)) | |
443e409d | 47 | sock.recv(2) # Read the trailing CRLF pair. Throw it away. |
0d92536c AD |
48 | |
49 | return chunk | |
28a8ef60 | 50 | |
23dcd469 | 51 | return bytearray() |
26938000 | 52 | |
26938000 | 53 | |
d3915c61 MV |
54 | class Timer(object): |
55 | def __init__(self, timeout): | |
56 | # If timeout is None, we always expire. | |
57 | self.timeout = timeout | |
58 | self.reset() | |
59 | ||
60 | def reset(self): | |
61 | self.time = time.time() | |
62 | ||
63 | def expired(self): | |
42b9cdee MV |
64 | """ |
65 | If expired, reset the timer and return True. | |
66 | """ | |
d3915c61 MV |
67 | if self.timeout is None: |
68 | return True | |
69 | elif time.time() - self.time > self.timeout: | |
70 | self.reset() | |
71 | return True | |
72 | return False | |
73 | ||
74 | ||
dd648a25 MV |
75 | class TwitterJSONIter(object): |
76 | ||
effd06bb | 77 | def __init__(self, handle, uri, arg_data, block=True, timeout=None): |
dd648a25 | 78 | self.handle = handle |
25ea832f AD |
79 | self.uri = uri |
80 | self.arg_data = arg_data | |
2300838f | 81 | self.block = block |
effd06bb | 82 | self.timeout = timeout |
effd06bb | 83 | |
dd648a25 MV |
84 | |
85 | def __iter__(self): | |
d3915c61 MV |
86 | actually_blocking = self.block and not self.timeout |
87 | sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock | |
67ddbde4 | 88 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
d3915c61 | 89 | sock.setblocking(actually_blocking) |
e80f5491 | 90 | buf = '' |
23dcd469 | 91 | json_decoder = json.JSONDecoder() |
d3915c61 MV |
92 | timer = Timer(self.timeout) |
93 | timeout_token = Timeout if self.timeout else None | |
dd648a25 | 94 | while True: |
42b9cdee | 95 | buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups. |
dd648a25 | 96 | try: |
23dcd469 AD |
97 | res, ptr = json_decoder.raw_decode(buf) |
98 | buf = buf[ptr:] | |
42b9cdee MV |
99 | except ValueError: |
100 | pass | |
101 | else: | |
dd648a25 | 102 | yield wrap_response(res, self.handle.headers) |
d3915c61 | 103 | timer.reset() |
dd648a25 | 104 | continue |
2300838f | 105 | try: |
fd613260 | 106 | if self.timeout and not buf: # This is a non-blocking read. |
d3915c61 MV |
107 | ready_to_read = select.select([sock], [], [], self.timeout)[0] |
108 | if not ready_to_read and timer.expired(): | |
109 | yield timeout_token | |
fd613260 | 110 | continue |
fd613260 | 111 | buf += recv_chunk(sock).decode('utf-8') |
b29e07db | 112 | if not buf: |
d3915c61 | 113 | yield Hangup |
e80f5491 | 114 | break |
2300838f | 115 | except SSLError as e: |
443e409d | 116 | # Error from a non-blocking read of an empty buffer. |
d3915c61 MV |
117 | if not actually_blocking and (e.errno == 2): |
118 | if timer.expired(): | |
119 | yield timeout_token | |
0d92536c | 120 | else: raise |
2300838f | 121 | |
d488eec0 | 122 | def handle_stream_response(req, uri, arg_data, block, timeout=None): |
0d92536c AD |
123 | try: |
124 | handle = urllib_request.urlopen(req,) | |
125 | except urllib_error.HTTPError as e: | |
126 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
effd06bb DK |
127 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout)) |
128 | ||
d3915c61 | 129 | class TwitterStream(TwitterCall): |
24950891 | 130 | """ |
42b9cdee MV |
131 | The TwitterStream object is an interface to the Twitter Stream |
132 | API. This can be used pretty much the same as the Twitter class | |
133 | except the result of calling a method will be an iterator that | |
134 | yields objects decoded from the stream. For example:: | |
24950891 | 135 | |
14b7a6ee | 136 | twitter_stream = TwitterStream(auth=OAuth(...)) |
24950891 MV |
137 | iterator = twitter_stream.statuses.sample() |
138 | ||
139 | for tweet in iterator: | |
140 | ...do something with this tweet... | |
141 | ||
42b9cdee MV |
142 | The iterator will yield until the TCP connection breaks. When the |
143 | connection breaks, the iterator yields `{'hangup': True}`, and | |
144 | raises `StopIteration` if iterated again. | |
2300838f | 145 | |
42b9cdee MV |
146 | The `timeout` parameter controls the maximum time between |
147 | yields. If it is nonzero, then the iterator will yield either | |
148 | stream data or `{'timeout': True}`. This is useful if you want | |
149 | your program to do other stuff in between waiting for tweets. | |
150 | ||
151 | The `block` parameter sets the stream to be non-blocking. In this | |
152 | mode, the iterator always yields immediately. It returns stream | |
153 | data, or `None`. Note that `timeout` supercedes this argument, so | |
154 | it should also be set `None` to use this mode. | |
24950891 | 155 | """ |
dd648a25 | 156 | def __init__( |
56d221bd | 157 | self, domain="stream.twitter.com", secure=True, auth=None, |
effd06bb | 158 | api_version='1.1', block=True, timeout=None): |
d3915c61 MV |
159 | uriparts = (str(api_version),) |
160 | timeout = float(timeout) if timeout else None | |
161 | ||
162 | class TwitterStreamCall(TwitterCall): | |
163 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
164 | return handle_stream_response( | |
165 | req, uri, arg_data, block=block, timeout=_timeout or timeout) | |
166 | ||
167 | TwitterCall.__init__( | |
dd648a25 | 168 | self, auth=auth, format="json", domain=domain, |
d3915c61 | 169 | callable_cls=TwitterStreamCall, |
86318060 | 170 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |