From: Andrew W. Donoho Date: Tue, 28 Jan 2014 14:13:06 +0000 (-0600) Subject: Further refine socket management. X-Git-Tag: twitter-1.11.0~3^2^2~7 X-Git-Url: https://jfr.im/git/z_archive/twitter.git/commitdiff_plain/28a8ef602a9a4f91a400f4d23917c4d35aaa8aa3?hp=23dcd4694c8bab8597a95b9e8dc51294de6d393a Further refine socket management. All HTTP chunks are read in their entirety. Cosmetic code improvements. (The socket's blocking state is set in a more compact form after a DeMorgan's boolean transformation.) Hangups by Twitter, as with timeouts, are signaled via a message to allow gracious recovery. --- diff --git a/twitter/stream.py b/twitter/stream.py index 8ef4173..b9035f0 100644 --- a/twitter/stream.py +++ b/twitter/stream.py @@ -14,6 +14,7 @@ from .api import TwitterCall, wrap_response, TwitterHTTPError def recv_chunk(sock): # -> bytearray: + timeout = sock.gettimeout(); sock.setblocking(True) # Read the whole HTTP chunk. buf = sock.recv(10) # Scan for an up to a 4GiB chunk size (0xffffffff). if buf: crlf = buf.find(b'\r\n') # Find the HTTP chunk size. @@ -28,8 +29,11 @@ def recv_chunk(sock): # -> bytearray: chunk[end:] = sock.recv(remaining - end) sock.recv(2) # Read the trailing CRLF pair. Throw it away. + sock.settimeout(timeout) return chunk + + sock.settimeout(timeout) return bytearray() ## recv_chunk() @@ -48,8 +52,7 @@ class TwitterJSONIter(object): def __iter__(self): sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - if not self.block or self.timeout: - sock.setblocking(False) + sock.setblocking(self.block and not self.timeout) # not (not self.block or self.timeout) buf = u'' json_decoder = json.JSONDecoder() timer = time.time() @@ -66,18 +69,20 @@ class TwitterJSONIter(object): pass else: yield None - # this is a non-blocking read (ie, it will return if any data is available) try: - if self.timeout: + if self.timeout: # this is a non-blocking read (ie, it will return if any data is available) + ready_to_read = select.select([sock], [], [], self.timeout) if ready_to_read[0]: buf += recv_chunk(sock).decode('utf-8') if time.time() - timer > self.timeout: - yield {"timeout": True} + yield {'timeout': True} else: - yield {"timeout": True} + yield {'timeout': True} else: buf += recv_chunk(sock).decode('utf-8') + if not buf and self.block: + yield {'hangup': True} except SSLError as e: if (not self.block or self.timeout) and (e.errno == 2): # Apparently this means there was nothing in the socket buf