Timeout = {'timeout': True}
Hangup = {'hangup': True}
-
+HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
def recv_chunk(sock): # -> bytearray:
class Timer(object):
def __init__(self, timeout):
- # If timeout is None, we always expire.
+ # If timeout is None, we never expire.
self.timeout = timeout
self.reset()
If expired, reset the timer and return True.
"""
if self.timeout is None:
- return True
+ return False
elif time.time() - self.time > self.timeout:
self.reset()
return True
class TwitterJSONIter(object):
- def __init__(self, handle, uri, arg_data, block=True, timeout=None):
+ def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
self.handle = handle
self.uri = uri
self.arg_data = arg_data
self.block = block
- self.timeout = timeout
+ self.timeout = float(timeout) if timeout else None
+ self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
def __iter__(self):
- actually_blocking = self.block and not self.timeout
+ actually_block = self.block and not self.timeout
+ sock_timeout = min(self.timeout, self.heartbeat_timeout) if actually_block else None
sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- sock.setblocking(actually_blocking)
+ sock.setblocking(actually_block)
buf = ''
- json_decoder = json.JSONDecoder()
+ raw_decode = json.JSONDecoder().raw_decode
timer = Timer(self.timeout)
- timeout_token = Timeout if self.timeout else None
+ heartbeat_timer = Timer(self.heartbeat_timeout)
while True:
- buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
+ if buf:
+ heartbeat_timer.reset()
+
+ buf = buf.lstrip() # Remove any keep-alive delimiters
try:
- res, ptr = json_decoder.raw_decode(buf)
+ res, ptr = raw_decode(buf)
buf = buf[ptr:]
except ValueError:
- pass
+ if not self.block and not self.timeout:
+ yield None
else:
yield wrap_response(res, self.handle.headers)
timer.reset()
+ heartbeat_timer.reset()
continue
+
+ if heartbeat_timer.expired():
+ yield HeartbeatTimeout
+ break
+ if timer.expired():
+ yield Timeout
+
try:
- if self.timeout and not buf: # This is a non-blocking read.
- ready_to_read = select.select([sock], [], [], self.timeout)[0]
- if not ready_to_read and timer.expired():
- yield timeout_token
- continue
- buf += recv_chunk(sock).decode('utf-8')
+ if not buf:
+ if sock_timeout:
+ ready_to_read = select.select([sock], [], [], sock_timeout)[0]
+ if not ready_to_read:
+ continue
+ buf += recv_chunk(sock).decode('utf-8')
if not buf:
yield Hangup
break
except SSLError as e:
- # Error from a non-blocking read of an empty buffer.
- if not actually_blocking and (e.errno == 2):
- if timer.expired():
- yield timeout_token
- else: raise
+ # Code 2 is error from a non-blocking read of an empty buffer.
+ if e.errno != 2:
+ raise
-def handle_stream_response(req, uri, arg_data, block, timeout=None):
+def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
try:
handle = urllib_request.urlopen(req,)
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, uri, 'json', arg_data)
- return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
+ return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
class TwitterStream(TwitterCall):
"""
connection breaks, the iterator yields `{'hangup': True}`, and
raises `StopIteration` if iterated again.
+ Similarly, if the stream does not produce heartbeats for more than
+ 90 seconds, the iterator yields `{'hangup': True,
+ 'heartbeat_timeout': True}`, and raises `StopIteration` if
+ iterated again.
+
The `timeout` parameter controls the maximum time between
yields. If it is nonzero, then the iterator will yield either
- stream data or `{'timeout': True}`. This is useful if you want
- your program to do other stuff in between waiting for tweets.
-
- The `block` parameter sets the stream to be non-blocking. In this
- mode, the iterator always yields immediately. It returns stream
- data, or `None`. Note that `timeout` supercedes this argument, so
- it should also be set `None` to use this mode.
+ stream data or `{'timeout': True}` within the timeout period. This
+ is useful if you want your program to do other stuff in between
+ waiting for tweets.
+
+ The `block` parameter sets the stream to be fully non-blocking. In
+ this mode, the iterator always yields immediately. It returns
+ stream data, or `None`. Note that `timeout` supercedes this
+ argument, so it should also be set `None` to use this mode.
"""
- def __init__(
- self, domain="stream.twitter.com", secure=True, auth=None,
- api_version='1.1', block=True, timeout=None):
+ def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
+ api_version='1.1', block=True, timeout=None,
+ heartbeat_timeout=90.0):
uriparts = (str(api_version),)
- timeout = float(timeout) if timeout else None
class TwitterStreamCall(TwitterCall):
def _handle_response(self, req, uri, arg_data, _timeout=None):
return handle_stream_response(
- req, uri, arg_data, block=block, timeout=_timeout or timeout)
+ req, uri, arg_data, block,
+ _timeout or timeout, heartbeat_timeout)
TwitterCall.__init__(
self, auth=auth, format="json", domain=domain,
import argparse
-from twitter.stream import TwitterStream, Timeout
+from twitter.stream import TwitterStream, Timeout, HeartbeatTimeout, Hangup
from twitter.oauth import OAuth
from twitter.util import printNicely
parser.add_argument('-cs', '--consumer-secret', required=True, help='The Twitter Consumer Secret.')
parser.add_argument('-us', '--user-stream', action='store_true', help='Connect to the user stream endpoint.')
parser.add_argument('-ss', '--site-stream', action='store_true', help='Connect to the site stream endpoint.')
- parser.add_argument('-to', '--timeout', help='Timeout for the stream (seconds)')
- parser.add_argument('-nb', '--no-block', action='store_true', help='Set stream to non-blocking')
- parser.add_argument('-tt', '--track-keywords', help='Search the stream for specific text')
+ parser.add_argument('-to', '--timeout', help='Timeout for the stream (seconds).')
+ parser.add_argument('-ht', '--heartbeat-timeout', help='Set heartbeat timeout.', default=90)
+ parser.add_argument('-nb', '--no-block', action='store_true', help='Set stream to non-blocking.')
+ parser.add_argument('-tt', '--track-keywords', help='Search the stream for specific text.')
return parser.parse_args()
def main():
args = parse_arguments()
- if not all((args.token, args.token_secret, args.consumer_key, args.consumer_secret)):
- print(__doc__)
- return 2
-
# When using twitter stream you must authorize.
auth = OAuth(args.token, args.token_secret, args.consumer_key, args.consumer_secret)
+
+ # These arguments are optional:
+ stream_args = dict(
+ timeout=args.timeout,
+ block=not args.no_block,
+ heartbeat_timeout=args.heartbeat_timeout)
+
if args.user_stream:
- stream = TwitterStream(auth=auth, domain='userstream.twitter.com', timeout=args.timeout, block=not args.no_block)
+ stream = TwitterStream(auth=auth, domain='userstream.twitter.com', **stream_args)
tweet_iter = stream.user()
elif args.site_stream:
- stream = TwitterStream(auth=auth, domain='sitestream.twitter.com', timeout=args.timeout, block=not args.no_block)
+ stream = TwitterStream(auth=auth, domain='sitestream.twitter.com', **stream_args)
tweet_iter = stream.site()
else:
- stream = TwitterStream(auth=auth, timeout=args.timeout, block=not args.no_block)
+ stream = TwitterStream(auth=auth, **stream_args)
if args.track_keywords:
tweet_iter = stream.statuses.filter(track=args.track_keywords)
else:
printNicely("-- None --")
elif tweet is Timeout:
printNicely("-- Timeout --")
+ elif tweet is HeartbeatTimeout:
+ printNicely("-- Heartbeat Timeout --")
+ elif tweet is Hangup:
+ printNicely("-- Hangup --")
elif tweet.get('text'):
printNicely(tweet['text'])
+ else:
+ printNicely("-- Some data: " + str(tweet))
if __name__ == '__main__':
main()