]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Increase the size of the read buffer to be larger than the average tweet.
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25
MV
14
15class TwitterJSONIter(object):
16
effd06bb 17 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25
MV
18 self.decoder = json.JSONDecoder()
19 self.handle = handle
25ea832f
AD
20 self.uri = uri
21 self.arg_data = arg_data
b8fd1206 22 self.buf = b""
2300838f 23 self.block = block
effd06bb
DK
24 self.timeout = timeout
25 self.timer = time.time()
26
dd648a25
MV
27
28 def __iter__(self):
b89b6108
SP
29 if sys.version_info >= (3, 0):
30 sock = self.handle.fp.raw._sock
31 else:
32 sock = self.handle.fp._sock.fp._sock
67ddbde4 33 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
effd06bb 34 if not self.block or self.timeout:
2300838f 35 sock.setblocking(False)
dd648a25 36 while True:
dd648a25 37 try:
b8fd1206 38 utf8_buf = self.buf.decode('utf8').lstrip()
d488eec0
AD
39 if utf8_buf and utf8_buf[0] != '{': # Remove the hex delimiter length and extra whitespace.
40 utf8_buf = utf8_buf.lstrip('0123456789abcdefABCDEF')
41 utf8_buf = utf8_buf.lstrip()
b8fd1206
MV
42 res, ptr = self.decoder.raw_decode(utf8_buf)
43 self.buf = utf8_buf[ptr:].encode('utf8')
dd648a25 44 yield wrap_response(res, self.handle.headers)
effd06bb 45 self.timer = time.time()
dd648a25 46 continue
e2d171b6 47 except ValueError as e:
2300838f
MV
48 if self.block:
49 pass
50 else:
51 yield None
d488eec0
AD
52 except urllib_error.HTTPError as e: # Probably unnecessary, no dynamic url calls in the try block.
53 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
e2d171b6 54 # this is a non-blocking read (ie, it will return if any data is available)
2300838f 55 try:
effd06bb
DK
56 if self.timeout:
57 ready_to_read = select.select([sock], [], [], self.timeout)
58 if ready_to_read[0]:
59 self.buf += sock.recv(1024)
60 if time.time() - self.timer > self.timeout:
61 yield {"timeout":True}
62 else:
63 yield {"timeout":True}
64 else:
ef99d734 65 self.buf += sock.recv(2048)
2300838f 66 except SSLError as e:
04478c23 67 if (not self.block or self.timeout) and (e.errno == 2):
2300838f
MV
68 # Apparently this means there was nothing in the socket buf
69 pass
70 else:
71 raise
25ea832f 72 except urllib_error.HTTPError as e:
d488eec0 73 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
2300838f 74
d488eec0 75def handle_stream_response(req, uri, arg_data, block, timeout=None):
2300838f 76 handle = urllib_request.urlopen(req,)
effd06bb
DK
77 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
78
79class TwitterStreamCallWithTimeout(TwitterCall):
80 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 81 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
82
83class TwitterStreamCall(TwitterCall):
9e19b5a9 84 def _handle_response(self, req, uri, arg_data, _timeout=None):
d488eec0 85 return handle_stream_response(req, uri, arg_data, block=True)
2300838f
MV
86
87class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 88 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 89 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
90
91class TwitterStream(TwitterStreamCall):
24950891 92 """
a6a7f763
MV
93 The TwitterStream object is an interface to the Twitter Stream API
94 (stream.twitter.com). This can be used pretty much the same as the
95 Twitter class except the result of calling a method will be an
96 iterator that yields objects decoded from the stream. For
97 example::
24950891 98
14b7a6ee 99 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
100 iterator = twitter_stream.statuses.sample()
101
102 for tweet in iterator:
103 ...do something with this tweet...
104
105 The iterator will yield tweets forever and ever (until the stream
106 breaks at which point it raises a TwitterHTTPError.)
2300838f 107
8df7be82 108 The `block` parameter controls if the stream is blocking. Default
74150740
MV
109 is blocking (True). When set to False, the iterator will
110 occasionally yield None when there is no available message.
24950891 111 """
dd648a25 112 def __init__(
56d221bd 113 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 114 api_version='1.1', block=True, timeout=None):
dd648a25
MV
115 uriparts = ()
116 uriparts += (str(api_version),)
117
2300838f 118 if block:
effd06bb
DK
119 if timeout:
120 call_cls = TwitterStreamCallWithTimeout
121 else:
122 call_cls = TwitterStreamCall
2300838f
MV
123 else:
124 call_cls = TwitterStreamCallNonBlocking
125
dd648a25
MV
126 TwitterStreamCall.__init__(
127 self, auth=auth, format="json", domain=domain,
2300838f 128 callable_cls=call_cls,
86318060 129 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)