]> jfr.im git - z_archive/twitter.git/commitdiff
Let's use the stdlib io system for great sanity.
authorMike Verdone <redacted>
Mon, 24 Feb 2014 23:06:38 +0000 (00:06 +0100)
committerMike Verdone <redacted>
Mon, 24 Feb 2014 23:11:00 +0000 (00:11 +0100)
twitter/stream.py

index 671a856f56be5f6a100e07b949bb4c153273a11f..19ee3851f430227ea7132eaadd37ed9b948eb78b 100644 (file)
@@ -1,10 +1,10 @@
 try:
     import urllib.request as urllib_request
     import urllib.error as urllib_error
-    import io
 except ImportError:
     import urllib2 as urllib_request
     import urllib2 as urllib_error
+import io
 import json
 from ssl import SSLError
 import socket
@@ -12,7 +12,8 @@ import sys, select, time
 
 from .api import TwitterCall, wrap_response, TwitterHTTPError
 
-PY_27_OR_HIGHER = sys.version_info >= (2, 7)
+CRLF = b'\r\n'
+
 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
 
 Timeout = {'timeout': True}
@@ -22,32 +23,42 @@ HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
 class ChunkDecodeError(Exception):
     pass
 
-def recv_chunk(sock): # -> bytearray:
-    header = sock.recv(8)  # Scan for an up to 16MiB chunk size (0xffffff).
-    crlf = header.find(b'\r\n')  # Find the HTTP chunk size.
+class EndOfStream(Exception):
+    pass
 
-    if not crlf:
+class SocketShim(io.IOBase):
+    """
+    Adapts a raw socket to fit the IO protocol.
+    """
+    def __init__(self, sock):
+        self.sock = sock
+    def readable(self):
+        return True
+    def read(self, size):
+        return self.sock.read(size)
+    def readinto(self, buf):
+        return self.sock.recv_into(buf)
+
+def recv_chunk(reader): # -> bytearray:
+    for headerlen in xrange(12):
+        header = reader.peek(headerlen)[:headerlen]
+        if header.endswith(CRLF):
+            break
+    else:
         raise ChunkDecodeError()
 
-    size = int(header[:crlf], 16)  # Decode the chunk size. Rarely exceeds 8KiB.
-    chunk = bytearray(size)
-    start = crlf + 2  # Add in the length of the header's CRLF pair.
-
-    if size <= 3:  # E.g. an HTTP chunk with just a keep-alive delimiter or end of stream (0).
-        chunk[:size] = header[start:start + size]
-    # There are several edge cases (size == [4-6]) as the chunk size exceeds the length
-    # of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
-    # shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
-    # and eliminates the need to address them.
-    else:  # There is more to read in the chunk.
-        end = len(header) - start
-        chunk[:end] = header[start:]
-        if PY_27_OR_HIGHER:  # When possible, use less memory by reading directly into the buffer.
-            buffer = memoryview(chunk)[end:]  # Create a view into the bytearray to hold the rest of the chunk.
-            sock.recv_into(buffer)
-        else:  # less efficient for python2.6 compatibility
-            chunk[end:] = sock.recv(max(0, size - end))
-        sock.recv(2)  # Read the trailing CRLF pair. Throw it away.
+    size = int(header, 16) # Decode the chunk size
+    reader.read(headerlen) # Ditch the header
+
+    if size == 0:
+        raise EndOfStream()
+
+    chunk = bytearray()
+    while len(chunk) < size:
+        remainder = size - len(chunk)
+        chunk.extend(reader.read(remainder))
+
+    reader.read(2) # Ditch remaining CRLF
 
     return chunk
 
@@ -90,6 +101,7 @@ class TwitterJSONIter(object):
         sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
         sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
         sock.setblocking(actually_block)
+        reader = io.BufferedReader(SocketShim(sock))
         buf = ''
         raw_decode = json.JSONDecoder().raw_decode
         timer = Timer(self.timeout)
@@ -115,15 +127,17 @@ class TwitterJSONIter(object):
                 yield Timeout
 
             try:
-                if not buf and sock_timeout:
+                if sock_timeout:
                     ready_to_read = select.select([sock], [], [], sock_timeout)[0]
                     if not ready_to_read:
                         continue
-                buf += recv_chunk(sock).decode('utf-8')
-                if not buf:
-                    yield Hangup
-                    break
-                heartbeat_timer.reset()
+                received = recv_chunk(reader)
+                buf += received.decode('utf-8')
+                if received:
+                    heartbeat_timer.reset()
+            except (ChunkDecodeError, EndOfStream):
+                yield Hangup
+                break
             except SSLError as e:
                 # Code 2 is error from a non-blocking read of an empty buffer.
                 if e.errno != 2: