]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
- More documentation.
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6
7 ACTIONS:
8 authorize authorize the command-line tool to interact with Twitter
9 follow add the specified user to your follow list
10 friends get latest tweets from your friends (default action)
11 help print this help text that you are currently reading
12 leave remove the specified user from your following list
13 public get latest public tweets
14 replies get latest replies
15 search search twitter (Beware: octothorpe, escape it)
16 set set your twitter status
17 shell login the twitter shell
18
19
20 OPTIONS:
21
22 -r --refresh run this command forever, polling every once
23 in a while (default: every 5 minutes)
24 -R --refresh-rate <rate> set the refresh rate (in seconds)
25 -f --format <format> specify the output format for status updates
26 -c --config <filename> read username and password from given config
27 file (default ~/.twitter)
28 -l --length <count> specify number of status updates shown
29 (default: 20, max: 200)
30 -t --timestamp show time before status lines
31 -d --datestamp shoe date before status lines
32 --no-ssl use HTTP instead of more secure HTTPS
33 --oauth <filename> filename to read/store oauth credentials to
34
35 FORMATS for the --format option
36
37 default one line per status
38 verbose multiple lines per status, more verbose status info
39 urls nothing but URLs
40 ansi ansi colour (rainbow mode)
41
42
43 CONFIG FILES
44
45 The config file should be placed in your home directory and be named .twitter.
46 It must contain a [twitter] header, and all the desired options you wish to
47 set, like so:
48
49 [twitter]
50 format: <desired_default_format_for_output>
51 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
52
53 OAuth authentication tokens are stored in the file .twitter_oauth in your
54 home directory.
55 """
56
57 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
58 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
59
60 import sys
61 import time
62 from getopt import gnu_getopt as getopt, GetoptError
63 from getpass import getpass
64 import re
65 import os.path
66 from ConfigParser import SafeConfigParser
67 import datetime
68 from urllib import quote
69 import webbrowser
70
71 from api import Twitter, TwitterError
72 from oauth import OAuth
73 import ansi
74
75 OPTIONS = {
76 'action': 'friends',
77 'refresh': False,
78 'refresh_rate': 600,
79 'format': 'default',
80 'prompt': '[cyan]twitter[R]> ',
81 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
82 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
83 'length': 20,
84 'timestamp': False,
85 'datestamp': False,
86 'extra_args': [],
87 'secure': True,
88 }
89
90 def parse_args(args, options):
91 long_opts = ['help', 'format=', 'refresh', 'oauth=',
92 'refresh-rate=', 'config=', 'length=', 'timestamp',
93 'datestamp', 'no-ssl']
94 short_opts = "e:p:f:h?rR:c:l:td"
95 opts, extra_args = getopt(args, short_opts, long_opts)
96
97 for opt, arg in opts:
98 if opt in ('-f', '--format'):
99 options['format'] = arg
100 elif opt in ('-r', '--refresh'):
101 options['refresh'] = True
102 elif opt in ('-R', '--refresh-rate'):
103 options['refresh_rate'] = int(arg)
104 elif opt in ('-l', '--length'):
105 options["length"] = int(arg)
106 elif opt in ('-t', '--timestamp'):
107 options["timestamp"] = True
108 elif opt in ('-d', '--datestamp'):
109 options["datestamp"] = True
110 elif opt in ('-?', '-h', '--help'):
111 options['action'] = 'help'
112 elif opt in ('-c', '--config'):
113 options['config_filename'] = arg
114 elif opt == '--no-ssl':
115 options['secure'] = False
116 elif opt == '--oauth':
117 options['oauth_filename'] = arg
118
119 if extra_args and not ('action' in options and options['action'] == 'help'):
120 options['action'] = extra_args[0]
121 options['extra_args'] = extra_args[1:]
122
123 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
124 timestamp = options["timestamp"]
125 datestamp = options["datestamp"]
126 t = time.strptime(status['created_at'], format)
127 i_hate_timezones = time.timezone
128 if (time.daylight):
129 i_hate_timezones = time.altzone
130 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
131 seconds=i_hate_timezones)
132 t = dt.timetuple()
133 if timestamp and datestamp:
134 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
135 elif timestamp:
136 return time.strftime("%H:%M:%S ", t)
137 elif datestamp:
138 return time.strftime("%Y-%m-%d ", t)
139 return ""
140
141 class StatusFormatter(object):
142 def __call__(self, status, options):
143 return (u"%s%s %s" %(
144 get_time_string(status, options),
145 status['user']['screen_name'], status['text']))
146
147 class AnsiStatusFormatter(object):
148 def __init__(self):
149 self._colourMap = ansi.ColourMap()
150
151 def __call__(self, status, options):
152 colour = self._colourMap.colourFor(status['user']['screen_name'])
153 return (u"%s%s%s%s %s" %(
154 get_time_string(status, options),
155 ansi.cmdColour(colour), status['user']['screen_name'],
156 ansi.cmdReset(), status['text']))
157
158 class VerboseStatusFormatter(object):
159 def __call__(self, status, options):
160 return (u"-- %s (%s) on %s\n%s\n" %(
161 status['user']['screen_name'],
162 status['user']['location'],
163 status['created_at'],
164 status['text']))
165
166 class URLStatusFormatter(object):
167 urlmatch = re.compile(r'https?://\S+')
168 def __call__(self, status, options):
169 urls = self.urlmatch.findall(status['text'])
170 return u'\n'.join(urls) if urls else ""
171
172 class AdminFormatter(object):
173 def __call__(self, action, user):
174 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
175 if action == "follow":
176 return u"You are now following %s.\n" %(user_str)
177 else:
178 return u"You are no longer following %s.\n" %(user_str)
179
180 class VerboseAdminFormatter(object):
181 def __call__(self, action, user):
182 return(u"-- %s: %s (%s): %s" % (
183 "Following" if action == "follow" else "Leaving",
184 user['screen_name'],
185 user['name'],
186 user['url']))
187
188 class SearchFormatter(object):
189 def __call__(self, result, options):
190 return(u"%s%s %s" %(
191 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
192 result['from_user'], result['text']))
193
194 class VerboseSearchFormatter(SearchFormatter):
195 pass #Default to the regular one
196
197 class URLSearchFormatter(object):
198 urlmatch = re.compile(r'https?://\S+')
199 def __call__(self, result, options):
200 urls = self.urlmatch.findall(result['text'])
201 return u'\n'.join(urls) if urls else ""
202
203 class AnsiSearchFormatter(object):
204 def __init__(self):
205 self._colourMap = ansi.ColourMap()
206
207 def __call__(self, result, options):
208 colour = self._colourMap.colourFor(result['from_user'])
209 return (u"%s%s%s%s %s" %(
210 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
211 ansi.cmdColour(colour), result['from_user'],
212 ansi.cmdReset(), result['text']))
213
214 formatters = {}
215 status_formatters = {
216 'default': StatusFormatter,
217 'verbose': VerboseStatusFormatter,
218 'urls': URLStatusFormatter,
219 'ansi': AnsiStatusFormatter
220 }
221 formatters['status'] = status_formatters
222
223 admin_formatters = {
224 'default': AdminFormatter,
225 'verbose': VerboseAdminFormatter,
226 'urls': AdminFormatter,
227 'ansi': AdminFormatter
228 }
229 formatters['admin'] = admin_formatters
230
231 search_formatters = {
232 'default': SearchFormatter,
233 'verbose': VerboseSearchFormatter,
234 'urls': URLSearchFormatter,
235 'ansi': AnsiSearchFormatter
236 }
237 formatters['search'] = search_formatters
238
239 def get_formatter(action_type, options):
240 formatters_dict = formatters.get(action_type)
241 if (not formatters_dict):
242 raise TwitterError(
243 "There was an error finding a class of formatters for your type (%s)"
244 %(action_type))
245 f = formatters_dict.get(options['format'])
246 if (not f):
247 raise TwitterError(
248 "Unknown formatter '%s' for status actions" %(options['format']))
249 return f()
250
251 class Action(object):
252
253 def ask(self, subject='perform this action', careful=False):
254 '''
255 Requests fromt he user using `raw_input` if `subject` should be
256 performed. When `careful`, the default answer is NO, otherwise YES.
257 Returns the user answer in the form `True` or `False`.
258 '''
259 sample = '(y/N)'
260 if not careful:
261 sample = '(Y/n)'
262
263 prompt = 'You really want to %s %s? ' %(subject, sample)
264 try:
265 answer = raw_input(prompt).lower()
266 if careful:
267 return answer in ('yes', 'y')
268 else:
269 return answer not in ('no', 'n')
270 except EOFError:
271 print >>sys.stderr # Put Newline since Enter was never pressed
272 # TODO:
273 # Figure out why on OS X the raw_input keeps raising
274 # EOFError and is never able to reset and get more input
275 # Hint: Look at how IPython implements their console
276 default = True
277 if careful:
278 default = False
279 return default
280
281 def __call__(self, twitter, options):
282 action = actions.get(options['action'], NoSuchAction)()
283 try:
284 doAction = lambda : action(twitter, options)
285 if (options['refresh'] and isinstance(action, StatusAction)):
286 while True:
287 doAction()
288 time.sleep(options['refresh_rate'])
289 else:
290 doAction()
291 except KeyboardInterrupt:
292 print >>sys.stderr, '\n[Keyboard Interrupt]'
293 pass
294
295 class NoSuchActionError(Exception):
296 pass
297
298 class NoSuchAction(Action):
299 def __call__(self, twitter, options):
300 raise NoSuchActionError("No such action: %s" %(options['action']))
301
302 def printNicely(string):
303 if sys.stdout.encoding:
304 print string.encode(sys.stdout.encoding, 'replace')
305 else:
306 print string.encode('utf-8')
307
308 class StatusAction(Action):
309 def __call__(self, twitter, options):
310 statuses = self.getStatuses(twitter, options)
311 sf = get_formatter('status', options)
312 for status in statuses:
313 statusStr = sf(status, options)
314 if statusStr.strip():
315 printNicely(statusStr)
316
317 class SearchAction(Action):
318 def __call__(self, twitter, options):
319 # We need to be pointing at search.twitter.com to work, and it is less
320 # tangly to do it here than in the main()
321 twitter.domain="search.twitter.com"
322 twitter.uri=""
323 # We need to bypass the TwitterCall parameter encoding, so we
324 # don't encode the plus sign, so we have to encode it ourselves
325 query_string = "+".join([quote(term) for term in options['extra_args']])
326 twitter.encoded_args = "q=%s" %(query_string)
327
328 results = twitter.search()['results']
329 f = get_formatter('search', options)
330 for result in results:
331 resultStr = f(result, options)
332 if resultStr.strip():
333 printNicely(resultStr)
334
335 class AdminAction(Action):
336 def __call__(self, twitter, options):
337 if not (options['extra_args'] and options['extra_args'][0]):
338 raise TwitterError("You need to specify a user (screen name)")
339 af = get_formatter('admin', options)
340 try:
341 user = self.getUser(twitter, options['extra_args'][0])
342 except TwitterError, e:
343 print "There was a problem following or leaving the specified user."
344 print "You may be trying to follow a user you are already following;"
345 print "Leaving a user you are not currently following;"
346 print "Or the user may not exist."
347 print "Sorry."
348 print
349 print e
350 else:
351 printNicely(af(options['action'], user))
352
353 class FriendsAction(StatusAction):
354 def getStatuses(self, twitter, options):
355 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
356
357 class PublicAction(StatusAction):
358 def getStatuses(self, twitter, options):
359 return reversed(twitter.statuses.public_timeline(count=options["length"]))
360
361 class RepliesAction(StatusAction):
362 def getStatuses(self, twitter, options):
363 return reversed(twitter.statuses.replies(count=options["length"]))
364
365 class FollowAction(AdminAction):
366 def getUser(self, twitter, user):
367 return twitter.friendships.create(id=user)
368
369 class LeaveAction(AdminAction):
370 def getUser(self, twitter, user):
371 return twitter.friendships.destroy(id=user)
372
373 class SetStatusAction(Action):
374 def __call__(self, twitter, options):
375 statusTxt = (u" ".join(options['extra_args'])
376 if options['extra_args']
377 else unicode(raw_input("message: ")))
378 status = (statusTxt.encode('utf8', 'replace'))
379 twitter.statuses.update(status=status)
380
381 class TwitterShell(Action):
382
383 def render_prompt(self, prompt):
384 '''Parses the `prompt` string and returns the rendered version'''
385 prompt = prompt.strip("'").replace("\\'","'")
386 for colour in ansi.COLOURS_NAMED:
387 if '[%s]' %(colour) in prompt:
388 prompt = prompt.replace(
389 '[%s]' %(colour), ansi.cmdColourNamed(colour))
390 prompt = prompt.replace('[R]', ansi.cmdReset())
391 return prompt
392
393 def __call__(self, twitter, options):
394 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
395 while True:
396 options['action'] = ""
397 try:
398 args = raw_input(prompt).split()
399 parse_args(args, options)
400 if not options['action']:
401 continue
402 elif options['action'] == 'exit':
403 raise SystemExit(0)
404 elif options['action'] == 'shell':
405 print >>sys.stderr, 'Sorry Xzibit does not work here!'
406 continue
407 elif options['action'] == 'help':
408 print >>sys.stderr, '''\ntwitter> `action`\n
409 The Shell Accepts all the command line actions along with:
410
411 exit Leave the twitter shell (^D may also be used)
412
413 Full CMD Line help is appended below for your convinience.'''
414 Action()(twitter, options)
415 options['action'] = ''
416 except NoSuchActionError, e:
417 print >>sys.stderr, e
418 except KeyboardInterrupt:
419 print >>sys.stderr, '\n[Keyboard Interrupt]'
420 except EOFError:
421 print >>sys.stderr
422 leaving = self.ask(subject='Leave')
423 if not leaving:
424 print >>sys.stderr, 'Excellent!'
425 else:
426 raise SystemExit(0)
427
428 class HelpAction(Action):
429 def __call__(self, twitter, options):
430 print __doc__
431
432 class DoNothingAction(Action):
433 def __call__(self, twitter, options):
434 pass
435
436 def parse_oauth_tokens(result):
437 for r in result.split('&'):
438 k, v = r.split('=')
439 if k == 'oauth_token':
440 oauth_token = v
441 elif k == 'oauth_token_secret':
442 oauth_token_secret = v
443 return oauth_token, oauth_token_secret
444
445 def oauth_dance(options):
446 print ("Hi there! We're gonna get you all set up to use Twitter"
447 " on the command-line.")
448 twitter = Twitter(
449 auth=OAuth('', '', CONSUMER_KEY, CONSUMER_SECRET),
450 format='')
451 oauth_token, oauth_token_secret = parse_oauth_tokens(
452 twitter.oauth.request_token())
453 print """
454 In the web browser window that opens please choose to Allow access to the
455 command-line tool. Copy the PIN number that appears on the next page and
456 paste or type it here:
457 """
458 webbrowser.open(
459 'http://api.twitter.com/oauth/authorize?oauth_token=' +
460 oauth_token)
461 time.sleep(2) # Sometimes the last command can print some
462 # crap. Wait a bit so it doesn't mess up the next
463 # prompt.
464 oauth_verifier = raw_input("Please type the PIN: ").strip()
465 twitter = Twitter(
466 auth=OAuth(
467 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
468 format='')
469 oauth_token, oauth_token_secret = parse_oauth_tokens(
470 twitter.oauth.access_token(oauth_verifier=oauth_verifier))
471 oauth_file = open(options['oauth_filename'], 'w')
472 print >> oauth_file, oauth_token
473 print >> oauth_file, oauth_token_secret
474 oauth_file.close()
475 print
476 print "That's it! Your authorization keys have been written to %s." % (
477 options['oauth_filename'])
478
479
480 actions = {
481 'authorize' : DoNothingAction,
482 'follow' : FollowAction,
483 'friends' : FriendsAction,
484 'help' : HelpAction,
485 'leave' : LeaveAction,
486 'public' : PublicAction,
487 'replies' : RepliesAction,
488 'search' : SearchAction,
489 'set' : SetStatusAction,
490 'shell' : TwitterShell,
491 }
492
493 def loadConfig(filename):
494 options = dict(OPTIONS)
495 if os.path.exists(filename):
496 cp = SafeConfigParser()
497 cp.read([filename])
498 for option in ('format', 'prompt'):
499 if cp.has_option('twitter', option):
500 options[option] = cp.get('twitter', option)
501 return options
502
503 def read_oauth_file(fn):
504 f = open(fn)
505 return f.readline().strip(), f.readline().strip()
506
507 def main(args=sys.argv[1:]):
508 arg_options = {}
509 try:
510 parse_args(args, arg_options)
511 except GetoptError, e:
512 print >> sys.stderr, "I can't do that, %s." %(e)
513 print >> sys.stderr
514 raise SystemExit(1)
515
516 config_options = loadConfig(
517 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
518
519 # Apply the various options in order, the most important applied last.
520 # Defaults first, then what's read from config file, then command-line
521 # arguments.
522 options = dict(OPTIONS)
523 for d in config_options, arg_options:
524 for k,v in d.items():
525 if v: options[k] = v
526
527 if options['refresh'] and options['action'] not in (
528 'friends', 'public', 'replies'):
529 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
530 print >> sys.stderr, "Use 'twitter -h' for help."
531 return 1
532
533 if (options['action'] == 'authorize'
534 or not os.path.exists(options['oauth_filename'])):
535 oauth_dance(options)
536
537 oauth_token, oauth_token_secret = read_oauth_file(options['oauth_filename'])
538
539 twitter = Twitter(
540 auth=OAuth(
541 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
542 secure=options['secure'],
543 api_version='1')
544
545 try:
546 Action()(twitter, options)
547 except NoSuchActionError, e:
548 print >>sys.stderr, e
549 raise SystemExit(1)
550 except TwitterError, e:
551 print >> sys.stderr, e.args[0]
552 print >> sys.stderr, "Use 'twitter -h' for help."
553 raise SystemExit(1)