]> jfr.im git - z_archive/twitter.git/blame_incremental - twitter/stream.py
Increase the size of the read buffer to be larger than the average tweet.
[z_archive/twitter.git] / twitter / stream.py
... / ...
CommitLineData
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
9from ssl import SSLError
10import socket
11import sys, select, time
12
13from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15class TwitterJSONIter(object):
16
17 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
18 self.decoder = json.JSONDecoder()
19 self.handle = handle
20 self.uri = uri
21 self.arg_data = arg_data
22 self.buf = b""
23 self.block = block
24 self.timeout = timeout
25 self.timer = time.time()
26
27
28 def __iter__(self):
29 if sys.version_info >= (3, 0):
30 sock = self.handle.fp.raw._sock
31 else:
32 sock = self.handle.fp._sock.fp._sock
33 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
34 if not self.block or self.timeout:
35 sock.setblocking(False)
36 while True:
37 try:
38 utf8_buf = self.buf.decode('utf8').lstrip()
39 if utf8_buf and utf8_buf[0] != '{': # Remove the hex delimiter length and extra whitespace.
40 utf8_buf = utf8_buf.lstrip('0123456789abcdefABCDEF')
41 utf8_buf = utf8_buf.lstrip()
42 res, ptr = self.decoder.raw_decode(utf8_buf)
43 self.buf = utf8_buf[ptr:].encode('utf8')
44 yield wrap_response(res, self.handle.headers)
45 self.timer = time.time()
46 continue
47 except ValueError as e:
48 if self.block:
49 pass
50 else:
51 yield None
52 except urllib_error.HTTPError as e: # Probably unnecessary, no dynamic url calls in the try block.
53 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
54 # this is a non-blocking read (ie, it will return if any data is available)
55 try:
56 if self.timeout:
57 ready_to_read = select.select([sock], [], [], self.timeout)
58 if ready_to_read[0]:
59 self.buf += sock.recv(1024)
60 if time.time() - self.timer > self.timeout:
61 yield {"timeout":True}
62 else:
63 yield {"timeout":True}
64 else:
65 self.buf += sock.recv(2048)
66 except SSLError as e:
67 if (not self.block or self.timeout) and (e.errno == 2):
68 # Apparently this means there was nothing in the socket buf
69 pass
70 else:
71 raise
72 except urllib_error.HTTPError as e:
73 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
74
75def handle_stream_response(req, uri, arg_data, block, timeout=None):
76 handle = urllib_request.urlopen(req,)
77 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
78
79class TwitterStreamCallWithTimeout(TwitterCall):
80 def _handle_response(self, req, uri, arg_data, _timeout=None):
81 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
82
83class TwitterStreamCall(TwitterCall):
84 def _handle_response(self, req, uri, arg_data, _timeout=None):
85 return handle_stream_response(req, uri, arg_data, block=True)
86
87class TwitterStreamCallNonBlocking(TwitterCall):
88 def _handle_response(self, req, uri, arg_data, _timeout=None):
89 return handle_stream_response(req, uri, arg_data, block=False)
90
91class TwitterStream(TwitterStreamCall):
92 """
93 The TwitterStream object is an interface to the Twitter Stream API
94 (stream.twitter.com). This can be used pretty much the same as the
95 Twitter class except the result of calling a method will be an
96 iterator that yields objects decoded from the stream. For
97 example::
98
99 twitter_stream = TwitterStream(auth=OAuth(...))
100 iterator = twitter_stream.statuses.sample()
101
102 for tweet in iterator:
103 ...do something with this tweet...
104
105 The iterator will yield tweets forever and ever (until the stream
106 breaks at which point it raises a TwitterHTTPError.)
107
108 The `block` parameter controls if the stream is blocking. Default
109 is blocking (True). When set to False, the iterator will
110 occasionally yield None when there is no available message.
111 """
112 def __init__(
113 self, domain="stream.twitter.com", secure=True, auth=None,
114 api_version='1.1', block=True, timeout=None):
115 uriparts = ()
116 uriparts += (str(api_version),)
117
118 if block:
119 if timeout:
120 call_cls = TwitterStreamCallWithTimeout
121 else:
122 call_cls = TwitterStreamCall
123 else:
124 call_cls = TwitterStreamCallNonBlocking
125
126 TwitterStreamCall.__init__(
127 self, auth=auth, format="json", domain=domain,
128 callable_cls=call_cls,
129 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)