+ def __call__(self, status, options):
+ return (u"%s%s %s" %(
+ get_time_string(status, options),
+ status['user']['screen_name'], status['text']))
+
+class AnsiStatusFormatter(object):
+ def __init__(self):
+ self._colourMap = ansi.ColourMap()
+
+ def __call__(self, status, options):
+ colour = self._colourMap.colourFor(status['user']['screen_name'])
+ return (u"%s%s%s%s %s" %(
+ get_time_string(status, options),
+ ansi.cmdColour(colour), status['user']['screen_name'],
+ ansi.cmdReset(), status['text']))
+
+class VerboseStatusFormatter(object):
+ def __call__(self, status, options):
+ return (u"-- %s (%s) on %s\n%s\n" %(
+ status['user']['screen_name'],
+ status['user']['location'],
+ status['created_at'],
+ status['text']))
+
+class URLStatusFormatter(object):
+ urlmatch = re.compile(r'https?://\S+')
+ def __call__(self, status, options):
+ urls = self.urlmatch.findall(status['text'])
+ return u'\n'.join(urls) if urls else ""
+
+class AdminFormatter(object):
+ def __call__(self, action, user):
+ user_str = u"%s (%s)" %(user['screen_name'], user['name'])
+ if action == "follow":
+ return u"You are now following %s.\n" %(user_str)
+ else:
+ return u"You are no longer following %s.\n" %(user_str)
+
+class VerboseAdminFormatter(object):
+ def __call__(self, action, user):
+ return(u"-- %s: %s (%s): %s" % (
+ "Following" if action == "follow" else "Leaving",
+ user['screen_name'],
+ user['name'],
+ user['url']))
+
+status_formatters = {
+ 'default': StatusFormatter,
+ 'verbose': VerboseStatusFormatter,
+ 'urls': URLStatusFormatter,
+ 'ansi': AnsiStatusFormatter
+}
+
+admin_formatters = {
+ 'default': AdminFormatter,
+ 'verbose': VerboseAdminFormatter,
+ 'urls': AdminFormatter,
+ 'ansi': AdminFormatter
+}
+
+def get_status_formatter(options):
+ sf = status_formatters.get(options['format'])
+ if (not sf):
+ raise TwitterError(
+ "Unknown formatter '%s'" %(options['format']))
+ return sf()
+
+def get_admin_formatter(options):
+ sf = admin_formatters.get(options['format'])
+ if (not sf):
+ raise TwitterError(
+ "Unknown formatter '%s'" %(options['format']))
+ return sf()
+
+class Action(object):
+
+ def ask(self, subject='perform this action', careful=False):
+ '''
+ Requests fromt he user using `raw_input` if `subject` should be
+ performed. When `careful`, the default answer is NO, otherwise YES.
+ Returns the user answer in the form `True` or `False`.
+ '''
+ sample = '(y/N)'
+ if not careful:
+ sample = '(Y/n)'
+
+ prompt = 'You really want to %s %s? ' %(subject, sample)
+ try:
+ answer = raw_input(prompt).lower()
+ if careful:
+ return answer in ('yes', 'y')
+ else:
+ return answer not in ('no', 'n')
+ except EOFError:
+ print >>sys.stderr # Put Newline since Enter was never pressed
+ # TODO:
+ # Figure out why on OS X the raw_input keeps raising
+ # EOFError and is never able to reset and get more input
+ # Hint: Look at how IPython implements their console
+ default = True
+ if careful:
+ default = False
+ return default
+
+ def __call__(self, twitter, options):
+ action = actions.get(options['action'], NoSuchAction)()
+ try:
+ doAction = lambda : action(twitter, options)
+ if (options['refresh'] and isinstance(action, StatusAction)):
+ while True:
+ doAction()
+ time.sleep(options['refresh_rate'])
+ else:
+ doAction()
+ except KeyboardInterrupt:
+ print >>sys.stderr, '\n[Keyboard Interrupt]'
+ pass
+
+class NoSuchActionError(Exception):
+ pass
+
+class NoSuchAction(Action):
+ def __call__(self, twitter, options):
+ raise NoSuchActionError("No such action: %s" %(options['action']))
+
+def printNicely(string):
+ if sys.stdout.encoding:
+ print string.encode(sys.stdout.encoding, 'replace')
+ else:
+ print string.encode('utf-8')
+
+class StatusAction(Action):
+ def __call__(self, twitter, options):
+ statuses = self.getStatuses(twitter, options)
+ sf = get_status_formatter(options)
+ for status in statuses:
+ statusStr = sf(status, options)
+ if statusStr.strip():
+ printNicely(statusStr)
+
+class AdminAction(Action):
+ def __call__(self, twitter, options):
+ if not (options['extra_args'] and options['extra_args'][0]):
+ raise TwitterError("You need to specify a user (screen name)")
+ af = get_admin_formatter(options)
+ try:
+ user = self.getUser(twitter, options['extra_args'][0])
+ except TwitterError, e:
+ print "There was a problem following or leaving the specified user."
+ print "You may be trying to follow a user you are already following;"
+ print "Leaving a user you are not currently following;"
+ print "Or the user may not exist."
+ print "Sorry."
+ print
+ print e
+ else:
+ printNicely(af(options['action'], user))
+
+class FriendsAction(StatusAction):
+ def getStatuses(self, twitter, options):
+ return reversed(twitter.statuses.friends_timeline(count=options["length"]))
+
+class PublicAction(StatusAction):
+ def getStatuses(self, twitter, options):
+ return reversed(twitter.statuses.public_timeline(count=options["length"]))
+
+class RepliesAction(StatusAction):
+ def getStatuses(self, twitter, options):
+ return reversed(twitter.statuses.replies(count=options["length"]))
+
+class FollowAction(AdminAction):
+ def getUser(self, twitter, user):
+ return twitter.friendships.create(id=user)
+
+class LeaveAction(AdminAction):
+ def getUser(self, twitter, user):
+ return twitter.friendships.destroy(id=user)
+
+class SetStatusAction(Action):
+ def __call__(self, twitter, options):
+ statusTxt = (u" ".join(options['extra_args'])
+ if options['extra_args']
+ else unicode(raw_input("message: ")))
+ status = (statusTxt.encode('utf8', 'replace'))
+ twitter.statuses.update(status=status)
+
+class TwitterShell(Action):
+
+ def render_prompt(self, prompt):
+ '''Parses the `prompt` string and returns the rendered version'''
+ prompt = prompt.strip("'").replace("\\'","'")
+ for colour in ansi.COLOURS_NAMED:
+ if '[%s]' %(colour) in prompt:
+ prompt = prompt.replace(
+ '[%s]' %(colour), ansi.cmdColourNamed(colour))
+ prompt = prompt.replace('[R]', ansi.cmdReset())
+ return prompt