]>
Commit | Line | Data |
---|---|---|
dd648a25 MV |
1 | try: |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
b8fd1206 | 4 | import io |
dd648a25 MV |
5 | except ImportError: |
6 | import urllib2 as urllib_request | |
7 | import urllib2 as urllib_error | |
8 | import json | |
2300838f | 9 | from ssl import SSLError |
67ddbde4 | 10 | import socket |
effd06bb | 11 | import sys, select, time |
dd648a25 | 12 | |
2983f31e | 13 | from .api import TwitterCall, wrap_response, TwitterHTTPError |
dd648a25 | 14 | |
26938000 AD |
15 | def recv_chunk(sock): |
16 | buf = sock.recv(10) # Scan for an up to a 4GiB chunk size (0xffffffff). | |
17 | if buf: | |
18 | crlf = buf.find(b'\r\n') # Find the HTTP chunk size. | |
19 | if crlf > 0: | |
20 | remaining = int(buf[:crlf], 16) # Decode the chunk size. | |
21 | chunk = bytearray(remaining) # Create the chunk buffer. | |
22 | ||
23 | start = crlf + 2 # Add in the length of the header's CRLF pair. | |
24 | end = len(buf) - start | |
25 | ||
26 | chunk[:end] = buf[start:] | |
27 | chunk[end:] = sock.recv(remaining - end) | |
28 | ||
29 | sock.recv(2) # Read the trailing CRLF pair. Throw it away. | |
30 | ||
31 | return chunk | |
32 | return b'' | |
33 | ||
34 | ## recv_chunk() | |
35 | ||
36 | ||
dd648a25 MV |
37 | class TwitterJSONIter(object): |
38 | ||
effd06bb | 39 | def __init__(self, handle, uri, arg_data, block=True, timeout=None): |
dd648a25 MV |
40 | self.decoder = json.JSONDecoder() |
41 | self.handle = handle | |
25ea832f AD |
42 | self.uri = uri |
43 | self.arg_data = arg_data | |
b8fd1206 | 44 | self.buf = b"" |
2300838f | 45 | self.block = block |
effd06bb DK |
46 | self.timeout = timeout |
47 | self.timer = time.time() | |
48 | ||
dd648a25 MV |
49 | |
50 | def __iter__(self): | |
b89b6108 SP |
51 | if sys.version_info >= (3, 0): |
52 | sock = self.handle.fp.raw._sock | |
53 | else: | |
54 | sock = self.handle.fp._sock.fp._sock | |
67ddbde4 | 55 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
effd06bb | 56 | if not self.block or self.timeout: |
2300838f | 57 | sock.setblocking(False) |
dd648a25 | 58 | while True: |
dd648a25 | 59 | try: |
b8fd1206 MV |
60 | utf8_buf = self.buf.decode('utf8').lstrip() |
61 | res, ptr = self.decoder.raw_decode(utf8_buf) | |
62 | self.buf = utf8_buf[ptr:].encode('utf8') | |
dd648a25 | 63 | yield wrap_response(res, self.handle.headers) |
effd06bb | 64 | self.timer = time.time() |
dd648a25 | 65 | continue |
e2d171b6 | 66 | except ValueError as e: |
2300838f MV |
67 | if self.block: |
68 | pass | |
69 | else: | |
70 | yield None | |
e2d171b6 | 71 | # this is a non-blocking read (ie, it will return if any data is available) |
2300838f | 72 | try: |
effd06bb DK |
73 | if self.timeout: |
74 | ready_to_read = select.select([sock], [], [], self.timeout) | |
75 | if ready_to_read[0]: | |
26938000 | 76 | self.buf += recv_chunk(sock) |
effd06bb DK |
77 | if time.time() - self.timer > self.timeout: |
78 | yield {"timeout":True} | |
79 | else: | |
80 | yield {"timeout":True} | |
81 | else: | |
26938000 | 82 | self.buf += recv_chunk(sock) |
2300838f | 83 | except SSLError as e: |
04478c23 | 84 | if (not self.block or self.timeout) and (e.errno == 2): |
2300838f MV |
85 | # Apparently this means there was nothing in the socket buf |
86 | pass | |
87 | else: | |
88 | raise | |
25ea832f | 89 | except urllib_error.HTTPError as e: |
d488eec0 | 90 | raise TwitterHTTPError(e, self.uri, 'json', self.arg_data) |
2300838f | 91 | |
d488eec0 | 92 | def handle_stream_response(req, uri, arg_data, block, timeout=None): |
2300838f | 93 | handle = urllib_request.urlopen(req,) |
effd06bb DK |
94 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout)) |
95 | ||
96 | class TwitterStreamCallWithTimeout(TwitterCall): | |
97 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
d488eec0 | 98 | return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout) |
dd648a25 MV |
99 | |
100 | class TwitterStreamCall(TwitterCall): | |
9e19b5a9 | 101 | def _handle_response(self, req, uri, arg_data, _timeout=None): |
d488eec0 | 102 | return handle_stream_response(req, uri, arg_data, block=True) |
2300838f MV |
103 | |
104 | class TwitterStreamCallNonBlocking(TwitterCall): | |
9e19b5a9 | 105 | def _handle_response(self, req, uri, arg_data, _timeout=None): |
2300838f | 106 | return handle_stream_response(req, uri, arg_data, block=False) |
dd648a25 MV |
107 | |
108 | class TwitterStream(TwitterStreamCall): | |
24950891 | 109 | """ |
a6a7f763 MV |
110 | The TwitterStream object is an interface to the Twitter Stream API |
111 | (stream.twitter.com). This can be used pretty much the same as the | |
112 | Twitter class except the result of calling a method will be an | |
113 | iterator that yields objects decoded from the stream. For | |
114 | example:: | |
24950891 | 115 | |
14b7a6ee | 116 | twitter_stream = TwitterStream(auth=OAuth(...)) |
24950891 MV |
117 | iterator = twitter_stream.statuses.sample() |
118 | ||
119 | for tweet in iterator: | |
120 | ...do something with this tweet... | |
121 | ||
122 | The iterator will yield tweets forever and ever (until the stream | |
123 | breaks at which point it raises a TwitterHTTPError.) | |
2300838f | 124 | |
8df7be82 | 125 | The `block` parameter controls if the stream is blocking. Default |
74150740 MV |
126 | is blocking (True). When set to False, the iterator will |
127 | occasionally yield None when there is no available message. | |
24950891 | 128 | """ |
dd648a25 | 129 | def __init__( |
56d221bd | 130 | self, domain="stream.twitter.com", secure=True, auth=None, |
effd06bb | 131 | api_version='1.1', block=True, timeout=None): |
dd648a25 MV |
132 | uriparts = () |
133 | uriparts += (str(api_version),) | |
134 | ||
2300838f | 135 | if block: |
effd06bb DK |
136 | if timeout: |
137 | call_cls = TwitterStreamCallWithTimeout | |
138 | else: | |
139 | call_cls = TwitterStreamCall | |
2300838f MV |
140 | else: |
141 | call_cls = TwitterStreamCallNonBlocking | |
142 | ||
dd648a25 MV |
143 | TwitterStreamCall.__init__( |
144 | self, auth=auth, format="json", domain=domain, | |
2300838f | 145 | callable_cls=call_cls, |
86318060 | 146 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |