+import codecs
+import sys, select, time
+
+from .api import TwitterCall, wrap_response, TwitterHTTPError
+
+CRLF = b'\r\n'
+MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
+MAX_SOCK_TIMEOUT = 10.0
+HEARTBEAT_TIMEOUT = 90.0
+
+Timeout = {'timeout': True}
+Hangup = {'hangup': True}
+DecodeError = {'hangup': True, 'decode_error': True}
+HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
+
+
+class HttpChunkDecoder(object):
+
+ def __init__(self):
+ self.buf = bytearray()
+ self.munch_crlf = False
+
+ def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
+ chunks = []
+ buf = self.buf
+ munch_crlf = self.munch_crlf
+ end_of_stream = False
+ decode_error = False
+ buf.extend(data)
+ while True:
+ if munch_crlf:
+ # Dang, Twitter, you crazy. Twitter only sends a terminating
+ # CRLF at the beginning of the *next* message.
+ if len(buf) >= 2:
+ buf = buf[2:]
+ munch_crlf = False
+ else:
+ break
+
+ header_end_pos = buf.find(CRLF)
+ if header_end_pos == -1:
+ break
+
+ header = buf[:header_end_pos]
+ data_start_pos = header_end_pos + 2
+ try:
+ chunk_len = int(header.decode('ascii'), 16)
+ except ValueError:
+ decode_error = True
+ break
+
+ if chunk_len == 0:
+ end_of_stream = True
+ break
+
+ data_end_pos = data_start_pos + chunk_len
+
+ if len(buf) >= data_end_pos:
+ chunks.append(buf[data_start_pos:data_end_pos])
+ buf = buf[data_end_pos:]
+ munch_crlf = True
+ else:
+ break
+ self.buf = buf
+ self.munch_crlf = munch_crlf
+ return bytearray().join(chunks), end_of_stream, decode_error
+
+
+class JsonDecoder(object):
+
+ def __init__(self):
+ self.buf = ""
+ self.raw_decode = json.JSONDecoder().raw_decode
+
+ def decode(self, data):
+ chunks = []
+ buf = self.buf + data
+ while True:
+ try:
+ buf = buf.lstrip()
+ res, ptr = self.raw_decode(buf)
+ buf = buf[ptr:]
+ chunks.append(res)
+ except ValueError:
+ break
+ self.buf = buf
+ return chunks
+
+
+class Timer(object):
+
+ def __init__(self, timeout):
+ # If timeout is None, we never expire.
+ self.timeout = timeout
+ self.reset()
+
+ def reset(self):
+ self.time = time.time()
+
+ def expired(self):
+ """
+ If expired, reset the timer and return True.
+ """
+ if self.timeout is None:
+ return False
+ elif time.time() - self.time > self.timeout:
+ self.reset()
+ return True
+ return False
+
+
+class SockReader(object):
+ def __init__(self, sock, sock_timeout):
+ self.sock = sock
+ self.sock_timeout = sock_timeout
+
+ def read(self):
+ try:
+ ready_to_read = select.select([self.sock], [], [], self.sock_timeout)[0]
+ if ready_to_read:
+ return self.sock.read()
+ except SSLError as e:
+ # Code 2 is error from a non-blocking read of an empty buffer.
+ if e.errno != 2:
+ raise
+ return bytearray()