]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Simplify further. New args to example script. Update documentation.
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
d3915c61
MV
15PY_27_OR_HIGHER = sys.version_info >= (2, 7)
16PY_3_OR_HIGHER = sys.version_info >= (3, 0)
17
18Timeout = {'timeout': True}
19Hangup = {'hangup': True}
20
21
ff33f9f1 22def recv_chunk(sock): # -> bytearray:
23dcd469 23
2aee87cc
AD
24 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
25 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
26938000 26
443e409d 27 if crlf > 0: # If there is a length, then process it
26938000 28
90ec2759 29 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
2aee87cc 30 chunk = bytearray(size)
0d92536c 31 start = crlf + 2 # Add in the length of the header's CRLF pair.
0d92536c 32
2aee87cc
AD
33 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
34 chunk[:size] = header[start:start + size]
35 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
3e782f63 36 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
a8880f9f
AD
37 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
38 # and eliminates the need to address them.
3e782f63 39 else: # There is more to read in the chunk.
2aee87cc
AD
40 end = len(header) - start
41 chunk[:end] = header[start:]
d3915c61 42 if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer.
ff33f9f1
R
43 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
44 sock.recv_into(buffer)
894d3717
BOT
45 else: # less efficient for python2.6 compatibility
46 chunk[end:] = sock.recv(max(0, size - end))
443e409d 47 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
0d92536c
AD
48
49 return chunk
28a8ef60 50
23dcd469 51 return bytearray()
26938000 52
26938000 53
d3915c61
MV
54class Timer(object):
55 def __init__(self, timeout):
56 # If timeout is None, we always expire.
57 self.timeout = timeout
58 self.reset()
59
60 def reset(self):
61 self.time = time.time()
62
63 def expired(self):
42b9cdee
MV
64 """
65 If expired, reset the timer and return True.
66 """
d3915c61
MV
67 if self.timeout is None:
68 return True
69 elif time.time() - self.time > self.timeout:
70 self.reset()
71 return True
72 return False
73
74
dd648a25
MV
75class TwitterJSONIter(object):
76
effd06bb 77 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 78 self.handle = handle
25ea832f
AD
79 self.uri = uri
80 self.arg_data = arg_data
2300838f 81 self.block = block
effd06bb 82 self.timeout = timeout
effd06bb 83
dd648a25
MV
84
85 def __iter__(self):
d3915c61
MV
86 actually_blocking = self.block and not self.timeout
87 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
67ddbde4 88 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
d3915c61 89 sock.setblocking(actually_blocking)
e80f5491 90 buf = ''
23dcd469 91 json_decoder = json.JSONDecoder()
d3915c61
MV
92 timer = Timer(self.timeout)
93 timeout_token = Timeout if self.timeout else None
dd648a25 94 while True:
42b9cdee 95 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
dd648a25 96 try:
23dcd469
AD
97 res, ptr = json_decoder.raw_decode(buf)
98 buf = buf[ptr:]
42b9cdee
MV
99 except ValueError:
100 pass
101 else:
dd648a25 102 yield wrap_response(res, self.handle.headers)
d3915c61 103 timer.reset()
dd648a25 104 continue
2300838f 105 try:
fd613260 106 if self.timeout and not buf: # This is a non-blocking read.
d3915c61
MV
107 ready_to_read = select.select([sock], [], [], self.timeout)[0]
108 if not ready_to_read and timer.expired():
109 yield timeout_token
fd613260 110 continue
fd613260 111 buf += recv_chunk(sock).decode('utf-8')
b29e07db 112 if not buf:
d3915c61 113 yield Hangup
e80f5491 114 break
2300838f 115 except SSLError as e:
443e409d 116 # Error from a non-blocking read of an empty buffer.
d3915c61
MV
117 if not actually_blocking and (e.errno == 2):
118 if timer.expired():
119 yield timeout_token
0d92536c 120 else: raise
2300838f 121
d488eec0 122def handle_stream_response(req, uri, arg_data, block, timeout=None):
0d92536c
AD
123 try:
124 handle = urllib_request.urlopen(req,)
125 except urllib_error.HTTPError as e:
126 raise TwitterHTTPError(e, uri, 'json', arg_data)
effd06bb
DK
127 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
128
d3915c61 129class TwitterStream(TwitterCall):
24950891 130 """
42b9cdee
MV
131 The TwitterStream object is an interface to the Twitter Stream
132 API. This can be used pretty much the same as the Twitter class
133 except the result of calling a method will be an iterator that
134 yields objects decoded from the stream. For example::
24950891 135
14b7a6ee 136 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
137 iterator = twitter_stream.statuses.sample()
138
139 for tweet in iterator:
140 ...do something with this tweet...
141
42b9cdee
MV
142 The iterator will yield until the TCP connection breaks. When the
143 connection breaks, the iterator yields `{'hangup': True}`, and
144 raises `StopIteration` if iterated again.
2300838f 145
42b9cdee
MV
146 The `timeout` parameter controls the maximum time between
147 yields. If it is nonzero, then the iterator will yield either
148 stream data or `{'timeout': True}`. This is useful if you want
149 your program to do other stuff in between waiting for tweets.
150
151 The `block` parameter sets the stream to be non-blocking. In this
152 mode, the iterator always yields immediately. It returns stream
153 data, or `None`. Note that `timeout` supercedes this argument, so
154 it should also be set `None` to use this mode.
24950891 155 """
dd648a25 156 def __init__(
56d221bd 157 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 158 api_version='1.1', block=True, timeout=None):
d3915c61
MV
159 uriparts = (str(api_version),)
160 timeout = float(timeout) if timeout else None
161
162 class TwitterStreamCall(TwitterCall):
163 def _handle_response(self, req, uri, arg_data, _timeout=None):
164 return handle_stream_response(
165 req, uri, arg_data, block=block, timeout=_timeout or timeout)
166
167 TwitterCall.__init__(
dd648a25 168 self, auth=auth, format="json", domain=domain,
d3915c61 169 callable_cls=TwitterStreamCall,
86318060 170 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)