]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
- Documentation updates.
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6
7 ACTIONS:
8 authorize authorize the command-line tool to interact with Twitter
9 follow add the specified user to your follow list
10 friends get latest tweets from your friends (default action)
11 help print this help text that you are currently reading
12 leave remove the specified user from your following list
13 public get latest public tweets
14 replies get latest replies
15 search search twitter (Beware: octothorpe, escape it)
16 set set your twitter status
17 shell login the twitter shell
18
19
20 OPTIONS:
21
22 -r --refresh run this command forever, polling every once
23 in a while (default: every 5 minutes)
24 -R --refresh-rate <rate> set the refresh rate (in seconds)
25 -f --format <format> specify the output format for status updates
26 -c --config <filename> read username and password from given config
27 file (default ~/.twitter)
28 -l --length <count> specify number of status updates shown
29 (default: 20, max: 200)
30 -t --timestamp show time before status lines
31 -d --datestamp shoe date before status lines
32 --no-ssl use HTTP instead of more secure HTTPS
33 --oauth <filename> filename to read/store oauth credentials to
34
35 FORMATS for the --format option
36
37 default one line per status
38 verbose multiple lines per status, more verbose status info
39 urls nothing but URLs
40 ansi ansi colour (rainbow mode)
41
42
43 CONFIG FILES
44
45 The config file should be placed in your home directory and be named .twitter.
46 It must contain a [twitter] header, and all the desired options you wish to
47 set, like so:
48
49 [twitter]
50 format: <desired_default_format_for_output>
51 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
52
53 OAuth authentication tokens are stored in the file .twitter_oauth in your
54 home directory.
55 """
56
57 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
58 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
59
60 import sys
61 import time
62 from getopt import gnu_getopt as getopt, GetoptError
63 from getpass import getpass
64 import re
65 import os.path
66 from ConfigParser import SafeConfigParser
67 import datetime
68 from urllib import quote
69 import webbrowser
70
71 from api import Twitter, TwitterError
72 from oauth import OAuth
73 import ansi
74
75 OPTIONS = {
76 'action': 'friends',
77 'refresh': False,
78 'refresh_rate': 600,
79 'format': 'default',
80 'prompt': '[cyan]twitter[R]> ',
81 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
82 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
83 'length': 20,
84 'timestamp': False,
85 'datestamp': False,
86 'extra_args': [],
87 'secure': True,
88 }
89
90 def parse_args(args, options):
91 long_opts = ['help', 'format=', 'refresh', 'oauth=',
92 'refresh-rate=', 'config=', 'length=', 'timestamp',
93 'datestamp', 'no-ssl']
94 short_opts = "e:p:f:h?rR:c:l:td"
95 opts, extra_args = getopt(args, short_opts, long_opts)
96
97 for opt, arg in opts:
98 if opt in ('-f', '--format'):
99 options['format'] = arg
100 elif opt in ('-r', '--refresh'):
101 options['refresh'] = True
102 elif opt in ('-R', '--refresh-rate'):
103 options['refresh_rate'] = int(arg)
104 elif opt in ('-l', '--length'):
105 options["length"] = int(arg)
106 elif opt in ('-t', '--timestamp'):
107 options["timestamp"] = True
108 elif opt in ('-d', '--datestamp'):
109 options["datestamp"] = True
110 elif opt in ('-?', '-h', '--help'):
111 options['action'] = 'help'
112 elif opt in ('-c', '--config'):
113 options['config_filename'] = arg
114 elif opt == '--no-ssl':
115 options['secure'] = False
116 elif opt == '--oauth':
117 options['oauth_filename'] = arg
118
119 if extra_args and not ('action' in options and options['action'] == 'help'):
120 options['action'] = extra_args[0]
121 options['extra_args'] = extra_args[1:]
122
123 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
124 timestamp = options["timestamp"]
125 datestamp = options["datestamp"]
126 t = time.strptime(status['created_at'], format)
127 i_hate_timezones = time.timezone
128 if (time.daylight):
129 i_hate_timezones = time.altzone
130 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
131 seconds=i_hate_timezones)
132 t = dt.timetuple()
133 if timestamp and datestamp:
134 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
135 elif timestamp:
136 return time.strftime("%H:%M:%S ", t)
137 elif datestamp:
138 return time.strftime("%Y-%m-%d ", t)
139 return ""
140
141 class StatusFormatter(object):
142 def __call__(self, status, options):
143 return (u"%s%s %s" %(
144 get_time_string(status, options),
145 status['user']['screen_name'], status['text']))
146
147 class AnsiStatusFormatter(object):
148 def __init__(self):
149 self._colourMap = ansi.ColourMap()
150
151 def __call__(self, status, options):
152 colour = self._colourMap.colourFor(status['user']['screen_name'])
153 return (u"%s%s%s%s %s" %(
154 get_time_string(status, options),
155 ansi.cmdColour(colour), status['user']['screen_name'],
156 ansi.cmdReset(), status['text']))
157
158 class VerboseStatusFormatter(object):
159 def __call__(self, status, options):
160 return (u"-- %s (%s) on %s\n%s\n" %(
161 status['user']['screen_name'],
162 status['user']['location'],
163 status['created_at'],
164 status['text']))
165
166 class URLStatusFormatter(object):
167 urlmatch = re.compile(r'https?://\S+')
168 def __call__(self, status, options):
169 urls = self.urlmatch.findall(status['text'])
170 return u'\n'.join(urls) if urls else ""
171
172 class AdminFormatter(object):
173 def __call__(self, action, user):
174 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
175 if action == "follow":
176 return u"You are now following %s.\n" %(user_str)
177 else:
178 return u"You are no longer following %s.\n" %(user_str)
179
180 class VerboseAdminFormatter(object):
181 def __call__(self, action, user):
182 return(u"-- %s: %s (%s): %s" % (
183 "Following" if action == "follow" else "Leaving",
184 user['screen_name'],
185 user['name'],
186 user['url']))
187
188 class SearchFormatter(object):
189 def __call__(self, result, options):
190 return(u"%s%s %s" %(
191 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
192 result['from_user'], result['text']))
193
194 class VerboseSearchFormatter(SearchFormatter):
195 pass #Default to the regular one
196
197 class URLSearchFormatter(object):
198 urlmatch = re.compile(r'https?://\S+')
199 def __call__(self, result, options):
200 urls = self.urlmatch.findall(result['text'])
201 return u'\n'.join(urls) if urls else ""
202
203 class AnsiSearchFormatter(object):
204 def __init__(self):
205 self._colourMap = ansi.ColourMap()
206
207 def __call__(self, result, options):
208 colour = self._colourMap.colourFor(result['from_user'])
209 return (u"%s%s%s%s %s" %(
210 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
211 ansi.cmdColour(colour), result['from_user'],
212 ansi.cmdReset(), result['text']))
213
214 formatters = {}
215 status_formatters = {
216 'default': StatusFormatter,
217 'verbose': VerboseStatusFormatter,
218 'urls': URLStatusFormatter,
219 'ansi': AnsiStatusFormatter
220 }
221 formatters['status'] = status_formatters
222
223 admin_formatters = {
224 'default': AdminFormatter,
225 'verbose': VerboseAdminFormatter,
226 'urls': AdminFormatter,
227 'ansi': AdminFormatter
228 }
229 formatters['admin'] = admin_formatters
230
231 search_formatters = {
232 'default': SearchFormatter,
233 'verbose': VerboseSearchFormatter,
234 'urls': URLSearchFormatter,
235 'ansi': AnsiSearchFormatter
236 }
237 formatters['search'] = search_formatters
238
239 def get_formatter(action_type, options):
240 formatters_dict = formatters.get(action_type)
241 if (not formatters_dict):
242 raise TwitterError(
243 "There was an error finding a class of formatters for your type (%s)"
244 %(action_type))
245 f = formatters_dict.get(options['format'])
246 if (not f):
247 raise TwitterError(
248 "Unknown formatter '%s' for status actions" %(options['format']))
249 return f()
250
251 class Action(object):
252
253 def ask(self, subject='perform this action', careful=False):
254 '''
255 Requests fromt he user using `raw_input` if `subject` should be
256 performed. When `careful`, the default answer is NO, otherwise YES.
257 Returns the user answer in the form `True` or `False`.
258 '''
259 sample = '(y/N)'
260 if not careful:
261 sample = '(Y/n)'
262
263 prompt = 'You really want to %s %s? ' %(subject, sample)
264 try:
265 answer = raw_input(prompt).lower()
266 if careful:
267 return answer in ('yes', 'y')
268 else:
269 return answer not in ('no', 'n')
270 except EOFError:
271 print >>sys.stderr # Put Newline since Enter was never pressed
272 # TODO:
273 # Figure out why on OS X the raw_input keeps raising
274 # EOFError and is never able to reset and get more input
275 # Hint: Look at how IPython implements their console
276 default = True
277 if careful:
278 default = False
279 return default
280
281 def __call__(self, twitter, options):
282 action = actions.get(options['action'], NoSuchAction)()
283 try:
284 doAction = lambda : action(twitter, options)
285 if (options['refresh'] and isinstance(action, StatusAction)):
286 while True:
287 doAction()
288 time.sleep(options['refresh_rate'])
289 else:
290 doAction()
291 except KeyboardInterrupt:
292 print >>sys.stderr, '\n[Keyboard Interrupt]'
293 pass
294
295 class NoSuchActionError(Exception):
296 pass
297
298 class NoSuchAction(Action):
299 def __call__(self, twitter, options):
300 raise NoSuchActionError("No such action: %s" %(options['action']))
301
302 def printNicely(string):
303 if sys.stdout.encoding:
304 print string.encode(sys.stdout.encoding, 'replace')
305 else:
306 print string.encode('utf-8')
307
308 class StatusAction(Action):
309 def __call__(self, twitter, options):
310 statuses = self.getStatuses(twitter, options)
311 sf = get_formatter('status', options)
312 for status in statuses:
313 statusStr = sf(status, options)
314 if statusStr.strip():
315 printNicely(statusStr)
316
317 class SearchAction(Action):
318 def __call__(self, twitter, options):
319 # We need to be pointing at search.twitter.com to work, and it is less
320 # tangly to do it here than in the main()
321 twitter.domain="search.twitter.com"
322 # We need to bypass the TwitterCall parameter encoding, so we
323 # don't encode the plus sign, so we have to encode it ourselves
324 query_string = "+".join([quote(term) for term in options['extra_args']])
325 twitter.encoded_args = "q=%s" %(query_string)
326
327 results = twitter.search()['results']
328 f = get_formatter('search', options)
329 for result in results:
330 resultStr = f(result, options)
331 if resultStr.strip():
332 printNicely(resultStr)
333
334 class AdminAction(Action):
335 def __call__(self, twitter, options):
336 if not (options['extra_args'] and options['extra_args'][0]):
337 raise TwitterError("You need to specify a user (screen name)")
338 af = get_formatter('admin', options)
339 try:
340 user = self.getUser(twitter, options['extra_args'][0])
341 except TwitterError, e:
342 print "There was a problem following or leaving the specified user."
343 print "You may be trying to follow a user you are already following;"
344 print "Leaving a user you are not currently following;"
345 print "Or the user may not exist."
346 print "Sorry."
347 print
348 print e
349 else:
350 printNicely(af(options['action'], user))
351
352 class FriendsAction(StatusAction):
353 def getStatuses(self, twitter, options):
354 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
355
356 class PublicAction(StatusAction):
357 def getStatuses(self, twitter, options):
358 return reversed(twitter.statuses.public_timeline(count=options["length"]))
359
360 class RepliesAction(StatusAction):
361 def getStatuses(self, twitter, options):
362 return reversed(twitter.statuses.replies(count=options["length"]))
363
364 class FollowAction(AdminAction):
365 def getUser(self, twitter, user):
366 return twitter.friendships.create(id=user)
367
368 class LeaveAction(AdminAction):
369 def getUser(self, twitter, user):
370 return twitter.friendships.destroy(id=user)
371
372 class SetStatusAction(Action):
373 def __call__(self, twitter, options):
374 statusTxt = (u" ".join(options['extra_args'])
375 if options['extra_args']
376 else unicode(raw_input("message: ")))
377 status = (statusTxt.encode('utf8', 'replace'))
378 twitter.statuses.update(status=status)
379
380 class TwitterShell(Action):
381
382 def render_prompt(self, prompt):
383 '''Parses the `prompt` string and returns the rendered version'''
384 prompt = prompt.strip("'").replace("\\'","'")
385 for colour in ansi.COLOURS_NAMED:
386 if '[%s]' %(colour) in prompt:
387 prompt = prompt.replace(
388 '[%s]' %(colour), ansi.cmdColourNamed(colour))
389 prompt = prompt.replace('[R]', ansi.cmdReset())
390 return prompt
391
392 def __call__(self, twitter, options):
393 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
394 while True:
395 options['action'] = ""
396 try:
397 args = raw_input(prompt).split()
398 parse_args(args, options)
399 if not options['action']:
400 continue
401 elif options['action'] == 'exit':
402 raise SystemExit(0)
403 elif options['action'] == 'shell':
404 print >>sys.stderr, 'Sorry Xzibit does not work here!'
405 continue
406 elif options['action'] == 'help':
407 print >>sys.stderr, '''\ntwitter> `action`\n
408 The Shell Accepts all the command line actions along with:
409
410 exit Leave the twitter shell (^D may also be used)
411
412 Full CMD Line help is appended below for your convinience.'''
413 Action()(twitter, options)
414 options['action'] = ''
415 except NoSuchActionError, e:
416 print >>sys.stderr, e
417 except KeyboardInterrupt:
418 print >>sys.stderr, '\n[Keyboard Interrupt]'
419 except EOFError:
420 print >>sys.stderr
421 leaving = self.ask(subject='Leave')
422 if not leaving:
423 print >>sys.stderr, 'Excellent!'
424 else:
425 raise SystemExit(0)
426
427 class HelpAction(Action):
428 def __call__(self, twitter, options):
429 print __doc__
430
431 class DoNothingAction(Action):
432 def __call__(self, twitter, options):
433 pass
434
435 def parse_oauth_tokens(result):
436 for r in result.split('&'):
437 k, v = r.split('=')
438 if k == 'oauth_token':
439 oauth_token = v
440 elif k == 'oauth_token_secret':
441 oauth_token_secret = v
442 return oauth_token, oauth_token_secret
443
444 def oauth_dance(options):
445 print ("Hi there! We're gonna get you all set up to use Twitter"
446 " on the command-line.")
447 twitter = Twitter(
448 auth=OAuth('', '', CONSUMER_KEY, CONSUMER_SECRET),
449 format='')
450 oauth_token, oauth_token_secret = parse_oauth_tokens(
451 twitter.oauth.request_token())
452 print """
453 In the web browser window that opens please choose to Allow access to the
454 command-line tool. Copy the PIN number that appears on the next page and
455 paste or type it here:
456 """
457 webbrowser.open(
458 'http://api.twitter.com/oauth/authorize?oauth_token=' +
459 oauth_token)
460 time.sleep(2) # Sometimes the last command can print some
461 # crap. Wait a bit so it doesn't mess up the next
462 # prompt.
463 oauth_verifier = raw_input("Please type the PIN: ").strip()
464 twitter = Twitter(
465 auth=OAuth(
466 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
467 format='')
468 oauth_token, oauth_token_secret = parse_oauth_tokens(
469 twitter.oauth.access_token(oauth_verifier=oauth_verifier))
470 oauth_file = open(options['oauth_filename'], 'w')
471 print >> oauth_file, oauth_token
472 print >> oauth_file, oauth_token_secret
473 oauth_file.close()
474 print
475 print "That's it! Your authorization keys have been written to %s." % (
476 options['oauth_filename'])
477
478
479 actions = {
480 'authorize' : DoNothingAction,
481 'follow' : FollowAction,
482 'friends' : FriendsAction,
483 'help' : HelpAction,
484 'leave' : LeaveAction,
485 'public' : PublicAction,
486 'replies' : RepliesAction,
487 'search' : SearchAction,
488 'set' : SetStatusAction,
489 'shell' : TwitterShell,
490 }
491
492 def loadConfig(filename):
493 options = dict(OPTIONS)
494 if os.path.exists(filename):
495 cp = SafeConfigParser()
496 cp.read([filename])
497 for option in ('format', 'prompt'):
498 if cp.has_option('twitter', option):
499 options[option] = cp.get('twitter', option)
500 return options
501
502 def read_oauth_file(fn):
503 f = open(fn)
504 return f.readline().strip(), f.readline().strip()
505
506 def main(args=sys.argv[1:]):
507 arg_options = {}
508 try:
509 parse_args(args, arg_options)
510 except GetoptError, e:
511 print >> sys.stderr, "I can't do that, %s." %(e)
512 print >> sys.stderr
513 raise SystemExit(1)
514
515 config_options = loadConfig(
516 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
517
518 # Apply the various options in order, the most important applied last.
519 # Defaults first, then what's read from config file, then command-line
520 # arguments.
521 options = dict(OPTIONS)
522 for d in config_options, arg_options:
523 for k,v in d.items():
524 if v: options[k] = v
525
526 if options['refresh'] and options['action'] not in (
527 'friends', 'public', 'replies'):
528 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
529 print >> sys.stderr, "Use 'twitter -h' for help."
530 return 1
531
532 if (options['action'] == 'authorize'
533 or not os.path.exists(options['oauth_filename'])):
534 oauth_dance(options)
535
536 oauth_token, oauth_token_secret = read_oauth_file(options['oauth_filename'])
537
538 twitter = Twitter(
539 auth=OAuth(
540 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
541 secure=options['secure'])
542
543 try:
544 Action()(twitter, options)
545 except NoSuchActionError, e:
546 print >>sys.stderr, e
547 raise SystemExit(1)
548 except TwitterError, e:
549 print >> sys.stderr, e.args[0]
550 print >> sys.stderr, "Use 'twitter -h' for help."
551 raise SystemExit(1)