]> jfr.im git - z_archive/twitter.git/blobdiff - twitter/stream_example.py
Merge pull request #208 from RouxRC/pr-userstream-keywords-example
[z_archive/twitter.git] / twitter / stream_example.py
index 8f5da3a64e6e2f38ea71bcbe8e2db0b81c244c37..8869c15f8515adec138e35f5f1b7ce01b46009d7 100644 (file)
@@ -1,60 +1,78 @@
 """
 Example program for the Stream API. This prints public status messages
-from the "sample" stream as fast as possible.
-
-USAGE
-
-  stream-example -t <token> -ts <token_secret> -ck <consumer_key> -cs <consumer_secret>
-
+from the "sample" stream as fast as possible. Use -h for help.
 """
 
 from __future__ import print_function
 
 import argparse
 
-from twitter.stream import TwitterStream
+from twitter.stream import TwitterStream, Timeout, HeartbeatTimeout, Hangup
 from twitter.oauth import OAuth
 from twitter.util import printNicely
 
 
 def parse_arguments():
 
-    parser = argparse.ArgumentParser()
-
-    parser.add_argument('-t',  '--token', help='The Twitter Access Token.')
-    parser.add_argument('-ts', '--token_secret', help='The Twitter Access Token Secret.')
-    parser.add_argument('-ck', '--consumer_key', help='The Twitter Consumer Key.')
-    parser.add_argument('-cs', '--consumer_secret', help='The Twitter Consumer Secret.')
-    parser.add_argument('-us', '--user_stream', action='store_true', help='Connect to the user stream endpoint.')
-    parser.add_argument('-ss', '--site_stream', action='store_true', help='Connect to the site stream endpoint.')
-
+    parser = argparse.ArgumentParser(description=__doc__ or "")
+
+    parser.add_argument('-t',  '--token', required=True, help='The Twitter Access Token.')
+    parser.add_argument('-ts', '--token-secret', required=True, help='The Twitter Access Token Secret.')
+    parser.add_argument('-ck', '--consumer-key', required=True, help='The Twitter Consumer Key.')
+    parser.add_argument('-cs', '--consumer-secret', required=True, help='The Twitter Consumer Secret.')
+    parser.add_argument('-us', '--user-stream', action='store_true', help='Connect to the user stream endpoint.')
+    parser.add_argument('-ss', '--site-stream', action='store_true', help='Connect to the site stream endpoint.')
+    parser.add_argument('-to', '--timeout', help='Timeout for the stream (seconds).')
+    parser.add_argument('-ht', '--heartbeat-timeout', help='Set heartbeat timeout.', default=90)
+    parser.add_argument('-nb', '--no-block', action='store_true', help='Set stream to non-blocking.')
+    parser.add_argument('-tt', '--track-keywords', help='Search the stream for specific text.')
     return parser.parse_args()
 
 def main():
     args = parse_arguments()
 
-    if not all((args.token, args.token_secret, args.consumer_key, args.consumer_secret)):
-        print(__doc__)
-        return 2
-
     # When using twitter stream you must authorize.
     auth = OAuth(args.token, args.token_secret, args.consumer_key, args.consumer_secret)
+
+    # These arguments are optional:
+    stream_args = dict(
+        timeout=args.timeout,
+        block=not args.no_block,
+        heartbeat_timeout=args.heartbeat_timeout)
+
+    query_args = dict()
+    if args.track_keywords:
+        query_args['track'] = args.track_keywords
+
     if args.user_stream:
-        stream = TwitterStream(auth=auth, domain='userstream.twitter.com')
-        tweet_iter = stream.user()
+        stream = TwitterStream(auth=auth, domain='userstream.twitter.com', **stream_args)
+        tweet_iter = stream.user(**query_args)
     elif args.site_stream:
-        stream = TwitterStream(auth=auth, domain='sitestream.twitter.com')
-        tweet_iter = stream.site()
+        stream = TwitterStream(auth=auth, domain='sitestream.twitter.com', **stream_args)
+        tweet_iter = stream.site(**query_args)
     else:
-        stream = TwitterStream(auth=auth)
-        tweet_iter = stream.statuses.sample()
+        stream = TwitterStream(auth=auth, **stream_args)
+        if args.track_keywords:
+            tweet_iter = stream.statuses.filter(**query_args)
+        else:
+            tweet_iter = stream.statuses.sample()
 
     # Iterate over the sample stream.
     for tweet in tweet_iter:
         # You must test that your tweet has text. It might be a delete
         # or data message.
-        if tweet.get('text'):
+        if tweet is None:
+            printNicely("-- None --")
+        elif tweet is Timeout:
+            printNicely("-- Timeout --")
+        elif tweet is HeartbeatTimeout:
+            printNicely("-- Heartbeat Timeout --")
+        elif tweet is Hangup:
+            printNicely("-- Hangup --")
+        elif tweet.get('text'):
             printNicely(tweet['text'])
+        else:
+            printNicely("-- Some data: " + str(tweet))
 
 if __name__ == '__main__':
     main()