]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Version 1.12.1
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
23dcd469
AD
15def recv_chunk(sock): # -> bytearray:
16
0d92536c
AD
17 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
26938000 19
443e409d 20 if crlf > 0: # If there is a length, then process it
26938000 21
0d92536c 22 remaining = int(buf[:crlf], 16) # Decode the chunk size.
26938000 23
0d92536c
AD
24 start = crlf + 2 # Add in the length of the header's CRLF pair.
25 end = len(buf) - start
26938000 26
0d92536c 27 chunk = bytearray(remaining)
0d92536c 28
e80f5491 29 if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
3e782f63
AD
30 chunk[:remaining] = buf[start:start + remaining]
31 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
32 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
a8880f9f
AD
33 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
34 # and eliminates the need to address them.
3e782f63 35 else: # There is more to read in the chunk.
443e409d
AD
36 chunk[:end] = buf[start:]
37 chunk[end:] = sock.recv(remaining - end)
38 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
0d92536c
AD
39
40 return chunk
28a8ef60 41
23dcd469 42 return bytearray()
26938000
AD
43
44## recv_chunk()
45
46
dd648a25
MV
47class TwitterJSONIter(object):
48
effd06bb 49 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 50 self.handle = handle
25ea832f
AD
51 self.uri = uri
52 self.arg_data = arg_data
2300838f 53 self.block = block
effd06bb 54 self.timeout = timeout
effd06bb 55
dd648a25
MV
56
57 def __iter__(self):
23dcd469 58 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
67ddbde4 59 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
443e409d 60 sock.setblocking(self.block and not self.timeout)
e80f5491 61 buf = ''
23dcd469 62 json_decoder = json.JSONDecoder()
e28a1da9 63 timer = time.time()
dd648a25 64 while True:
dd648a25 65 try:
23dcd469
AD
66 buf = buf.lstrip()
67 res, ptr = json_decoder.raw_decode(buf)
68 buf = buf[ptr:]
dd648a25 69 yield wrap_response(res, self.handle.headers)
e28a1da9 70 timer = time.time()
dd648a25 71 continue
e2d171b6 72 except ValueError as e:
443e409d
AD
73 if self.block: pass
74 else: yield None
2300838f 75 try:
c20d1a80 76 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
0d92536c 77 if self.timeout:
effd06bb
DK
78 ready_to_read = select.select([sock], [], [], self.timeout)
79 if ready_to_read[0]:
0d92536c 80 buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
e28a1da9 81 if time.time() - timer > self.timeout:
28a8ef60 82 yield {'timeout': True}
3e782f63 83 else: yield {'timeout': True}
effd06bb 84 else:
23dcd469 85 buf += recv_chunk(sock).decode('utf-8')
28a8ef60
AD
86 if not buf and self.block:
87 yield {'hangup': True}
e80f5491 88 break
2300838f 89 except SSLError as e:
443e409d
AD
90 # Error from a non-blocking read of an empty buffer.
91 if (not self.block or self.timeout) and (e.errno == 2): pass
0d92536c 92 else: raise
2300838f 93
d488eec0 94def handle_stream_response(req, uri, arg_data, block, timeout=None):
0d92536c
AD
95 try:
96 handle = urllib_request.urlopen(req,)
97 except urllib_error.HTTPError as e:
98 raise TwitterHTTPError(e, uri, 'json', arg_data)
effd06bb
DK
99 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
100
101class TwitterStreamCallWithTimeout(TwitterCall):
102 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 103 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
104
105class TwitterStreamCall(TwitterCall):
9e19b5a9 106 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 107 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
108
109class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 110 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 111 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
112
113class TwitterStream(TwitterStreamCall):
24950891 114 """
a6a7f763
MV
115 The TwitterStream object is an interface to the Twitter Stream API
116 (stream.twitter.com). This can be used pretty much the same as the
117 Twitter class except the result of calling a method will be an
118 iterator that yields objects decoded from the stream. For
119 example::
24950891 120
14b7a6ee 121 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
122 iterator = twitter_stream.statuses.sample()
123
124 for tweet in iterator:
125 ...do something with this tweet...
126
127 The iterator will yield tweets forever and ever (until the stream
128 breaks at which point it raises a TwitterHTTPError.)
2300838f 129
8df7be82 130 The `block` parameter controls if the stream is blocking. Default
74150740
MV
131 is blocking (True). When set to False, the iterator will
132 occasionally yield None when there is no available message.
24950891 133 """
dd648a25 134 def __init__(
56d221bd 135 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 136 api_version='1.1', block=True, timeout=None):
dd648a25
MV
137 uriparts = ()
138 uriparts += (str(api_version),)
139
2300838f 140 if block:
effd06bb
DK
141 if timeout:
142 call_cls = TwitterStreamCallWithTimeout
143 else:
144 call_cls = TwitterStreamCall
2300838f
MV
145 else:
146 call_cls = TwitterStreamCallNonBlocking
147
dd648a25
MV
148 TwitterStreamCall.__init__(
149 self, auth=auth, format="json", domain=domain,
2300838f 150 callable_cls=call_cls,
86318060 151 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)