]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Simplify further. New args to example script. Update documentation.
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 PY_27_OR_HIGHER = sys.version_info >= (2, 7)
16 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
17
18 Timeout = {'timeout': True}
19 Hangup = {'hangup': True}
20
21
22 def recv_chunk(sock): # -> bytearray:
23
24 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
25 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
26
27 if crlf > 0: # If there is a length, then process it
28
29 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
30 chunk = bytearray(size)
31 start = crlf + 2 # Add in the length of the header's CRLF pair.
32
33 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
34 chunk[:size] = header[start:start + size]
35 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
36 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
37 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
38 # and eliminates the need to address them.
39 else: # There is more to read in the chunk.
40 end = len(header) - start
41 chunk[:end] = header[start:]
42 if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer.
43 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
44 sock.recv_into(buffer)
45 else: # less efficient for python2.6 compatibility
46 chunk[end:] = sock.recv(max(0, size - end))
47 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
48
49 return chunk
50
51 return bytearray()
52
53
54 class Timer(object):
55 def __init__(self, timeout):
56 # If timeout is None, we always expire.
57 self.timeout = timeout
58 self.reset()
59
60 def reset(self):
61 self.time = time.time()
62
63 def expired(self):
64 """
65 If expired, reset the timer and return True.
66 """
67 if self.timeout is None:
68 return True
69 elif time.time() - self.time > self.timeout:
70 self.reset()
71 return True
72 return False
73
74
75 class TwitterJSONIter(object):
76
77 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
78 self.handle = handle
79 self.uri = uri
80 self.arg_data = arg_data
81 self.block = block
82 self.timeout = timeout
83
84
85 def __iter__(self):
86 actually_blocking = self.block and not self.timeout
87 sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
88 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
89 sock.setblocking(actually_blocking)
90 buf = ''
91 json_decoder = json.JSONDecoder()
92 timer = Timer(self.timeout)
93 timeout_token = Timeout if self.timeout else None
94 while True:
95 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
96 try:
97 res, ptr = json_decoder.raw_decode(buf)
98 buf = buf[ptr:]
99 except ValueError:
100 pass
101 else:
102 yield wrap_response(res, self.handle.headers)
103 timer.reset()
104 continue
105 try:
106 if self.timeout and not buf: # This is a non-blocking read.
107 ready_to_read = select.select([sock], [], [], self.timeout)[0]
108 if not ready_to_read and timer.expired():
109 yield timeout_token
110 continue
111 buf += recv_chunk(sock).decode('utf-8')
112 if not buf:
113 yield Hangup
114 break
115 except SSLError as e:
116 # Error from a non-blocking read of an empty buffer.
117 if not actually_blocking and (e.errno == 2):
118 if timer.expired():
119 yield timeout_token
120 else: raise
121
122 def handle_stream_response(req, uri, arg_data, block, timeout=None):
123 try:
124 handle = urllib_request.urlopen(req,)
125 except urllib_error.HTTPError as e:
126 raise TwitterHTTPError(e, uri, 'json', arg_data)
127 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
128
129 class TwitterStream(TwitterCall):
130 """
131 The TwitterStream object is an interface to the Twitter Stream
132 API. This can be used pretty much the same as the Twitter class
133 except the result of calling a method will be an iterator that
134 yields objects decoded from the stream. For example::
135
136 twitter_stream = TwitterStream(auth=OAuth(...))
137 iterator = twitter_stream.statuses.sample()
138
139 for tweet in iterator:
140 ...do something with this tweet...
141
142 The iterator will yield until the TCP connection breaks. When the
143 connection breaks, the iterator yields `{'hangup': True}`, and
144 raises `StopIteration` if iterated again.
145
146 The `timeout` parameter controls the maximum time between
147 yields. If it is nonzero, then the iterator will yield either
148 stream data or `{'timeout': True}`. This is useful if you want
149 your program to do other stuff in between waiting for tweets.
150
151 The `block` parameter sets the stream to be non-blocking. In this
152 mode, the iterator always yields immediately. It returns stream
153 data, or `None`. Note that `timeout` supercedes this argument, so
154 it should also be set `None` to use this mode.
155 """
156 def __init__(
157 self, domain="stream.twitter.com", secure=True, auth=None,
158 api_version='1.1', block=True, timeout=None):
159 uriparts = (str(api_version),)
160 timeout = float(timeout) if timeout else None
161
162 class TwitterStreamCall(TwitterCall):
163 def _handle_response(self, req, uri, arg_data, _timeout=None):
164 return handle_stream_response(
165 req, uri, arg_data, block=block, timeout=_timeout or timeout)
166
167 TwitterCall.__init__(
168 self, auth=auth, format="json", domain=domain,
169 callable_cls=TwitterStreamCall,
170 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)