]> jfr.im git - z_archive/twitter.git/blame_incremental - twitter/stream_example.py
Simplify streaming code.
[z_archive/twitter.git] / twitter / stream_example.py
... / ...
CommitLineData
1"""
2Example program for the Stream API. This prints public status messages
3from the "sample" stream as fast as possible.
4
5USAGE
6
7 stream-example -t <token> -ts <token_secret> -ck <consumer_key> -cs <consumer_secret>
8
9"""
10
11from __future__ import print_function
12
13import argparse
14
15from twitter.stream import TwitterStream, Timeout
16from twitter.oauth import OAuth
17from twitter.util import printNicely
18
19
20def parse_arguments():
21
22 parser = argparse.ArgumentParser()
23
24 parser.add_argument('-t', '--token', help='The Twitter Access Token.')
25 parser.add_argument('-ts', '--token-secret', help='The Twitter Access Token Secret.')
26 parser.add_argument('-ck', '--consumer-key', help='The Twitter Consumer Key.')
27 parser.add_argument('-cs', '--consumer-secret', help='The Twitter Consumer Secret.')
28 parser.add_argument('-us', '--user-stream', action='store_true', help='Connect to the user stream endpoint.')
29 parser.add_argument('-ss', '--site-stream', action='store_true', help='Connect to the site stream endpoint.')
30 parser.add_argument('-to', '--timeout', help='Timeout for the stream (seconds)')
31 parser.add_argument('-nb', '--no-block', action='store_true', help='Set stream to non-blocking')
32 return parser.parse_args()
33
34def main():
35 args = parse_arguments()
36
37 if not all((args.token, args.token_secret, args.consumer_key, args.consumer_secret)):
38 print(__doc__)
39 return 2
40
41 # When using twitter stream you must authorize.
42 auth = OAuth(args.token, args.token_secret, args.consumer_key, args.consumer_secret)
43 if args.user_stream:
44 stream = TwitterStream(auth=auth, domain='userstream.twitter.com', timeout=args.timeout, block=not args.no_block)
45 tweet_iter = stream.user()
46 elif args.site_stream:
47 stream = TwitterStream(auth=auth, domain='sitestream.twitter.com')
48 tweet_iter = stream.site()
49 else:
50 stream = TwitterStream(auth=auth)
51 tweet_iter = stream.statuses.sample()
52
53 # Iterate over the sample stream.
54 for tweet in tweet_iter:
55 # You must test that your tweet has text. It might be a delete
56 # or data message.
57 if tweet is None:
58 printNicely("-- None --")
59 elif tweet is Timeout:
60 printNicely("-- Timeout --")
61 elif tweet.get('text'):
62 printNicely(tweet['text'])
63
64if __name__ == '__main__':
65 main()