]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Handle HTTP chunks that only contain keep-alive delimiters.
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
23dcd469
AD
15def recv_chunk(sock): # -> bytearray:
16
0d92536c
AD
17 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
26938000 19
443e409d 20 if crlf > 0: # If there is a length, then process it
26938000 21
0d92536c 22 remaining = int(buf[:crlf], 16) # Decode the chunk size.
26938000 23
0d92536c
AD
24 start = crlf + 2 # Add in the length of the header's CRLF pair.
25 end = len(buf) - start
26938000 26
0d92536c 27 chunk = bytearray(remaining)
0d92536c 28
443e409d
AD
29 if end < remaining:
30 chunk[:end] = buf[start:]
31 chunk[end:] = sock.recv(remaining - end)
32 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
33 else: # E.g. an HTTP chunk with just a keep-alive delimiter.
34 chunk[:remaining] = buf[start:start + remaining]
0d92536c
AD
35
36 return chunk
28a8ef60 37
23dcd469 38 return bytearray()
26938000
AD
39
40## recv_chunk()
41
42
dd648a25
MV
43class TwitterJSONIter(object):
44
effd06bb 45 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 46 self.handle = handle
25ea832f
AD
47 self.uri = uri
48 self.arg_data = arg_data
2300838f 49 self.block = block
effd06bb 50 self.timeout = timeout
effd06bb 51
dd648a25
MV
52
53 def __iter__(self):
23dcd469 54 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
67ddbde4 55 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
443e409d 56 sock.setblocking(self.block and not self.timeout)
23dcd469
AD
57 buf = u''
58 json_decoder = json.JSONDecoder()
e28a1da9 59 timer = time.time()
dd648a25 60 while True:
dd648a25 61 try:
23dcd469
AD
62 buf = buf.lstrip()
63 res, ptr = json_decoder.raw_decode(buf)
64 buf = buf[ptr:]
dd648a25 65 yield wrap_response(res, self.handle.headers)
e28a1da9 66 timer = time.time()
dd648a25 67 continue
e2d171b6 68 except ValueError as e:
443e409d
AD
69 if self.block: pass
70 else: yield None
2300838f 71 try:
c20d1a80 72 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
0d92536c 73 if self.timeout:
effd06bb
DK
74 ready_to_read = select.select([sock], [], [], self.timeout)
75 if ready_to_read[0]:
0d92536c 76 buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
e28a1da9 77 if time.time() - timer > self.timeout:
28a8ef60 78 yield {'timeout': True}
effd06bb 79 else:
28a8ef60 80 yield {'timeout': True}
effd06bb 81 else:
23dcd469 82 buf += recv_chunk(sock).decode('utf-8')
28a8ef60
AD
83 if not buf and self.block:
84 yield {'hangup': True}
2300838f 85 except SSLError as e:
443e409d
AD
86 # Error from a non-blocking read of an empty buffer.
87 if (not self.block or self.timeout) and (e.errno == 2): pass
0d92536c 88 else: raise
2300838f 89
d488eec0 90def handle_stream_response(req, uri, arg_data, block, timeout=None):
0d92536c
AD
91 try:
92 handle = urllib_request.urlopen(req,)
93 except urllib_error.HTTPError as e:
94 raise TwitterHTTPError(e, uri, 'json', arg_data)
effd06bb
DK
95 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
96
97class TwitterStreamCallWithTimeout(TwitterCall):
98 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 99 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
100
101class TwitterStreamCall(TwitterCall):
9e19b5a9 102 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 103 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
104
105class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 106 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 107 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
108
109class TwitterStream(TwitterStreamCall):
24950891 110 """
a6a7f763
MV
111 The TwitterStream object is an interface to the Twitter Stream API
112 (stream.twitter.com). This can be used pretty much the same as the
113 Twitter class except the result of calling a method will be an
114 iterator that yields objects decoded from the stream. For
115 example::
24950891 116
14b7a6ee 117 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
118 iterator = twitter_stream.statuses.sample()
119
120 for tweet in iterator:
121 ...do something with this tweet...
122
123 The iterator will yield tweets forever and ever (until the stream
124 breaks at which point it raises a TwitterHTTPError.)
2300838f 125
8df7be82 126 The `block` parameter controls if the stream is blocking. Default
74150740
MV
127 is blocking (True). When set to False, the iterator will
128 occasionally yield None when there is no available message.
24950891 129 """
dd648a25 130 def __init__(
56d221bd 131 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 132 api_version='1.1', block=True, timeout=None):
dd648a25
MV
133 uriparts = ()
134 uriparts += (str(api_version),)
135
2300838f 136 if block:
effd06bb
DK
137 if timeout:
138 call_cls = TwitterStreamCallWithTimeout
139 else:
140 call_cls = TwitterStreamCall
2300838f
MV
141 else:
142 call_cls = TwitterStreamCallNonBlocking
143
dd648a25
MV
144 TwitterStreamCall.__init__(
145 self, auth=auth, format="json", domain=domain,
2300838f 146 callable_cls=call_cls,
86318060 147 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)