]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Deal with SSL errno 2 in timeout mode
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
effd06bb 11import sys, select, time
dd648a25 12
2983f31e 13from .api import TwitterCall, wrap_response, TwitterHTTPError
dd648a25
MV
14
15class TwitterJSONIter(object):
16
effd06bb 17 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
dd648a25
MV
18 self.decoder = json.JSONDecoder()
19 self.handle = handle
b8fd1206 20 self.buf = b""
2300838f 21 self.block = block
effd06bb
DK
22 self.timeout = timeout
23 self.timer = time.time()
24
dd648a25
MV
25
26 def __iter__(self):
b89b6108
SP
27 if sys.version_info >= (3, 0):
28 sock = self.handle.fp.raw._sock
29 else:
30 sock = self.handle.fp._sock.fp._sock
67ddbde4 31 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
effd06bb 32 if not self.block or self.timeout:
2300838f 33 sock.setblocking(False)
dd648a25 34 while True:
dd648a25 35 try:
b8fd1206
MV
36 utf8_buf = self.buf.decode('utf8').lstrip()
37 res, ptr = self.decoder.raw_decode(utf8_buf)
38 self.buf = utf8_buf[ptr:].encode('utf8')
dd648a25 39 yield wrap_response(res, self.handle.headers)
effd06bb 40 self.timer = time.time()
dd648a25 41 continue
e2d171b6 42 except ValueError as e:
2300838f
MV
43 if self.block:
44 pass
45 else:
46 yield None
dd648a25
MV
47 except urllib_error.HTTPError as e:
48 raise TwitterHTTPError(e, uri, self.format, arg_data)
e2d171b6 49 # this is a non-blocking read (ie, it will return if any data is available)
2300838f 50 try:
effd06bb
DK
51 if self.timeout:
52 ready_to_read = select.select([sock], [], [], self.timeout)
53 if ready_to_read[0]:
54 self.buf += sock.recv(1024)
55 if time.time() - self.timer > self.timeout:
56 yield {"timeout":True}
57 else:
58 yield {"timeout":True}
59 else:
60 self.buf += sock.recv(1024)
2300838f 61 except SSLError as e:
a5ba0e8d 62 if (not self.block) and (e.errno == 2) or self.timeout:
2300838f
MV
63 # Apparently this means there was nothing in the socket buf
64 pass
65 else:
66 raise
67
effd06bb 68def handle_stream_response(req, uri, arg_data, block, timeout=None):
2300838f 69 handle = urllib_request.urlopen(req,)
effd06bb
DK
70 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
71
72class TwitterStreamCallWithTimeout(TwitterCall):
73 def _handle_response(self, req, uri, arg_data, _timeout=None):
74 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
dd648a25
MV
75
76class TwitterStreamCall(TwitterCall):
9e19b5a9 77 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f
MV
78 return handle_stream_response(req, uri, arg_data, block=True)
79
80class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 81 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 82 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
83
84class TwitterStream(TwitterStreamCall):
24950891 85 """
a6a7f763
MV
86 The TwitterStream object is an interface to the Twitter Stream API
87 (stream.twitter.com). This can be used pretty much the same as the
88 Twitter class except the result of calling a method will be an
89 iterator that yields objects decoded from the stream. For
90 example::
24950891 91
14b7a6ee 92 twitter_stream = TwitterStream(auth=OAuth(...))
24950891
MV
93 iterator = twitter_stream.statuses.sample()
94
95 for tweet in iterator:
96 ...do something with this tweet...
97
98 The iterator will yield tweets forever and ever (until the stream
99 breaks at which point it raises a TwitterHTTPError.)
2300838f 100
8df7be82 101 The `block` parameter controls if the stream is blocking. Default
74150740
MV
102 is blocking (True). When set to False, the iterator will
103 occasionally yield None when there is no available message.
24950891 104 """
dd648a25 105 def __init__(
56d221bd 106 self, domain="stream.twitter.com", secure=True, auth=None,
effd06bb 107 api_version='1.1', block=True, timeout=None):
dd648a25
MV
108 uriparts = ()
109 uriparts += (str(api_version),)
110
2300838f 111 if block:
effd06bb
DK
112 if timeout:
113 call_cls = TwitterStreamCallWithTimeout
114 else:
115 call_cls = TwitterStreamCall
2300838f
MV
116 else:
117 call_cls = TwitterStreamCallNonBlocking
118
dd648a25
MV
119 TwitterStreamCall.__init__(
120 self, auth=auth, format="json", domain=domain,
2300838f 121 callable_cls=call_cls,
effd06bb 122 secure=secure, uriparts=uriparts, timeout=timeout)