]>
Commit | Line | Data |
---|---|---|
1 | try: | |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
4 | import io | |
5 | except ImportError: | |
6 | import urllib2 as urllib_request | |
7 | import urllib2 as urllib_error | |
8 | import json | |
9 | from ssl import SSLError | |
10 | import socket | |
11 | import sys, select, time | |
12 | ||
13 | from .api import TwitterCall, wrap_response, TwitterHTTPError | |
14 | ||
15 | class TwitterJSONIter(object): | |
16 | ||
17 | def __init__(self, handle, uri, arg_data, block=True, timeout=None): | |
18 | self.decoder = json.JSONDecoder() | |
19 | self.handle = handle | |
20 | self.uri = uri | |
21 | self.arg_data = arg_data | |
22 | self.buf = b"" | |
23 | self.block = block | |
24 | self.timeout = timeout | |
25 | self.timer = time.time() | |
26 | ||
27 | ||
28 | def __iter__(self): | |
29 | if sys.version_info >= (3, 0): | |
30 | sock = self.handle.fp.raw._sock | |
31 | else: | |
32 | sock = self.handle.fp._sock.fp._sock | |
33 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | |
34 | if not self.block or self.timeout: | |
35 | sock.setblocking(False) | |
36 | while True: | |
37 | try: | |
38 | utf8_buf = self.buf.decode('utf8').lstrip() | |
39 | if utf8_buf and utf8_buf[0] != '{': # Remove the hex delimiter length and extra whitespace. | |
40 | utf8_buf = utf8_buf.lstrip('0123456789abcdefABCDEF') | |
41 | utf8_buf = utf8_buf.lstrip() | |
42 | res, ptr = self.decoder.raw_decode(utf8_buf) | |
43 | self.buf = utf8_buf[ptr:].encode('utf8') | |
44 | yield wrap_response(res, self.handle.headers) | |
45 | self.timer = time.time() | |
46 | continue | |
47 | except ValueError as e: | |
48 | if self.block: | |
49 | pass | |
50 | else: | |
51 | yield None | |
52 | except urllib_error.HTTPError as e: # Probably unnecessary, no dynamic url calls in the try block. | |
53 | raise TwitterHTTPError(e, self.uri, 'json', self.arg_data) | |
54 | # this is a non-blocking read (ie, it will return if any data is available) | |
55 | try: | |
56 | if self.timeout: | |
57 | ready_to_read = select.select([sock], [], [], self.timeout) | |
58 | if ready_to_read[0]: | |
59 | self.buf += sock.recv(1024) | |
60 | if time.time() - self.timer > self.timeout: | |
61 | yield {"timeout":True} | |
62 | else: | |
63 | yield {"timeout":True} | |
64 | else: | |
65 | self.buf += sock.recv(2048) | |
66 | except SSLError as e: | |
67 | if (not self.block or self.timeout) and (e.errno == 2): | |
68 | # Apparently this means there was nothing in the socket buf | |
69 | pass | |
70 | else: | |
71 | raise | |
72 | except urllib_error.HTTPError as e: | |
73 | raise TwitterHTTPError(e, self.uri, 'json', self.arg_data) | |
74 | ||
75 | def handle_stream_response(req, uri, arg_data, block, timeout=None): | |
76 | handle = urllib_request.urlopen(req,) | |
77 | return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout)) | |
78 | ||
79 | class TwitterStreamCallWithTimeout(TwitterCall): | |
80 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
81 | return handle_stream_response(req, uri, arg_data, block=True, timeout=self.timeout) | |
82 | ||
83 | class TwitterStreamCall(TwitterCall): | |
84 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
85 | return handle_stream_response(req, uri, arg_data, block=True) | |
86 | ||
87 | class TwitterStreamCallNonBlocking(TwitterCall): | |
88 | def _handle_response(self, req, uri, arg_data, _timeout=None): | |
89 | return handle_stream_response(req, uri, arg_data, block=False) | |
90 | ||
91 | class TwitterStream(TwitterStreamCall): | |
92 | """ | |
93 | The TwitterStream object is an interface to the Twitter Stream API | |
94 | (stream.twitter.com). This can be used pretty much the same as the | |
95 | Twitter class except the result of calling a method will be an | |
96 | iterator that yields objects decoded from the stream. For | |
97 | example:: | |
98 | ||
99 | twitter_stream = TwitterStream(auth=OAuth(...)) | |
100 | iterator = twitter_stream.statuses.sample() | |
101 | ||
102 | for tweet in iterator: | |
103 | ...do something with this tweet... | |
104 | ||
105 | The iterator will yield tweets forever and ever (until the stream | |
106 | breaks at which point it raises a TwitterHTTPError.) | |
107 | ||
108 | The `block` parameter controls if the stream is blocking. Default | |
109 | is blocking (True). When set to False, the iterator will | |
110 | occasionally yield None when there is no available message. | |
111 | """ | |
112 | def __init__( | |
113 | self, domain="stream.twitter.com", secure=True, auth=None, | |
114 | api_version='1.1', block=True, timeout=None): | |
115 | uriparts = () | |
116 | uriparts += (str(api_version),) | |
117 | ||
118 | if block: | |
119 | if timeout: | |
120 | call_cls = TwitterStreamCallWithTimeout | |
121 | else: | |
122 | call_cls = TwitterStreamCall | |
123 | else: | |
124 | call_cls = TwitterStreamCallNonBlocking | |
125 | ||
126 | TwitterStreamCall.__init__( | |
127 | self, auth=auth, format="json", domain=domain, | |
128 | callable_cls=call_cls, | |
129 | secure=secure, uriparts=uriparts, timeout=timeout, gzip=False) |