]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Move recv_chunk() into a stand alone function. Further minimize memory use by using...
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
26938000
AD
15def recv_chunk(sock):
16 buf = sock.recv(10) # Scan for an up to a 4GiB chunk size (0xffffffff).
17 if buf:
18 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
19 if crlf > 0:
20 remaining = int(buf[:crlf], 16) # Decode the chunk size.
21 chunk = bytearray(remaining) # Create the chunk buffer.
22
23 start = crlf + 2 # Add in the length of the header's CRLF pair.
24 end = len(buf) - start
25
26 chunk[:end] = buf[start:]
27 chunk[end:] = sock.recv(remaining - end)
28
29 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
30
31 return chunk
32 return b''
33
34## recv_chunk()
35
36
dd648a25
MV
37class TwitterJSONIter(object):
38
effd06bb 39 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25
MV
40 self.decoder = json.JSONDecoder()
41 self.handle = handle
25ea832f
AD
42 self.uri = uri
43 self.arg_data = arg_data
b8fd1206 44 self.buf = b""
2300838f 45 self.block = block
effd06bb
DK
46 self.timeout = timeout
47 self.timer = time.time()
48
dd648a25
MV
49
50 def __iter__(self):
b89b6108
SP
51 if sys.version_info >= (3, 0):
52 sock = self.handle.fp.raw._sock
53 else:
54 sock = self.handle.fp._sock.fp._sock
67ddbde4 55 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
effd06bb 56 if not self.block or self.timeout:
2300838f 57 sock.setblocking(False)
dd648a25 58 while True:
dd648a25 59 try:
b8fd1206
MV
60 utf8_buf = self.buf.decode('utf8').lstrip()
61 res, ptr = self.decoder.raw_decode(utf8_buf)
62 self.buf = utf8_buf[ptr:].encode('utf8')
dd648a25 63 yield wrap_response(res, self.handle.headers)
effd06bb 64 self.timer = time.time()
dd648a25 65 continue
e2d171b6 66 except ValueError as e:
2300838f
MV
67 if self.block:
68 pass
69 else:
70 yield None
e2d171b6 71 # this is a non-blocking read (ie, it will return if any data is available)
2300838f 72 try:
effd06bb
DK
73 if self.timeout:
74 ready_to_read = select.select([sock], [], [], self.timeout)
75 if ready_to_read[0]:
26938000 76 self.buf += recv_chunk(sock)
effd06bb
DK
77 if time.time() - self.timer > self.timeout:
78 yield {"timeout":True}
79 else:
80 yield {"timeout":True}
81 else:
26938000 82 self.buf += recv_chunk(sock)
2300838f 83 except SSLError as e:
04478c23 84 if (not self.block or self.timeout) and (e.errno == 2):
2300838f
MV
85 # Apparently this means there was nothing in the socket buf
86 pass
87 else:
88 raise
25ea832f 89 except urllib_error.HTTPError as e:
d488eec0 90 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
2300838f 91
d488eec0 92def handle_stream_response(req, uri, arg_data, block, timeout=None):
2300838f 93 handle = urllib_request.urlopen(req,)
effd06bb
DK
94 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
95
96class TwitterStreamCallWithTimeout(TwitterCall):
97 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 98 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
99
100class TwitterStreamCall(TwitterCall):
9e19b5a9 101 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 102 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
103
104class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 105 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 106 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
107
108class TwitterStream(TwitterStreamCall):
24950891 109 """
a6a7f763
MV
110 The TwitterStream object is an interface to the Twitter Stream API
111 (stream.twitter.com). This can be used pretty much the same as the
112 Twitter class except the result of calling a method will be an
113 iterator that yields objects decoded from the stream. For
114 example::
24950891 115
14b7a6ee 116 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
117 iterator = twitter_stream.statuses.sample()
118
119 for tweet in iterator:
120 ...do something with this tweet...
121
122 The iterator will yield tweets forever and ever (until the stream
123 breaks at which point it raises a TwitterHTTPError.)
2300838f 124
8df7be82 125 The `block` parameter controls if the stream is blocking. Default
74150740
MV
126 is blocking (True). When set to False, the iterator will
127 occasionally yield None when there is no available message.
24950891 128 """
dd648a25 129 def __init__(
56d221bd 130 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 131 api_version='1.1', block=True, timeout=None):
dd648a25
MV
132 uriparts = ()
133 uriparts += (str(api_version),)
134
2300838f 135 if block:
effd06bb
DK
136 if timeout:
137 call_cls = TwitterStreamCallWithTimeout
138 else:
139 call_cls = TwitterStreamCall
2300838f
MV
140 else:
141 call_cls = TwitterStreamCallNonBlocking
142
dd648a25
MV
143 TwitterStreamCall.__init__(
144 self, auth=auth, format="json", domain=domain,
2300838f 145 callable_cls=call_cls,
86318060 146 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)