]>
Commit | Line | Data |
---|---|---|
dd648a25 MV |
1 | try: |
2 | import urllib.request as urllib_request | |
3 | import urllib.error as urllib_error | |
b8fd1206 | 4 | import io |
dd648a25 MV |
5 | except ImportError: |
6 | import urllib2 as urllib_request | |
7 | import urllib2 as urllib_error | |
8 | import json | |
2300838f | 9 | from ssl import SSLError |
dd648a25 MV |
10 | |
11 | from .api import TwitterCall, wrap_response | |
12 | ||
13 | class TwitterJSONIter(object): | |
14 | ||
2300838f | 15 | def __init__(self, handle, uri, arg_data, block=True): |
dd648a25 MV |
16 | self.decoder = json.JSONDecoder() |
17 | self.handle = handle | |
b8fd1206 | 18 | self.buf = b"" |
2300838f | 19 | self.block = block |
dd648a25 MV |
20 | |
21 | def __iter__(self): | |
2300838f MV |
22 | sock = self.handle.fp._sock.fp._sock |
23 | if not self.block: | |
24 | sock.setblocking(False) | |
dd648a25 | 25 | while True: |
dd648a25 | 26 | try: |
b8fd1206 MV |
27 | utf8_buf = self.buf.decode('utf8').lstrip() |
28 | res, ptr = self.decoder.raw_decode(utf8_buf) | |
29 | self.buf = utf8_buf[ptr:].encode('utf8') | |
dd648a25 | 30 | yield wrap_response(res, self.handle.headers) |
dd648a25 | 31 | continue |
e2d171b6 | 32 | except ValueError as e: |
2300838f MV |
33 | if self.block: |
34 | pass | |
35 | else: | |
36 | yield None | |
dd648a25 MV |
37 | except urllib_error.HTTPError as e: |
38 | raise TwitterHTTPError(e, uri, self.format, arg_data) | |
e2d171b6 | 39 | # this is a non-blocking read (ie, it will return if any data is available) |
2300838f MV |
40 | try: |
41 | self.buf += sock.recv(1024) | |
42 | except SSLError as e: | |
43 | if (not self.block) and (e.errno == 2): | |
44 | # Apparently this means there was nothing in the socket buf | |
45 | pass | |
46 | else: | |
47 | raise | |
48 | ||
49 | def handle_stream_response(req, uri, arg_data, block): | |
50 | handle = urllib_request.urlopen(req,) | |
51 | return iter(TwitterJSONIter(handle, uri, arg_data, block)) | |
dd648a25 MV |
52 | |
53 | class TwitterStreamCall(TwitterCall): | |
54 | def _handle_response(self, req, uri, arg_data): | |
2300838f MV |
55 | return handle_stream_response(req, uri, arg_data, block=True) |
56 | ||
57 | class TwitterStreamCallNonBlocking(TwitterCall): | |
58 | def _handle_response(self, req, uri, arg_data): | |
59 | return handle_stream_response(req, uri, arg_data, block=False) | |
dd648a25 MV |
60 | |
61 | class TwitterStream(TwitterStreamCall): | |
24950891 MV |
62 | """ |
63 | Interface to the Twitter Stream API (stream.twitter.com). This can | |
64 | be used pretty much the same as the Twitter class except the | |
65 | result of calling a method will be an iterator that yields objects | |
66 | decoded from the stream. For example:: | |
67 | ||
68 | twitter_stream = TwitterStream(auth=UserPassAuth('joe', 'joespassword')) | |
69 | iterator = twitter_stream.statuses.sample() | |
70 | ||
71 | for tweet in iterator: | |
72 | ...do something with this tweet... | |
73 | ||
74 | The iterator will yield tweets forever and ever (until the stream | |
75 | breaks at which point it raises a TwitterHTTPError.) | |
2300838f MV |
76 | |
77 | The `bloc` paramater controls if the stream is blocking. | |
24950891 | 78 | """ |
dd648a25 | 79 | def __init__( |
56d221bd | 80 | self, domain="stream.twitter.com", secure=True, auth=None, |
2300838f | 81 | api_version='1', block=True): |
dd648a25 MV |
82 | uriparts = () |
83 | uriparts += (str(api_version),) | |
84 | ||
2300838f MV |
85 | if block: |
86 | call_cls = TwitterStreamCall | |
87 | else: | |
88 | call_cls = TwitterStreamCallNonBlocking | |
89 | ||
dd648a25 MV |
90 | TwitterStreamCall.__init__( |
91 | self, auth=auth, format="json", domain=domain, | |
2300838f | 92 | callable_cls=call_cls, |
dd648a25 | 93 | secure=secure, uriparts=uriparts) |