]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Merge git://github.com/chexov/twitter into list_action_merge
[z_archive/twitter.git] / twitter / cmdline.py
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 USAGE:
5
6 twitter [action] [options]
7
8
9 ACTIONS:
10 authorize authorize the command-line tool to interact with Twitter
11 follow add the specified user to your follow list
12 friends get latest tweets from your friends (default action)
13 help print this help text that you are currently reading
14 leave remove the specified user from your following list
15 public get latest public tweets
16 list get list of user lists
17 replies get latest replies
18 search search twitter (Beware: octothorpe, escape it)
19 set set your twitter status
20 shell login the twitter shell
21
22
23 OPTIONS:
24
25 -r --refresh run this command forever, polling every once
26 in a while (default: every 5 minutes)
27 -R --refresh-rate <rate> set the refresh rate (in seconds)
28 -f --format <format> specify the output format for status updates
29 -c --config <filename> read username and password from given config
30 file (default ~/.twitter)
31 -l --length <count> specify number of status updates shown
32 (default: 20, max: 200)
33 -t --timestamp show time before status lines
34 -d --datestamp shoe date before status lines
35 --no-ssl use HTTP instead of more secure HTTPS
36 --oauth <filename> filename to read/store oauth credentials to
37
38 FORMATS for the --format option
39
40 default one line per status
41 verbose multiple lines per status, more verbose status info
42 urls nothing but URLs
43 ansi ansi colour (rainbow mode)
44
45
46 CONFIG FILES
47
48 The config file should be placed in your home directory and be named .twitter.
49 It must contain a [twitter] header, and all the desired options you wish to
50 set, like so:
51
52 [twitter]
53 format: <desired_default_format_for_output>
54 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
55
56 OAuth authentication tokens are stored in the file .twitter_oauth in your
57 home directory.
58 """
59
60 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
61 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
62
63 import sys
64 import time
65 from getopt import gnu_getopt as getopt, GetoptError
66 from getpass import getpass
67 import re
68 import os.path
69 from ConfigParser import SafeConfigParser
70 import datetime
71 from urllib import quote
72 import webbrowser
73
74 from api import Twitter, TwitterError
75 from oauth import OAuth, write_token_file, read_token_file
76 from oauth_dance import oauth_dance
77 import ansi
78
79 OPTIONS = {
80 'action': 'friends',
81 'refresh': False,
82 'refresh_rate': 600,
83 'format': 'default',
84 'prompt': '[cyan]twitter[R]> ',
85 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
86 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
87 'length': 20,
88 'timestamp': False,
89 'datestamp': False,
90 'extra_args': [],
91 'secure': True,
92 }
93
94 def parse_args(args, options):
95 long_opts = ['help', 'format=', 'refresh', 'oauth=',
96 'refresh-rate=', 'config=', 'length=', 'timestamp',
97 'datestamp', 'no-ssl']
98 short_opts = "e:p:f:h?rR:c:l:td"
99 opts, extra_args = getopt(args, short_opts, long_opts)
100
101 for opt, arg in opts:
102 if opt in ('-f', '--format'):
103 options['format'] = arg
104 elif opt in ('-r', '--refresh'):
105 options['refresh'] = True
106 elif opt in ('-R', '--refresh-rate'):
107 options['refresh_rate'] = int(arg)
108 elif opt in ('-l', '--length'):
109 options["length"] = int(arg)
110 elif opt in ('-t', '--timestamp'):
111 options["timestamp"] = True
112 elif opt in ('-d', '--datestamp'):
113 options["datestamp"] = True
114 elif opt in ('-?', '-h', '--help'):
115 options['action'] = 'help'
116 elif opt in ('-c', '--config'):
117 options['config_filename'] = arg
118 elif opt == '--no-ssl':
119 options['secure'] = False
120 elif opt == '--oauth':
121 options['oauth_filename'] = arg
122
123 if extra_args and not ('action' in options and options['action'] == 'help'):
124 options['action'] = extra_args[0]
125 options['extra_args'] = extra_args[1:]
126
127 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
128 timestamp = options["timestamp"]
129 datestamp = options["datestamp"]
130 t = time.strptime(status['created_at'], format)
131 i_hate_timezones = time.timezone
132 if (time.daylight):
133 i_hate_timezones = time.altzone
134 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
135 seconds=i_hate_timezones)
136 t = dt.timetuple()
137 if timestamp and datestamp:
138 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
139 elif timestamp:
140 return time.strftime("%H:%M:%S ", t)
141 elif datestamp:
142 return time.strftime("%Y-%m-%d ", t)
143 return ""
144
145 class StatusFormatter(object):
146 def __call__(self, status, options):
147 return (u"%s%s %s" %(
148 get_time_string(status, options),
149 status['user']['screen_name'], status['text']))
150
151 class AnsiStatusFormatter(object):
152 def __init__(self):
153 self._colourMap = ansi.ColourMap()
154
155 def __call__(self, status, options):
156 colour = self._colourMap.colourFor(status['user']['screen_name'])
157 return (u"%s%s%s%s %s" %(
158 get_time_string(status, options),
159 ansi.cmdColour(colour), status['user']['screen_name'],
160 ansi.cmdReset(), status['text']))
161
162 class VerboseStatusFormatter(object):
163 def __call__(self, status, options):
164 return (u"-- %s (%s) on %s\n%s\n" %(
165 status['user']['screen_name'],
166 status['user']['location'],
167 status['created_at'],
168 status['text']))
169
170 class URLStatusFormatter(object):
171 urlmatch = re.compile(r'https?://\S+')
172 def __call__(self, status, options):
173 urls = self.urlmatch.findall(status['text'])
174 return u'\n'.join(urls) if urls else ""
175
176 class ListsFormatter(object):
177 def __call__(self, list):
178 if list['description']:
179 list_str = u"%-30s (%s)" % (list['name'], list['description'])
180 else:
181 list_str = u"%-30s" % (list['name'])
182 return u"%s\n" % list_str
183
184 class ListsVerboseFormatter(object):
185 def __call__(self, list):
186 list_str = u"%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
187 return list_str
188
189 class AnsiListsFormatter(object):
190 def __init__(self):
191 self._colourMap = ansi.ColourMap()
192
193 def __call__(self, list):
194 colour = self._colourMap.colourFor(list['name'])
195 return (u"%s%-15s%s %s" %(
196 ansi.cmdColour(colour), list['name'],
197 ansi.cmdReset(), list['description']))
198
199
200 class AdminFormatter(object):
201 def __call__(self, action, user):
202 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
203 if action == "follow":
204 return u"You are now following %s.\n" %(user_str)
205 else:
206 return u"You are no longer following %s.\n" %(user_str)
207
208 class VerboseAdminFormatter(object):
209 def __call__(self, action, user):
210 return(u"-- %s: %s (%s): %s" % (
211 "Following" if action == "follow" else "Leaving",
212 user['screen_name'],
213 user['name'],
214 user['url']))
215
216 class SearchFormatter(object):
217 def __call__(self, result, options):
218 return(u"%s%s %s" %(
219 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
220 result['from_user'], result['text']))
221
222 class VerboseSearchFormatter(SearchFormatter):
223 pass #Default to the regular one
224
225 class URLSearchFormatter(object):
226 urlmatch = re.compile(r'https?://\S+')
227 def __call__(self, result, options):
228 urls = self.urlmatch.findall(result['text'])
229 return u'\n'.join(urls) if urls else ""
230
231 class AnsiSearchFormatter(object):
232 def __init__(self):
233 self._colourMap = ansi.ColourMap()
234
235 def __call__(self, result, options):
236 colour = self._colourMap.colourFor(result['from_user'])
237 return (u"%s%s%s%s %s" %(
238 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
239 ansi.cmdColour(colour), result['from_user'],
240 ansi.cmdReset(), result['text']))
241
242 _term_encoding = None
243 def get_term_encoding():
244 global _term_encoding
245 if not _term_encoding:
246 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
247 if lang[1:]:
248 _term_encoding = lang[1]
249 else:
250 _term_encoding = 'UTF-8'
251 return _term_encoding
252
253 formatters = {}
254 status_formatters = {
255 'default': StatusFormatter,
256 'verbose': VerboseStatusFormatter,
257 'urls': URLStatusFormatter,
258 'ansi': AnsiStatusFormatter
259 }
260 formatters['status'] = status_formatters
261
262 admin_formatters = {
263 'default': AdminFormatter,
264 'verbose': VerboseAdminFormatter,
265 'urls': AdminFormatter,
266 'ansi': AdminFormatter
267 }
268 formatters['admin'] = admin_formatters
269
270 search_formatters = {
271 'default': SearchFormatter,
272 'verbose': VerboseSearchFormatter,
273 'urls': URLSearchFormatter,
274 'ansi': AnsiSearchFormatter
275 }
276 formatters['search'] = search_formatters
277
278 lists_formatters = {
279 'default': ListsFormatter,
280 'verbose': ListsVerboseFormatter,
281 'urls': None,
282 'ansi': AnsiListsFormatter
283 }
284 formatters['lists'] = lists_formatters
285
286 def get_formatter(action_type, options):
287 formatters_dict = formatters.get(action_type)
288 if (not formatters_dict):
289 raise TwitterError(
290 "There was an error finding a class of formatters for your type (%s)"
291 %(action_type))
292 f = formatters_dict.get(options['format'])
293 if (not f):
294 raise TwitterError(
295 "Unknown formatter '%s' for status actions" %(options['format']))
296 return f()
297
298 class Action(object):
299
300 def ask(self, subject='perform this action', careful=False):
301 '''
302 Requests fromt he user using `raw_input` if `subject` should be
303 performed. When `careful`, the default answer is NO, otherwise YES.
304 Returns the user answer in the form `True` or `False`.
305 '''
306 sample = '(y/N)'
307 if not careful:
308 sample = '(Y/n)'
309
310 prompt = 'You really want to %s %s? ' %(subject, sample)
311 try:
312 answer = raw_input(prompt).lower()
313 if careful:
314 return answer in ('yes', 'y')
315 else:
316 return answer not in ('no', 'n')
317 except EOFError:
318 print >>sys.stderr # Put Newline since Enter was never pressed
319 # TODO:
320 # Figure out why on OS X the raw_input keeps raising
321 # EOFError and is never able to reset and get more input
322 # Hint: Look at how IPython implements their console
323 default = True
324 if careful:
325 default = False
326 return default
327
328 def __call__(self, twitter, options):
329 action = actions.get(options['action'], NoSuchAction)()
330 try:
331 doAction = lambda : action(twitter, options)
332 if (options['refresh'] and isinstance(action, StatusAction)):
333 while True:
334 doAction()
335 time.sleep(options['refresh_rate'])
336 else:
337 doAction()
338 except KeyboardInterrupt:
339 print >>sys.stderr, '\n[Keyboard Interrupt]'
340 pass
341
342 class NoSuchActionError(Exception):
343 pass
344
345 class NoSuchAction(Action):
346 def __call__(self, twitter, options):
347 raise NoSuchActionError("No such action: %s" %(options['action']))
348
349 def printNicely(string):
350 if sys.stdout.encoding:
351 print string.encode(sys.stdout.encoding, 'replace')
352 else:
353 print string.encode('utf-8')
354
355 class StatusAction(Action):
356 def __call__(self, twitter, options):
357 statuses = self.getStatuses(twitter, options)
358 sf = get_formatter('status', options)
359 for status in statuses:
360 statusStr = sf(status, options)
361 if statusStr.strip():
362 printNicely(statusStr)
363
364 class SearchAction(Action):
365 def __call__(self, twitter, options):
366 # We need to be pointing at search.twitter.com to work, and it is less
367 # tangly to do it here than in the main()
368 twitter.domain="search.twitter.com"
369 twitter.uriparts=()
370 # We need to bypass the TwitterCall parameter encoding, so we
371 # don't encode the plus sign, so we have to encode it ourselves
372 query_string = "+".join(
373 [quote(term.decode(get_term_encoding()))
374 for term in options['extra_args']])
375
376 results = twitter.search(q=query_string)['results']
377 f = get_formatter('search', options)
378 for result in results:
379 resultStr = f(result, options)
380 if resultStr.strip():
381 printNicely(resultStr)
382
383 class AdminAction(Action):
384 def __call__(self, twitter, options):
385 if not (options['extra_args'] and options['extra_args'][0]):
386 raise TwitterError("You need to specify a user (screen name)")
387 af = get_formatter('admin', options)
388 try:
389 user = self.getUser(twitter, options['extra_args'][0])
390 except TwitterError, e:
391 print "There was a problem following or leaving the specified user."
392 print "You may be trying to follow a user you are already following;"
393 print "Leaving a user you are not currently following;"
394 print "Or the user may not exist."
395 print "Sorry."
396 print
397 print e
398 else:
399 printNicely(af(options['action'], user))
400
401 class ListsAction(StatusAction):
402 def getStatuses(self, twitter, options):
403 screen_name = twitter.account.verify_credentials()['screen_name']
404 if not (options['extra_args'] and options['extra_args'][0]):
405 for list in twitter.user.lists(user=screen_name)['lists']:
406 lf = get_formatter('lists', options)
407 printNicely(lf(list))
408 raise SystemExit(0)
409 return reversed(twitter.user.lists.list.statuses(user=screen_name, list=options['extra_args'][0]))
410
411 class FriendsAction(StatusAction):
412 def getStatuses(self, twitter, options):
413 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
414
415 class PublicAction(StatusAction):
416 def getStatuses(self, twitter, options):
417 return reversed(twitter.statuses.public_timeline(count=options["length"]))
418
419 class RepliesAction(StatusAction):
420 def getStatuses(self, twitter, options):
421 return reversed(twitter.statuses.replies(count=options["length"]))
422
423 class FollowAction(AdminAction):
424 def getUser(self, twitter, user):
425 return twitter.friendships.create(id=user)
426
427 class LeaveAction(AdminAction):
428 def getUser(self, twitter, user):
429 return twitter.friendships.destroy(id=user)
430
431 class SetStatusAction(Action):
432 def __call__(self, twitter, options):
433 statusTxt = (" ".join(options['extra_args']).decode(get_term_encoding())
434 if options['extra_args']
435 else unicode(raw_input("message: ")))
436 status = (statusTxt.encode('utf8', 'replace'))
437 twitter.statuses.update(status=status)
438
439 class TwitterShell(Action):
440
441 def render_prompt(self, prompt):
442 '''Parses the `prompt` string and returns the rendered version'''
443 prompt = prompt.strip("'").replace("\\'","'")
444 for colour in ansi.COLOURS_NAMED:
445 if '[%s]' %(colour) in prompt:
446 prompt = prompt.replace(
447 '[%s]' %(colour), ansi.cmdColourNamed(colour))
448 prompt = prompt.replace('[R]', ansi.cmdReset())
449 return prompt
450
451 def __call__(self, twitter, options):
452 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
453 while True:
454 options['action'] = ""
455 try:
456 args = raw_input(prompt).split()
457 parse_args(args, options)
458 if not options['action']:
459 continue
460 elif options['action'] == 'exit':
461 raise SystemExit(0)
462 elif options['action'] == 'shell':
463 print >>sys.stderr, 'Sorry Xzibit does not work here!'
464 continue
465 elif options['action'] == 'help':
466 print >>sys.stderr, '''\ntwitter> `action`\n
467 The Shell Accepts all the command line actions along with:
468
469 exit Leave the twitter shell (^D may also be used)
470
471 Full CMD Line help is appended below for your convinience.'''
472 Action()(twitter, options)
473 options['action'] = ''
474 except NoSuchActionError, e:
475 print >>sys.stderr, e
476 except KeyboardInterrupt:
477 print >>sys.stderr, '\n[Keyboard Interrupt]'
478 except EOFError:
479 print >>sys.stderr
480 leaving = self.ask(subject='Leave')
481 if not leaving:
482 print >>sys.stderr, 'Excellent!'
483 else:
484 raise SystemExit(0)
485
486 class HelpAction(Action):
487 def __call__(self, twitter, options):
488 print __doc__
489
490 class DoNothingAction(Action):
491 def __call__(self, twitter, options):
492 pass
493
494 actions = {
495 'authorize' : DoNothingAction,
496 'follow' : FollowAction,
497 'friends' : FriendsAction,
498 'list' : ListsAction,
499 'help' : HelpAction,
500 'leave' : LeaveAction,
501 'public' : PublicAction,
502 'replies' : RepliesAction,
503 'search' : SearchAction,
504 'set' : SetStatusAction,
505 'shell' : TwitterShell,
506 }
507
508 def loadConfig(filename):
509 options = dict(OPTIONS)
510 if os.path.exists(filename):
511 cp = SafeConfigParser()
512 cp.read([filename])
513 for option in ('format', 'prompt'):
514 if cp.has_option('twitter', option):
515 options[option] = cp.get('twitter', option)
516 return options
517
518 def main(args=sys.argv[1:]):
519 arg_options = {}
520 try:
521 parse_args(args, arg_options)
522 except GetoptError, e:
523 print >> sys.stderr, "I can't do that, %s." %(e)
524 print >> sys.stderr
525 raise SystemExit(1)
526
527 config_path = os.path.expanduser(
528 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
529 config_options = loadConfig(config_path)
530
531 # Apply the various options in order, the most important applied last.
532 # Defaults first, then what's read from config file, then command-line
533 # arguments.
534 options = dict(OPTIONS)
535 for d in config_options, arg_options:
536 for k,v in d.items():
537 if v: options[k] = v
538
539 if options['refresh'] and options['action'] not in (
540 'friends', 'public', 'replies'):
541 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
542 print >> sys.stderr, "Use 'twitter -h' for help."
543 return 1
544
545 oauth_filename = os.path.expanduser(options['oauth_filename'])
546
547 if (options['action'] == 'authorize'
548 or not os.path.exists(oauth_filename)):
549 oauth_dance(
550 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
551 options['oauth_filename'])
552
553 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
554
555 twitter = Twitter(
556 auth=OAuth(
557 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
558 secure=options['secure'],
559 api_version='1',
560 domain='api.twitter.com')
561
562 try:
563 Action()(twitter, options)
564 except NoSuchActionError, e:
565 print >>sys.stderr, e
566 raise SystemExit(1)
567 except TwitterError, e:
568 print >> sys.stderr, str(e)
569 print >> sys.stderr, "Use 'twitter -h' for help."
570 raise SystemExit(1)