]>
jfr.im git - z_archive/twitter.git/blob - twitter/stream_example.py
2 Example program for the Stream API. This prints public status messages
3 from the "sample" stream as fast as possible. Use -h for help.
6 from __future__
import print_function
10 from twitter
.stream
import TwitterStream
, Timeout
, HeartbeatTimeout
, Hangup
11 from twitter
.oauth
import OAuth
12 from twitter
.util
import printNicely
15 def parse_arguments():
17 parser
= argparse
.ArgumentParser(description
=__doc__
or "")
19 parser
.add_argument('-t', '--token', required
=True, help='The Twitter Access Token.')
20 parser
.add_argument('-ts', '--token-secret', required
=True, help='The Twitter Access Token Secret.')
21 parser
.add_argument('-ck', '--consumer-key', required
=True, help='The Twitter Consumer Key.')
22 parser
.add_argument('-cs', '--consumer-secret', required
=True, help='The Twitter Consumer Secret.')
23 parser
.add_argument('-us', '--user-stream', action
='store_true', help='Connect to the user stream endpoint.')
24 parser
.add_argument('-ss', '--site-stream', action
='store_true', help='Connect to the site stream endpoint.')
25 parser
.add_argument('-to', '--timeout', help='Timeout for the stream (seconds).')
26 parser
.add_argument('-ht', '--heartbeat-timeout', help='Set heartbeat timeout.', default
=90)
27 parser
.add_argument('-nb', '--no-block', action
='store_true', help='Set stream to non-blocking.')
28 parser
.add_argument('-tt', '--track-keywords', help='Search the stream for specific text.')
29 return parser
.parse_args()
32 args
= parse_arguments()
34 # When using twitter stream you must authorize.
35 auth
= OAuth(args
.token
, args
.token_secret
, args
.consumer_key
, args
.consumer_secret
)
37 # These arguments are optional:
40 block
=not args
.no_block
,
41 heartbeat_timeout
=args
.heartbeat_timeout
)
44 if args
.track_keywords
:
45 query_args
['track'] = args
.track_keywords
48 stream
= TwitterStream(auth
=auth
, domain
='userstream.twitter.com', **stream_args
)
49 tweet_iter
= stream
.user(**query_args
)
50 elif args
.site_stream
:
51 stream
= TwitterStream(auth
=auth
, domain
='sitestream.twitter.com', **stream_args
)
52 tweet_iter
= stream
.site(**query_args
)
54 stream
= TwitterStream(auth
=auth
, **stream_args
)
55 if args
.track_keywords
:
56 tweet_iter
= stream
.statuses
.filter(**query_args
)
58 tweet_iter
= stream
.statuses
.sample()
60 # Iterate over the sample stream.
61 for tweet
in tweet_iter
:
62 # You must test that your tweet has text. It might be a delete
65 printNicely("-- None --")
66 elif tweet
is Timeout
:
67 printNicely("-- Timeout --")
68 elif tweet
is HeartbeatTimeout
:
69 printNicely("-- Heartbeat Timeout --")
71 printNicely("-- Hangup --")
72 elif tweet
.get('text'):
73 printNicely(tweet
['text'])
75 printNicely("-- Some data: " + str(tweet
))
77 if __name__
== '__main__':