- try:
- buf = buf.lstrip()
- res, ptr = json_decoder.raw_decode(buf)
- buf = buf[ptr:]
- yield wrap_response(res, self.handle.headers)
- timer = time.time()
- continue
- except ValueError as e:
- if self.block:
- pass
- else:
- yield None
- try:
- buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
- if self.timeout:
- ready_to_read = select.select([sock], [], [], self.timeout)
- if ready_to_read[0]:
- buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
- if time.time() - timer > self.timeout:
- yield {'timeout': True}
- else:
- yield {'timeout': True}
- else:
- buf += recv_chunk(sock).decode('utf-8')
- if not buf and self.block:
- yield {'hangup': True}
- except SSLError as e:
- if (not self.block or self.timeout) and (e.errno == 2): pass # Empty buffer during polling.
- else: raise
-
-def handle_stream_response(req, uri, arg_data, block, timeout=None):
+ # Decode all the things:
+ data = sock_reader.read()
+ dechunked_data, end_of_stream, decode_error = chunk_decoder.decode(data)
+ unicode_data = utf8_decoder.decode(dechunked_data)
+ json_data = json_decoder.decode(unicode_data)
+
+ # Yield data-like things:
+ for json_obj in json_data:
+ yield wrap_response(json_obj, headers)
+
+ # Reset timers:
+ if dechunked_data:
+ heartbeat_timer.reset()
+ if json_data:
+ timer.reset()
+
+ # Yield timeouts and special things:
+ if end_of_stream:
+ yield Hangup
+ break
+ if decode_error:
+ yield DecodeError
+ break
+ if heartbeat_timer.expired():
+ yield HeartbeatTimeout
+ break
+ if timer.expired():
+ yield self.timeout_token
+
+
+def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):