]> jfr.im git - z_archive/twitter.git/blob - twitter/stream.py
Adapt code to archive DMs
[z_archive/twitter.git] / twitter / stream.py
1 try:
2 import urllib.request as urllib_request
3 import urllib.error as urllib_error
4 import io
5 except ImportError:
6 import urllib2 as urllib_request
7 import urllib2 as urllib_error
8 import json
9 from ssl import SSLError
10 import socket
11 import sys
12
13 from .api import TwitterCall, wrap_response
14
15 class TwitterJSONIter(object):
16
17 def __init__(self, handle, uri, arg_data, block=True):
18 self.decoder = json.JSONDecoder()
19 self.handle = handle
20 self.buf = b""
21 self.block = block
22
23 def __iter__(self):
24 if sys.version_info >= (3, 0):
25 sock = self.handle.fp.raw._sock
26 else:
27 sock = self.handle.fp._sock.fp._sock
28 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
29 if not self.block:
30 sock.setblocking(False)
31 while True:
32 try:
33 utf8_buf = self.buf.decode('utf8').lstrip()
34 res, ptr = self.decoder.raw_decode(utf8_buf)
35 self.buf = utf8_buf[ptr:].encode('utf8')
36 yield wrap_response(res, self.handle.headers)
37 continue
38 except ValueError as e:
39 if self.block:
40 pass
41 else:
42 yield None
43 except urllib_error.HTTPError as e:
44 raise TwitterHTTPError(e, uri, self.format, arg_data)
45 # this is a non-blocking read (ie, it will return if any data is available)
46 try:
47 self.buf += sock.recv(1024)
48 except SSLError as e:
49 if (not self.block) and (e.errno == 2):
50 # Apparently this means there was nothing in the socket buf
51 pass
52 else:
53 raise
54
55 def handle_stream_response(req, uri, arg_data, block):
56 handle = urllib_request.urlopen(req,)
57 return iter(TwitterJSONIter(handle, uri, arg_data, block))
58
59 class TwitterStreamCall(TwitterCall):
60 def _handle_response(self, req, uri, arg_data, _timeout=None):
61 return handle_stream_response(req, uri, arg_data, block=True)
62
63 class TwitterStreamCallNonBlocking(TwitterCall):
64 def _handle_response(self, req, uri, arg_data, _timeout=None):
65 return handle_stream_response(req, uri, arg_data, block=False)
66
67 class TwitterStream(TwitterStreamCall):
68 """
69 The TwitterStream object is an interface to the Twitter Stream API
70 (stream.twitter.com). This can be used pretty much the same as the
71 Twitter class except the result of calling a method will be an
72 iterator that yields objects decoded from the stream. For
73 example::
74
75 twitter_stream = TwitterStream(auth=OAuth(...))
76 iterator = twitter_stream.statuses.sample()
77
78 for tweet in iterator:
79 ...do something with this tweet...
80
81 The iterator will yield tweets forever and ever (until the stream
82 breaks at which point it raises a TwitterHTTPError.)
83
84 The `block` parameter controls if the stream is blocking. Default
85 is blocking (True). When set to False, the iterator will
86 occasionally yield None when there is no available message.
87 """
88 def __init__(
89 self, domain="stream.twitter.com", secure=True, auth=None,
90 api_version='1.1', block=True):
91 uriparts = ()
92 uriparts += (str(api_version),)
93
94 if block:
95 call_cls = TwitterStreamCall
96 else:
97 call_cls = TwitterStreamCallNonBlocking
98
99 TwitterStreamCall.__init__(
100 self, auth=auth, format="json", domain=domain,
101 callable_cls=call_cls,
102 secure=secure, uriparts=uriparts)