]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
eb0774295f813b742c050297e3bbf48a1e049e34
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 def recv_chunk(sock):
16 buf = sock.recv(10) # Scan for an up to a 4GiB chunk size (0xffffffff).
17 if buf:
18 crlf = buf.find(b'\r\n') # Find the HTTP chunk size.
19 if crlf > 0:
20 remaining = int(buf[:crlf], 16) # Decode the chunk size.
21 chunk = bytearray(remaining) # Create the chunk buffer.
22
23 start = crlf + 2 # Add in the length of the header's CRLF pair.
24 end = len(buf) - start
25
26 chunk[:end] = buf[start:]
27 chunk[end:] = sock.recv(remaining - end)
28
29 sock.recv(2) # Read the trailing CRLF pair. Throw it away.
30
31 return chunk
32 return b''
33
34 ## recv_chunk()
35
36
37 class TwitterJSONIter(object):
38
39 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
40 self.decoder = json.JSONDecoder()
41 self.handle = handle
42 self.uri = uri
43 self.arg_data = arg_data
44 self.buf = b""
45 self.block = block
46 self.timeout = timeout
47 self.timer = time.time()
48
49
50 def __iter__(self):
51 if sys.version_info >= (3, 0):
52 sock = self.handle.fp.raw._sock
53 else:
54 sock = self.handle.fp._sock.fp._sock
55 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
56 if not self.block or self.timeout:
57 sock.setblocking(False)
58 while True:
59 try:
60 utf8_buf = self.buf.decode('utf8').lstrip()
61 res, ptr = self.decoder.raw_decode(utf8_buf)
62 self.buf = utf8_buf[ptr:].encode('utf8')
63 yield wrap_response(res, self.handle.headers)
64 self.timer = time.time()
65 continue
66 except ValueError as e:
67 if self.block:
68 pass
69 else:
70 yield None
71 # this is a non-blocking read (ie, it will return if any data is available)
72 try:
73 if self.timeout:
74 ready_to_read = select.select([sock], [], [], self.timeout)
75 if ready_to_read[0]:
76 self.buf += recv_chunk(sock)
77 if time.time() - self.timer > self.timeout:
78 yield {"timeout":True}
79 else:
80 yield {"timeout":True}
81 else:
82 self.buf += recv_chunk(sock)
83 except SSLError as e:
84 if (not self.block or self.timeout) and (e.errno == 2):
85 # Apparently this means there was nothing in the socket buf
86 pass
87 else:
88 raise
89 except urllib_error.HTTPError as e:
90 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
91
92 def handle_stream_response(req, uri, arg_data, block, timeout=None):
93 handle = urllib_request.urlopen(req,)
94 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
95
96 class TwitterStreamCallWithTimeout(TwitterCall):
97 def _handle_response(self, req, uri, arg_data, _timeout=None):
98 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
99
100 class TwitterStreamCall(TwitterCall):
101 def _handle_response(self, req, uri, arg_data, _timeout=None):
102 return handle_stream_response(req, uri, arg_data, block=True)
103
104 class TwitterStreamCallNonBlocking(TwitterCall):
105 def _handle_response(self, req, uri, arg_data, _timeout=None):
106 return handle_stream_response(req, uri, arg_data, block=False)
107
108 class TwitterStream(TwitterStreamCall):
109 """
110 The TwitterStream object is an interface to the Twitter Stream API
111 (stream.twitter.com). This can be used pretty much the same as the
112 Twitter class except the result of calling a method will be an
113 iterator that yields objects decoded from the stream. For
114 example::
115
116 twitter_stream = TwitterStream(auth=OAuth(...))
117 iterator = twitter_stream.statuses.sample()
118
119 for tweet in iterator:
120 ...do something with this tweet...
121
122 The iterator will yield tweets forever and ever (until the stream
123 breaks at which point it raises a TwitterHTTPError.)
124
125 The `block` parameter controls if the stream is blocking. Default
126 is blocking (True). When set to False, the iterator will
127 occasionally yield None when there is no available message.
128 """
129 def __init__(
130 self, domain="stream.twitter.com", secure=True, auth=None,
131 api_version='1.1', block=True, timeout=None):
132 uriparts = ()
133 uriparts += (str(api_version),)
134
135 if block:
136 if timeout:
137 call_cls = TwitterStreamCallWithTimeout
138 else:
139 call_cls = TwitterStreamCall
140 else:
141 call_cls = TwitterStreamCallNonBlocking
142
143 TwitterStreamCall.__init__(
144 self, auth=auth, format="json", domain=domain,
145 callable_cls=call_cls,
146 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)