+class VerboseStatusFormatter(object):
+ def __call__(self, status):
+ return (u"-- %s (%s) on %s\n%s\n" %(
+ status['user']['screen_name'],
+ status['user']['location'],
+ status['created_at'],
+ status['text']))
+
+class URLStatusFormatter(object):
+ urlmatch = re.compile(r'https?://\S+')
+ def __call__(self, status):
+ urls = self.urlmatch.findall(status['text'])
+ return u'\n'.join(urls) if urls else ""
+
+class AdminFormatter(object):
+ def __call__(self, action, user):
+ user_str = u"%s (%s)" %(user['screen_name'], user['name'])
+ if action == "follow":
+ return u"You are now following %s.\n" %(user_str)
+ else:
+ return u"You are no longer following %s.\n" %(user_str)
+
+class VerboseAdminFormatter(object):
+ def __call__(self, action, user):
+ return(u"-- %s: %s (%s): %s" % (
+ "Following" if action == "follow" else "Leaving",
+ user['screen_name'],
+ user['name'],
+ user['url']))
+
+class URLAdminFormatter(object):
+ def __call__(self, action, user):
+ return("Admin actions do not support the URL formatter")
+
+status_formatters = {
+ 'default': StatusFormatter,
+ 'verbose': VerboseStatusFormatter,
+ 'urls': URLStatusFormatter
+}
+
+admin_formatters = {
+ 'default': AdminFormatter,
+ 'verbose': VerboseAdminFormatter,
+ 'urls': URLAdminFormatter
+}
+
+def get_status_formatter(options):
+ sf = status_formatters.get(options['format'])
+ if (not sf):
+ raise TwitterError(
+ "Unknown formatter '%s'" %(options['format']))
+ return sf()
+
+def get_admin_formatter(options):
+ sf = admin_formatters.get(options['format'])
+ if (not sf):
+ raise TwitterError(
+ "Unknown formatter '%s'" %(options['format']))
+ return sf()
+
+class Action(object):
+ pass
+
+class NoSuchAction(Action):
+ def __call__(self, twitter, options):
+ print >> sys.stderr, "No such action: ", options['action']
+ sys.exit(1)
+
+class StatusAction(Action):
+ def __call__(self, twitter, options):
+ statuses = self.getStatuses(twitter)
+ sf = get_status_formatter(options)
+ for status in statuses:
+ statusStr = sf(status)
+ if statusStr.strip():
+ print statusStr.encode(sys.stdout.encoding, 'replace')
+
+class AdminAction(Action):
+ def __call__(self, twitter, options):
+ if not options['extra_args'][0]:
+ raise TwitterError("You need to specify a user (screen name)")
+ af = get_admin_formatter(options)
+ try:
+ user = self.getUser(twitter, options['extra_args'][0])
+ except TwitterError, e:
+ print "There was a problem following or leaving the specified user."
+ print " You may be trying to follow a user you are already following;"
+ print " Leaving a user you are not currently following;"
+ print " Or the user may not exist."
+ print " Sorry."
+ print
+ print e
+ else:
+ print af(options['action'], user).encode(sys.stdout.encoding, 'replace')
+
+class FriendsAction(StatusAction):
+ def getStatuses(self, twitter):
+ return reversed(twitter.statuses.friends_timeline())
+
+class PublicAction(StatusAction):
+ def getStatuses(self, twitter):
+ return reversed(twitter.statuses.public_timeline())
+
+class RepliesAction(StatusAction):
+ def getStatuses(self, twitter):
+ return reversed(twitter.statuses.replies())
+
+class FollowAction(AdminAction):
+ def getUser(self, twitter, user):
+ return twitter.notifications.follow(id=user)
+
+class LeaveAction(AdminAction):
+ def getUser(self, twitter, user):
+ return twitter.notifications.leave(id=user)
+
+class SetStatusAction(Action):
+ def __call__(self, twitter, options):
+ statusTxt = (u" ".join(options['extra_args'])
+ if options['extra_args']
+ else unicode(raw_input("message: ")))
+ status = (statusTxt.encode('utf8', 'replace'))
+ twitter.statuses.update(status=status)
+
+class HelpAction(Action):
+ def __call__(self, twitter, options):
+ print __doc__