set set your twitter status
shell login to the twitter shell
rate get your current rate limit status (remaining API reqs)
-
+ repl begin a Read-Eval-Print-Loop with a configured twitter
+ object
OPTIONS:
CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
+import code
from getopt import gnu_getopt as getopt, GetoptError
-from getpass import getpass
import json
import locale
import os.path
import re
-import string
import sys
import time
except ImportError:
import html.parser as HTMLParser
-import webbrowser
-
from .api import Twitter, TwitterError
-from .oauth import OAuth, write_token_file, read_token_file
+from .oauth import OAuth, read_token_file
from .oauth_dance import oauth_dance
from . import ansi
from .util import smrt_input, printNicely, align_text
'refresh_rate': 600,
'format': 'default',
'prompt': '[cyan]twitter[R]> ',
- 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
- 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
+ 'config_filename': os.environ.get('HOME',
+ os.environ.get('USERPROFILE', ''))
+ + os.sep + '.twitter',
+ 'oauth_filename': os.environ.get('HOME',
+ os.environ.get('USERPROFILE', ''))
+ + os.sep + '.twitter_oauth',
'length': 20,
'timestamp': False,
'datestamp': False,
profileRe = re.compile(r'(?P<profile>\@\S+)')
ansiFormatter = ansi.AnsiCmd(False)
+
def parse_args(args, options):
long_opts = ['help', 'format=', 'refresh', 'oauth=',
'refresh-rate=', 'config=', 'length=', 'timestamp',
options['action'] = extra_args[0]
options['extra_args'] = extra_args[1:]
+
def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
timestamp = options["timestamp"]
datestamp = options["datestamp"]
t = time.strptime(status['created_at'], format)
i_hate_timezones = time.timezone
- if (time.daylight):
+ if time.daylight:
i_hate_timezones = time.altzone
dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
seconds=i_hate_timezones)
return time.strftime("%Y-%m-%d ", t)
return ""
+
def reRepl(m):
ansiTypes = {
- 'clear': ansiFormatter.cmdReset(),
- 'hashtag': ansiFormatter.cmdBold(),
- 'profile': ansiFormatter.cmdUnderline(),
- }
+ 'clear': ansiFormatter.cmdReset(),
+ 'hashtag': ansiFormatter.cmdBold(),
+ 'profile': ansiFormatter.cmdUnderline(),
+ }
s = None
try:
pass
return s
+
def replaceInStatus(status):
txt = gHtmlParser.unescape(status)
txt = re.sub(hashtagRe, reRepl, txt)
txt = re.sub(profileRe, reRepl, txt)
return txt
+
+
def correctRTStatus(status):
- if('retweeted_status' in status):
- return "RT " + status['retweeted_status']['user']['screen_name'] + " " + status['retweeted_status']['text']
+ if 'retweeted_status' in status:
+ return ("RT @" + status['retweeted_status']['user']['screen_name']
+ + " " + status['retweeted_status']['text'])
else:
return status['text']
+
class StatusFormatter(object):
def __call__(self, status, options):
- return ("%s%s %s" % (
+ return ("%s@%s %s" % (
get_time_string(status, options),
- status['user']['screen_name'], gHtmlParser.unescape(correctRTStatus(status))))
+ status['user']['screen_name'],
+ gHtmlParser.unescape(correctRTStatus(status))))
+
class AnsiStatusFormatter(object):
def __init__(self):
return ("%s%s% 16s%s %s " % (
get_time_string(status, options),
ansiFormatter.cmdColour(colour), status['user']['screen_name'],
- ansiFormatter.cmdReset(), align_text(replaceInStatus(correctRTStatus(status)))))
+ ansiFormatter.cmdReset(),
+ align_text(replaceInStatus(correctRTStatus(status)))))
+
class VerboseStatusFormatter(object):
def __call__(self, status, options):
status['created_at'],
gHtmlParser.unescape(correctRTStatus(status))))
+
class JSONStatusFormatter(object):
def __call__(self, status, options):
- status['text'] = gHtmlParser.unescape(correctRTStatus(status))
- return json.dumps(status)
+ status['text'] = gHtmlParser.unescape(status['text'])
+ return json.dumps(status)
+
class URLStatusFormatter(object):
urlmatch = re.compile(r'https?://\S+')
+
def __call__(self, status, options):
urls = self.urlmatch.findall(correctRTStatus(status))
return '\n'.join(urls) if urls else ""
list_str = "%-30s" % (list['name'])
return "%s\n" % list_str
+
class ListsVerboseFormatter(object):
def __call__(self, list):
- list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
+ list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (
+ list['name'], list['description'],
+ list['member_count'], list['mode'])
return list_str
+
class AnsiListsFormatter(object):
def __init__(self):
self._colourMap = ansi.ColourMap()
else:
return "You are no longer following %s.\n" % (user_str)
+
class VerboseAdminFormatter(object):
def __call__(self, action, user):
return("-- %s: %s (%s): %s" % (
user['name'],
user['url']))
+
class SearchFormatter(object):
def __call__(self, result, options):
return("%s%s %s" % (
get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
result['from_user'], result['text']))
+
class VerboseSearchFormatter(SearchFormatter):
pass # Default to the regular one
+
class URLSearchFormatter(object):
urlmatch = re.compile(r'https?://\S+')
+
def __call__(self, result, options):
urls = self.urlmatch.findall(result['text'])
return '\n'.join(urls) if urls else ""
+
class AnsiSearchFormatter(object):
def __init__(self):
self._colourMap = ansi.ColourMap()
ansiFormatter.cmdReset(), result['text']))
_term_encoding = None
+
+
def get_term_encoding():
global _term_encoding
if not _term_encoding:
}
formatters['lists'] = lists_formatters
+
def get_formatter(action_type, options):
formatters_dict = formatters.get(action_type)
- if (not formatters_dict):
+ if not formatters_dict:
raise TwitterError(
"There was an error finding a class of formatters for your type (%s)"
% (action_type))
f = formatters_dict.get(options['format'])
- if (not f):
+ if not f:
raise TwitterError(
"Unknown formatter '%s' for status actions" % (options['format']))
return f()
+
class Action(object):
def ask(self, subject='perform this action', careful=False):
except EOFError:
print(file=sys.stderr) # Put Newline since Enter was never pressed
# TODO:
- # Figure out why on OS X the raw_input keeps raising
- # EOFError and is never able to reset and get more input
- # Hint: Look at how IPython implements their console
+ # Figure out why on OS X the raw_input keeps raising
+ # EOFError and is never able to reset and get more input
+ # Hint: Look at how IPython implements their console
default = True
if careful:
default = False
def __call__(self, twitter, options):
action = actions.get(options['action'], NoSuchAction)()
try:
- doAction = lambda : action(twitter, options)
- if (options['refresh'] and isinstance(action, StatusAction)):
+ doAction = lambda: action(twitter, options)
+ if options['refresh'] and isinstance(action, StatusAction):
while True:
doAction()
sys.stdout.flush()
print('\n[Keyboard Interrupt]', file=sys.stderr)
pass
+
class NoSuchActionError(Exception):
pass
+
class NoSuchAction(Action):
def __call__(self, twitter, options):
raise NoSuchActionError("No such action: %s" % (options['action']))
+
class StatusAction(Action):
def __call__(self, twitter, options):
statuses = self.getStatuses(twitter, options)
sf = get_formatter('status', options)
- if(options['format'] == "json"):
- printNicely("[")
- for status in statuses[:-1]:
- statusStr = sf(status, options)
- if statusStr.strip():
- printNicely(statusStr+",")
- printNicely(sf(statuses[-1], options)+"]")
- else:
- for status in statuses:
- statusStr = sf(status, options)
- if statusStr.strip():
- printNicely(statusStr)
+ for status in statuses:
+ statusStr = sf(status, options)
+ if statusStr.strip():
+ printNicely(statusStr)
+
class SearchAction(Action):
def __call__(self, twitter, options):
if resultStr.strip():
printNicely(resultStr)
+
class AdminAction(Action):
def __call__(self, twitter, options):
if not (options['extra_args'] and options['extra_args'][0]):
else:
printNicely(af(options['action'], user))
+
class ListsAction(StatusAction):
def getStatuses(self, twitter, options):
if not options['extra_args']:
return []
else:
return list(reversed(twitter.lists.statuses(
- owner_screen_name=screen_name, slug=options['extra_args'][1])))
+ count=options['length'],
+ owner_screen_name=screen_name,
+ slug=options['extra_args'][1])))
class MyListsAction(ListsAction):
class FriendsAction(StatusAction):
def getStatuses(self, twitter, options):
- return list(reversed(twitter.statuses.home_timeline(count=options["length"])))
+ return list(reversed(
+ twitter.statuses.home_timeline(count=options["length"])))
+
class RepliesAction(StatusAction):
def getStatuses(self, twitter, options):
- return list(reversed(twitter.statuses.mentions_timeline(count=options["length"])))
+ return list(reversed(
+ twitter.statuses.mentions_timeline(count=options["length"])))
+
class FollowAction(AdminAction):
def getUser(self, twitter, user):
return twitter.friendships.create(screen_name=user)
+
class LeaveAction(AdminAction):
def getUser(self, twitter, user):
return twitter.friendships.destroy(screen_name=user)
+
class SetStatusAction(Action):
def __call__(self, twitter, options):
statusTxt = (" ".join(options['extra_args'])
while statusTxt:
limit = 140 - len(replies)
if len(statusTxt) > limit:
- end = string.rfind(statusTxt, ' ', 0, limit)
+ end = str.rfind(statusTxt, ' ', 0, limit)
else:
end = limit
splitted.append(" ".join((replies, statusTxt[:end])))
for status in splitted:
twitter.statuses.update(status=status)
+
class TwitterShell(Action):
def render_prompt(self, prompt):
continue
elif options['action'] == 'help':
print('''\ntwitter> `action`\n
- The Shell Accepts all the command line actions along with:
+ The Shell accepts all the command line actions along with:
exit Leave the twitter shell (^D may also be used)
- Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
+ Full CMD Line help is appended below for your convenience.''',
+ file=sys.stderr)
Action()(twitter, options)
options['action'] = ''
except NoSuchActionError as e:
else:
raise SystemExit(0)
+
class PythonPromptAction(Action):
def __call__(self, twitter, options):
try:
except EOFError:
pass
+
class HelpAction(Action):
def __call__(self, twitter, options):
print(__doc__)
+
class DoNothingAction(Action):
def __call__(self, twitter, options):
pass
+
class RateLimitStatus(Action):
def __call__(self, twitter, options):
rate = twitter.application.rate_limit_status()
- print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
- print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
- time.asctime(time.localtime(rate['reset_time_in_seconds']))))
+ resources = rate['resources']
+ for resource in resources:
+ for method in resources[resource]:
+ limit = resources[resource][method]['limit']
+ remaining = resources[resource][method]['remaining']
+ reset = resources[resource][method]['reset']
+
+ print("Remaining API requests for %s: %s / %s" %
+ (method, remaining, limit))
+ print("Next reset in %ss (%s)\n" % (int(reset - time.time()),
+ time.asctime(time.localtime(reset))))
+
+
+class ReplAction(Action):
+ def __call__(self, twitter, options):
+ upload = Twitter(
+ auth=twitter.auth,
+ domain="upload.twitter.com")
+ printNicely(
+ "\nUse the 'twitter' object to interact with"
+ " the Twitter REST API.\n"
+ "Use twitter_upload to interact with "
+ "upload.twitter.com\n\n")
+ code.interact(local={
+ "twitter": twitter,
+ "t": twitter,
+ "twitter_upload": upload,
+ "u": upload
+ })
+
actions = {
'authorize' : DoNothingAction,
'set' : SetStatusAction,
'shell' : TwitterShell,
'rate' : RateLimitStatus,
+ 'repl' : ReplAction,
}
+
def loadConfig(filename):
options = dict(OPTIONS)
if os.path.exists(filename):
options[option] = cp.getboolean('twitter', option)
return options
+
def main(args=sys.argv[1:]):
arg_options = {}
try:
options = dict(OPTIONS)
for d in config_options, arg_options:
for k, v in list(d.items()):
- if v: options[k] = v
+ if v:
+ options[k] = v
- if options['refresh'] and options['action'] not in (
- 'friends', 'replies'):
- print("You can only refresh the friends or replies actions.", file=sys.stderr)
+ if options['refresh'] and options['action'] not in ('friends', 'replies'):
+ print("You can only refresh the friends or replies actions.",
+ file=sys.stderr)
print("Use 'twitter -h' for help.", file=sys.stderr)
return 1
oauth_filename = os.path.expanduser(options['oauth_filename'])
- if (options['action'] == 'authorize'
- or not os.path.exists(oauth_filename)):
+ if options['action'] == 'authorize' or not os.path.exists(oauth_filename):
oauth_dance(
"the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
options['oauth_filename'])