]> jfr.im git - z_archive/twitter.git/blame_incremental - twitter/stream.py
__builtins__["raw_input"] throws a KeyError on Python 3.2.3, so adding it to the...
[z_archive/twitter.git] / twitter / stream.py
... / ...
CommitLineData
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
9from ssl import SSLError
10
11from .api import TwitterCall, wrap_response
12
13class TwitterJSONIter(object):
14
15 def __init__(self, handle, uri, arg_data, block=True):
16 self.decoder = json.JSONDecoder()
17 self.handle = handle
18 self.buf = b""
19 self.block = block
20
21 def __iter__(self):
22 sock = self.handle.fp._sock.fp._sock
23 if not self.block:
24 sock.setblocking(False)
25 while True:
26 try:
27 utf8_buf = self.buf.decode('utf8').lstrip()
28 res, ptr = self.decoder.raw_decode(utf8_buf)
29 self.buf = utf8_buf[ptr:].encode('utf8')
30 yield wrap_response(res, self.handle.headers)
31 continue
32 except ValueError as e:
33 if self.block:
34 pass
35 else:
36 yield None
37 except urllib_error.HTTPError as e:
38 raise TwitterHTTPError(e, uri, self.format, arg_data)
39 # this is a non-blocking read (ie, it will return if any data is available)
40 try:
41 self.buf += sock.recv(1024)
42 except SSLError as e:
43 if (not self.block) and (e.errno == 2):
44 # Apparently this means there was nothing in the socket buf
45 pass
46 else:
47 raise
48
49def handle_stream_response(req, uri, arg_data, block):
50 handle = urllib_request.urlopen(req,)
51 return iter(TwitterJSONIter(handle, uri, arg_data, block))
52
53class TwitterStreamCall(TwitterCall):
54 def _handle_response(self, req, uri, arg_data):
55 return handle_stream_response(req, uri, arg_data, block=True)
56
57class TwitterStreamCallNonBlocking(TwitterCall):
58 def _handle_response(self, req, uri, arg_data):
59 return handle_stream_response(req, uri, arg_data, block=False)
60
61class TwitterStream(TwitterStreamCall):
62 """
63 The TwitterStream object is an interface to the Twitter Stream API
64 (stream.twitter.com). This can be used pretty much the same as the
65 Twitter class except the result of calling a method will be an
66 iterator that yields objects decoded from the stream. For
67 example::
68
69 twitter_stream = TwitterStream(auth=UserPassAuth('joe', 'joespassword'))
70 iterator = twitter_stream.statuses.sample()
71
72 for tweet in iterator:
73 ...do something with this tweet...
74
75 The iterator will yield tweets forever and ever (until the stream
76 breaks at which point it raises a TwitterHTTPError.)
77
78 The `block` parameter controls if the stream is blocking. Default
79 is blocking (True). When set to False, the iterator will
80 occasionally yield None when there is no available message.
81 """
82 def __init__(
83 self, domain="stream.twitter.com", secure=True, auth=None,
84 api_version='1', block=True):
85 uriparts = ()
86 uriparts += (str(api_version),)
87
88 if block:
89 call_cls = TwitterStreamCall
90 else:
91 call_cls = TwitterStreamCallNonBlocking
92
93 TwitterStreamCall.__init__(
94 self, auth=auth, format="json", domain=domain,
95 callable_cls=call_cls,
96 secure=secure, uriparts=uriparts)