-try:
+# encoding: utf-8
+from __future__ import unicode_literals
+
+import sys
+PY_3_OR_HIGHER = sys.version_info >= (3, 0)
+
+if PY_3_OR_HIGHER:
import urllib.request as urllib_request
import urllib.error as urllib_error
-except ImportError:
+else:
import urllib2 as urllib_request
import urllib2 as urllib_error
-import io
import json
from ssl import SSLError
import socket
+import codecs
import sys, select, time
from .api import TwitterCall, wrap_response, TwitterHTTPError
-PY_3_OR_HIGHER = sys.version_info >= (3, 0)
-
CRLF = b'\r\n'
+MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
+MAX_SOCK_TIMEOUT = 10.0
+HEARTBEAT_TIMEOUT = 90.0
Timeout = {'timeout': True}
Hangup = {'hangup': True}
-HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
+DecodeError = {'hangup': True, 'decode_error': True}
+HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
-class ChunkDecodeError(Exception):
- pass
-class EndOfStream(Exception):
- pass
+class HttpChunkDecoder(object):
-range = range if PY_3_OR_HIGHER else xrange
+ def __init__(self):
+ self.buf = bytearray()
+ self.munch_crlf = False
-class SocketShim(io.IOBase):
- """
- Adapts a raw socket to fit the IO protocol.
- """
- def __init__(self, sock):
- self.sock = sock
- def readable(self):
- return True
- def read(self, size):
- return self.sock.read(size)
- def readinto(self, buf):
- return self.sock.recv_into(buf)
+ def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
+ chunks = []
+ buf = self.buf
+ munch_crlf = self.munch_crlf
+ end_of_stream = False
+ decode_error = False
+ buf.extend(data)
+ while True:
+ if munch_crlf:
+ # Dang, Twitter, you crazy. Twitter only sends a terminating
+ # CRLF at the beginning of the *next* message.
+ if len(buf) >= 2:
+ buf = buf[2:]
+ munch_crlf = False
+ else:
+ break
+
+ header_end_pos = buf.find(CRLF)
+ if header_end_pos == -1:
+ break
-def recv_chunk(reader): # -> bytearray:
- for headerlen in range(12):
- header = reader.peek(headerlen)[:headerlen]
- if header.endswith(CRLF):
- break
- else:
- raise ChunkDecodeError()
+ header = buf[:header_end_pos]
+ data_start_pos = header_end_pos + 2
+ try:
+ chunk_len = int(header.decode('ascii'), 16)
+ except ValueError:
+ decode_error = True
+ break
+
+ if chunk_len == 0:
+ end_of_stream = True
+ break
+
+ data_end_pos = data_start_pos + chunk_len
- size = int(header, 16) # Decode the chunk size
- reader.read(headerlen) # Ditch the header
+ if len(buf) >= data_end_pos:
+ chunks.append(buf[data_start_pos:data_end_pos])
+ buf = buf[data_end_pos:]
+ munch_crlf = True
+ else:
+ break
+ self.buf = buf
+ self.munch_crlf = munch_crlf
+ return bytearray().join(chunks), end_of_stream, decode_error
- if size == 0:
- raise EndOfStream()
- chunk = bytearray()
- while len(chunk) < size:
- remainder = size - len(chunk)
- chunk.extend(reader.read(remainder))
+class JsonDecoder(object):
- reader.read(2) # Ditch remaining CRLF
+ def __init__(self):
+ self.buf = ""
+ self.raw_decode = json.JSONDecoder().raw_decode
- return chunk
+ def decode(self, data):
+ chunks = []
+ buf = self.buf + data
+ while True:
+ try:
+ buf = buf.lstrip()
+ res, ptr = self.raw_decode(buf)
+ buf = buf[ptr:]
+ chunks.append(res)
+ except ValueError:
+ break
+ self.buf = buf
+ return chunks
class Timer(object):
+
def __init__(self, timeout):
# If timeout is None, we never expire.
self.timeout = timeout
return False
+class SockReader(object):
+ def __init__(self, sock, sock_timeout):
+ self.sock = sock
+ self.sock_timeout = sock_timeout
+
+ def read(self):
+ try:
+ ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
+ if ready_to_read:
+ return self.sock.read()
+ except SSLError as e:
+ # Code 2 is error from a non-blocking read of an empty buffer.
+ if e.errno != 2:
+ raise
+ return bytearray()
+
+
class TwitterJSONIter(object):
def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
self.handle = handle
self.uri = uri
self.arg_data = arg_data
- self.block = block
- self.timeout = float(timeout) if timeout else None
- self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
-
+ self.timeout_token = Timeout
+ self.timeout = None
+ self.heartbeat_timeout = HEARTBEAT_TIMEOUT
+ if timeout and timeout > 0:
+ self.timeout = float(timeout)
+ elif not (block or timeout):
+ self.timeout_token = None
+ self.timeout = MIN_SOCK_TIMEOUT
+ if heartbeat_timeout and heartbeat_timeout > 0:
+ self.heartbeat_timeout = float(heartbeat_timeout)
def __iter__(self):
- actually_block = self.block and not self.timeout
- sock_timeout = min(self.timeout or 1000000, self.heartbeat_timeout) if actually_block else None
+ timeouts = [t for t in (self.timeout, self.heartbeat_timeout, MAX_SOCK_TIMEOUT)
+ if t is not None]
+ sock_timeout = min(*timeouts)
sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- sock.setblocking(actually_block)
- reader = io.BufferedReader(SocketShim(sock))
- buf = ''
- raw_decode = json.JSONDecoder().raw_decode
+ headers = self.handle.headers
+ sock_reader = SockReader(sock, sock_timeout)
+ chunk_decoder = HttpChunkDecoder()
+ utf8_decoder = codecs.getincrementaldecoder("utf-8")()
+ json_decoder = JsonDecoder()
timer = Timer(self.timeout)
heartbeat_timer = Timer(self.heartbeat_timeout)
+
while True:
- buf = buf.lstrip() # Remove any keep-alive delimiters
- try:
- res, ptr = raw_decode(buf)
- buf = buf[ptr:]
- except ValueError:
- if not self.block and not self.timeout:
- yield None
- else:
- yield wrap_response(res, self.handle.headers)
- timer.reset()
+ # Decode all the things:
+ data = sock_reader.read()
+ dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
+ unicode_data = utf8_decoder.decode(dechunked_data)
+ json_data = json_decoder.decode(unicode_data)
+
+ # Yield data-like things:
+ for json_obj in json_data:
+ yield wrap_response(json_obj, headers)
+
+ # Reset timers:
+ if dechunked_data:
heartbeat_timer.reset()
- continue
+ if json_data:
+ timer.reset()
+ # Yield timeouts and special things:
+ if end_of_stream:
+ yield Hangup
+ break
+ if decode_error:
+ yield DecodeError
+ break
if heartbeat_timer.expired():
yield HeartbeatTimeout
break
if timer.expired():
- yield Timeout
+ yield self.timeout_token
- try:
- if sock_timeout:
- ready_to_read = select.select([sock], [], [], sock_timeout)[0]
- if not ready_to_read:
- continue
- received = recv_chunk(reader)
- buf += received.decode('utf-8')
- if received:
- heartbeat_timer.reset()
- except (ChunkDecodeError, EndOfStream):
- yield Hangup
- break
- except SSLError as e:
- # Code 2 is error from a non-blocking read of an empty buffer.
- if e.errno != 2:
- raise
def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
try: