]> jfr.im git - z_archive/twitter.git/blame - twitter/stream.py
Default to 1.1 for all domains, not just api.twitter.com.
[z_archive/twitter.git] / twitter / stream.py
CommitLineData
dd648a25
MV
1try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
b8fd1206 4 import io
dd648a25
MV
5except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8import json
2300838f 9from ssl import SSLError
67ddbde4 10import socket
dd648a25
MV
11
12from .api import TwitterCall, wrap_response
13
14class TwitterJSONIter(object):
15
2300838f 16 def __init__(self, handle, uri, arg_data, block=True):
dd648a25
MV
17 self.decoder = json.JSONDecoder()
18 self.handle = handle
b8fd1206 19 self.buf = b""
2300838f 20 self.block = block
dd648a25
MV
21
22 def __iter__(self):
2300838f 23 sock = self.handle.fp._sock.fp._sock
67ddbde4 24 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
2300838f
MV
25 if not self.block:
26 sock.setblocking(False)
dd648a25 27 while True:
dd648a25 28 try:
b8fd1206
MV
29 utf8_buf = self.buf.decode('utf8').lstrip()
30 res, ptr = self.decoder.raw_decode(utf8_buf)
31 self.buf = utf8_buf[ptr:].encode('utf8')
dd648a25 32 yield wrap_response(res, self.handle.headers)
dd648a25 33 continue
e2d171b6 34 except ValueError as e:
2300838f
MV
35 if self.block:
36 pass
37 else:
38 yield None
dd648a25
MV
39 except urllib_error.HTTPError as e:
40 raise TwitterHTTPError(e, uri, self.format, arg_data)
e2d171b6 41 # this is a non-blocking read (ie, it will return if any data is available)
2300838f
MV
42 try:
43 self.buf += sock.recv(1024)
44 except SSLError as e:
45 if (not self.block) and (e.errno == 2):
46 # Apparently this means there was nothing in the socket buf
47 pass
48 else:
49 raise
50
51def handle_stream_response(req, uri, arg_data, block):
52 handle = urllib_request.urlopen(req,)
53 return iter(TwitterJSONIter(handle, uri, arg_data, block))
dd648a25
MV
54
55class TwitterStreamCall(TwitterCall):
9e19b5a9 56 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f
MV
57 return handle_stream_response(req, uri, arg_data, block=True)
58
59class TwitterStreamCallNonBlocking(TwitterCall):
9e19b5a9 60 def _handle_response(self, req, uri, arg_data, _timeout=None):
2300838f 61 return handle_stream_response(req, uri, arg_data, block=False)
dd648a25
MV
62
63class TwitterStream(TwitterStreamCall):
24950891 64 """
a6a7f763
MV
65 The TwitterStream object is an interface to the Twitter Stream API
66 (stream.twitter.com). This can be used pretty much the same as the
67 Twitter class except the result of calling a method will be an
68 iterator that yields objects decoded from the stream. For
69 example::
24950891
MV
70
71 twitter_stream = TwitterStream(auth=UserPassAuth('joe', 'joespassword'))
72 iterator = twitter_stream.statuses.sample()
73
74 for tweet in iterator:
75 ...do something with this tweet...
76
77 The iterator will yield tweets forever and ever (until the stream
78 breaks at which point it raises a TwitterHTTPError.)
2300838f 79
8df7be82 80 The `block` parameter controls if the stream is blocking. Default
74150740
MV
81 is blocking (True). When set to False, the iterator will
82 occasionally yield None when there is no available message.
24950891 83 """
dd648a25 84 def __init__(
56d221bd 85 self, domain="stream.twitter.com", secure=True, auth=None,
2300838f 86 api_version='1', block=True):
dd648a25
MV
87 uriparts = ()
88 uriparts += (str(api_version),)
89
2300838f
MV
90 if block:
91 call_cls = TwitterStreamCall
92 else:
93 call_cls = TwitterStreamCallNonBlocking
94
dd648a25
MV
95 TwitterStreamCall.__init__(
96 self, auth=auth, format="json", domain=domain,
2300838f 97 callable_cls=call_cls,
dd648a25 98 secure=secure, uriparts=uriparts)