]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Implement true non-blocking TwitterStream.
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
dd648a25
MV
10
11from .api import TwitterCall, wrap_response
12
13class TwitterJSONIter(object):
14
2300838f 15 def __init__(self, handle, uri, arg_data, block=True):
dd648a25
MV
16 self.decoder = json.JSONDecoder()
17 self.handle = handle
b8fd1206 18 self.buf = b""
2300838f 19 self.block = block
dd648a25
MV
20
21 def __iter__(self):
2300838f
MV
22 sock = self.handle.fp._sock.fp._sock
23 if not self.block:
24 sock.setblocking(False)
dd648a25 25 while True:
dd648a25 26 try:
b8fd1206
MV
27 utf8_buf = self.buf.decode('utf8').lstrip()
28 res, ptr = self.decoder.raw_decode(utf8_buf)
29 self.buf = utf8_buf[ptr:].encode('utf8')
dd648a25 30 yield wrap_response(res, self.handle.headers)
dd648a25 31 continue
e2d171b6 32 except ValueError as e:
2300838f
MV
33 if self.block:
34 pass
35 else:
36 yield None
dd648a25
MV
37 except urllib_error.HTTPError as e:
38 raise TwitterHTTPError(e, uri, self.format, arg_data)
e2d171b6 39 # this is a non-blocking read (ie, it will return if any data is available)
2300838f
MV
40 try:
41 self.buf += sock.recv(1024)
42 except SSLError as e:
43 if (not self.block) and (e.errno == 2):
44 # Apparently this means there was nothing in the socket buf
45 pass
46 else:
47 raise
48
49def handle_stream_response(req, uri, arg_data, block):
50 handle = urllib_request.urlopen(req,)
51 return iter(TwitterJSONIter(handle, uri, arg_data, block))
dd648a25
MV
52
53class TwitterStreamCall(TwitterCall):
54 def _handle_response(self, req, uri, arg_data):
2300838f
MV
55 return handle_stream_response(req, uri, arg_data, block=True)
56
57class TwitterStreamCallNonBlocking(TwitterCall):
58 def _handle_response(self, req, uri, arg_data):
59 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
60
61class TwitterStream(TwitterStreamCall):
24950891
MV
62 """
63 Interface to the Twitter Stream API (stream.twitter.com). This can
64 be used pretty much the same as the Twitter class except the
65 result of calling a method will be an iterator that yields objects
66 decoded from the stream. For example::
67
68 twitter_stream = TwitterStream(auth=UserPassAuth('joe', 'joespassword'))
69 iterator = twitter_stream.statuses.sample()
70
71 for tweet in iterator:
72 ...do something with this tweet...
73
74 The iterator will yield tweets forever and ever (until the stream
75 breaks at which point it raises a TwitterHTTPError.)
2300838f
MV
76
77 The `bloc` paramater controls if the stream is blocking.
24950891 78 """
dd648a25 79 def __init__(
56d221bd 80 self, domain="stream.twitter.com", secure=True, auth=None,
2300838f 81 api_version='1', block=True):
dd648a25
MV
82 uriparts = ()
83 uriparts += (str(api_version),)
84
2300838f
MV
85 if block:
86 call_cls = TwitterStreamCall
87 else:
88 call_cls = TwitterStreamCallNonBlocking
89
dd648a25
MV
90 TwitterStreamCall.__init__(
91 self, auth=auth, format="json", domain=domain,
2300838f 92 callable_cls=call_cls,
dd648a25 93 secure=secure, uriparts=uriparts)