]> jfr.im git - z_archive/twitter.git/commitdiff
Explicitly control heartbeat timeouts.
authorMike Verdone <redacted>
Tue, 18 Feb 2014 00:59:54 +0000 (01:59 +0100)
committerMike Verdone <redacted>
Tue, 18 Feb 2014 00:59:54 +0000 (01:59 +0100)
twitter/stream.py
twitter/stream_example.py

index bf116d56790e8574afcb3e2d2d00e5ea79f52d7a..728b49d9e8efa90f58fc1ba0a2f0ba07028a0988 100644 (file)
@@ -17,7 +17,7 @@ PY_3_OR_HIGHER = sys.version_info >= (3, 0)
 
 Timeout = {'timeout': True}
 Hangup = {'hangup': True}
-
+HeartbeatTimeout = {'heartbeat_timeout': True, 'hangup': True}
 
 def recv_chunk(sock): # -> bytearray:
 
@@ -53,7 +53,7 @@ def recv_chunk(sock): # -> bytearray:
 
 class Timer(object):
     def __init__(self, timeout):
-        # If timeout is None, we always expire.
+        # If timeout is None, we never expire.
         self.timeout = timeout
         self.reset()
 
@@ -65,7 +65,7 @@ class Timer(object):
         If expired, reset the timer and return True.
         """
         if self.timeout is None:
-            return True
+            return False
         elif time.time() - self.time > self.timeout:
             self.reset()
             return True
@@ -74,57 +74,69 @@ class Timer(object):
 
 class TwitterJSONIter(object):
 
-    def __init__(self, handle, uri, arg_data, block=True, timeout=None):
+    def __init__(self, handle, uri, arg_data, block, timeout, heartbeat_timeout):
         self.handle = handle
         self.uri = uri
         self.arg_data = arg_data
         self.block = block
-        self.timeout = timeout
+        self.timeout = float(timeout) if timeout else None
+        self.heartbeat_timeout = float(heartbeat_timeout) if heartbeat_timeout else None
 
 
     def __iter__(self):
-        actually_blocking = self.block and not self.timeout
+        actually_block = self.block and not self.timeout
+        sock_timeout = min(self.timeout, self.heartbeat_timeout) if actually_block else None
         sock = self.handle.fp.raw._sock if PY_3_OR_HIGHER else self.handle.fp._sock.fp._sock
         sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-        sock.setblocking(actually_blocking)
+        sock.setblocking(actually_block)
         buf = ''
-        json_decoder = json.JSONDecoder()
+        raw_decode = json.JSONDecoder().raw_decode
         timer = Timer(self.timeout)
-        timeout_token = Timeout if self.timeout else None
+        heartbeat_timer = Timer(self.heartbeat_timeout)
         while True:
-            buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
+            if buf:
+                heartbeat_timer.reset()
+
+            buf = buf.lstrip() # Remove any keep-alive delimiters
             try:
-                res, ptr = json_decoder.raw_decode(buf)
+                res, ptr = raw_decode(buf)
                 buf = buf[ptr:]
             except ValueError:
-                pass
+                if not self.block and not self.timeout:
+                    yield None
             else:
                 yield wrap_response(res, self.handle.headers)
                 timer.reset()
+                heartbeat_timer.reset()
                 continue
+
+            if heartbeat_timer.expired():
+                yield HeartbeatTimeout
+                break
+            if timer.expired():
+                yield Timeout
+
             try:
-                if self.timeout and not buf:  # This is a non-blocking read.
-                    ready_to_read = select.select([sock], [], [], self.timeout)[0]
-                    if not ready_to_read and timer.expired():
-                        yield timeout_token
-                        continue
-                buf += recv_chunk(sock).decode('utf-8')
+                if not buf:
+                    if sock_timeout:
+                        ready_to_read = select.select([sock], [], [], sock_timeout)[0]
+                        if not ready_to_read:
+                            continue
+                    buf += recv_chunk(sock).decode('utf-8')
                 if not buf:
                     yield Hangup
                     break
             except SSLError as e:
-                # Error from a non-blocking read of an empty buffer.
-                if not actually_blocking and (e.errno == 2):
-                    if timer.expired():
-                        yield timeout_token
-                else: raise
+                # Code 2 is error from a non-blocking read of an empty buffer.
+                if e.errno != 2:
+                    raise
 
-def handle_stream_response(req, uri, arg_data, block, timeout=None):
+def handle_stream_response(req, uri, arg_data, block, timeout, heartbeat_timeout):
     try:
         handle = urllib_request.urlopen(req,)
     except urllib_error.HTTPError as e:
         raise TwitterHTTPError(e, uri, 'json', arg_data)
-    return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))
+    return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout, heartbeat_timeout))
 
 class TwitterStream(TwitterCall):
     """
@@ -143,26 +155,32 @@ class TwitterStream(TwitterCall):
     connection breaks, the iterator yields `{'hangup': True}`, and
     raises `StopIteration` if iterated again.
 
+    Similarly, if the stream does not produce heartbeats for more than
+    90 seconds, the iterator yields `{'hangup': True,
+    'heartbeat_timeout': True}`, and raises `StopIteration` if
+    iterated again.
+
     The `timeout` parameter controls the maximum time between
     yields. If it is nonzero, then the iterator will yield either
-    stream data or `{'timeout': True}`. This is useful if you want
-    your program to do other stuff in between waiting for tweets.
-
-    The `block` parameter sets the stream to be non-blocking. In this
-    mode, the iterator always yields immediately. It returns stream
-    data, or `None`. Note that `timeout` supercedes this argument, so
-    it should also be set `None` to use this mode.
+    stream data or `{'timeout': True}` within the timeout period. This
+    is useful if you want your program to do other stuff in between
+    waiting for tweets.
+
+    The `block` parameter sets the stream to be fully non-blocking. In
+    this mode, the iterator always yields immediately. It returns
+    stream data, or `None`. Note that `timeout` supercedes this
+    argument, so it should also be set `None` to use this mode.
     """
-    def __init__(
-        self, domain="stream.twitter.com", secure=True, auth=None,
-        api_version='1.1', block=True, timeout=None):
+    def __init__(self, domain="stream.twitter.com", secure=True, auth=None,
+                 api_version='1.1', block=True, timeout=None,
+                 heartbeat_timeout=90.0):
         uriparts = (str(api_version),)
-        timeout = float(timeout) if timeout else None
 
         class TwitterStreamCall(TwitterCall):
             def _handle_response(self, req, uri, arg_data, _timeout=None):
                 return handle_stream_response(
-                    req, uri, arg_data, block=block, timeout=_timeout or timeout)
+                    req, uri, arg_data, block,
+                    _timeout or timeout, heartbeat_timeout)
 
         TwitterCall.__init__(
             self, auth=auth, format="json", domain=domain,
index ac0c8b7350255159c563062b06aad80aba9ebe67..0996ea1c92212ebbf98d1b07e4f68223f69ac259 100644 (file)
@@ -7,7 +7,7 @@ from __future__ import print_function
 
 import argparse
 
-from twitter.stream import TwitterStream, Timeout
+from twitter.stream import TwitterStream, Timeout, HeartbeatTimeout, Hangup
 from twitter.oauth import OAuth
 from twitter.util import printNicely
 
@@ -22,28 +22,32 @@ def parse_arguments():
     parser.add_argument('-cs', '--consumer-secret', required=True, help='The Twitter Consumer Secret.')
     parser.add_argument('-us', '--user-stream', action='store_true', help='Connect to the user stream endpoint.')
     parser.add_argument('-ss', '--site-stream', action='store_true', help='Connect to the site stream endpoint.')
-    parser.add_argument('-to', '--timeout', help='Timeout for the stream (seconds)')
-    parser.add_argument('-nb', '--no-block', action='store_true', help='Set stream to non-blocking')
-    parser.add_argument('-tt', '--track-keywords', help='Search the stream for specific text')
+    parser.add_argument('-to', '--timeout', help='Timeout for the stream (seconds).')
+    parser.add_argument('-ht', '--heartbeat-timeout', help='Set heartbeat timeout.', default=90)
+    parser.add_argument('-nb', '--no-block', action='store_true', help='Set stream to non-blocking.')
+    parser.add_argument('-tt', '--track-keywords', help='Search the stream for specific text.')
     return parser.parse_args()
 
 def main():
     args = parse_arguments()
 
-    if not all((args.token, args.token_secret, args.consumer_key, args.consumer_secret)):
-        print(__doc__)
-        return 2
-
     # When using twitter stream you must authorize.
     auth = OAuth(args.token, args.token_secret, args.consumer_key, args.consumer_secret)
+
+    # These arguments are optional:
+    stream_args = dict(
+        timeout=args.timeout,
+        block=not args.no_block,
+        heartbeat_timeout=args.heartbeat_timeout)
+
     if args.user_stream:
-        stream = TwitterStream(auth=auth, domain='userstream.twitter.com', timeout=args.timeout, block=not args.no_block)
+        stream = TwitterStream(auth=auth, domain='userstream.twitter.com', **stream_args)
         tweet_iter = stream.user()
     elif args.site_stream:
-        stream = TwitterStream(auth=auth, domain='sitestream.twitter.com', timeout=args.timeout, block=not args.no_block)
+        stream = TwitterStream(auth=auth, domain='sitestream.twitter.com', **stream_args)
         tweet_iter = stream.site()
     else:
-        stream = TwitterStream(auth=auth, timeout=args.timeout, block=not args.no_block)
+        stream = TwitterStream(auth=auth, **stream_args)
         if args.track_keywords:
             tweet_iter = stream.statuses.filter(track=args.track_keywords)
         else:
@@ -57,8 +61,14 @@ def main():
             printNicely("-- None --")
         elif tweet is Timeout:
             printNicely("-- Timeout --")
+        elif tweet is HeartbeatTimeout:
+            printNicely("-- Heartbeat Timeout --")
+        elif tweet is Hangup:
+            printNicely("-- Hangup --")
         elif tweet.get('text'):
             printNicely(tweet['text'])
+        else:
+            printNicely("-- Some data: " + str(tweet))
 
 if __name__ == '__main__':
     main()