]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Merge pull request #203 from RouxRC/pr-fix-timeout
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 python27_3 = sys.version_info >= (2, 7)
16 def recv_chunk(sock): # -> bytearray:
17
18 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
19 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
20
21 if crlf > 0: # If there is a length, then process it
22
23 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
24 chunk = bytearray(size)
25 start = crlf + 2 # Add in the length of the header's CRLF pair.
26
27 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
28 chunk[:size] = header[start:start + size]
29 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
30 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
31 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
32 # and eliminates the need to address them.
33 else: # There is more to read in the chunk.
34 end = len(header) - start
35 chunk[:end] = header[start:]
36 if python27_3: # When possible, use less memory by reading directly into the buffer.
37 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
38 sock.recv_into(buffer)
39 else: # less efficient for python2.6 compatibility
40 chunk[end:] = sock.recv(max(0, size - end))
41 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
42
43 return chunk
44
45 return bytearray()
46
47
48 class TwitterJSONIter(object):
49
50 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
51 self.handle = handle
52 self.uri = uri
53 self.arg_data = arg_data
54 self.block = block
55 self.timeout = timeout
56
57
58 def __iter__(self):
59 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
60 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
61 sock.setblocking(self.block and not self.timeout)
62 buf = ''
63 json_decoder = json.JSONDecoder()
64 timer = time.time()
65 while True:
66 try:
67 buf = buf.lstrip()
68 res, ptr = json_decoder.raw_decode(buf)
69 buf = buf[ptr:]
70 yield wrap_response(res, self.handle.headers)
71 continue
72 except ValueError as e:
73 if self.block: pass
74 else: yield None
75 try:
76 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
77 if self.timeout and not buf: # This is a non-blocking read.
78 ready_to_read = select.select([sock], [], [], self.timeout)
79 if not ready_to_read[0] and time.time() - timer > self.timeout:
80 yield {'timeout': True}
81 continue
82 timer = time.time()
83 buf += recv_chunk(sock).decode('utf-8')
84 if not buf:
85 yield {'hangup': True}
86 break
87 except SSLError as e:
88 # Error from a non-blocking read of an empty buffer.
89 if (not self.block or self.timeout) and (e.errno == 2): pass
90 else: raise
91
92 def handle_stream_response(req, uri, arg_data, block, timeout=None):
93 try:
94 handle = urllib_request.urlopen(req,)
95 except urllib_error.HTTPError as e:
96 raise TwitterHTTPError(e, uri, 'json', arg_data)
97 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
98
99 class TwitterStreamCallWithTimeout(TwitterCall):
100 def _handle_response(self, req, uri, arg_data, _timeout=None):
101 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
102
103 class TwitterStreamCall(TwitterCall):
104 def _handle_response(self, req, uri, arg_data, _timeout=None):
105 return handle_stream_response(req, uri, arg_data, block=True)
106
107 class TwitterStreamCallNonBlocking(TwitterCall):
108 def _handle_response(self, req, uri, arg_data, _timeout=None):
109 return handle_stream_response(req, uri, arg_data, block=False)
110
111 class TwitterStream(TwitterStreamCall):
112 """
113 The TwitterStream object is an interface to the Twitter Stream API
114 (stream.twitter.com). This can be used pretty much the same as the
115 Twitter class except the result of calling a method will be an
116 iterator that yields objects decoded from the stream. For
117 example::
118
119 twitter_stream = TwitterStream(auth=OAuth(...))
120 iterator = twitter_stream.statuses.sample()
121
122 for tweet in iterator:
123 ...do something with this tweet...
124
125 The iterator will yield tweets forever and ever (until the stream
126 breaks at which point it raises a TwitterHTTPError.)
127
128 The `block` parameter controls if the stream is blocking. Default
129 is blocking (True). When set to False, the iterator will
130 occasionally yield None when there is no available message.
131 """
132 def __init__(
133 self, domain="stream.twitter.com", secure=True, auth=None,
134 api_version='1.1', block=True, timeout=None):
135 uriparts = ()
136 uriparts += (str(api_version),)
137
138 if block:
139 if timeout:
140 call_cls = TwitterStreamCallWithTimeout
141 else:
142 call_cls = TwitterStreamCall
143 else:
144 call_cls = TwitterStreamCallNonBlocking
145
146 TwitterStreamCall.__init__(
147 self, auth=auth, format="json", domain=domain,
148 callable_cls=call_cls,
149 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)