]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Merge pull request #178 from dkanygin/master
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 class TwitterJSONIter(object):
16
17 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
18 self.decoder = json.JSONDecoder()
19 self.handle = handle
20 self.buf = b""
21 self.block = block
22 self.timeout = timeout
23 self.timer = time.time()
24
25
26 def __iter__(self):
27 if sys.version_info >= (3, 0):
28 sock = self.handle.fp.raw._sock
29 else:
30 sock = self.handle.fp._sock.fp._sock
31 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
32 if not self.block or self.timeout:
33 sock.setblocking(False)
34 while True:
35 try:
36 utf8_buf = self.buf.decode('utf8').lstrip()
37 res, ptr = self.decoder.raw_decode(utf8_buf)
38 self.buf = utf8_buf[ptr:].encode('utf8')
39 yield wrap_response(res, self.handle.headers)
40 self.timer = time.time()
41 continue
42 except ValueError as e:
43 if self.block:
44 pass
45 else:
46 yield None
47 except urllib_error.HTTPError as e:
48 raise TwitterHTTPError(e, uri, self.format, arg_data)
49 # this is a non-blocking read (ie, it will return if any data is available)
50 try:
51 if self.timeout:
52 ready_to_read = select.select([sock], [], [], self.timeout)
53 if ready_to_read[0]:
54 self.buf += sock.recv(1024)
55 if time.time() - self.timer > self.timeout:
56 yield {"timeout":True}
57 else:
58 yield {"timeout":True}
59 else:
60 self.buf += sock.recv(1024)
61 except SSLError as e:
62 if (not self.block or self.timeout) and (e.errno == 2):
63 # Apparently this means there was nothing in the socket buf
64 pass
65 else:
66 raise
67
68 def handle_stream_response(req, uri, arg_data, block, timeout=None):
69 handle = urllib_request.urlopen(req,)
70 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
71
72 class TwitterStreamCallWithTimeout(TwitterCall):
73 def _handle_response(self, req, uri, arg_data, _timeout=None):
74 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
75
76 class TwitterStreamCall(TwitterCall):
77 def _handle_response(self, req, uri, arg_data, _timeout=None):
78 return handle_stream_response(req, uri, arg_data, block=True)
79
80 class TwitterStreamCallNonBlocking(TwitterCall):
81 def _handle_response(self, req, uri, arg_data, _timeout=None):
82 return handle_stream_response(req, uri, arg_data, block=False)
83
84 class TwitterStream(TwitterStreamCall):
85 """
86 The TwitterStream object is an interface to the Twitter Stream API
87 (stream.twitter.com). This can be used pretty much the same as the
88 Twitter class except the result of calling a method will be an
89 iterator that yields objects decoded from the stream. For
90 example::
91
92 twitter_stream = TwitterStream(auth=OAuth(...))
93 iterator = twitter_stream.statuses.sample()
94
95 for tweet in iterator:
96 ...do something with this tweet...
97
98 The iterator will yield tweets forever and ever (until the stream
99 breaks at which point it raises a TwitterHTTPError.)
100
101 The `block` parameter controls if the stream is blocking. Default
102 is blocking (True). When set to False, the iterator will
103 occasionally yield None when there is no available message.
104 """
105 def __init__(
106 self, domain="stream.twitter.com", secure=True, auth=None,
107 api_version='1.1', block=True, timeout=None):
108 uriparts = ()
109 uriparts += (str(api_version),)
110
111 if block:
112 if timeout:
113 call_cls = TwitterStreamCallWithTimeout
114 else:
115 call_cls = TwitterStreamCall
116 else:
117 call_cls = TwitterStreamCallNonBlocking
118
119 TwitterStreamCall.__init__(
120 self, auth=auth, format="json", domain=domain,
121 callable_cls=call_cls,
122 secure=secure, uriparts=uriparts, timeout=timeout)