]>
Commit | Line | Data |
---|---|---|
dd648a25 MV |
1 | try: |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
b8fd1206 | 4 | import io |
dd648a25 MV |
5 | except ImportError: |
6 | import urllib2 as urllib_request | |
7 | import urllib2 as urllib_error | |
8 | import json | |
2300838f | 9 | from ssl import SSLError |
67ddbde4 | 10 | import socket |
effd06bb | 11 | import sys, select, time |
dd648a25 | 12 | |
2983f31e | 13 | from .api import TwitterCall, wrap_response, TwitterHTTPError |
dd648a25 | 14 | |
23dcd469 AD |
15 | def recv_chunk(sock): # -> bytearray: |
16 | ||
0d92536c AD |
17 | buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff). |
18 | crlf = buf.find(b'\r\n') # Find the HTTP chunk size. | |
26938000 | 19 | |
443e409d | 20 | if crlf > 0: # If there is a length, then process it |
26938000 | 21 | |
0d92536c | 22 | remaining = int(buf[:crlf], 16) # Decode the chunk size. |
26938000 | 23 | |
0d92536c AD |
24 | start = crlf + 2 # Add in the length of the header's CRLF pair. |
25 | end = len(buf) - start | |
26938000 | 26 | |
0d92536c | 27 | chunk = bytearray(remaining) |
0d92536c | 28 | |
443e409d AD |
29 | if end < remaining: |
30 | chunk[:end] = buf[start:] | |
31 | chunk[end:] = sock.recv(remaining - end) | |
32 | sock.recv(2) # Read the trailing CRLF pair. Throw it away. | |
33 | else: # E.g. an HTTP chunk with just a keep-alive delimiter. | |
34 | chunk[:remaining] = buf[start:start + remaining] | |
0d92536c AD |
35 | |
36 | return chunk | |
28a8ef60 | 37 | |
23dcd469 | 38 | return bytearray() |
26938000 AD |
39 | |
40 | ## recv_chunk() | |
41 | ||
42 | ||
dd648a25 MV |
43 | class TwitterJSONIter(object): |
44 | ||
effd06bb | 45 | def __init__(self, handle, uri, arg_data, block=True, timeout=None): |
dd648a25 | 46 | self.handle = handle |
25ea832f AD |
47 | self.uri = uri |
48 | self.arg_data = arg_data | |
2300838f | 49 | self.block = block |
effd06bb | 50 | self.timeout = timeout |
effd06bb | 51 | |
dd648a25 MV |
52 | |
53 | def __iter__(self): | |
23dcd469 | 54 | sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock |
67ddbde4 | 55 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
443e409d | 56 | sock.setblocking(self.block and not self.timeout) |
23dcd469 AD |
57 | buf = u'' |
58 | json_decoder = json.JSONDecoder() | |
e28a1da9 | 59 | timer = time.time() |
dd648a25 | 60 | while True: |
dd648a25 | 61 | try: |
23dcd469 AD |
62 | buf = buf.lstrip() |
63 | res, ptr = json_decoder.raw_decode(buf) | |
64 | buf = buf[ptr:] | |
dd648a25 | 65 | yield wrap_response(res, self.handle.headers) |
e28a1da9 | 66 | timer = time.time() |
dd648a25 | 67 | continue |
e2d171b6 | 68 | except ValueError as e: |
443e409d AD |
69 | if self.block: pass |
70 | else: yield None | |
2300838f | 71 | try: |
c20d1a80 | 72 | buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups. |
0d92536c | 73 | if self.timeout: |
effd06bb DK |
74 | ready_to_read = select.select([sock], [], [], self.timeout) |
75 | if ready_to_read[0]: | |
0d92536c | 76 | buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read. |
e28a1da9 | 77 | if time.time() - timer > self.timeout: |
28a8ef60 | 78 | yield {'timeout': True} |
effd06bb | 79 | else: |
28a8ef60 | 80 | yield {'timeout': True} |
effd06bb | 81 | else: |
23dcd469 | 82 | buf += recv_chunk(sock).decode('utf-8') |
28a8ef60 AD |
83 | if not buf and self.block: |
84 | yield {'hangup': True} | |
2300838f | 85 | except SSLError as e: |
443e409d AD |
86 | # Error from a non-blocking read of an empty buffer. |
87 | if (not self.block or self.timeout) and (e.errno == 2): pass | |
0d92536c | 88 | else: raise |
2300838f | 89 | |
d488eec0 | 90 | def handle_stream_response(req, uri, arg_data, block, timeout=None): |
0d92536c AD |
91 | try: |
92 | handle = urllib_request.urlopen(req,) | |
93 | except urllib_error.HTTPError as e: | |
94 | raise TwitterHTTPError(e, uri, 'json', arg_data) | |
effd06bb DK |
95 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout)) |
96 | ||
97 | class TwitterStreamCallWithTimeout(TwitterCall): | |
98 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
d488eec0 | 99 | return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout) |
dd648a25 MV |
100 | |
101 | class TwitterStreamCall(TwitterCall): | |
9e19b5a9 | 102 | def _handle_response(self, req, uri, arg_data, _timeout=None): |
d488eec0 | 103 | return handle_stream_response(req, uri, arg_data, block=True) |
2300838f MV |
104 | |
105 | class TwitterStreamCallNonBlocking(TwitterCall): | |
9e19b5a9 | 106 | def _handle_response(self, req, uri, arg_data, _timeout=None): |
2300838f | 107 | return handle_stream_response(req, uri, arg_data, block=False) |
dd648a25 MV |
108 | |
109 | class TwitterStream(TwitterStreamCall): | |
24950891 | 110 | """ |
a6a7f763 MV |
111 | The TwitterStream object is an interface to the Twitter Stream API |
112 | (stream.twitter.com). This can be used pretty much the same as the | |
113 | Twitter class except the result of calling a method will be an | |
114 | iterator that yields objects decoded from the stream. For | |
115 | example:: | |
24950891 | 116 | |
14b7a6ee | 117 | twitter_stream = TwitterStream(auth=OAuth(...)) |
24950891 MV |
118 | iterator = twitter_stream.statuses.sample() |
119 | ||
120 | for tweet in iterator: | |
121 | ...do something with this tweet... | |
122 | ||
123 | The iterator will yield tweets forever and ever (until the stream | |
124 | breaks at which point it raises a TwitterHTTPError.) | |
2300838f | 125 | |
8df7be82 | 126 | The `block` parameter controls if the stream is blocking. Default |
74150740 MV |
127 | is blocking (True). When set to False, the iterator will |
128 | occasionally yield None when there is no available message. | |
24950891 | 129 | """ |
dd648a25 | 130 | def __init__( |
56d221bd | 131 | self, domain="stream.twitter.com", secure=True, auth=None, |
effd06bb | 132 | api_version='1.1', block=True, timeout=None): |
dd648a25 MV |
133 | uriparts = () |
134 | uriparts += (str(api_version),) | |
135 | ||
2300838f | 136 | if block: |
effd06bb DK |
137 | if timeout: |
138 | call_cls = TwitterStreamCallWithTimeout | |
139 | else: | |
140 | call_cls = TwitterStreamCall | |
2300838f MV |
141 | else: |
142 | call_cls = TwitterStreamCallNonBlocking | |
143 | ||
dd648a25 MV |
144 | TwitterStreamCall.__init__( |
145 | self, auth=auth, format="json", domain=domain, | |
2300838f | 146 | callable_cls=call_cls, |
86318060 | 147 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |