]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
d466edc2d3d67e4e838508a239f5e1e6db6828cb
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6 ACTIONS:
7 follow add the specified user to your follow list
8 friends get latest tweets from your friends (default action)
9 help print this help text that you are currently reading
10 leave remove the specified user from your following list
11 public get latest public tweets
12 replies get latest replies
13 search searchtwitter (Beware: octothorpe, escape it)
14 set set your twitter status
15 shell login the twitter shell
16
17 OPTIONS:
18
19 -e --email <email> your email to login to twitter
20 -p --password <password> your twitter password
21 -r --refresh run this command forever, polling every once
22 in a while (default: every 5 minutes)
23 -R --refresh-rate <rate> set the refresh rate (in seconds)
24 -f --format <format> specify the output format for status updates
25 -c --config <filename> read username and password from given config
26 file (default ~/.twitter)
27 -l --length <count> specify number of status updates shown
28 (default: 20, max: 200)
29 -t --timestamp show time before status lines
30 -d --datestamp shoe date before status lines
31 --no-ssl use HTTP instead of more secure HTTPS
32
33 FORMATS for the --format option
34
35 default one line per status
36 verbose multiple lines per status, more verbose status info
37 urls nothing but URLs
38 ansi ansi colour (rainbow mode)
39
40 CONFIG FILES
41
42 The config file should contain a [twitter] header, and all the desired options
43 you wish to set, like so:
44
45 [twitter]
46 email: <username>
47 password: <password>
48 format: <desired_default_format_for_output>
49 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
50 """
51
52 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
53 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
54
55 import sys
56 import time
57 from getopt import gnu_getopt as getopt, GetoptError
58 from getpass import getpass
59 import re
60 import os.path
61 from ConfigParser import SafeConfigParser
62 import datetime
63 from urllib import quote
64 import webbrowser
65
66 from api import Twitter, TwitterError
67 from oauth import OAuth
68 import ansi
69
70 # Please don't change this, it was provided by the fine folks at Twitter.
71 # If you change it, it will not work.
72 AGENT_STR = "twittercommandlinetoolpy"
73
74 OPTIONS = {
75 'email': None,
76 'password': None,
77 'action': 'friends',
78 'refresh': False,
79 'refresh_rate': 600,
80 'format': 'default',
81 'prompt': '[cyan]twitter[R]> ',
82 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
83 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
84 'length': 20,
85 'timestamp': False,
86 'datestamp': False,
87 'extra_args': [],
88 'secure': True,
89 }
90
91 def parse_args(args, options):
92 long_opts = ['email', 'password', 'help', 'format', 'refresh',
93 'refresh-rate', 'config', 'length', 'timestamp',
94 'datestamp', 'no-ssl']
95 short_opts = "e:p:f:h?rR:c:l:td"
96 opts, extra_args = getopt(args, short_opts, long_opts)
97
98 for opt, arg in opts:
99 if opt in ('-e', '--email'):
100 options['email'] = arg
101 elif opt in ('-p', '--password'):
102 options['password'] = arg
103 elif opt in ('-f', '--format'):
104 options['format'] = arg
105 elif opt in ('-r', '--refresh'):
106 options['refresh'] = True
107 elif opt in ('-R', '--refresh-rate'):
108 options['refresh_rate'] = int(arg)
109 elif opt in ('-l', '--length'):
110 options["length"] = int(arg)
111 elif opt in ('-t', '--timestamp'):
112 options["timestamp"] = True
113 elif opt in ('-d', '--datestamp'):
114 options["datestamp"] = True
115 elif opt in ('-?', '-h', '--help'):
116 options['action'] = 'help'
117 elif opt in ('-c', '--config'):
118 options['config_filename'] = arg
119 elif opt == '--no-ssl':
120 options['secure'] = False
121
122 if extra_args and not ('action' in options and options['action'] == 'help'):
123 options['action'] = extra_args[0]
124 options['extra_args'] = extra_args[1:]
125
126 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
127 timestamp = options["timestamp"]
128 datestamp = options["datestamp"]
129 t = time.strptime(status['created_at'], format)
130 i_hate_timezones = time.timezone
131 if (time.daylight):
132 i_hate_timezones = time.altzone
133 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
134 seconds=i_hate_timezones)
135 t = dt.timetuple()
136 if timestamp and datestamp:
137 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
138 elif timestamp:
139 return time.strftime("%H:%M:%S ", t)
140 elif datestamp:
141 return time.strftime("%Y-%m-%d ", t)
142 return ""
143
144 class StatusFormatter(object):
145 def __call__(self, status, options):
146 return (u"%s%s %s" %(
147 get_time_string(status, options),
148 status['user']['screen_name'], status['text']))
149
150 class AnsiStatusFormatter(object):
151 def __init__(self):
152 self._colourMap = ansi.ColourMap()
153
154 def __call__(self, status, options):
155 colour = self._colourMap.colourFor(status['user']['screen_name'])
156 return (u"%s%s%s%s %s" %(
157 get_time_string(status, options),
158 ansi.cmdColour(colour), status['user']['screen_name'],
159 ansi.cmdReset(), status['text']))
160
161 class VerboseStatusFormatter(object):
162 def __call__(self, status, options):
163 return (u"-- %s (%s) on %s\n%s\n" %(
164 status['user']['screen_name'],
165 status['user']['location'],
166 status['created_at'],
167 status['text']))
168
169 class URLStatusFormatter(object):
170 urlmatch = re.compile(r'https?://\S+')
171 def __call__(self, status, options):
172 urls = self.urlmatch.findall(status['text'])
173 return u'\n'.join(urls) if urls else ""
174
175 class AdminFormatter(object):
176 def __call__(self, action, user):
177 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
178 if action == "follow":
179 return u"You are now following %s.\n" %(user_str)
180 else:
181 return u"You are no longer following %s.\n" %(user_str)
182
183 class VerboseAdminFormatter(object):
184 def __call__(self, action, user):
185 return(u"-- %s: %s (%s): %s" % (
186 "Following" if action == "follow" else "Leaving",
187 user['screen_name'],
188 user['name'],
189 user['url']))
190
191 class SearchFormatter(object):
192 def __call__(self, result, options):
193 return(u"%s%s %s" %(
194 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
195 result['from_user'], result['text']))
196
197 class VerboseSearchFormatter(SearchFormatter):
198 pass #Default to the regular one
199
200 class URLSearchFormatter(object):
201 urlmatch = re.compile(r'https?://\S+')
202 def __call__(self, result, options):
203 urls = self.urlmatch.findall(result['text'])
204 return u'\n'.join(urls) if urls else ""
205
206 class AnsiSearchFormatter(object):
207 def __init__(self):
208 self._colourMap = ansi.ColourMap()
209
210 def __call__(self, result, options):
211 colour = self._colourMap.colourFor(result['from_user'])
212 return (u"%s%s%s%s %s" %(
213 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
214 ansi.cmdColour(colour), result['from_user'],
215 ansi.cmdReset(), result['text']))
216
217 formatters = {}
218 status_formatters = {
219 'default': StatusFormatter,
220 'verbose': VerboseStatusFormatter,
221 'urls': URLStatusFormatter,
222 'ansi': AnsiStatusFormatter
223 }
224 formatters['status'] = status_formatters
225
226 admin_formatters = {
227 'default': AdminFormatter,
228 'verbose': VerboseAdminFormatter,
229 'urls': AdminFormatter,
230 'ansi': AdminFormatter
231 }
232 formatters['admin'] = admin_formatters
233
234 search_formatters = {
235 'default': SearchFormatter,
236 'verbose': VerboseSearchFormatter,
237 'urls': URLSearchFormatter,
238 'ansi': AnsiSearchFormatter
239 }
240 formatters['search'] = search_formatters
241
242 def get_formatter(action_type, options):
243 formatters_dict = formatters.get(action_type)
244 if (not formatters_dict):
245 raise TwitterError(
246 "There was an error finding a class of formatters for your type (%s)"
247 %(action_type))
248 f = formatters_dict.get(options['format'])
249 if (not f):
250 raise TwitterError(
251 "Unknown formatter '%s' for status actions" %(options['format']))
252 return f()
253
254 class Action(object):
255
256 def ask(self, subject='perform this action', careful=False):
257 '''
258 Requests fromt he user using `raw_input` if `subject` should be
259 performed. When `careful`, the default answer is NO, otherwise YES.
260 Returns the user answer in the form `True` or `False`.
261 '''
262 sample = '(y/N)'
263 if not careful:
264 sample = '(Y/n)'
265
266 prompt = 'You really want to %s %s? ' %(subject, sample)
267 try:
268 answer = raw_input(prompt).lower()
269 if careful:
270 return answer in ('yes', 'y')
271 else:
272 return answer not in ('no', 'n')
273 except EOFError:
274 print >>sys.stderr # Put Newline since Enter was never pressed
275 # TODO:
276 # Figure out why on OS X the raw_input keeps raising
277 # EOFError and is never able to reset and get more input
278 # Hint: Look at how IPython implements their console
279 default = True
280 if careful:
281 default = False
282 return default
283
284 def __call__(self, twitter, options):
285 action = actions.get(options['action'], NoSuchAction)()
286 try:
287 doAction = lambda : action(twitter, options)
288 if (options['refresh'] and isinstance(action, StatusAction)):
289 while True:
290 doAction()
291 time.sleep(options['refresh_rate'])
292 else:
293 doAction()
294 except KeyboardInterrupt:
295 print >>sys.stderr, '\n[Keyboard Interrupt]'
296 pass
297
298 class NoSuchActionError(Exception):
299 pass
300
301 class NoSuchAction(Action):
302 def __call__(self, twitter, options):
303 raise NoSuchActionError("No such action: %s" %(options['action']))
304
305 def printNicely(string):
306 if sys.stdout.encoding:
307 print string.encode(sys.stdout.encoding, 'replace')
308 else:
309 print string.encode('utf-8')
310
311 class StatusAction(Action):
312 def __call__(self, twitter, options):
313 statuses = self.getStatuses(twitter, options)
314 sf = get_formatter('status', options)
315 for status in statuses:
316 statusStr = sf(status, options)
317 if statusStr.strip():
318 printNicely(statusStr)
319
320 class SearchAction(Action):
321 def __call__(self, twitter, options):
322 # We need to be pointing at search.twitter.com to work, and it is less
323 # tangly to do it here than in the main()
324 twitter.domain="search.twitter.com"
325 # We need to bypass the TwitterCall parameter encoding, so we
326 # don't encode the plus sign, so we have to encode it ourselves
327 query_string = "+".join([quote(term) for term in options['extra_args']])
328 twitter.encoded_args = "q=%s" %(query_string)
329
330 results = twitter.search()['results']
331 f = get_formatter('search', options)
332 for result in results:
333 resultStr = f(result, options)
334 if resultStr.strip():
335 printNicely(resultStr)
336
337 class AdminAction(Action):
338 def __call__(self, twitter, options):
339 if not (options['extra_args'] and options['extra_args'][0]):
340 raise TwitterError("You need to specify a user (screen name)")
341 af = get_formatter('admin', options)
342 try:
343 user = self.getUser(twitter, options['extra_args'][0])
344 except TwitterError, e:
345 print "There was a problem following or leaving the specified user."
346 print "You may be trying to follow a user you are already following;"
347 print "Leaving a user you are not currently following;"
348 print "Or the user may not exist."
349 print "Sorry."
350 print
351 print e
352 else:
353 printNicely(af(options['action'], user))
354
355 class FriendsAction(StatusAction):
356 def getStatuses(self, twitter, options):
357 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
358
359 class PublicAction(StatusAction):
360 def getStatuses(self, twitter, options):
361 return reversed(twitter.statuses.public_timeline(count=options["length"]))
362
363 class RepliesAction(StatusAction):
364 def getStatuses(self, twitter, options):
365 return reversed(twitter.statuses.replies(count=options["length"]))
366
367 class FollowAction(AdminAction):
368 def getUser(self, twitter, user):
369 return twitter.friendships.create(id=user)
370
371 class LeaveAction(AdminAction):
372 def getUser(self, twitter, user):
373 return twitter.friendships.destroy(id=user)
374
375 class SetStatusAction(Action):
376 def __call__(self, twitter, options):
377 statusTxt = (u" ".join(options['extra_args'])
378 if options['extra_args']
379 else unicode(raw_input("message: ")))
380 status = (statusTxt.encode('utf8', 'replace'))
381 twitter.statuses.update(status=status)
382
383 class TwitterShell(Action):
384
385 def render_prompt(self, prompt):
386 '''Parses the `prompt` string and returns the rendered version'''
387 prompt = prompt.strip("'").replace("\\'","'")
388 for colour in ansi.COLOURS_NAMED:
389 if '[%s]' %(colour) in prompt:
390 prompt = prompt.replace(
391 '[%s]' %(colour), ansi.cmdColourNamed(colour))
392 prompt = prompt.replace('[R]', ansi.cmdReset())
393 return prompt
394
395 def __call__(self, twitter, options):
396 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
397 while True:
398 options['action'] = ""
399 try:
400 args = raw_input(prompt).split()
401 parse_args(args, options)
402 if not options['action']:
403 continue
404 elif options['action'] == 'exit':
405 raise SystemExit(0)
406 elif options['action'] == 'shell':
407 print >>sys.stderr, 'Sorry Xzibit does not work here!'
408 continue
409 elif options['action'] == 'help':
410 print >>sys.stderr, '''\ntwitter> `action`\n
411 The Shell Accepts all the command line actions along with:
412
413 exit Leave the twitter shell (^D may also be used)
414
415 Full CMD Line help is appended below for your convinience.'''
416 Action()(twitter, options)
417 options['action'] = ''
418 except NoSuchActionError, e:
419 print >>sys.stderr, e
420 except KeyboardInterrupt:
421 print >>sys.stderr, '\n[Keyboard Interrupt]'
422 except EOFError:
423 print >>sys.stderr
424 leaving = self.ask(subject='Leave')
425 if not leaving:
426 print >>sys.stderr, 'Excellent!'
427 else:
428 raise SystemExit(0)
429
430 class HelpAction(Action):
431 def __call__(self, twitter, options):
432 print __doc__
433
434 def parse_oauth_tokens(result):
435 for r in result.split('&'):
436 k, v = r.split('=')
437 if k == 'oauth_token':
438 oauth_token = v
439 elif k == 'oauth_token_secret':
440 oauth_token_secret = v
441 return oauth_token, oauth_token_secret
442
443 def oauth_dance(options):
444 print ("Hi there! We're gonna get you all set up to use Twitter"
445 " on the command-line.")
446 twitter = Twitter(
447 auth=OAuth('', '', CONSUMER_KEY, CONSUMER_SECRET),
448 format='')
449 oauth_token, oauth_token_secret = parse_oauth_tokens(
450 twitter.oauth.request_token())
451 print """
452 In the web browser window that opens please choose to Allow access to the
453 command-line tool. Copy the PIN number that appears on the next page and
454 paste or type it here:
455 """
456 webbrowser.open(
457 'http://api.twitter.com/oauth/authorize?oauth_token=' +
458 oauth_token)
459 oauth_verifier = raw_input("Please type the PIN: ").strip()
460 twitter = Twitter(
461 auth=OAuth(
462 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
463 format='')
464 oauth_token, oauth_token_secret = parse_oauth_tokens(
465 twitter.oauth.access_token(oauth_verifier=oauth_verifier))
466 oauth_file = open(options['oauth_filename'], 'w')
467 print >> oauth_file, oauth_token
468 print >> oauth_file, oauth_token_secret
469 oauth_file.close()
470 print "That's it! Your authorization keys have been written to %s." % (
471 options['oauth_filename'])
472
473
474 actions = {
475 'follow' : FollowAction,
476 'friends' : FriendsAction,
477 'help' : HelpAction,
478 'leave' : LeaveAction,
479 'public' : PublicAction,
480 'replies' : RepliesAction,
481 'search' : SearchAction,
482 'set' : SetStatusAction,
483 'shell' : TwitterShell,
484 }
485
486 def loadConfig(filename):
487 options = dict(OPTIONS)
488 if os.path.exists(filename):
489 cp = SafeConfigParser()
490 cp.read([filename])
491 for option in ('email', 'password', 'format', 'prompt'):
492 if cp.has_option('twitter', option):
493 options[option] = cp.get('twitter', option)
494 return options
495
496 def main(args=sys.argv[1:]):
497 arg_options = {}
498 try:
499 parse_args(args, arg_options)
500 except GetoptError, e:
501 print >> sys.stderr, "I can't do that, %s." %(e)
502 print >> sys.stderr
503 raise SystemExit(1)
504
505 config_options = loadConfig(
506 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
507
508 # Apply the various options in order, the most important applied last.
509 # Defaults first, then what's read from config file, then command-line
510 # arguments.
511 options = dict(OPTIONS)
512 for d in config_options, arg_options:
513 for k,v in d.items():
514 if v: options[k] = v
515
516 if options['refresh'] and options['action'] not in (
517 'friends', 'public', 'replies'):
518 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
519 print >> sys.stderr, "Use 'twitter -h' for help."
520 raise SystemExit(1)
521
522 if options['email'] and not options['password']:
523 options['password'] = getpass("Twitter password: ")
524
525 if options['action'] == 'authorize':
526 oauth_dance(options)
527 return 0
528
529 twitter = Twitter(
530 email=options['email'], password=options['password'], agent=AGENT_STR,
531 secure=options['secure'])
532 try:
533 Action()(twitter, options)
534 except NoSuchActionError, e:
535 print >>sys.stderr, e
536 raise SystemExit(1)
537 except TwitterError, e:
538 print >> sys.stderr, e.args[0]
539 print >> sys.stderr, "Use 'twitter -h' for help."
540 raise SystemExit(1)