]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Further refine socket management.
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
23dcd469
AD
15def recv_chunk(sock): # -> bytearray:
16
28a8ef60 17 timeout = sock.gettimeout(); sock.setblocking(True) # Read the whole HTTP chunk.
26938000
AD
18 buf = sock.recv(10) # Scan for an up to a 4GiB chunk size (0xffffffff).
19 if buf:
20 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
21 if crlf > 0:
22 remaining = int(buf[:crlf], 16) # Decode the chunk size.
23 chunk = bytearray(remaining) # Create the chunk buffer.
24
25 start = crlf + 2 # Add in the length of the header's CRLF pair.
26 end = len(buf) - start
27
28 chunk[:end] = buf[start:]
29 chunk[end:] = sock.recv(remaining - end)
30
31 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
28a8ef60 32 sock.settimeout(timeout)
26938000
AD
33
34 return chunk
28a8ef60
AD
35
36 sock.settimeout(timeout)
23dcd469 37 return bytearray()
26938000
AD
38
39## recv_chunk()
40
41
dd648a25
MV
42class TwitterJSONIter(object):
43
effd06bb 44 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 45 self.handle = handle
25ea832f
AD
46 self.uri = uri
47 self.arg_data = arg_data
2300838f 48 self.block = block
effd06bb 49 self.timeout = timeout
effd06bb 50
dd648a25
MV
51
52 def __iter__(self):
23dcd469 53 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
67ddbde4 54 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
28a8ef60 55 sock.setblocking(self.block and not self.timeout) # not (not self.block or self.timeout)
23dcd469
AD
56 buf = u''
57 json_decoder = json.JSONDecoder()
e28a1da9 58 timer = time.time()
dd648a25 59 while True:
dd648a25 60 try:
23dcd469
AD
61 buf = buf.lstrip()
62 res, ptr = json_decoder.raw_decode(buf)
63 buf = buf[ptr:]
dd648a25 64 yield wrap_response(res, self.handle.headers)
e28a1da9 65 timer = time.time()
dd648a25 66 continue
e2d171b6 67 except ValueError as e:
2300838f
MV
68 if self.block:
69 pass
70 else:
71 yield None
2300838f 72 try:
28a8ef60
AD
73 if self.timeout: # this is a non-blocking read (ie, it will return if any data is available)
74
effd06bb
DK
75 ready_to_read = select.select([sock], [], [], self.timeout)
76 if ready_to_read[0]:
23dcd469 77 buf += recv_chunk(sock).decode('utf-8')
e28a1da9 78 if time.time() - timer > self.timeout:
28a8ef60 79 yield {'timeout': True}
effd06bb 80 else:
28a8ef60 81 yield {'timeout': True}
effd06bb 82 else:
23dcd469 83 buf += recv_chunk(sock).decode('utf-8')
28a8ef60
AD
84 if not buf and self.block:
85 yield {'hangup': True}
2300838f 86 except SSLError as e:
04478c23 87 if (not self.block or self.timeout) and (e.errno == 2):
2300838f
MV
88 # Apparently this means there was nothing in the socket buf
89 pass
90 else:
91 raise
25ea832f 92 except urllib_error.HTTPError as e:
d488eec0 93 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
2300838f 94
d488eec0 95def handle_stream_response(req, uri, arg_data, block, timeout=None):
2300838f 96 handle = urllib_request.urlopen(req,)
effd06bb
DK
97 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
98
99class TwitterStreamCallWithTimeout(TwitterCall):
100 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 101 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
102
103class TwitterStreamCall(TwitterCall):
9e19b5a9 104 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 105 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
106
107class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 108 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 109 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
110
111class TwitterStream(TwitterStreamCall):
24950891 112 """
a6a7f763
MV
113 The TwitterStream object is an interface to the Twitter Stream API
114 (stream.twitter.com). This can be used pretty much the same as the
115 Twitter class except the result of calling a method will be an
116 iterator that yields objects decoded from the stream. For
117 example::
24950891 118
14b7a6ee 119 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
120 iterator = twitter_stream.statuses.sample()
121
122 for tweet in iterator:
123 ...do something with this tweet...
124
125 The iterator will yield tweets forever and ever (until the stream
126 breaks at which point it raises a TwitterHTTPError.)
2300838f 127
8df7be82 128 The `block` parameter controls if the stream is blocking. Default
74150740
MV
129 is blocking (True). When set to False, the iterator will
130 occasionally yield None when there is no available message.
24950891 131 """
dd648a25 132 def __init__(
56d221bd 133 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 134 api_version='1.1', block=True, timeout=None):
dd648a25
MV
135 uriparts = ()
136 uriparts += (str(api_version),)
137
2300838f 138 if block:
effd06bb
DK
139 if timeout:
140 call_cls = TwitterStreamCallWithTimeout
141 else:
142 call_cls = TwitterStreamCall
2300838f
MV
143 else:
144 call_cls = TwitterStreamCallNonBlocking
145
dd648a25
MV
146 TwitterStreamCall.__init__(
147 self, auth=auth, format="json", domain=domain,
2300838f 148 callable_cls=call_cls,
86318060 149 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)