]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
- Attempt to determine terminal encoding when tweeting.
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6
7 ACTIONS:
8 authorize authorize the command-line tool to interact with Twitter
9 follow add the specified user to your follow list
10 friends get latest tweets from your friends (default action)
11 help print this help text that you are currently reading
12 leave remove the specified user from your following list
13 public get latest public tweets
14 replies get latest replies
15 search search twitter (Beware: octothorpe, escape it)
16 set set your twitter status
17 shell login the twitter shell
18
19
20 OPTIONS:
21
22 -r --refresh run this command forever, polling every once
23 in a while (default: every 5 minutes)
24 -R --refresh-rate <rate> set the refresh rate (in seconds)
25 -f --format <format> specify the output format for status updates
26 -c --config <filename> read username and password from given config
27 file (default ~/.twitter)
28 -l --length <count> specify number of status updates shown
29 (default: 20, max: 200)
30 -t --timestamp show time before status lines
31 -d --datestamp shoe date before status lines
32 --no-ssl use HTTP instead of more secure HTTPS
33 --oauth <filename> filename to read/store oauth credentials to
34
35 FORMATS for the --format option
36
37 default one line per status
38 verbose multiple lines per status, more verbose status info
39 urls nothing but URLs
40 ansi ansi colour (rainbow mode)
41
42
43 CONFIG FILES
44
45 The config file should be placed in your home directory and be named .twitter.
46 It must contain a [twitter] header, and all the desired options you wish to
47 set, like so:
48
49 [twitter]
50 format: <desired_default_format_for_output>
51 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
52
53 OAuth authentication tokens are stored in the file .twitter_oauth in your
54 home directory.
55 """
56
57 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
58 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
59
60 import sys
61 import time
62 from getopt import gnu_getopt as getopt, GetoptError
63 from getpass import getpass
64 import re
65 import os.path
66 from ConfigParser import SafeConfigParser
67 import datetime
68 from urllib import quote
69 import webbrowser
70
71 from api import Twitter, TwitterError
72 from oauth import OAuth
73 import ansi
74
75 OPTIONS = {
76 'action': 'friends',
77 'refresh': False,
78 'refresh_rate': 600,
79 'format': 'default',
80 'prompt': '[cyan]twitter[R]> ',
81 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
82 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
83 'length': 20,
84 'timestamp': False,
85 'datestamp': False,
86 'extra_args': [],
87 'secure': True,
88 }
89
90 def parse_args(args, options):
91 long_opts = ['help', 'format=', 'refresh', 'oauth=',
92 'refresh-rate=', 'config=', 'length=', 'timestamp',
93 'datestamp', 'no-ssl']
94 short_opts = "e:p:f:h?rR:c:l:td"
95 opts, extra_args = getopt(args, short_opts, long_opts)
96
97 for opt, arg in opts:
98 if opt in ('-f', '--format'):
99 options['format'] = arg
100 elif opt in ('-r', '--refresh'):
101 options['refresh'] = True
102 elif opt in ('-R', '--refresh-rate'):
103 options['refresh_rate'] = int(arg)
104 elif opt in ('-l', '--length'):
105 options["length"] = int(arg)
106 elif opt in ('-t', '--timestamp'):
107 options["timestamp"] = True
108 elif opt in ('-d', '--datestamp'):
109 options["datestamp"] = True
110 elif opt in ('-?', '-h', '--help'):
111 options['action'] = 'help'
112 elif opt in ('-c', '--config'):
113 options['config_filename'] = arg
114 elif opt == '--no-ssl':
115 options['secure'] = False
116 elif opt == '--oauth':
117 options['oauth_filename'] = arg
118
119 if extra_args and not ('action' in options and options['action'] == 'help'):
120 options['action'] = extra_args[0]
121 options['extra_args'] = extra_args[1:]
122
123 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
124 timestamp = options["timestamp"]
125 datestamp = options["datestamp"]
126 t = time.strptime(status['created_at'], format)
127 i_hate_timezones = time.timezone
128 if (time.daylight):
129 i_hate_timezones = time.altzone
130 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
131 seconds=i_hate_timezones)
132 t = dt.timetuple()
133 if timestamp and datestamp:
134 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
135 elif timestamp:
136 return time.strftime("%H:%M:%S ", t)
137 elif datestamp:
138 return time.strftime("%Y-%m-%d ", t)
139 return ""
140
141 class StatusFormatter(object):
142 def __call__(self, status, options):
143 return (u"%s%s %s" %(
144 get_time_string(status, options),
145 status['user']['screen_name'], status['text']))
146
147 class AnsiStatusFormatter(object):
148 def __init__(self):
149 self._colourMap = ansi.ColourMap()
150
151 def __call__(self, status, options):
152 colour = self._colourMap.colourFor(status['user']['screen_name'])
153 return (u"%s%s%s%s %s" %(
154 get_time_string(status, options),
155 ansi.cmdColour(colour), status['user']['screen_name'],
156 ansi.cmdReset(), status['text']))
157
158 class VerboseStatusFormatter(object):
159 def __call__(self, status, options):
160 return (u"-- %s (%s) on %s\n%s\n" %(
161 status['user']['screen_name'],
162 status['user']['location'],
163 status['created_at'],
164 status['text']))
165
166 class URLStatusFormatter(object):
167 urlmatch = re.compile(r'https?://\S+')
168 def __call__(self, status, options):
169 urls = self.urlmatch.findall(status['text'])
170 return u'\n'.join(urls) if urls else ""
171
172 class AdminFormatter(object):
173 def __call__(self, action, user):
174 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
175 if action == "follow":
176 return u"You are now following %s.\n" %(user_str)
177 else:
178 return u"You are no longer following %s.\n" %(user_str)
179
180 class VerboseAdminFormatter(object):
181 def __call__(self, action, user):
182 return(u"-- %s: %s (%s): %s" % (
183 "Following" if action == "follow" else "Leaving",
184 user['screen_name'],
185 user['name'],
186 user['url']))
187
188 class SearchFormatter(object):
189 def __call__(self, result, options):
190 return(u"%s%s %s" %(
191 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
192 result['from_user'], result['text']))
193
194 class VerboseSearchFormatter(SearchFormatter):
195 pass #Default to the regular one
196
197 class URLSearchFormatter(object):
198 urlmatch = re.compile(r'https?://\S+')
199 def __call__(self, result, options):
200 urls = self.urlmatch.findall(result['text'])
201 return u'\n'.join(urls) if urls else ""
202
203 class AnsiSearchFormatter(object):
204 def __init__(self):
205 self._colourMap = ansi.ColourMap()
206
207 def __call__(self, result, options):
208 colour = self._colourMap.colourFor(result['from_user'])
209 return (u"%s%s%s%s %s" %(
210 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
211 ansi.cmdColour(colour), result['from_user'],
212 ansi.cmdReset(), result['text']))
213
214 _term_encoding = None
215 def get_term_encoding():
216 global _term_encoding
217 if not _term_encoding:
218 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
219 if lang[1:]:
220 _term_encoding = lang[1]
221 else:
222 _term_encoding = 'UTF-8'
223 return _term_encoding
224
225 formatters = {}
226 status_formatters = {
227 'default': StatusFormatter,
228 'verbose': VerboseStatusFormatter,
229 'urls': URLStatusFormatter,
230 'ansi': AnsiStatusFormatter
231 }
232 formatters['status'] = status_formatters
233
234 admin_formatters = {
235 'default': AdminFormatter,
236 'verbose': VerboseAdminFormatter,
237 'urls': AdminFormatter,
238 'ansi': AdminFormatter
239 }
240 formatters['admin'] = admin_formatters
241
242 search_formatters = {
243 'default': SearchFormatter,
244 'verbose': VerboseSearchFormatter,
245 'urls': URLSearchFormatter,
246 'ansi': AnsiSearchFormatter
247 }
248 formatters['search'] = search_formatters
249
250 def get_formatter(action_type, options):
251 formatters_dict = formatters.get(action_type)
252 if (not formatters_dict):
253 raise TwitterError(
254 "There was an error finding a class of formatters for your type (%s)"
255 %(action_type))
256 f = formatters_dict.get(options['format'])
257 if (not f):
258 raise TwitterError(
259 "Unknown formatter '%s' for status actions" %(options['format']))
260 return f()
261
262 class Action(object):
263
264 def ask(self, subject='perform this action', careful=False):
265 '''
266 Requests fromt he user using `raw_input` if `subject` should be
267 performed. When `careful`, the default answer is NO, otherwise YES.
268 Returns the user answer in the form `True` or `False`.
269 '''
270 sample = '(y/N)'
271 if not careful:
272 sample = '(Y/n)'
273
274 prompt = 'You really want to %s %s? ' %(subject, sample)
275 try:
276 answer = raw_input(prompt).lower()
277 if careful:
278 return answer in ('yes', 'y')
279 else:
280 return answer not in ('no', 'n')
281 except EOFError:
282 print >>sys.stderr # Put Newline since Enter was never pressed
283 # TODO:
284 # Figure out why on OS X the raw_input keeps raising
285 # EOFError and is never able to reset and get more input
286 # Hint: Look at how IPython implements their console
287 default = True
288 if careful:
289 default = False
290 return default
291
292 def __call__(self, twitter, options):
293 action = actions.get(options['action'], NoSuchAction)()
294 try:
295 doAction = lambda : action(twitter, options)
296 if (options['refresh'] and isinstance(action, StatusAction)):
297 while True:
298 doAction()
299 time.sleep(options['refresh_rate'])
300 else:
301 doAction()
302 except KeyboardInterrupt:
303 print >>sys.stderr, '\n[Keyboard Interrupt]'
304 pass
305
306 class NoSuchActionError(Exception):
307 pass
308
309 class NoSuchAction(Action):
310 def __call__(self, twitter, options):
311 raise NoSuchActionError("No such action: %s" %(options['action']))
312
313 def printNicely(string):
314 if sys.stdout.encoding:
315 print string.encode(sys.stdout.encoding, 'replace')
316 else:
317 print string.encode('utf-8')
318
319 class StatusAction(Action):
320 def __call__(self, twitter, options):
321 statuses = self.getStatuses(twitter, options)
322 sf = get_formatter('status', options)
323 for status in statuses:
324 statusStr = sf(status, options)
325 if statusStr.strip():
326 printNicely(statusStr)
327
328 class SearchAction(Action):
329 def __call__(self, twitter, options):
330 # We need to be pointing at search.twitter.com to work, and it is less
331 # tangly to do it here than in the main()
332 twitter.domain="search.twitter.com"
333 twitter.uri=""
334 # We need to bypass the TwitterCall parameter encoding, so we
335 # don't encode the plus sign, so we have to encode it ourselves
336 query_string = "+".join(
337 [quote(term.decode(get_term_encoding()))
338 for term in options['extra_args']])
339 twitter.encoded_args = "q=%s" %(query_string)
340
341 results = twitter.search()['results']
342 f = get_formatter('search', options)
343 for result in results:
344 resultStr = f(result, options)
345 if resultStr.strip():
346 printNicely(resultStr)
347
348 class AdminAction(Action):
349 def __call__(self, twitter, options):
350 if not (options['extra_args'] and options['extra_args'][0]):
351 raise TwitterError("You need to specify a user (screen name)")
352 af = get_formatter('admin', options)
353 try:
354 user = self.getUser(twitter, options['extra_args'][0])
355 except TwitterError, e:
356 print "There was a problem following or leaving the specified user."
357 print "You may be trying to follow a user you are already following;"
358 print "Leaving a user you are not currently following;"
359 print "Or the user may not exist."
360 print "Sorry."
361 print
362 print e
363 else:
364 printNicely(af(options['action'], user))
365
366 class FriendsAction(StatusAction):
367 def getStatuses(self, twitter, options):
368 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
369
370 class PublicAction(StatusAction):
371 def getStatuses(self, twitter, options):
372 return reversed(twitter.statuses.public_timeline(count=options["length"]))
373
374 class RepliesAction(StatusAction):
375 def getStatuses(self, twitter, options):
376 return reversed(twitter.statuses.replies(count=options["length"]))
377
378 class FollowAction(AdminAction):
379 def getUser(self, twitter, user):
380 return twitter.friendships.create(id=user)
381
382 class LeaveAction(AdminAction):
383 def getUser(self, twitter, user):
384 return twitter.friendships.destroy(id=user)
385
386 class SetStatusAction(Action):
387 def __call__(self, twitter, options):
388 statusTxt = (" ".join(options['extra_args']).decode(get_term_encoding())
389 if options['extra_args']
390 else unicode(raw_input("message: ")))
391 status = (statusTxt.encode('utf8', 'replace'))
392 twitter.statuses.update(status=status)
393
394 class TwitterShell(Action):
395
396 def render_prompt(self, prompt):
397 '''Parses the `prompt` string and returns the rendered version'''
398 prompt = prompt.strip("'").replace("\\'","'")
399 for colour in ansi.COLOURS_NAMED:
400 if '[%s]' %(colour) in prompt:
401 prompt = prompt.replace(
402 '[%s]' %(colour), ansi.cmdColourNamed(colour))
403 prompt = prompt.replace('[R]', ansi.cmdReset())
404 return prompt
405
406 def __call__(self, twitter, options):
407 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
408 while True:
409 options['action'] = ""
410 try:
411 args = raw_input(prompt).split()
412 parse_args(args, options)
413 if not options['action']:
414 continue
415 elif options['action'] == 'exit':
416 raise SystemExit(0)
417 elif options['action'] == 'shell':
418 print >>sys.stderr, 'Sorry Xzibit does not work here!'
419 continue
420 elif options['action'] == 'help':
421 print >>sys.stderr, '''\ntwitter> `action`\n
422 The Shell Accepts all the command line actions along with:
423
424 exit Leave the twitter shell (^D may also be used)
425
426 Full CMD Line help is appended below for your convinience.'''
427 Action()(twitter, options)
428 options['action'] = ''
429 except NoSuchActionError, e:
430 print >>sys.stderr, e
431 except KeyboardInterrupt:
432 print >>sys.stderr, '\n[Keyboard Interrupt]'
433 except EOFError:
434 print >>sys.stderr
435 leaving = self.ask(subject='Leave')
436 if not leaving:
437 print >>sys.stderr, 'Excellent!'
438 else:
439 raise SystemExit(0)
440
441 class HelpAction(Action):
442 def __call__(self, twitter, options):
443 print __doc__
444
445 class DoNothingAction(Action):
446 def __call__(self, twitter, options):
447 pass
448
449 def parse_oauth_tokens(result):
450 for r in result.split('&'):
451 k, v = r.split('=')
452 if k == 'oauth_token':
453 oauth_token = v
454 elif k == 'oauth_token_secret':
455 oauth_token_secret = v
456 return oauth_token, oauth_token_secret
457
458 def oauth_dance(options):
459 print ("Hi there! We're gonna get you all set up to use Twitter"
460 " on the command-line.")
461 twitter = Twitter(
462 auth=OAuth('', '', CONSUMER_KEY, CONSUMER_SECRET),
463 format='')
464 oauth_token, oauth_token_secret = parse_oauth_tokens(
465 twitter.oauth.request_token())
466 print """
467 In the web browser window that opens please choose to Allow access to the
468 command-line tool. Copy the PIN number that appears on the next page and
469 paste or type it here:
470 """
471 webbrowser.open(
472 'http://api.twitter.com/oauth/authorize?oauth_token=' +
473 oauth_token)
474 time.sleep(2) # Sometimes the last command can print some
475 # crap. Wait a bit so it doesn't mess up the next
476 # prompt.
477 oauth_verifier = raw_input("Please type the PIN: ").strip()
478 twitter = Twitter(
479 auth=OAuth(
480 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
481 format='')
482 oauth_token, oauth_token_secret = parse_oauth_tokens(
483 twitter.oauth.access_token(oauth_verifier=oauth_verifier))
484 oauth_file = open(options['oauth_filename'], 'w')
485 print >> oauth_file, oauth_token
486 print >> oauth_file, oauth_token_secret
487 oauth_file.close()
488 print
489 print "That's it! Your authorization keys have been written to %s." % (
490 options['oauth_filename'])
491
492
493 actions = {
494 'authorize' : DoNothingAction,
495 'follow' : FollowAction,
496 'friends' : FriendsAction,
497 'help' : HelpAction,
498 'leave' : LeaveAction,
499 'public' : PublicAction,
500 'replies' : RepliesAction,
501 'search' : SearchAction,
502 'set' : SetStatusAction,
503 'shell' : TwitterShell,
504 }
505
506 def loadConfig(filename):
507 options = dict(OPTIONS)
508 if os.path.exists(filename):
509 cp = SafeConfigParser()
510 cp.read([filename])
511 for option in ('format', 'prompt'):
512 if cp.has_option('twitter', option):
513 options[option] = cp.get('twitter', option)
514 return options
515
516 def read_oauth_file(fn):
517 f = open(fn)
518 return f.readline().strip(), f.readline().strip()
519
520 def main(args=sys.argv[1:]):
521 arg_options = {}
522 try:
523 parse_args(args, arg_options)
524 except GetoptError, e:
525 print >> sys.stderr, "I can't do that, %s." %(e)
526 print >> sys.stderr
527 raise SystemExit(1)
528
529 config_options = loadConfig(
530 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
531
532 # Apply the various options in order, the most important applied last.
533 # Defaults first, then what's read from config file, then command-line
534 # arguments.
535 options = dict(OPTIONS)
536 for d in config_options, arg_options:
537 for k,v in d.items():
538 if v: options[k] = v
539
540 if options['refresh'] and options['action'] not in (
541 'friends', 'public', 'replies'):
542 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
543 print >> sys.stderr, "Use 'twitter -h' for help."
544 return 1
545
546 if (options['action'] == 'authorize'
547 or not os.path.exists(options['oauth_filename'])):
548 oauth_dance(options)
549
550 oauth_token, oauth_token_secret = read_oauth_file(options['oauth_filename'])
551
552 twitter = Twitter(
553 auth=OAuth(
554 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
555 secure=options['secure'],
556 api_version='1')
557
558 try:
559 Action()(twitter, options)
560 except NoSuchActionError, e:
561 print >>sys.stderr, e
562 raise SystemExit(1)
563 except TwitterError, e:
564 print >> sys.stderr, e.args[0]
565 print >> sys.stderr, "Use 'twitter -h' for help."
566 raise SystemExit(1)