X-Git-Url: https://jfr.im/git/z_archive/twitter.git/blobdiff_plain/aa19e2bebdbf61fdc6b250eb9842bac13b7312be..537752d1e5b17f4867ca6a17e1a715b303fa62a4:/twitter/stream.py diff --git a/twitter/stream.py b/twitter/stream.py index 3afbb19..bc18f8e 100644 --- a/twitter/stream.py +++ b/twitter/stream.py @@ -10,21 +10,21 @@ else: import json from ssl import SSLError import socket -import io import codecs import sys, select, time from .api import TwitterCall, wrap_response, TwitterHTTPError CRLF = b'\r\n' +MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay! +MAX_SOCK_TIMEOUT = 10.0 +HEARTBEAT_TIMEOUT = 90.0 Timeout = {'timeout': True} Hangup = {'hangup': True} DecodeError = {'hangup': True, 'decode_error': True} HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True} -range = range if PY_3_OR_HIGHER else xrange - class HttpChunkDecoder(object): @@ -144,16 +144,23 @@ class TwitterJSONIter(object): self.handle = handle self.uri = uri self.arg_data = arg_data - self.block = block - self.timeout = float(timeout) if timeout else None - self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None + self.timeout_token = Timeout + self.timeout = None + self.heartbeat_timeout = HEARTBEAT_TIMEOUT + if timeout and timeout > 0: + self.timeout = float(timeout) + elif not (block or timeout): + self.timeout_token = None + self.timeout = MIN_SOCK_TIMEOUT + if heartbeat_timeout and heartbeat_timeout > 0: + self.heartbeat_timeout = float(heartbeat_timeout) def __iter__(self): - actually_block = self.block and not self.timeout - sock_timeout = min(self.timeout or 1000000, self.heartbeat_timeout) + timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT) + if t is not None] + sock_timeout = min(*timeouts) sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - sock.setblocking(actually_block) headers = self.handle.headers sock_reader = SockReader(sock, sock_timeout) chunk_decoder = HttpChunkDecoder() @@ -166,8 +173,8 @@ class TwitterJSONIter(object): # Decode all the things: data = sock_reader.read() dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data) - utf8_data = utf8_decoder.decode(dechunked_data) - json_data = json_decoder.decode(utf8_data) + unicode_data = utf8_decoder.decode(dechunked_data) + json_data = json_decoder.decode(unicode_data) # Yield data-like things: for json_obj in json_data: @@ -190,9 +197,7 @@ class TwitterJSONIter(object): yield HeartbeatTimeout break if timer.expired(): - yield Timeout - if not self.block and not self.timeout: - yield None + yield self.timeout_token def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):