+CRLF = b'\r\n'
+MIN_SOCK_TIMEOUT = 0.0 # Apparenty select with zero wait is okay!
+MAX_SOCK_TIMEOUT = 10.0
+HEARTBEAT_TIMEOUT = 90.0
+
+Timeout = {'timeout': True}
+Hangup = {'hangup': True}
+DecodeError = {'hangup': True, 'decode_error': True}
+HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}
+
+
+class HttpChunkDecoder(object):
+
+ def __init__(self):
+ self.buf = bytearray()
+ self.munch_crlf = False
+
+ def decode(self, data): # -> (bytearray, end_of_stream, decode_error)
+ chunks = []
+ buf = self.buf
+ munch_crlf = self.munch_crlf
+ end_of_stream = False
+ decode_error = False
+ buf.extend(data)
+ while True:
+ if munch_crlf:
+ # Dang, Twitter, you crazy. Twitter only sends a terminating
+ # CRLF at the beginning of the *next* message.
+ if len(buf) >= 2:
+ buf = buf[2:]
+ munch_crlf = False
+ else:
+ break
+
+ header_end_pos = buf.find(CRLF)
+ if header_end_pos == -1:
+ break
+
+ header = buf[:header_end_pos]
+ data_start_pos = header_end_pos + 2
+ try:
+ chunk_len = int(header.decode('ascii'), 16)
+ except ValueError:
+ decode_error = True
+ break
+
+ if chunk_len == 0:
+ end_of_stream = True
+ break
+
+ data_end_pos = data_start_pos + chunk_len
+
+ if len(buf) >= data_end_pos:
+ chunks.append(buf[data_start_pos:data_end_pos])
+ buf = buf[data_end_pos:]
+ munch_crlf = True
+ else:
+ break
+ self.buf = buf
+ self.munch_crlf = munch_crlf
+ return bytearray().join(chunks), end_of_stream, decode_error
+
+
+class JsonDecoder(object):
+
+ def __init__(self):
+ self.buf = ""
+ self.raw_decode = json.JSONDecoder().raw_decode
+
+ def decode(self, data):
+ chunks = []
+ buf = self.buf + data
+ while True:
+ try:
+ buf = buf.lstrip()
+ res, ptr = self.raw_decode(buf)
+ buf = buf[ptr:]
+ chunks.append(res)
+ except ValueError:
+ break
+ self.buf = buf
+ return chunks
+
+
+class Timer(object):