]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Clarify the comment about edge cases.
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 def recv_chunk(sock): # -> bytearray:
16
17 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
19
20 if crlf > 0: # If there is a length, then process it
21
22 remaining = int(buf[:crlf], 16) # Decode the chunk size.
23
24 start = crlf + 2 # Add in the length of the header's CRLF pair.
25 end = len(buf) - start
26
27 chunk = bytearray(remaining)
28
29 if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter.
30 chunk[:remaining] = buf[start:start + remaining]
31 # There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
32 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
33 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
34 # and eliminates the need to address them.
35 else: # There is more to read in the chunk.
36 chunk[:end] = buf[start:]
37 chunk[end:] = sock.recv(remaining - end)
38 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
39
40 return chunk
41
42 return bytearray()
43
44 ## recv_chunk()
45
46
47 class TwitterJSONIter(object):
48
49 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
50 self.handle = handle
51 self.uri = uri
52 self.arg_data = arg_data
53 self.block = block
54 self.timeout = timeout
55
56
57 def __iter__(self):
58 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
59 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
60 sock.setblocking(self.block and not self.timeout)
61 buf = u''
62 json_decoder = json.JSONDecoder()
63 timer = time.time()
64 while True:
65 try:
66 buf = buf.lstrip()
67 res, ptr = json_decoder.raw_decode(buf)
68 buf = buf[ptr:]
69 yield wrap_response(res, self.handle.headers)
70 timer = time.time()
71 continue
72 except ValueError as e:
73 if self.block: pass
74 else: yield None
75 try:
76 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
77 if self.timeout:
78 ready_to_read = select.select([sock], [], [], self.timeout)
79 if ready_to_read[0]:
80 buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
81 if time.time() - timer > self.timeout:
82 yield {'timeout': True}
83 else: yield {'timeout': True}
84 else:
85 buf += recv_chunk(sock).decode('utf-8')
86 if not buf and self.block:
87 yield {'hangup': True}
88 except SSLError as e:
89 # Error from a non-blocking read of an empty buffer.
90 if (not self.block or self.timeout) and (e.errno == 2): pass
91 else: raise
92
93 def handle_stream_response(req, uri, arg_data, block, timeout=None):
94 try:
95 handle = urllib_request.urlopen(req,)
96 except urllib_error.HTTPError as e:
97 raise TwitterHTTPError(e, uri, 'json', arg_data)
98 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
99
100 class TwitterStreamCallWithTimeout(TwitterCall):
101 def _handle_response(self, req, uri, arg_data, _timeout=None):
102 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
103
104 class TwitterStreamCall(TwitterCall):
105 def _handle_response(self, req, uri, arg_data, _timeout=None):
106 return handle_stream_response(req, uri, arg_data, block=True)
107
108 class TwitterStreamCallNonBlocking(TwitterCall):
109 def _handle_response(self, req, uri, arg_data, _timeout=None):
110 return handle_stream_response(req, uri, arg_data, block=False)
111
112 class TwitterStream(TwitterStreamCall):
113 """
114 The TwitterStream object is an interface to the Twitter Stream API
115 (stream.twitter.com). This can be used pretty much the same as the
116 Twitter class except the result of calling a method will be an
117 iterator that yields objects decoded from the stream. For
118 example::
119
120 twitter_stream = TwitterStream(auth=OAuth(...))
121 iterator = twitter_stream.statuses.sample()
122
123 for tweet in iterator:
124 ...do something with this tweet...
125
126 The iterator will yield tweets forever and ever (until the stream
127 breaks at which point it raises a TwitterHTTPError.)
128
129 The `block` parameter controls if the stream is blocking. Default
130 is blocking (True). When set to False, the iterator will
131 occasionally yield None when there is no available message.
132 """
133 def __init__(
134 self, domain="stream.twitter.com", secure=True, auth=None,
135 api_version='1.1', block=True, timeout=None):
136 uriparts = ()
137 uriparts += (str(api_version),)
138
139 if block:
140 if timeout:
141 call_cls = TwitterStreamCallWithTimeout
142 else:
143 call_cls = TwitterStreamCall
144 else:
145 call_cls = TwitterStreamCallNonBlocking
146
147 TwitterStreamCall.__init__(
148 self, auth=auth, format="json", domain=domain,
149 callable_cls=call_cls,
150 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)