]>
Commit | Line | Data |
---|---|---|
1 | try: | |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
4 | import io | |
5 | except ImportError: | |
6 | import urllib2 as urllib_request | |
7 | import urllib2 as urllib_error | |
8 | import json | |
9 | from ssl import SSLError | |
10 | import socket | |
11 | import sys, select, time | |
12 | ||
13 | from .api import TwitterCall, wrap_response, TwitterHTTPError | |
14 | ||
15 | def recv_chunk_old(sock): # -> bytearray: | |
16 | """ | |
17 | Compatible with Python 2.6, but less efficient. | |
18 | """ | |
19 | buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff). | |
20 | crlf = buf.find(b'\r\n') # Find the HTTP chunk size. | |
21 | ||
22 | if crlf > 0: # If there is a length, then process it | |
23 | ||
24 | remaining = int(buf[:crlf], 16) # Decode the chunk size. | |
25 | ||
26 | start = crlf + 2 # Add in the length of the header's CRLF pair. | |
27 | end = len(buf) - start | |
28 | ||
29 | chunk = bytearray(remaining) | |
30 | ||
31 | if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0). | |
32 | chunk[:remaining] = buf[start:start + remaining] | |
33 | # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length | |
34 | # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The | |
35 | # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases | |
36 | # and eliminates the need to address them. | |
37 | else: # There is more to read in the chunk. | |
38 | chunk[:end] = buf[start:] | |
39 | chunk[end:] = sock.recv(max(0, remaining - end)) | |
40 | sock.recv(2) # Read the trailing CRLF pair. Throw it away. | |
41 | ||
42 | return chunk | |
43 | ||
44 | return bytearray() | |
45 | ||
46 | ## recv_chunk_old() | |
47 | ||
48 | def recv_chunk_new(sock): # -> bytearray: | |
49 | """ | |
50 | Compatible with Python 2.7+. | |
51 | """ | |
52 | header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff). | |
53 | crlf = header.find(b'\r\n') # Find the HTTP chunk size. | |
54 | ||
55 | if crlf > 0: # If there is a length, then process it | |
56 | ||
57 | size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB. | |
58 | chunk = bytearray(size) | |
59 | start = crlf + 2 # Add in the length of the header's CRLF pair. | |
60 | ||
61 | if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0). | |
62 | chunk[:size] = header[start:start + size] | |
63 | # There are several edge cases (size == [4-6]) as the chunk size exceeds the length | |
64 | # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The | |
65 | # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases | |
66 | # and eliminates the need to address them. | |
67 | else: # There is more to read in the chunk. | |
68 | end = len(header) - start | |
69 | chunk[:end] = header[start:] | |
70 | buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk. | |
71 | sock.recv_into(buffer) | |
72 | sock.recv(2) # Read the trailing CRLF pair. Throw it away. | |
73 | ||
74 | return chunk | |
75 | ||
76 | return bytearray() | |
77 | ||
78 | ## recv_chunk_new() | |
79 | ||
80 | if (sys.version_info.major, sys.version_info.minor) >= (2, 7): | |
81 | recv_chunk = recv_chunk_new | |
82 | else: | |
83 | recv_chunk = recv_chunk_old | |
84 | ||
85 | class TwitterJSONIter(object): | |
86 | ||
87 | def __init__(self, handle, uri, arg_data, block=True, timeout=None): | |
88 | self.handle = handle | |
89 | self.uri = uri | |
90 | self.arg_data = arg_data | |
91 | self.block = block | |
92 | self.timeout = timeout | |
93 | ||
94 | ||
95 | def __iter__(self): | |
96 | sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock | |
97 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | |
98 | sock.setblocking(self.block and not self.timeout) | |
99 | buf = '' | |
100 | json_decoder = json.JSONDecoder() | |
101 | timer = time.time() | |
102 | while True: | |
103 | try: | |
104 | buf = buf.lstrip() | |
105 | res, ptr = json_decoder.raw_decode(buf) | |
106 | buf = buf[ptr:] | |
107 | yield wrap_response(res, self.handle.headers) | |
108 | continue | |
109 | except ValueError as e: | |
110 | if self.block and not self.timeout: pass | |
111 | else: yield None | |
112 | try: | |
113 | buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups. | |
114 | if self.timeout and not buf: # This is a non-blocking read. | |
115 | ready_to_read = select.select([sock], [], [], self.timeout) | |
116 | if not ready_to_read[0] and time.time() - timer > self.timeout: | |
117 | yield {'timeout': True} | |
118 | continue | |
119 | timer = time.time() | |
120 | buf += recv_chunk(sock).decode('utf-8') | |
121 | if not buf and self.block: | |
122 | yield {'hangup': True} | |
123 | break | |
124 | except SSLError as e: | |
125 | # Error from a non-blocking read of an empty buffer. | |
126 | if (not self.block or self.timeout) and (e.errno == 2): pass | |
127 | else: raise | |
128 | ||
129 | def handle_stream_response(req, uri, arg_data, block, timeout=None): | |
130 | try: | |
131 | handle = urllib_request.urlopen(req,) | |
132 | except urllib_error.HTTPError as e: | |
133 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
134 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout)) | |
135 | ||
136 | class TwitterStreamCallWithTimeout(TwitterCall): | |
137 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
138 | return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout) | |
139 | ||
140 | class TwitterStreamCall(TwitterCall): | |
141 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
142 | return handle_stream_response(req, uri, arg_data, block=True) | |
143 | ||
144 | class TwitterStreamCallNonBlocking(TwitterCall): | |
145 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
146 | return handle_stream_response(req, uri, arg_data, block=False) | |
147 | ||
148 | class TwitterStream(TwitterStreamCall): | |
149 | """ | |
150 | The TwitterStream object is an interface to the Twitter Stream API | |
151 | (stream.twitter.com). This can be used pretty much the same as the | |
152 | Twitter class except the result of calling a method will be an | |
153 | iterator that yields objects decoded from the stream. For | |
154 | example:: | |
155 | ||
156 | twitter_stream = TwitterStream(auth=OAuth(...)) | |
157 | iterator = twitter_stream.statuses.sample() | |
158 | ||
159 | for tweet in iterator: | |
160 | ...do something with this tweet... | |
161 | ||
162 | The iterator will yield tweets forever and ever (until the stream | |
163 | breaks at which point it raises a TwitterHTTPError.) | |
164 | ||
165 | The `block` parameter controls if the stream is blocking. Default | |
166 | is blocking (True). When set to False, the iterator will | |
167 | occasionally yield None when there is no available message. | |
168 | """ | |
169 | def __init__( | |
170 | self, domain="stream.twitter.com", secure=True, auth=None, | |
171 | api_version='1.1', block=True, timeout=None): | |
172 | uriparts = () | |
173 | uriparts += (str(api_version),) | |
174 | ||
175 | if block: | |
176 | if timeout: | |
177 | call_cls = TwitterStreamCallWithTimeout | |
178 | else: | |
179 | call_cls = TwitterStreamCall | |
180 | else: | |
181 | call_cls = TwitterStreamCallNonBlocking | |
182 | ||
183 | TwitterStreamCall.__init__( | |
184 | self, auth=auth, format="json", domain=domain, | |
185 | callable_cls=call_cls, | |
186 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |