X-Git-Url: https://jfr.im/git/z_archive/twitter.git/blobdiff_plain/43778459d292186d3055fef60a21e8ad562b6d13..c0c66ba0ebe25c1ddc446111dc29a1e6fc07b367:/twitter/stream.py diff --git a/twitter/stream.py b/twitter/stream.py index 3ea9c69..5384d5b 100644 --- a/twitter/stream.py +++ b/twitter/stream.py @@ -1,3 +1,6 @@ +# encoding: utf-8 +from __future__ import unicode_literals + import sys PY_3_OR_HIGHER = sys.version_info >= (3, 0) @@ -16,7 +19,8 @@ import sys, select, time from .api import TwitterCall, wrap_response, TwitterHTTPError CRLF = b'\r\n' -MIN_TIMEOUT = 0.0 # Apparenty select with zero wait is okay! +MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay! +MAX_SOCK_TIMEOUT = 10.0 HEARTBEAT_TIMEOUT = 90.0 Timeout = {'timeout': True} @@ -24,8 +28,6 @@ Hangup = {'hangup': True} DecodeError = {'hangup': True, 'decode_error': True} HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True} -range = range if PY_3_OR_HIGHER else xrange - class HttpChunkDecoder(object): @@ -82,7 +84,7 @@ class HttpChunkDecoder(object): class JsonDecoder(object): def __init__(self): - self.buf = u"" + self.buf = "" self.raw_decode = json.JSONDecoder().raw_decode def decode(self, data): @@ -146,18 +148,20 @@ class TwitterJSONIter(object): self.uri = uri self.arg_data = arg_data self.timeout_token = Timeout - self.timeout = HEARTBEAT_TIMEOUT + self.timeout = None self.heartbeat_timeout = HEARTBEAT_TIMEOUT if timeout and timeout > 0: self.timeout = float(timeout) elif not (block or timeout): self.timeout_token = None - self.timeout = MIN_TIMEOUT + self.timeout = MIN_SOCK_TIMEOUT if heartbeat_timeout and heartbeat_timeout > 0: self.heartbeat_timeout = float(heartbeat_timeout) def __iter__(self): - sock_timeout = min(self.timeout, self.heartbeat_timeout) + timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT) + if t is not None] + sock_timeout = min(*timeouts) sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) headers = self.handle.headers @@ -172,8 +176,8 @@ class TwitterJSONIter(object): # Decode all the things: data = sock_reader.read() dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data) - utf8_data = utf8_decoder.decode(dechunked_data) - json_data = json_decoder.decode(utf8_data) + unicode_data = utf8_decoder.decode(dechunked_data) + json_data = json_decoder.decode(unicode_data) # Yield data-like things: for json_obj in json_data: @@ -217,7 +221,37 @@ class TwitterStream(TwitterCall): iterator = twitter_stream.statuses.sample() for tweet in iterator: - ...do something with this tweet... + # ...do something with this tweet... + + Per default the ``TwitterStream`` object uses + [public streams](https://dev.twitter.com/docs/streaming-apis/streams/public). + If you want to use one of the other + [streaming APIs](https://dev.twitter.com/docs/streaming-apis), specify the URL + manually: + + - [Public streams](https://dev.twitter.com/docs/streaming-apis/streams/public): stream.twitter.com + - [User streams](https://dev.twitter.com/docs/streaming-apis/streams/user): userstream.twitter.com + - [Site streams](https://dev.twitter.com/docs/streaming-apis/streams/site): sitestream.twitter.com + + Note that you require the proper + [permissions](https://dev.twitter.com/docs/application-permission-model) to + access these streams. E.g. for direct messages your + [application](https://dev.twitter.com/apps) needs the "Read, Write & Direct + Messages" permission. + + The following example demonstrates how to retrieve all new direct messages + from the user stream:: + + auth = OAuth( + consumer_key='[your consumer key]', + consumer_secret='[your consumer secret]', + token='[your token]', + token_secret='[your token secret]' + ) + twitter_userstream = TwitterStream(auth=auth, domain='userstream.twitter.com') + for msg in twitter_userstream.user(): + if 'direct_message' in msg: + print msg['direct_message']['text'] The iterator will yield until the TCP connection breaks. When the connection breaks, the iterator yields `{'hangup': True}`, and