]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Handle HTTP chunks that only contain keep-alive delimiters.
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 def recv_chunk(sock): # -> bytearray:
16
17 buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
18 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
19
20 if crlf > 0: # If there is a length, then process it
21
22 remaining = int(buf[:crlf], 16) # Decode the chunk size.
23
24 start = crlf + 2 # Add in the length of the header's CRLF pair.
25 end = len(buf) - start
26
27 chunk = bytearray(remaining)
28
29 if end < remaining:
30 chunk[:end] = buf[start:]
31 chunk[end:] = sock.recv(remaining - end)
32 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
33 else: # E.g. an HTTP chunk with just a keep-alive delimiter.
34 chunk[:remaining] = buf[start:start + remaining]
35
36 return chunk
37
38 return bytearray()
39
40 ## recv_chunk()
41
42
43 class TwitterJSONIter(object):
44
45 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
46 self.handle = handle
47 self.uri = uri
48 self.arg_data = arg_data
49 self.block = block
50 self.timeout = timeout
51
52
53 def __iter__(self):
54 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
55 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
56 sock.setblocking(self.block and not self.timeout)
57 buf = u''
58 json_decoder = json.JSONDecoder()
59 timer = time.time()
60 while True:
61 try:
62 buf = buf.lstrip()
63 res, ptr = json_decoder.raw_decode(buf)
64 buf = buf[ptr:]
65 yield wrap_response(res, self.handle.headers)
66 timer = time.time()
67 continue
68 except ValueError as e:
69 if self.block: pass
70 else: yield None
71 try:
72 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
73 if self.timeout:
74 ready_to_read = select.select([sock], [], [], self.timeout)
75 if ready_to_read[0]:
76 buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
77 if time.time() - timer > self.timeout:
78 yield {'timeout': True}
79 else:
80 yield {'timeout': True}
81 else:
82 buf += recv_chunk(sock).decode('utf-8')
83 if not buf and self.block:
84 yield {'hangup': True}
85 except SSLError as e:
86 # Error from a non-blocking read of an empty buffer.
87 if (not self.block or self.timeout) and (e.errno == 2): pass
88 else: raise
89
90 def handle_stream_response(req, uri, arg_data, block, timeout=None):
91 try:
92 handle = urllib_request.urlopen(req,)
93 except urllib_error.HTTPError as e:
94 raise TwitterHTTPError(e, uri, 'json', arg_data)
95 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
96
97 class TwitterStreamCallWithTimeout(TwitterCall):
98 def _handle_response(self, req, uri, arg_data, _timeout=None):
99 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
100
101 class TwitterStreamCall(TwitterCall):
102 def _handle_response(self, req, uri, arg_data, _timeout=None):
103 return handle_stream_response(req, uri, arg_data, block=True)
104
105 class TwitterStreamCallNonBlocking(TwitterCall):
106 def _handle_response(self, req, uri, arg_data, _timeout=None):
107 return handle_stream_response(req, uri, arg_data, block=False)
108
109 class TwitterStream(TwitterStreamCall):
110 """
111 The TwitterStream object is an interface to the Twitter Stream API
112 (stream.twitter.com). This can be used pretty much the same as the
113 Twitter class except the result of calling a method will be an
114 iterator that yields objects decoded from the stream. For
115 example::
116
117 twitter_stream = TwitterStream(auth=OAuth(...))
118 iterator = twitter_stream.statuses.sample()
119
120 for tweet in iterator:
121 ...do something with this tweet...
122
123 The iterator will yield tweets forever and ever (until the stream
124 breaks at which point it raises a TwitterHTTPError.)
125
126 The `block` parameter controls if the stream is blocking. Default
127 is blocking (True). When set to False, the iterator will
128 occasionally yield None when there is no available message.
129 """
130 def __init__(
131 self, domain="stream.twitter.com", secure=True, auth=None,
132 api_version='1.1', block=True, timeout=None):
133 uriparts = ()
134 uriparts += (str(api_version),)
135
136 if block:
137 if timeout:
138 call_cls = TwitterStreamCallWithTimeout
139 else:
140 call_cls = TwitterStreamCall
141 else:
142 call_cls = TwitterStreamCallNonBlocking
143
144 TwitterStreamCall.__init__(
145 self, auth=auth, format="json", domain=domain,
146 callable_cls=call_cls,
147 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)