]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
094b5f95482385825ebb19c9228f17690f16d86c
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys, select, time
12
13 from .api import TwitterCall, wrap_response, TwitterHTTPError
14
15 class TwitterJSONIter(object):
16
17 def __init__(self, handle, uri, arg_data, block=True, timeout=None):
18 self.decoder = json.JSONDecoder()
19 self.handle = handle
20 self.uri = uri
21 self.arg_data = arg_data
22 self.buf = b""
23 self.block = block
24 self.timeout = timeout
25 self.timer = time.time()
26
27
28 def __iter__(self):
29 if sys.version_info >= (3, 0):
30 sock = self.handle.fp.raw._sock
31 else:
32 sock = self.handle.fp._sock.fp._sock
33 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
34 if not self.block or self.timeout:
35 sock.setblocking(False)
36 while True:
37 try:
38 utf8_buf = self.buf.decode('utf8').lstrip()
39 if utf8_buf and utf8_buf[0] != '{': # Remove the hex delimiter length and extra whitespace.
40 utf8_buf = utf8_buf.lstrip('0123456789abcdefABCDEF')
41 utf8_buf = utf8_buf.lstrip()
42 res, ptr = self.decoder.raw_decode(utf8_buf)
43 self.buf = utf8_buf[ptr:].encode('utf8')
44 yield wrap_response(res, self.handle.headers)
45 self.timer = time.time()
46 continue
47 except ValueError as e:
48 if self.block:
49 pass
50 else:
51 yield None
52 except urllib_error.HTTPError as e: # Probably unnecessary, no dynamic url calls in the try block.
53 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
54 # this is a non-blocking read (ie, it will return if any data is available)
55 try:
56 if self.timeout:
57 ready_to_read = select.select([sock], [], [], self.timeout)
58 if ready_to_read[0]:
59 self.buf += sock.recv(1024)
60 if time.time() - self.timer > self.timeout:
61 yield {"timeout":True}
62 else:
63 yield {"timeout":True}
64 else:
65 self.buf += sock.recv(1024) # As tweets are typically longer than 1KB, consider increasing this size.
66 except SSLError as e:
67 if (not self.block or self.timeout) and (e.errno == 2):
68 # Apparently this means there was nothing in the socket buf
69 pass
70 else:
71 raise
72 except urllib_error.HTTPError as e:
73 raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
74
75 def handle_stream_response(req, uri, arg_data, block, timeout=None):
76 handle = urllib_request.urlopen(req,)
77 return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
78
79 class TwitterStreamCallWithTimeout(TwitterCall):
80 def _handle_response(self, req, uri, arg_data, _timeout=None):
81 return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout)
82
83 class TwitterStreamCall(TwitterCall):
84 def _handle_response(self, req, uri, arg_data, _timeout=None):
85 return handle_stream_response(req, uri, arg_data, block=True)
86
87 class TwitterStreamCallNonBlocking(TwitterCall):
88 def _handle_response(self, req, uri, arg_data, _timeout=None):
89 return handle_stream_response(req, uri, arg_data, block=False)
90
91 class TwitterStream(TwitterStreamCall):
92 """
93 The TwitterStream object is an interface to the Twitter Stream API
94 (stream.twitter.com). This can be used pretty much the same as the
95 Twitter class except the result of calling a method will be an
96 iterator that yields objects decoded from the stream. For
97 example::
98
99 twitter_stream = TwitterStream(auth=OAuth(...))
100 iterator = twitter_stream.statuses.sample()
101
102 for tweet in iterator:
103 ...do something with this tweet...
104
105 The iterator will yield tweets forever and ever (until the stream
106 breaks at which point it raises a TwitterHTTPError.)
107
108 The `block` parameter controls if the stream is blocking. Default
109 is blocking (True). When set to False, the iterator will
110 occasionally yield None when there is no available message.
111 """
112 def __init__(
113 self, domain="stream.twitter.com", secure=True, auth=None,
114 api_version='1.1', block=True, timeout=None):
115 uriparts = ()
116 uriparts += (str(api_version),)
117
118 if block:
119 if timeout:
120 call_cls = TwitterStreamCallWithTimeout
121 else:
122 call_cls = TwitterStreamCall
123 else:
124 call_cls = TwitterStreamCallNonBlocking
125
126 TwitterStreamCall.__init__(
127 self, auth=auth, format="json", domain=domain,
128 callable_cls=call_cls,
129 secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)