]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Remove spaces at eol.
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6
7 ACTIONS:
8 authorize authorize the command-line tool to interact with Twitter
9 follow add the specified user to your follow list
10 friends get latest tweets from your friends (default action)
11 help print this help text that you are currently reading
12 leave remove the specified user from your following list
13 public get latest public tweets
14 replies get latest replies
15 search search twitter (Beware: octothorpe, escape it)
16 set set your twitter status
17 shell login the twitter shell
18
19
20 OPTIONS:
21
22 -r --refresh run this command forever, polling every once
23 in a while (default: every 5 minutes)
24 -R --refresh-rate <rate> set the refresh rate (in seconds)
25 -f --format <format> specify the output format for status updates
26 -c --config <filename> read username and password from given config
27 file (default ~/.twitter)
28 -l --length <count> specify number of status updates shown
29 (default: 20, max: 200)
30 -t --timestamp show time before status lines
31 -d --datestamp shoe date before status lines
32 --no-ssl use HTTP instead of more secure HTTPS
33 --oauth <filename> filename to read/store oauth credentials to
34
35 FORMATS for the --format option
36
37 default one line per status
38 verbose multiple lines per status, more verbose status info
39 urls nothing but URLs
40 ansi ansi colour (rainbow mode)
41
42
43 CONFIG FILES
44
45 The config file should be placed in your home directory and be named .twitter.
46 It must contain a [twitter] header, and all the desired options you wish to
47 set, like so:
48
49 [twitter]
50 format: <desired_default_format_for_output>
51 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
52
53 OAuth authentication tokens are stored in the file .twitter_oauth in your
54 home directory.
55 """
56
57 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
58 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
59
60 import sys
61 import time
62 from getopt import gnu_getopt as getopt, GetoptError
63 from getpass import getpass
64 import re
65 import os.path
66 from ConfigParser import SafeConfigParser
67 import datetime
68 from urllib import quote
69 import webbrowser
70
71 from api import Twitter, TwitterError
72 from oauth import OAuth, write_token_file, read_token_file
73 from oauth_dance import oauth_dance
74 import ansi
75
76 OPTIONS = {
77 'action': 'friends',
78 'refresh': False,
79 'refresh_rate': 600,
80 'format': 'default',
81 'prompt': '[cyan]twitter[R]> ',
82 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
83 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
84 'length': 20,
85 'timestamp': False,
86 'datestamp': False,
87 'extra_args': [],
88 'secure': True,
89 }
90
91 def parse_args(args, options):
92 long_opts = ['help', 'format=', 'refresh', 'oauth=',
93 'refresh-rate=', 'config=', 'length=', 'timestamp',
94 'datestamp', 'no-ssl']
95 short_opts = "e:p:f:h?rR:c:l:td"
96 opts, extra_args = getopt(args, short_opts, long_opts)
97
98 for opt, arg in opts:
99 if opt in ('-f', '--format'):
100 options['format'] = arg
101 elif opt in ('-r', '--refresh'):
102 options['refresh'] = True
103 elif opt in ('-R', '--refresh-rate'):
104 options['refresh_rate'] = int(arg)
105 elif opt in ('-l', '--length'):
106 options["length"] = int(arg)
107 elif opt in ('-t', '--timestamp'):
108 options["timestamp"] = True
109 elif opt in ('-d', '--datestamp'):
110 options["datestamp"] = True
111 elif opt in ('-?', '-h', '--help'):
112 options['action'] = 'help'
113 elif opt in ('-c', '--config'):
114 options['config_filename'] = arg
115 elif opt == '--no-ssl':
116 options['secure'] = False
117 elif opt == '--oauth':
118 options['oauth_filename'] = arg
119
120 if extra_args and not ('action' in options and options['action'] == 'help'):
121 options['action'] = extra_args[0]
122 options['extra_args'] = extra_args[1:]
123
124 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
125 timestamp = options["timestamp"]
126 datestamp = options["datestamp"]
127 t = time.strptime(status['created_at'], format)
128 i_hate_timezones = time.timezone
129 if (time.daylight):
130 i_hate_timezones = time.altzone
131 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
132 seconds=i_hate_timezones)
133 t = dt.timetuple()
134 if timestamp and datestamp:
135 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
136 elif timestamp:
137 return time.strftime("%H:%M:%S ", t)
138 elif datestamp:
139 return time.strftime("%Y-%m-%d ", t)
140 return ""
141
142 class StatusFormatter(object):
143 def __call__(self, status, options):
144 return (u"%s%s %s" %(
145 get_time_string(status, options),
146 status['user']['screen_name'], status['text']))
147
148 class AnsiStatusFormatter(object):
149 def __init__(self):
150 self._colourMap = ansi.ColourMap()
151
152 def __call__(self, status, options):
153 colour = self._colourMap.colourFor(status['user']['screen_name'])
154 return (u"%s%s%s%s %s" %(
155 get_time_string(status, options),
156 ansi.cmdColour(colour), status['user']['screen_name'],
157 ansi.cmdReset(), status['text']))
158
159 class VerboseStatusFormatter(object):
160 def __call__(self, status, options):
161 return (u"-- %s (%s) on %s\n%s\n" %(
162 status['user']['screen_name'],
163 status['user']['location'],
164 status['created_at'],
165 status['text']))
166
167 class URLStatusFormatter(object):
168 urlmatch = re.compile(r'https?://\S+')
169 def __call__(self, status, options):
170 urls = self.urlmatch.findall(status['text'])
171 return u'\n'.join(urls) if urls else ""
172
173 class AdminFormatter(object):
174 def __call__(self, action, user):
175 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
176 if action == "follow":
177 return u"You are now following %s.\n" %(user_str)
178 else:
179 return u"You are no longer following %s.\n" %(user_str)
180
181 class VerboseAdminFormatter(object):
182 def __call__(self, action, user):
183 return(u"-- %s: %s (%s): %s" % (
184 "Following" if action == "follow" else "Leaving",
185 user['screen_name'],
186 user['name'],
187 user['url']))
188
189 class SearchFormatter(object):
190 def __call__(self, result, options):
191 return(u"%s%s %s" %(
192 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
193 result['from_user'], result['text']))
194
195 class VerboseSearchFormatter(SearchFormatter):
196 pass #Default to the regular one
197
198 class URLSearchFormatter(object):
199 urlmatch = re.compile(r'https?://\S+')
200 def __call__(self, result, options):
201 urls = self.urlmatch.findall(result['text'])
202 return u'\n'.join(urls) if urls else ""
203
204 class AnsiSearchFormatter(object):
205 def __init__(self):
206 self._colourMap = ansi.ColourMap()
207
208 def __call__(self, result, options):
209 colour = self._colourMap.colourFor(result['from_user'])
210 return (u"%s%s%s%s %s" %(
211 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
212 ansi.cmdColour(colour), result['from_user'],
213 ansi.cmdReset(), result['text']))
214
215 _term_encoding = None
216 def get_term_encoding():
217 global _term_encoding
218 if not _term_encoding:
219 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
220 if lang[1:]:
221 _term_encoding = lang[1]
222 else:
223 _term_encoding = 'UTF-8'
224 return _term_encoding
225
226 formatters = {}
227 status_formatters = {
228 'default': StatusFormatter,
229 'verbose': VerboseStatusFormatter,
230 'urls': URLStatusFormatter,
231 'ansi': AnsiStatusFormatter
232 }
233 formatters['status'] = status_formatters
234
235 admin_formatters = {
236 'default': AdminFormatter,
237 'verbose': VerboseAdminFormatter,
238 'urls': AdminFormatter,
239 'ansi': AdminFormatter
240 }
241 formatters['admin'] = admin_formatters
242
243 search_formatters = {
244 'default': SearchFormatter,
245 'verbose': VerboseSearchFormatter,
246 'urls': URLSearchFormatter,
247 'ansi': AnsiSearchFormatter
248 }
249 formatters['search'] = search_formatters
250
251 def get_formatter(action_type, options):
252 formatters_dict = formatters.get(action_type)
253 if (not formatters_dict):
254 raise TwitterError(
255 "There was an error finding a class of formatters for your type (%s)"
256 %(action_type))
257 f = formatters_dict.get(options['format'])
258 if (not f):
259 raise TwitterError(
260 "Unknown formatter '%s' for status actions" %(options['format']))
261 return f()
262
263 class Action(object):
264
265 def ask(self, subject='perform this action', careful=False):
266 '''
267 Requests fromt he user using `raw_input` if `subject` should be
268 performed. When `careful`, the default answer is NO, otherwise YES.
269 Returns the user answer in the form `True` or `False`.
270 '''
271 sample = '(y/N)'
272 if not careful:
273 sample = '(Y/n)'
274
275 prompt = 'You really want to %s %s? ' %(subject, sample)
276 try:
277 answer = raw_input(prompt).lower()
278 if careful:
279 return answer in ('yes', 'y')
280 else:
281 return answer not in ('no', 'n')
282 except EOFError:
283 print >>sys.stderr # Put Newline since Enter was never pressed
284 # TODO:
285 # Figure out why on OS X the raw_input keeps raising
286 # EOFError and is never able to reset and get more input
287 # Hint: Look at how IPython implements their console
288 default = True
289 if careful:
290 default = False
291 return default
292
293 def __call__(self, twitter, options):
294 action = actions.get(options['action'], NoSuchAction)()
295 try:
296 doAction = lambda : action(twitter, options)
297 if (options['refresh'] and isinstance(action, StatusAction)):
298 while True:
299 doAction()
300 time.sleep(options['refresh_rate'])
301 else:
302 doAction()
303 except KeyboardInterrupt:
304 print >>sys.stderr, '\n[Keyboard Interrupt]'
305 pass
306
307 class NoSuchActionError(Exception):
308 pass
309
310 class NoSuchAction(Action):
311 def __call__(self, twitter, options):
312 raise NoSuchActionError("No such action: %s" %(options['action']))
313
314 def printNicely(string):
315 if sys.stdout.encoding:
316 print string.encode(sys.stdout.encoding, 'replace')
317 else:
318 print string.encode('utf-8')
319
320 class StatusAction(Action):
321 def __call__(self, twitter, options):
322 statuses = self.getStatuses(twitter, options)
323 sf = get_formatter('status', options)
324 for status in statuses:
325 statusStr = sf(status, options)
326 if statusStr.strip():
327 printNicely(statusStr)
328
329 class SearchAction(Action):
330 def __call__(self, twitter, options):
331 # We need to be pointing at search.twitter.com to work, and it is less
332 # tangly to do it here than in the main()
333 twitter.domain="search.twitter.com"
334 twitter.uri=""
335 # We need to bypass the TwitterCall parameter encoding, so we
336 # don't encode the plus sign, so we have to encode it ourselves
337 query_string = "+".join(
338 [quote(term.decode(get_term_encoding()))
339 for term in options['extra_args']])
340 twitter.encoded_args = "q=%s" %(query_string)
341
342 results = twitter.search()['results']
343 f = get_formatter('search', options)
344 for result in results:
345 resultStr = f(result, options)
346 if resultStr.strip():
347 printNicely(resultStr)
348
349 class AdminAction(Action):
350 def __call__(self, twitter, options):
351 if not (options['extra_args'] and options['extra_args'][0]):
352 raise TwitterError("You need to specify a user (screen name)")
353 af = get_formatter('admin', options)
354 try:
355 user = self.getUser(twitter, options['extra_args'][0])
356 except TwitterError, e:
357 print "There was a problem following or leaving the specified user."
358 print "You may be trying to follow a user you are already following;"
359 print "Leaving a user you are not currently following;"
360 print "Or the user may not exist."
361 print "Sorry."
362 print
363 print e
364 else:
365 printNicely(af(options['action'], user))
366
367 class FriendsAction(StatusAction):
368 def getStatuses(self, twitter, options):
369 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
370
371 class PublicAction(StatusAction):
372 def getStatuses(self, twitter, options):
373 return reversed(twitter.statuses.public_timeline(count=options["length"]))
374
375 class RepliesAction(StatusAction):
376 def getStatuses(self, twitter, options):
377 return reversed(twitter.statuses.replies(count=options["length"]))
378
379 class FollowAction(AdminAction):
380 def getUser(self, twitter, user):
381 return twitter.friendships.create(id=user)
382
383 class LeaveAction(AdminAction):
384 def getUser(self, twitter, user):
385 return twitter.friendships.destroy(id=user)
386
387 class SetStatusAction(Action):
388 def __call__(self, twitter, options):
389 statusTxt = (" ".join(options['extra_args']).decode(get_term_encoding())
390 if options['extra_args']
391 else unicode(raw_input("message: ")))
392 status = (statusTxt.encode('utf8', 'replace'))
393 twitter.statuses.update(status=status)
394
395 class TwitterShell(Action):
396
397 def render_prompt(self, prompt):
398 '''Parses the `prompt` string and returns the rendered version'''
399 prompt = prompt.strip("'").replace("\\'","'")
400 for colour in ansi.COLOURS_NAMED:
401 if '[%s]' %(colour) in prompt:
402 prompt = prompt.replace(
403 '[%s]' %(colour), ansi.cmdColourNamed(colour))
404 prompt = prompt.replace('[R]', ansi.cmdReset())
405 return prompt
406
407 def __call__(self, twitter, options):
408 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
409 while True:
410 options['action'] = ""
411 try:
412 args = raw_input(prompt).split()
413 parse_args(args, options)
414 if not options['action']:
415 continue
416 elif options['action'] == 'exit':
417 raise SystemExit(0)
418 elif options['action'] == 'shell':
419 print >>sys.stderr, 'Sorry Xzibit does not work here!'
420 continue
421 elif options['action'] == 'help':
422 print >>sys.stderr, '''\ntwitter> `action`\n
423 The Shell Accepts all the command line actions along with:
424
425 exit Leave the twitter shell (^D may also be used)
426
427 Full CMD Line help is appended below for your convinience.'''
428 Action()(twitter, options)
429 options['action'] = ''
430 except NoSuchActionError, e:
431 print >>sys.stderr, e
432 except KeyboardInterrupt:
433 print >>sys.stderr, '\n[Keyboard Interrupt]'
434 except EOFError:
435 print >>sys.stderr
436 leaving = self.ask(subject='Leave')
437 if not leaving:
438 print >>sys.stderr, 'Excellent!'
439 else:
440 raise SystemExit(0)
441
442 class HelpAction(Action):
443 def __call__(self, twitter, options):
444 print __doc__
445
446 class DoNothingAction(Action):
447 def __call__(self, twitter, options):
448 pass
449
450 actions = {
451 'authorize' : DoNothingAction,
452 'follow' : FollowAction,
453 'friends' : FriendsAction,
454 'help' : HelpAction,
455 'leave' : LeaveAction,
456 'public' : PublicAction,
457 'replies' : RepliesAction,
458 'search' : SearchAction,
459 'set' : SetStatusAction,
460 'shell' : TwitterShell,
461 }
462
463 def loadConfig(filename):
464 options = dict(OPTIONS)
465 if os.path.exists(filename):
466 cp = SafeConfigParser()
467 cp.read([filename])
468 for option in ('format', 'prompt'):
469 if cp.has_option('twitter', option):
470 options[option] = cp.get('twitter', option)
471 return options
472
473 def main(args=sys.argv[1:]):
474 arg_options = {}
475 try:
476 parse_args(args, arg_options)
477 except GetoptError, e:
478 print >> sys.stderr, "I can't do that, %s." %(e)
479 print >> sys.stderr
480 raise SystemExit(1)
481
482 config_path = os.path.expanduser(
483 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
484 config_options = loadConfig(config_path)
485
486 # Apply the various options in order, the most important applied last.
487 # Defaults first, then what's read from config file, then command-line
488 # arguments.
489 options = dict(OPTIONS)
490 for d in config_options, arg_options:
491 for k,v in d.items():
492 if v: options[k] = v
493
494 if options['refresh'] and options['action'] not in (
495 'friends', 'public', 'replies'):
496 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
497 print >> sys.stderr, "Use 'twitter -h' for help."
498 return 1
499
500 oauth_filename = os.path.expanduser(options['oauth_filename'])
501
502 if (options['action'] == 'authorize'
503 or not os.path.exists(oauth_filename)):
504 oauth_dance(
505 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
506 options['oauth_filename'])
507
508 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
509
510 twitter = Twitter(
511 auth=OAuth(
512 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
513 secure=options['secure'],
514 api_version='1',
515 domain='api.twitter.com')
516
517 try:
518 Action()(twitter, options)
519 except NoSuchActionError, e:
520 print >>sys.stderr, e
521 raise SystemExit(1)
522 except TwitterError, e:
523 print >> sys.stderr, str(e)
524 print >> sys.stderr, "Use 'twitter -h' for help."
525 raise SystemExit(1)