]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Merge pull request #206 from polm/patch-1
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25 14
894d3717 15python27_3 = sys.version_info >= (2, 7)
ff33f9f1 16def recv_chunk(sock): # -> bytearray:
23dcd469 17
2aee87cc
AD
18 header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
19 crlf = header.find(b'\r\n') # Find the HTTP chunk size.
26938000 20
443e409d 21 if crlf > 0: # If there is a length, then process it
26938000 22
90ec2759 23 size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB.
2aee87cc 24 chunk = bytearray(size)
0d92536c 25 start = crlf + 2 # Add in the length of the header's CRLF pair.
0d92536c 26
2aee87cc
AD
27 if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
28 chunk[:size] = header[start:start + size]
29 # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
3e782f63 30 # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
a8880f9f
AD
31 # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
32 # and eliminates the need to address them.
3e782f63 33 else: # There is more to read in the chunk.
2aee87cc
AD
34 end = len(header) - start
35 chunk[:end] = header[start:]
894d3717 36 if python27_3: # When possible, use less memory by reading directly into the buffer.
ff33f9f1
R
37 buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk.
38 sock.recv_into(buffer)
894d3717
BOT
39 else: # less efficient for python2.6 compatibility
40 chunk[end:] = sock.recv(max(0, size - end))
443e409d 41 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
0d92536c
AD
42
43 return chunk
28a8ef60 44
23dcd469 45 return bytearray()
26938000 46
26938000 47
dd648a25
MV
48class TwitterJSONIter(object):
49
effd06bb 50 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25 51 self.handle = handle
25ea832f
AD
52 self.uri = uri
53 self.arg_data = arg_data
2300838f 54 self.block = block
effd06bb 55 self.timeout = timeout
effd06bb 56
dd648a25
MV
57
58 def __iter__(self):
23dcd469 59 sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
67ddbde4 60 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
443e409d 61 sock.setblocking(self.block and not self.timeout)
e80f5491 62 buf = ''
23dcd469 63 json_decoder = json.JSONDecoder()
e28a1da9 64 timer = time.time()
dd648a25 65 while True:
dd648a25 66 try:
23dcd469
AD
67 buf = buf.lstrip()
68 res, ptr = json_decoder.raw_decode(buf)
69 buf = buf[ptr:]
dd648a25 70 yield wrap_response(res, self.handle.headers)
dd648a25 71 continue
e2d171b6 72 except ValueError as e:
221bc078 73 if self.block: pass
443e409d 74 else: yield None
2300838f 75 try:
c20d1a80 76 buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
fd613260 77 if self.timeout and not buf: # This is a non-blocking read.
effd06bb 78 ready_to_read = select.select([sock], [], [], self.timeout)
fd613260
R
79 if not ready_to_read[0] and time.time() - timer > self.timeout:
80 yield {'timeout': True}
81 continue
82 timer = time.time()
83 buf += recv_chunk(sock).decode('utf-8')
b29e07db 84 if not buf:
28a8ef60 85 yield {'hangup': True}
e80f5491 86 break
2300838f 87 except SSLError as e:
443e409d
AD
88 # Error from a non-blocking read of an empty buffer.
89 if (not self.block or self.timeout) and (e.errno == 2): pass
0d92536c 90 else: raise
2300838f 91
d488eec0 92def handle_stream_response(req, uri, arg_data, block, timeout=None):
0d92536c
AD
93 try:
94 handle = urllib_request.urlopen(req,)
95 except urllib_error.HTTPError as e:
96 raise TwitterHTTPError(e, uri, 'json', arg_data)
effd06bb
DK
97 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
98
99class TwitterStreamCallWithTimeout(TwitterCall):
100 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 101 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
102
103class TwitterStreamCall(TwitterCall):
9e19b5a9 104 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 105 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
106
107class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 108 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 109 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
110
111class TwitterStream(TwitterStreamCall):
24950891 112 """
a6a7f763
MV
113 The TwitterStream object is an interface to the Twitter Stream API
114 (stream.twitter.com). This can be used pretty much the same as the
115 Twitter class except the result of calling a method will be an
116 iterator that yields objects decoded from the stream. For
117 example::
24950891 118
14b7a6ee 119 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
120 iterator = twitter_stream.statuses.sample()
121
122 for tweet in iterator:
123 ...do something with this tweet...
124
125 The iterator will yield tweets forever and ever (until the stream
126 breaks at which point it raises a TwitterHTTPError.)
2300838f 127
8df7be82 128 The `block` parameter controls if the stream is blocking. Default
74150740
MV
129 is blocking (True). When set to False, the iterator will
130 occasionally yield None when there is no available message.
24950891 131 """
dd648a25 132 def __init__(
56d221bd 133 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 134 api_version='1.1', block=True, timeout=None):
dd648a25
MV
135 uriparts = ()
136 uriparts += (str(api_version),)
137
2300838f 138 if block:
effd06bb
DK
139 if timeout:
140 call_cls = TwitterStreamCallWithTimeout
141 else:
142 call_cls = TwitterStreamCall
2300838f
MV
143 else:
144 call_cls = TwitterStreamCallNonBlocking
145
dd648a25
MV
146 TwitterStreamCall.__init__(
147 self, auth=auth, format="json", domain=domain,
2300838f 148 callable_cls=call_cls,
86318060 149 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)