]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Cosmetic edits.
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 class TwitterJSONIter(object):
16
17 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
18 self.decoder = json.JSONDecoder()
19 self.handle = handle
20 self.uri = uri
21 self.arg_data = arg_data
22 self.buf = b""
23 self.block = block
24 self.timeout = timeout
25 self.timer = time.time()
26
27
28 def recv_chunk(self, sock):
29 buf = sock.recv(32)
30 if buf:
31 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
32 if crlf > 0:
33 remaining = int(buf[:crlf].decode(), 16) # Decode the chunk size.
34 chunk = bytearray(buf[crlf + 2:]) # Create the chunk buffer.
35 remaining -= len(chunk)
36
37 while remaining > 0:
38 balance = sock.recv(remaining + 2) # Add the length of the chunk's CRLF pair.
39 if balance:
40 chunk.extend(balance)
41 remaining -= len(balance)
42 # If possible, remove the trailing CRLF pair. (This precludes an extra trip through the JSON parser.)
43 if remaining == -2 and chunk[-2] == 0x0d and chunk[-1] == 0x0a:
44 del chunk[-2:]
45 return chunk
46 return b''
47
48
49 def __iter__(self):
50 if sys.version_info >= (3, 0):
51 sock = self.handle.fp.raw._sock
52 else:
53 sock = self.handle.fp._sock.fp._sock
54 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
55 if not self.block or self.timeout:
56 sock.setblocking(False)
57 while True:
58 try:
59 utf8_buf = self.buf.decode('utf8').lstrip()
60 res, ptr = self.decoder.raw_decode(utf8_buf)
61 self.buf = utf8_buf[ptr:].encode('utf8')
62 yield wrap_response(res, self.handle.headers)
63 self.timer = time.time()
64 continue
65 except ValueError as e:
66 if self.block:
67 pass
68 else:
69 yield None
70 # this is a non-blocking read (ie, it will return if any data is available)
71 try:
72 if self.timeout:
73 ready_to_read = select.select([sock], [], [], self.timeout)
74 if ready_to_read[0]:
75 self.buf += self.recv_chunk(sock)
76 if time.time() - self.timer > self.timeout:
77 yield {"timeout":True}
78 else:
79 yield {"timeout":True}
80 else:
81 self.buf += self.recv_chunk(sock)
82 except SSLError as e:
83 if (not self.block or self.timeout) and (e.errno == 2):
84 # Apparently this means there was nothing in the socket buf
85 pass
86 else:
87 raise
88 except urllib_error.HTTPError as e:
89 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
90
91 def handle_stream_response(req, uri, arg_data, block, timeout=None):
92 handle = urllib_request.urlopen(req,)
93 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
94
95 class TwitterStreamCallWithTimeout(TwitterCall):
96 def _handle_response(self, req, uri, arg_data, _timeout=None):
97 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
98
99 class TwitterStreamCall(TwitterCall):
100 def _handle_response(self, req, uri, arg_data, _timeout=None):
101 return handle_stream_response(req, uri, arg_data, block=True)
102
103 class TwitterStreamCallNonBlocking(TwitterCall):
104 def _handle_response(self, req, uri, arg_data, _timeout=None):
105 return handle_stream_response(req, uri, arg_data, block=False)
106
107 class TwitterStream(TwitterStreamCall):
108 """
109 The TwitterStream object is an interface to the Twitter Stream API
110 (stream.twitter.com). This can be used pretty much the same as the
111 Twitter class except the result of calling a method will be an
112 iterator that yields objects decoded from the stream. For
113 example::
114
115 twitter_stream = TwitterStream(auth=OAuth(...))
116 iterator = twitter_stream.statuses.sample()
117
118 for tweet in iterator:
119 ...do something with this tweet...
120
121 The iterator will yield tweets forever and ever (until the stream
122 breaks at which point it raises a TwitterHTTPError.)
123
124 The `block` parameter controls if the stream is blocking. Default
125 is blocking (True). When set to False, the iterator will
126 occasionally yield None when there is no available message.
127 """
128 def __init__(
129 self, domain="stream.twitter.com", secure=True, auth=None,
130 api_version='1.1', block=True, timeout=None):
131 uriparts = ()
132 uriparts += (str(api_version),)
133
134 if block:
135 if timeout:
136 call_cls = TwitterStreamCallWithTimeout
137 else:
138 call_cls = TwitterStreamCall
139 else:
140 call_cls = TwitterStreamCallNonBlocking
141
142 TwitterStreamCall.__init__(
143 self, auth=auth, format="json", domain=domain,
144 callable_cls=call_cls,
145 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)