From: Mike Verdone Date: Mon, 24 Feb 2014 21:48:08 +0000 (+0100) Subject: Further simplification in progress. X-Git-Tag: twitter-1.14.0~2^2~2 X-Git-Url: https://jfr.im/git/z_archive/twitter.git/commitdiff_plain/dcece3a61cb23ca20f55e0f0b2572eb6b75e3941?hp=03d8511cd4f14723ffe3e78ff92cbcbbea8080df Further simplification in progress. --- diff --git a/twitter/stream.py b/twitter/stream.py index 728b49d..671a856 100644 --- a/twitter/stream.py +++ b/twitter/stream.py @@ -19,36 +19,37 @@ Timeout = {'timeout': True} Hangup = {'hangup': True} HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True} -def recv_chunk(sock): # -> bytearray: +class ChunkDecodeError(Exception): + pass +def recv_chunk(sock): # -> bytearray: header = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff). crlf = header.find(b'\r\n') # Find the HTTP chunk size. - if crlf > 0: # If there is a length, then process it + if not crlf: + raise ChunkDecodeError() - size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB. - chunk = bytearray(size) - start = crlf + 2 # Add in the length of the header's CRLF pair. + size = int(header[:crlf], 16) # Decode the chunk size. Rarely exceeds 8KiB. + chunk = bytearray(size) + start = crlf + 2 # Add in the length of the header's CRLF pair. - if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0). - chunk[:size] = header[start:start + size] - # There are several edge cases (size == [4-6]) as the chunk size exceeds the length - # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The - # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases - # and eliminates the need to address them. - else: # There is more to read in the chunk. - end = len(header) - start - chunk[:end] = header[start:] - if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer. - buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk. - sock.recv_into(buffer) - else: # less efficient for python2.6 compatibility - chunk[end:] = sock.recv(max(0, size - end)) - sock.recv(2) # Read the trailing CRLF pair. Throw it away. + if size <= 3: # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0). + chunk[:size] = header[start:start + size] + # There are several edge cases (size == [4-6]) as the chunk size exceeds the length + # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The + # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases + # and eliminates the need to address them. + else: # There is more to read in the chunk. + end = len(header) - start + chunk[:end] = header[start:] + if PY_27_OR_HIGHER: # When possible, use less memory by reading directly into the buffer. + buffer = memoryview(chunk)[end:] # Create a view into the bytearray to hold the rest of the chunk. + sock.recv_into(buffer) + else: # less efficient for python2.6 compatibility + chunk[end:] = sock.recv(max(0, size - end)) + sock.recv(2) # Read the trailing CRLF pair. Throw it away. - return chunk - - return bytearray() + return chunk class Timer(object): @@ -94,9 +95,6 @@ class TwitterJSONIter(object): timer = Timer(self.timeout) heartbeat_timer = Timer(self.heartbeat_timeout) while True: - if buf: - heartbeat_timer.reset() - buf = buf.lstrip() # Remove any keep-alive delimiters try: res, ptr = raw_decode(buf) @@ -117,15 +115,15 @@ class TwitterJSONIter(object): yield Timeout try: - if not buf: - if sock_timeout: - ready_to_read = select.select([sock], [], [], sock_timeout)[0] - if not ready_to_read: - continue - buf += recv_chunk(sock).decode('utf-8') + if not buf and sock_timeout: + ready_to_read = select.select([sock], [], [], sock_timeout)[0] + if not ready_to_read: + continue + buf += recv_chunk(sock).decode('utf-8') if not buf: yield Hangup break + heartbeat_timer.reset() except SSLError as e: # Code 2 is error from a non-blocking read of an empty buffer. if e.errno != 2: