]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Seperate mylist action for my own lists as opposed to anyone elses.
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow add the specified user to your follow list
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave remove the specified user from your following list
14 public get latest public tweets
15 list get list of user lists
16 replies get latest replies
17 search search twitter (Beware: octothorpe, escape it)
18 set set your twitter status
19 shell login the twitter shell
20
21
22 OPTIONS:
23
24 -r --refresh run this command forever, polling every once
25 in a while (default: every 5 minutes)
26 -R --refresh-rate <rate> set the refresh rate (in seconds)
27 -f --format <format> specify the output format for status updates
28 -c --config <filename> read username and password from given config
29 file (default ~/.twitter)
30 -l --length <count> specify number of status updates shown
31 (default: 20, max: 200)
32 -t --timestamp show time before status lines
33 -d --datestamp shoe date before status lines
34 --no-ssl use HTTP instead of more secure HTTPS
35 --oauth <filename> filename to read/store oauth credentials to
36
37 FORMATS for the --format option
38
39 default one line per status
40 verbose multiple lines per status, more verbose status info
41 urls nothing but URLs
42 ansi ansi colour (rainbow mode)
43
44
45 CONFIG FILES
46
47 The config file should be placed in your home directory and be named .twitter.
48 It must contain a [twitter] header, and all the desired options you wish to
49 set, like so:
50
51 [twitter]
52 format: <desired_default_format_for_output>
53 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
54
55 OAuth authentication tokens are stored in the file .twitter_oauth in your
56 home directory.
57 """
58
59 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
60 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
61
62 import sys
63 import time
64 from getopt import gnu_getopt as getopt, GetoptError
65 from getpass import getpass
66 import re
67 import os.path
68 from ConfigParser import SafeConfigParser
69 import datetime
70 from urllib import quote
71 import webbrowser
72
73 from api import Twitter, TwitterError
74 from oauth import OAuth, write_token_file, read_token_file
75 from oauth_dance import oauth_dance
76 import ansi
77
78 OPTIONS = {
79 'action': 'friends',
80 'refresh': False,
81 'refresh_rate': 600,
82 'format': 'default',
83 'prompt': '[cyan]twitter[R]> ',
84 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
85 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
86 'length': 20,
87 'timestamp': False,
88 'datestamp': False,
89 'extra_args': [],
90 'secure': True,
91 }
92
93 def parse_args(args, options):
94 long_opts = ['help', 'format=', 'refresh', 'oauth=',
95 'refresh-rate=', 'config=', 'length=', 'timestamp',
96 'datestamp', 'no-ssl']
97 short_opts = "e:p:f:h?rR:c:l:td"
98 opts, extra_args = getopt(args, short_opts, long_opts)
99
100 for opt, arg in opts:
101 if opt in ('-f', '--format'):
102 options['format'] = arg
103 elif opt in ('-r', '--refresh'):
104 options['refresh'] = True
105 elif opt in ('-R', '--refresh-rate'):
106 options['refresh_rate'] = int(arg)
107 elif opt in ('-l', '--length'):
108 options["length"] = int(arg)
109 elif opt in ('-t', '--timestamp'):
110 options["timestamp"] = True
111 elif opt in ('-d', '--datestamp'):
112 options["datestamp"] = True
113 elif opt in ('-?', '-h', '--help'):
114 options['action'] = 'help'
115 elif opt in ('-c', '--config'):
116 options['config_filename'] = arg
117 elif opt == '--no-ssl':
118 options['secure'] = False
119 elif opt == '--oauth':
120 options['oauth_filename'] = arg
121
122 if extra_args and not ('action' in options and options['action'] == 'help'):
123 options['action'] = extra_args[0]
124 options['extra_args'] = extra_args[1:]
125
126 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
127 timestamp = options["timestamp"]
128 datestamp = options["datestamp"]
129 t = time.strptime(status['created_at'], format)
130 i_hate_timezones = time.timezone
131 if (time.daylight):
132 i_hate_timezones = time.altzone
133 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
134 seconds=i_hate_timezones)
135 t = dt.timetuple()
136 if timestamp and datestamp:
137 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
138 elif timestamp:
139 return time.strftime("%H:%M:%S ", t)
140 elif datestamp:
141 return time.strftime("%Y-%m-%d ", t)
142 return ""
143
144 class StatusFormatter(object):
145 def __call__(self, status, options):
146 return (u"%s%s %s" %(
147 get_time_string(status, options),
148 status['user']['screen_name'], status['text']))
149
150 class AnsiStatusFormatter(object):
151 def __init__(self):
152 self._colourMap = ansi.ColourMap()
153
154 def __call__(self, status, options):
155 colour = self._colourMap.colourFor(status['user']['screen_name'])
156 return (u"%s%s%s%s %s" %(
157 get_time_string(status, options),
158 ansi.cmdColour(colour), status['user']['screen_name'],
159 ansi.cmdReset(), status['text']))
160
161 class VerboseStatusFormatter(object):
162 def __call__(self, status, options):
163 return (u"-- %s (%s) on %s\n%s\n" %(
164 status['user']['screen_name'],
165 status['user']['location'],
166 status['created_at'],
167 status['text']))
168
169 class URLStatusFormatter(object):
170 urlmatch = re.compile(r'https?://\S+')
171 def __call__(self, status, options):
172 urls = self.urlmatch.findall(status['text'])
173 return u'\n'.join(urls) if urls else ""
174
175
176 class ListsFormatter(object):
177 def __call__(self, list):
178 if list['description']:
179 list_str = u"%-30s (%s)" % (list['name'], list['description'])
180 else:
181 list_str = u"%-30s" % (list['name'])
182 return u"%s\n" % list_str
183
184 class ListsVerboseFormatter(object):
185 def __call__(self, list):
186 list_str = u"%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
187 return list_str
188
189 class AnsiListsFormatter(object):
190 def __init__(self):
191 self._colourMap = ansi.ColourMap()
192
193 def __call__(self, list):
194 colour = self._colourMap.colourFor(list['name'])
195 return (u"%s%-15s%s %s" %(
196 ansi.cmdColour(colour), list['name'],
197 ansi.cmdReset(), list['description']))
198
199
200 class AdminFormatter(object):
201 def __call__(self, action, user):
202 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
203 if action == "follow":
204 return u"You are now following %s.\n" %(user_str)
205 else:
206 return u"You are no longer following %s.\n" %(user_str)
207
208 class VerboseAdminFormatter(object):
209 def __call__(self, action, user):
210 return(u"-- %s: %s (%s): %s" % (
211 "Following" if action == "follow" else "Leaving",
212 user['screen_name'],
213 user['name'],
214 user['url']))
215
216 class SearchFormatter(object):
217 def __call__(self, result, options):
218 return(u"%s%s %s" %(
219 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
220 result['from_user'], result['text']))
221
222 class VerboseSearchFormatter(SearchFormatter):
223 pass #Default to the regular one
224
225 class URLSearchFormatter(object):
226 urlmatch = re.compile(r'https?://\S+')
227 def __call__(self, result, options):
228 urls = self.urlmatch.findall(result['text'])
229 return u'\n'.join(urls) if urls else ""
230
231 class AnsiSearchFormatter(object):
232 def __init__(self):
233 self._colourMap = ansi.ColourMap()
234
235 def __call__(self, result, options):
236 colour = self._colourMap.colourFor(result['from_user'])
237 return (u"%s%s%s%s %s" %(
238 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
239 ansi.cmdColour(colour), result['from_user'],
240 ansi.cmdReset(), result['text']))
241
242 _term_encoding = None
243 def get_term_encoding():
244 global _term_encoding
245 if not _term_encoding:
246 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
247 if lang[1:]:
248 _term_encoding = lang[1]
249 else:
250 _term_encoding = 'UTF-8'
251 return _term_encoding
252
253 formatters = {}
254 status_formatters = {
255 'default': StatusFormatter,
256 'verbose': VerboseStatusFormatter,
257 'urls': URLStatusFormatter,
258 'ansi': AnsiStatusFormatter
259 }
260 formatters['status'] = status_formatters
261
262 admin_formatters = {
263 'default': AdminFormatter,
264 'verbose': VerboseAdminFormatter,
265 'urls': AdminFormatter,
266 'ansi': AdminFormatter
267 }
268 formatters['admin'] = admin_formatters
269
270 search_formatters = {
271 'default': SearchFormatter,
272 'verbose': VerboseSearchFormatter,
273 'urls': URLSearchFormatter,
274 'ansi': AnsiSearchFormatter
275 }
276 formatters['search'] = search_formatters
277
278 lists_formatters = {
279 'default': ListsFormatter,
280 'verbose': ListsVerboseFormatter,
281 'urls': None,
282 'ansi': AnsiListsFormatter
283 }
284 formatters['lists'] = lists_formatters
285
286 def get_formatter(action_type, options):
287 formatters_dict = formatters.get(action_type)
288 if (not formatters_dict):
289 raise TwitterError(
290 "There was an error finding a class of formatters for your type (%s)"
291 %(action_type))
292 f = formatters_dict.get(options['format'])
293 if (not f):
294 raise TwitterError(
295 "Unknown formatter '%s' for status actions" %(options['format']))
296 return f()
297
298 class Action(object):
299
300 def ask(self, subject='perform this action', careful=False):
301 '''
302 Requests fromt he user using `raw_input` if `subject` should be
303 performed. When `careful`, the default answer is NO, otherwise YES.
304 Returns the user answer in the form `True` or `False`.
305 '''
306 sample = '(y/N)'
307 if not careful:
308 sample = '(Y/n)'
309
310 prompt = 'You really want to %s %s? ' %(subject, sample)
311 try:
312 answer = raw_input(prompt).lower()
313 if careful:
314 return answer in ('yes', 'y')
315 else:
316 return answer not in ('no', 'n')
317 except EOFError:
318 print >>sys.stderr # Put Newline since Enter was never pressed
319 # TODO:
320 # Figure out why on OS X the raw_input keeps raising
321 # EOFError and is never able to reset and get more input
322 # Hint: Look at how IPython implements their console
323 default = True
324 if careful:
325 default = False
326 return default
327
328 def __call__(self, twitter, options):
329 action = actions.get(options['action'], NoSuchAction)()
330 try:
331 doAction = lambda : action(twitter, options)
332 if (options['refresh'] and isinstance(action, StatusAction)):
333 while True:
334 doAction()
335 time.sleep(options['refresh_rate'])
336 else:
337 doAction()
338 except KeyboardInterrupt:
339 print >>sys.stderr, '\n[Keyboard Interrupt]'
340 pass
341
342 class NoSuchActionError(Exception):
343 pass
344
345 class NoSuchAction(Action):
346 def __call__(self, twitter, options):
347 raise NoSuchActionError("No such action: %s" %(options['action']))
348
349 def printNicely(string):
350 if sys.stdout.encoding:
351 print string.encode(sys.stdout.encoding, 'replace')
352 else:
353 print string.encode('utf-8')
354
355 class StatusAction(Action):
356 def __call__(self, twitter, options):
357 statuses = self.getStatuses(twitter, options)
358 sf = get_formatter('status', options)
359 for status in statuses:
360 statusStr = sf(status, options)
361 if statusStr.strip():
362 printNicely(statusStr)
363
364 class SearchAction(Action):
365 def __call__(self, twitter, options):
366 # We need to be pointing at search.twitter.com to work, and it is less
367 # tangly to do it here than in the main()
368 twitter.domain="search.twitter.com"
369 twitter.uriparts=()
370 # We need to bypass the TwitterCall parameter encoding, so we
371 # don't encode the plus sign, so we have to encode it ourselves
372 query_string = "+".join(
373 [quote(term.decode(get_term_encoding()))
374 for term in options['extra_args']])
375
376 results = twitter.search(q=query_string)['results']
377 f = get_formatter('search', options)
378 for result in results:
379 resultStr = f(result, options)
380 if resultStr.strip():
381 printNicely(resultStr)
382
383 class AdminAction(Action):
384 def __call__(self, twitter, options):
385 if not (options['extra_args'] and options['extra_args'][0]):
386 raise TwitterError("You need to specify a user (screen name)")
387 af = get_formatter('admin', options)
388 try:
389 user = self.getUser(twitter, options['extra_args'][0])
390 except TwitterError, e:
391 print "There was a problem following or leaving the specified user."
392 print "You may be trying to follow a user you are already following;"
393 print "Leaving a user you are not currently following;"
394 print "Or the user may not exist."
395 print "Sorry."
396 print
397 print e
398 else:
399 printNicely(af(options['action'], user))
400
401 class ListsAction(StatusAction):
402 def getStatuses(self, twitter, options):
403 if not options['extra_args']:
404 raise TwitterError("Please provide a user to query for lists")
405
406 screen_name = options['extra_args'][0]
407
408 if not options['extra_args'][1:]:
409 lists = twitter.user.lists(user=screen_name)['lists']
410 if not lists:
411 printNicely("This user has no lists.")
412 for list in lists:
413 lf = get_formatter('lists', options)
414 printNicely(lf(list))
415 return []
416 else:
417 return reversed(twitter.user.lists.list.statuses(
418 user=screen_name, list=options['extra_args'][1]))
419
420
421 class MyListsAction(ListsAction):
422 def getStatuses(self, twitter, options):
423 screen_name = twitter.account.verify_credentials()['screen_name']
424 options['extra_args'].insert(0, screen_name)
425 return ListsAction.getStatuses(self, twitter, options)
426
427
428 class FriendsAction(StatusAction):
429 def getStatuses(self, twitter, options):
430 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
431
432 class PublicAction(StatusAction):
433 def getStatuses(self, twitter, options):
434 return reversed(twitter.statuses.public_timeline(count=options["length"]))
435
436 class RepliesAction(StatusAction):
437 def getStatuses(self, twitter, options):
438 return reversed(twitter.statuses.replies(count=options["length"]))
439
440 class FollowAction(AdminAction):
441 def getUser(self, twitter, user):
442 return twitter.friendships.create(id=user)
443
444 class LeaveAction(AdminAction):
445 def getUser(self, twitter, user):
446 return twitter.friendships.destroy(id=user)
447
448 class SetStatusAction(Action):
449 def __call__(self, twitter, options):
450 statusTxt = (" ".join(options['extra_args']).decode(get_term_encoding())
451 if options['extra_args']
452 else unicode(raw_input("message: ")))
453 status = (statusTxt.encode('utf8', 'replace'))
454 twitter.statuses.update(status=status)
455
456 class TwitterShell(Action):
457
458 def render_prompt(self, prompt):
459 '''Parses the `prompt` string and returns the rendered version'''
460 prompt = prompt.strip("'").replace("\\'","'")
461 for colour in ansi.COLOURS_NAMED:
462 if '[%s]' %(colour) in prompt:
463 prompt = prompt.replace(
464 '[%s]' %(colour), ansi.cmdColourNamed(colour))
465 prompt = prompt.replace('[R]', ansi.cmdReset())
466 return prompt
467
468 def __call__(self, twitter, options):
469 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
470 while True:
471 options['action'] = ""
472 try:
473 args = raw_input(prompt).split()
474 parse_args(args, options)
475 if not options['action']:
476 continue
477 elif options['action'] == 'exit':
478 raise SystemExit(0)
479 elif options['action'] == 'shell':
480 print >>sys.stderr, 'Sorry Xzibit does not work here!'
481 continue
482 elif options['action'] == 'help':
483 print >>sys.stderr, '''\ntwitter> `action`\n
484 The Shell Accepts all the command line actions along with:
485
486 exit Leave the twitter shell (^D may also be used)
487
488 Full CMD Line help is appended below for your convinience.'''
489 Action()(twitter, options)
490 options['action'] = ''
491 except NoSuchActionError, e:
492 print >>sys.stderr, e
493 except KeyboardInterrupt:
494 print >>sys.stderr, '\n[Keyboard Interrupt]'
495 except EOFError:
496 print >>sys.stderr
497 leaving = self.ask(subject='Leave')
498 if not leaving:
499 print >>sys.stderr, 'Excellent!'
500 else:
501 raise SystemExit(0)
502
503 class HelpAction(Action):
504 def __call__(self, twitter, options):
505 print __doc__
506
507 class DoNothingAction(Action):
508 def __call__(self, twitter, options):
509 pass
510
511 actions = {
512 'authorize' : DoNothingAction,
513 'follow' : FollowAction,
514 'friends' : FriendsAction,
515 'list' : ListsAction,
516 'mylist' : MyListsAction,
517 'help' : HelpAction,
518 'leave' : LeaveAction,
519 'public' : PublicAction,
520 'replies' : RepliesAction,
521 'search' : SearchAction,
522 'set' : SetStatusAction,
523 'shell' : TwitterShell,
524 }
525
526 def loadConfig(filename):
527 options = dict(OPTIONS)
528 if os.path.exists(filename):
529 cp = SafeConfigParser()
530 cp.read([filename])
531 for option in ('format', 'prompt'):
532 if cp.has_option('twitter', option):
533 options[option] = cp.get('twitter', option)
534 return options
535
536 def main(args=sys.argv[1:]):
537 arg_options = {}
538 try:
539 parse_args(args, arg_options)
540 except GetoptError, e:
541 print >> sys.stderr, "I can't do that, %s." %(e)
542 print >> sys.stderr
543 raise SystemExit(1)
544
545 config_path = os.path.expanduser(
546 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
547 config_options = loadConfig(config_path)
548
549 # Apply the various options in order, the most important applied last.
550 # Defaults first, then what's read from config file, then command-line
551 # arguments.
552 options = dict(OPTIONS)
553 for d in config_options, arg_options:
554 for k,v in d.items():
555 if v: options[k] = v
556
557 if options['refresh'] and options['action'] not in (
558 'friends', 'public', 'replies'):
559 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
560 print >> sys.stderr, "Use 'twitter -h' for help."
561 return 1
562
563 oauth_filename = os.path.expanduser(options['oauth_filename'])
564
565 if (options['action'] == 'authorize'
566 or not os.path.exists(oauth_filename)):
567 oauth_dance(
568 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
569 options['oauth_filename'])
570
571 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
572
573 twitter = Twitter(
574 auth=OAuth(
575 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
576 secure=options['secure'],
577 api_version='1',
578 domain='api.twitter.com')
579
580 try:
581 Action()(twitter, options)
582 except NoSuchActionError, e:
583 print >>sys.stderr, e
584 raise SystemExit(1)
585 except TwitterError, e:
586 print >> sys.stderr, str(e)
587 print >> sys.stderr, "Use 'twitter -h' for help."
588 raise SystemExit(1)