]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
make cmd client understand any language
[z_archive/twitter.git] / twitter / cmdline.py
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 USAGE:
5
6 twitter [action] [options]
7
8
9 ACTIONS:
10 authorize authorize the command-line tool to interact with Twitter
11 follow add the specified user to your follow list
12 friends get latest tweets from your friends (default action)
13 help print this help text that you are currently reading
14 leave remove the specified user from your following list
15 public get latest public tweets
16 replies get latest replies
17 search search twitter (Beware: octothorpe, escape it)
18 set set your twitter status
19 shell login the twitter shell
20
21
22 OPTIONS:
23
24 -r --refresh run this command forever, polling every once
25 in a while (default: every 5 minutes)
26 -R --refresh-rate <rate> set the refresh rate (in seconds)
27 -f --format <format> specify the output format for status updates
28 -c --config <filename> read username and password from given config
29 file (default ~/.twitter)
30 -l --length <count> specify number of status updates shown
31 (default: 20, max: 200)
32 -t --timestamp show time before status lines
33 -d --datestamp shoe date before status lines
34 --no-ssl use HTTP instead of more secure HTTPS
35 --oauth <filename> filename to read/store oauth credentials to
36
37 FORMATS for the --format option
38
39 default one line per status
40 verbose multiple lines per status, more verbose status info
41 urls nothing but URLs
42 ansi ansi colour (rainbow mode)
43
44
45 CONFIG FILES
46
47 The config file should be placed in your home directory and be named .twitter.
48 It must contain a [twitter] header, and all the desired options you wish to
49 set, like so:
50
51 [twitter]
52 format: <desired_default_format_for_output>
53 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
54
55 OAuth authentication tokens are stored in the file .twitter_oauth in your
56 home directory.
57 """
58
59 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
60 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
61
62 import sys
63 import time
64 from getopt import gnu_getopt as getopt, GetoptError
65 from getpass import getpass
66 import re
67 import os.path
68 from ConfigParser import SafeConfigParser
69 import datetime
70 from urllib import quote
71 import webbrowser
72
73 from api import Twitter, TwitterError
74 from oauth import OAuth, write_token_file, read_token_file
75 from oauth_dance import oauth_dance
76 import ansi
77
78 OPTIONS = {
79 'action': 'friends',
80 'refresh': False,
81 'refresh_rate': 600,
82 'format': 'default',
83 'prompt': '[cyan]twitter[R]> ',
84 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
85 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
86 'length': 20,
87 'timestamp': False,
88 'datestamp': False,
89 'extra_args': [],
90 'secure': True,
91 }
92
93 def parse_args(args, options):
94 long_opts = ['help', 'format=', 'refresh', 'oauth=',
95 'refresh-rate=', 'config=', 'length=', 'timestamp',
96 'datestamp', 'no-ssl']
97 short_opts = "e:p:f:h?rR:c:l:td"
98 opts, extra_args = getopt(args, short_opts, long_opts)
99
100 for opt, arg in opts:
101 if opt in ('-f', '--format'):
102 options['format'] = arg
103 elif opt in ('-r', '--refresh'):
104 options['refresh'] = True
105 elif opt in ('-R', '--refresh-rate'):
106 options['refresh_rate'] = int(arg)
107 elif opt in ('-l', '--length'):
108 options["length"] = int(arg)
109 elif opt in ('-t', '--timestamp'):
110 options["timestamp"] = True
111 elif opt in ('-d', '--datestamp'):
112 options["datestamp"] = True
113 elif opt in ('-?', '-h', '--help'):
114 options['action'] = 'help'
115 elif opt in ('-c', '--config'):
116 options['config_filename'] = arg
117 elif opt == '--no-ssl':
118 options['secure'] = False
119 elif opt == '--oauth':
120 options['oauth_filename'] = arg
121
122 if extra_args and not ('action' in options and options['action'] == 'help'):
123 options['action'] = extra_args[0]
124 options['extra_args'] = extra_args[1:]
125
126 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
127 timestamp = options["timestamp"]
128 datestamp = options["datestamp"]
129 t = time.strptime(status['created_at'], format)
130 i_hate_timezones = time.timezone
131 if (time.daylight):
132 i_hate_timezones = time.altzone
133 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
134 seconds=i_hate_timezones)
135 t = dt.timetuple()
136 if timestamp and datestamp:
137 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
138 elif timestamp:
139 return time.strftime("%H:%M:%S ", t)
140 elif datestamp:
141 return time.strftime("%Y-%m-%d ", t)
142 return ""
143
144 class StatusFormatter(object):
145 def __call__(self, status, options):
146 return (u"%s%s %s" %(
147 get_time_string(status, options),
148 status['user']['screen_name'], status['text']))
149
150 class AnsiStatusFormatter(object):
151 def __init__(self):
152 self._colourMap = ansi.ColourMap()
153
154 def __call__(self, status, options):
155 colour = self._colourMap.colourFor(status['user']['screen_name'])
156 return (u"%s%s%s%s %s" %(
157 get_time_string(status, options),
158 ansi.cmdColour(colour), status['user']['screen_name'],
159 ansi.cmdReset(), status['text']))
160
161 class VerboseStatusFormatter(object):
162 def __call__(self, status, options):
163 return (u"-- %s (%s) on %s\n%s\n" %(
164 status['user']['screen_name'],
165 status['user']['location'],
166 status['created_at'],
167 status['text']))
168
169 class URLStatusFormatter(object):
170 urlmatch = re.compile(r'https?://\S+')
171 def __call__(self, status, options):
172 urls = self.urlmatch.findall(status['text'])
173 return u'\n'.join(urls) if urls else ""
174
175 class AdminFormatter(object):
176 def __call__(self, action, user):
177 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
178 if action == "follow":
179 return u"You are now following %s.\n" %(user_str)
180 else:
181 return u"You are no longer following %s.\n" %(user_str)
182
183 class VerboseAdminFormatter(object):
184 def __call__(self, action, user):
185 return(u"-- %s: %s (%s): %s" % (
186 "Following" if action == "follow" else "Leaving",
187 user['screen_name'],
188 user['name'],
189 user['url']))
190
191 class SearchFormatter(object):
192 def __call__(self, result, options):
193 return(u"%s%s %s" %(
194 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
195 result['from_user'], result['text']))
196
197 class VerboseSearchFormatter(SearchFormatter):
198 pass #Default to the regular one
199
200 class URLSearchFormatter(object):
201 urlmatch = re.compile(r'https?://\S+')
202 def __call__(self, result, options):
203 urls = self.urlmatch.findall(result['text'])
204 return u'\n'.join(urls) if urls else ""
205
206 class AnsiSearchFormatter(object):
207 def __init__(self):
208 self._colourMap = ansi.ColourMap()
209
210 def __call__(self, result, options):
211 colour = self._colourMap.colourFor(result['from_user'])
212 return (u"%s%s%s%s %s" %(
213 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
214 ansi.cmdColour(colour), result['from_user'],
215 ansi.cmdReset(), result['text']))
216
217 _term_encoding = None
218 def get_term_encoding():
219 global _term_encoding
220 if not _term_encoding:
221 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
222 if lang[1:]:
223 _term_encoding = lang[1]
224 else:
225 _term_encoding = 'UTF-8'
226 return _term_encoding
227
228 formatters = {}
229 status_formatters = {
230 'default': StatusFormatter,
231 'verbose': VerboseStatusFormatter,
232 'urls': URLStatusFormatter,
233 'ansi': AnsiStatusFormatter
234 }
235 formatters['status'] = status_formatters
236
237 admin_formatters = {
238 'default': AdminFormatter,
239 'verbose': VerboseAdminFormatter,
240 'urls': AdminFormatter,
241 'ansi': AdminFormatter
242 }
243 formatters['admin'] = admin_formatters
244
245 search_formatters = {
246 'default': SearchFormatter,
247 'verbose': VerboseSearchFormatter,
248 'urls': URLSearchFormatter,
249 'ansi': AnsiSearchFormatter
250 }
251 formatters['search'] = search_formatters
252
253 def get_formatter(action_type, options):
254 formatters_dict = formatters.get(action_type)
255 if (not formatters_dict):
256 raise TwitterError(
257 "There was an error finding a class of formatters for your type (%s)"
258 %(action_type))
259 f = formatters_dict.get(options['format'])
260 if (not f):
261 raise TwitterError(
262 "Unknown formatter '%s' for status actions" %(options['format']))
263 return f()
264
265 class Action(object):
266
267 def ask(self, subject='perform this action', careful=False):
268 '''
269 Requests fromt he user using `raw_input` if `subject` should be
270 performed. When `careful`, the default answer is NO, otherwise YES.
271 Returns the user answer in the form `True` or `False`.
272 '''
273 sample = '(y/N)'
274 if not careful:
275 sample = '(Y/n)'
276
277 prompt = 'You really want to %s %s? ' %(subject, sample)
278 try:
279 answer = raw_input(prompt).lower()
280 if careful:
281 return answer in ('yes', 'y')
282 else:
283 return answer not in ('no', 'n')
284 except EOFError:
285 print >>sys.stderr # Put Newline since Enter was never pressed
286 # TODO:
287 # Figure out why on OS X the raw_input keeps raising
288 # EOFError and is never able to reset and get more input
289 # Hint: Look at how IPython implements their console
290 default = True
291 if careful:
292 default = False
293 return default
294
295 def __call__(self, twitter, options):
296 action = actions.get(options['action'], NoSuchAction)()
297 try:
298 doAction = lambda : action(twitter, options)
299 if (options['refresh'] and isinstance(action, StatusAction)):
300 while True:
301 doAction()
302 time.sleep(options['refresh_rate'])
303 else:
304 doAction()
305 except KeyboardInterrupt:
306 print >>sys.stderr, '\n[Keyboard Interrupt]'
307 pass
308
309 class NoSuchActionError(Exception):
310 pass
311
312 class NoSuchAction(Action):
313 def __call__(self, twitter, options):
314 raise NoSuchActionError("No such action: %s" %(options['action']))
315
316 def printNicely(string):
317 if sys.stdout.encoding:
318 print string.encode(sys.stdout.encoding, 'replace')
319 else:
320 print string.encode('utf-8')
321
322 class StatusAction(Action):
323 def __call__(self, twitter, options):
324 statuses = self.getStatuses(twitter, options)
325 sf = get_formatter('status', options)
326 for status in statuses:
327 statusStr = sf(status, options)
328 if statusStr.strip():
329 printNicely(statusStr)
330
331 class SearchAction(Action):
332 def __call__(self, twitter, options):
333 # We need to be pointing at search.twitter.com to work, and it is less
334 # tangly to do it here than in the main()
335 twitter.domain="search.twitter.com"
336 twitter.uriparts=()
337 # We need to bypass the TwitterCall parameter encoding, so we
338 # don't encode the plus sign, so we have to encode it ourselves
339 query_string = "+".join(
340 [quote(term.decode(get_term_encoding()))
341 for term in options['extra_args']])
342
343 results = twitter.search(q=query_string)['results']
344 f = get_formatter('search', options)
345 for result in results:
346 resultStr = f(result, options)
347 if resultStr.strip():
348 printNicely(resultStr)
349
350 class AdminAction(Action):
351 def __call__(self, twitter, options):
352 if not (options['extra_args'] and options['extra_args'][0]):
353 raise TwitterError("You need to specify a user (screen name)")
354 af = get_formatter('admin', options)
355 try:
356 user = self.getUser(twitter, options['extra_args'][0])
357 except TwitterError, e:
358 print "There was a problem following or leaving the specified user."
359 print "You may be trying to follow a user you are already following;"
360 print "Leaving a user you are not currently following;"
361 print "Or the user may not exist."
362 print "Sorry."
363 print
364 print e
365 else:
366 printNicely(af(options['action'], user))
367
368 class FriendsAction(StatusAction):
369 def getStatuses(self, twitter, options):
370 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
371
372 class PublicAction(StatusAction):
373 def getStatuses(self, twitter, options):
374 return reversed(twitter.statuses.public_timeline(count=options["length"]))
375
376 class RepliesAction(StatusAction):
377 def getStatuses(self, twitter, options):
378 return reversed(twitter.statuses.replies(count=options["length"]))
379
380 class FollowAction(AdminAction):
381 def getUser(self, twitter, user):
382 return twitter.friendships.create(id=user)
383
384 class LeaveAction(AdminAction):
385 def getUser(self, twitter, user):
386 return twitter.friendships.destroy(id=user)
387
388 class SetStatusAction(Action):
389 def __call__(self, twitter, options):
390 statusTxt = (" ".join(options['extra_args']).decode(get_term_encoding())
391 if options['extra_args']
392 else unicode(raw_input("message: ")))
393 status = (statusTxt.encode('utf8', 'replace'))
394 twitter.statuses.update(status=status)
395
396 class TwitterShell(Action):
397
398 def render_prompt(self, prompt):
399 '''Parses the `prompt` string and returns the rendered version'''
400 prompt = prompt.strip("'").replace("\\'","'")
401 for colour in ansi.COLOURS_NAMED:
402 if '[%s]' %(colour) in prompt:
403 prompt = prompt.replace(
404 '[%s]' %(colour), ansi.cmdColourNamed(colour))
405 prompt = prompt.replace('[R]', ansi.cmdReset())
406 return prompt
407
408 def __call__(self, twitter, options):
409 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
410 while True:
411 options['action'] = ""
412 try:
413 args = raw_input(prompt).split()
414 parse_args(args, options)
415 if not options['action']:
416 continue
417 elif options['action'] == 'exit':
418 raise SystemExit(0)
419 elif options['action'] == 'shell':
420 print >>sys.stderr, 'Sorry Xzibit does not work here!'
421 continue
422 elif options['action'] == 'help':
423 print >>sys.stderr, '''\ntwitter> `action`\n
424 The Shell Accepts all the command line actions along with:
425
426 exit Leave the twitter shell (^D may also be used)
427
428 Full CMD Line help is appended below for your convinience.'''
429 Action()(twitter, options)
430 options['action'] = ''
431 except NoSuchActionError, e:
432 print >>sys.stderr, e
433 except KeyboardInterrupt:
434 print >>sys.stderr, '\n[Keyboard Interrupt]'
435 except EOFError:
436 print >>sys.stderr
437 leaving = self.ask(subject='Leave')
438 if not leaving:
439 print >>sys.stderr, 'Excellent!'
440 else:
441 raise SystemExit(0)
442
443 class HelpAction(Action):
444 def __call__(self, twitter, options):
445 print __doc__
446
447 class DoNothingAction(Action):
448 def __call__(self, twitter, options):
449 pass
450
451 actions = {
452 'authorize' : DoNothingAction,
453 'follow' : FollowAction,
454 'friends' : FriendsAction,
455 'help' : HelpAction,
456 'leave' : LeaveAction,
457 'public' : PublicAction,
458 'replies' : RepliesAction,
459 'search' : SearchAction,
460 'set' : SetStatusAction,
461 'shell' : TwitterShell,
462 }
463
464 def loadConfig(filename):
465 options = dict(OPTIONS)
466 if os.path.exists(filename):
467 cp = SafeConfigParser()
468 cp.read([filename])
469 for option in ('format', 'prompt'):
470 if cp.has_option('twitter', option):
471 options[option] = cp.get('twitter', option)
472 return options
473
474 def main(args=sys.argv[1:]):
475 arg_options = {}
476 try:
477 parse_args(args, arg_options)
478 except GetoptError, e:
479 print >> sys.stderr, "I can't do that, %s." %(e)
480 print >> sys.stderr
481 raise SystemExit(1)
482
483 config_path = os.path.expanduser(
484 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
485 config_options = loadConfig(config_path)
486
487 # Apply the various options in order, the most important applied last.
488 # Defaults first, then what's read from config file, then command-line
489 # arguments.
490 options = dict(OPTIONS)
491 for d in config_options, arg_options:
492 for k,v in d.items():
493 if v: options[k] = v
494
495 if options['refresh'] and options['action'] not in (
496 'friends', 'public', 'replies'):
497 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
498 print >> sys.stderr, "Use 'twitter -h' for help."
499 return 1
500
501 oauth_filename = os.path.expanduser(options['oauth_filename'])
502
503 if (options['action'] == 'authorize'
504 or not os.path.exists(oauth_filename)):
505 oauth_dance(
506 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
507 options['oauth_filename'])
508
509 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
510
511 twitter = Twitter(
512 auth=OAuth(
513 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
514 secure=options['secure'],
515 api_version='1',
516 domain='api.twitter.com')
517
518 try:
519 Action()(twitter, options)
520 except NoSuchActionError, e:
521 print >>sys.stderr, e
522 raise SystemExit(1)
523 except TwitterError, e:
524 print >> sys.stderr, str(e)
525 print >> sys.stderr, "Use 'twitter -h' for help."
526 raise SystemExit(1)