]>
jfr.im git - z_archive/twitter.git/blob - twitter/stream_example.py
2 Example program for the Stream API. This prints public status messages
3 from the "sample" stream as fast as possible.
7 stream-example -t <token> -ts <token_secret> -ck <consumer_key> -cs <consumer_secret>
11 from __future__
import print_function
15 from twitter
.stream
import TwitterStream
16 from twitter
.oauth
import OAuth
17 from twitter
.util
import printNicely
20 def parse_arguments():
22 parser
= argparse
.ArgumentParser()
24 parser
.add_argument('-t', '--token', help='The Twitter Access Token.')
25 parser
.add_argument('-ts', '--token_secret', help='The Twitter Access Token Secret.')
26 parser
.add_argument('-ck', '--consumer_key', help='The Twitter Consumer Key.')
27 parser
.add_argument('-cs', '--consumer_secret', help='The Twitter Consumer Secret.')
28 parser
.add_argument('-us', '--user_stream', action
='store_true', help='Connect to the user stream endpoint.')
29 parser
.add_argument('-ss', '--site_stream', action
='store_true', help='Connect to the site stream endpoint.')
31 return parser
.parse_args()
34 args
= parse_arguments()
36 if not all((args
.token
, args
.token_secret
, args
.consumer_key
, args
.consumer_secret
)):
40 # When using twitter stream you must authorize.
41 auth
= OAuth(args
.token
, args
.token_secret
, args
.consumer_key
, args
.consumer_secret
)
43 stream
= TwitterStream(auth
=auth
, domain
='userstream.twitter.com')
44 tweet_iter
= stream
.user()
45 elif args
.site_stream
:
46 stream
= TwitterStream(auth
=auth
, domain
='sitestream.twitter.com')
47 tweet_iter
= stream
.site()
49 stream
= TwitterStream(auth
=auth
, timeout
=60.0)
50 tweet_iter
= stream
.statuses
.sample()
52 # Iterate over the sample stream.
53 for tweet
in tweet_iter
:
54 # You must test that your tweet has text. It might be a delete
57 printNicely(tweet
['text'])
59 if __name__
== '__main__':