tweets from that list
mylist get list of your lists; give a list name to get tweets
from that list
- public get latest public tweets
pyprompt start a Python prompt for interacting with the twitter
object directly
replies get latest replies to you
search search twitter (Beware: octothorpe, escape it)
set set your twitter status
shell login to the twitter shell
-
+ rate get your current rate limit status (remaining API reqs)
+ repl begin a Read-Eval-Print-Loop with a configured twitter
+ object
OPTIONS:
default one line per status
verbose multiple lines per status, more verbose status info
+ json raw json data from the api on each line
urls nothing but URLs
ansi ansi colour (rainbow mode)
from __future__ import print_function
-CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
-CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
+try:
+ input = __builtins__.raw_input
+except (AttributeError, KeyError):
+ pass
-import sys
-import time
+
+CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
+CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
+
+import code
from getopt import gnu_getopt as getopt, GetoptError
-from getpass import getpass
-import re
+import json
+import locale
import os.path
+import re
+import sys
+import time
+
try:
from ConfigParser import SafeConfigParser
except ImportError:
from urllib.parse import quote
except ImportError:
from urllib2 import quote
-import webbrowser
+try:
+ import HTMLParser
+except ImportError:
+ import html.parser as HTMLParser
from .api import Twitter, TwitterError
-from .oauth import OAuth, write_token_file, read_token_file
+from .oauth import OAuth, read_token_file
from .oauth_dance import oauth_dance
from . import ansi
-from .util import smrt_input, printNicely
+from .util import smrt_input, printNicely, align_text
OPTIONS = {
'action': 'friends',
'refresh_rate': 600,
'format': 'default',
'prompt': '[cyan]twitter[R]> ',
- 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
- 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
+ 'config_filename': os.environ.get('HOME',
+ os.environ.get('USERPROFILE', ''))
+ + os.sep + '.twitter',
+ 'oauth_filename': os.environ.get('HOME',
+ os.environ.get('USERPROFILE', ''))
+ + os.sep + '.twitter_oauth',
'length': 20,
'timestamp': False,
'datestamp': False,
'extra_args': [],
'secure': True,
+ 'invert_split': False,
+ 'force-ansi': False,
}
+gHtmlParser = HTMLParser.HTMLParser()
+hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
+profileRe = re.compile(r'(?P<profile>\@\S+)')
+ansiFormatter = ansi.AnsiCmd(False)
+
+
def parse_args(args, options):
long_opts = ['help', 'format=', 'refresh', 'oauth=',
'refresh-rate=', 'config=', 'length=', 'timestamp',
- 'datestamp', 'no-ssl']
+ 'datestamp', 'no-ssl', 'force-ansi']
short_opts = "e:p:f:h?rR:c:l:td"
opts, extra_args = getopt(args, short_opts, long_opts)
+ if extra_args and hasattr(extra_args[0], 'decode'):
+ extra_args = [arg.decode(locale.getpreferredencoding())
+ for arg in extra_args]
for opt, arg in opts:
if opt in ('-f', '--format'):
options['secure'] = False
elif opt == '--oauth':
options['oauth_filename'] = arg
+ elif opt == '--force-ansi':
+ options['force-ansi'] = True
if extra_args and not ('action' in options and options['action'] == 'help'):
options['action'] = extra_args[0]
options['extra_args'] = extra_args[1:]
+
def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
timestamp = options["timestamp"]
datestamp = options["datestamp"]
t = time.strptime(status['created_at'], format)
i_hate_timezones = time.timezone
- if (time.daylight):
+ if time.daylight:
i_hate_timezones = time.altzone
dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
seconds=i_hate_timezones)
return time.strftime("%Y-%m-%d ", t)
return ""
+
+def reRepl(m):
+ ansiTypes = {
+ 'clear': ansiFormatter.cmdReset(),
+ 'hashtag': ansiFormatter.cmdBold(),
+ 'profile': ansiFormatter.cmdUnderline(),
+ }
+
+ s = None
+ try:
+ mkey = m.lastgroup
+ if m.group(mkey):
+ s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
+ except IndexError:
+ pass
+ return s
+
+
+def replaceInStatus(status):
+ txt = gHtmlParser.unescape(status)
+ txt = re.sub(hashtagRe, reRepl, txt)
+ txt = re.sub(profileRe, reRepl, txt)
+ return txt
+
+
+def correctRTStatus(status):
+ if 'retweeted_status' in status:
+ return ("RT @" + status['retweeted_status']['user']['screen_name']
+ + " " + status['retweeted_status']['text'])
+ else:
+ return status['text']
+
+
class StatusFormatter(object):
def __call__(self, status, options):
- return ("%s%s %s" %(
+ return ("%s@%s %s" % (
get_time_string(status, options),
- status['user']['screen_name'], status['text']))
+ status['user']['screen_name'],
+ gHtmlParser.unescape(correctRTStatus(status))))
+
class AnsiStatusFormatter(object):
def __init__(self):
def __call__(self, status, options):
colour = self._colourMap.colourFor(status['user']['screen_name'])
- return ("%s%s%s%s %s" %(
+ return ("%s%s% 16s%s %s " % (
get_time_string(status, options),
- ansi.cmdColour(colour), status['user']['screen_name'],
- ansi.cmdReset(), status['text']))
+ ansiFormatter.cmdColour(colour), status['user']['screen_name'],
+ ansiFormatter.cmdReset(),
+ align_text(replaceInStatus(correctRTStatus(status)))))
+
class VerboseStatusFormatter(object):
def __call__(self, status, options):
- return ("-- %s (%s) on %s\n%s\n" %(
+ return ("-- %s (%s) on %s\n%s\n" % (
status['user']['screen_name'],
status['user']['location'],
status['created_at'],
- status['text']))
+ gHtmlParser.unescape(correctRTStatus(status))))
+
+
+class JSONStatusFormatter(object):
+ def __call__(self, status, options):
+ status['text'] = gHtmlParser.unescape(status['text'])
+ return json.dumps(status)
+
class URLStatusFormatter(object):
urlmatch = re.compile(r'https?://\S+')
+
def __call__(self, status, options):
- urls = self.urlmatch.findall(status['text'])
+ urls = self.urlmatch.findall(correctRTStatus(status))
return '\n'.join(urls) if urls else ""
list_str = "%-30s" % (list['name'])
return "%s\n" % list_str
+
class ListsVerboseFormatter(object):
def __call__(self, list):
- list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
+ list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (
+ list['name'], list['description'],
+ list['member_count'], list['mode'])
return list_str
+
class AnsiListsFormatter(object):
def __init__(self):
self._colourMap = ansi.ColourMap()
def __call__(self, list):
colour = self._colourMap.colourFor(list['name'])
- return ("%s%-15s%s %s" %(
- ansi.cmdColour(colour), list['name'],
- ansi.cmdReset(), list['description']))
+ return ("%s%-15s%s %s" % (
+ ansiFormatter.cmdColour(colour), list['name'],
+ ansiFormatter.cmdReset(), list['description']))
class AdminFormatter(object):
def __call__(self, action, user):
- user_str = "%s (%s)" %(user['screen_name'], user['name'])
+ user_str = "%s (%s)" % (user['screen_name'], user['name'])
if action == "follow":
- return "You are now following %s.\n" %(user_str)
+ return "You are now following %s.\n" % (user_str)
else:
- return "You are no longer following %s.\n" %(user_str)
+ return "You are no longer following %s.\n" % (user_str)
+
class VerboseAdminFormatter(object):
def __call__(self, action, user):
user['name'],
user['url']))
+
class SearchFormatter(object):
def __call__(self, result, options):
- return("%s%s %s" %(
+ return("%s%s %s" % (
get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
result['from_user'], result['text']))
+
class VerboseSearchFormatter(SearchFormatter):
- pass #Default to the regular one
+ pass # Default to the regular one
+
class URLSearchFormatter(object):
urlmatch = re.compile(r'https?://\S+')
+
def __call__(self, result, options):
urls = self.urlmatch.findall(result['text'])
return '\n'.join(urls) if urls else ""
+
class AnsiSearchFormatter(object):
def __init__(self):
self._colourMap = ansi.ColourMap()
def __call__(self, result, options):
colour = self._colourMap.colourFor(result['from_user'])
- return ("%s%s%s%s %s" %(
+ return ("%s%s%s%s %s" % (
get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
- ansi.cmdColour(colour), result['from_user'],
- ansi.cmdReset(), result['text']))
+ ansiFormatter.cmdColour(colour), result['from_user'],
+ ansiFormatter.cmdReset(), result['text']))
_term_encoding = None
+
+
def get_term_encoding():
global _term_encoding
if not _term_encoding:
status_formatters = {
'default': StatusFormatter,
'verbose': VerboseStatusFormatter,
+ 'json': JSONStatusFormatter,
'urls': URLStatusFormatter,
'ansi': AnsiStatusFormatter
}
}
formatters['lists'] = lists_formatters
+
def get_formatter(action_type, options):
formatters_dict = formatters.get(action_type)
- if (not formatters_dict):
+ if not formatters_dict:
raise TwitterError(
"There was an error finding a class of formatters for your type (%s)"
- %(action_type))
+ % (action_type))
f = formatters_dict.get(options['format'])
- if (not f):
+ if not f:
raise TwitterError(
- "Unknown formatter '%s' for status actions" %(options['format']))
+ "Unknown formatter '%s' for status actions" % (options['format']))
return f()
+
class Action(object):
def ask(self, subject='perform this action', careful=False):
'''
- Requests fromt he user using `raw_input` if `subject` should be
+ Requests from the user using `raw_input` if `subject` should be
performed. When `careful`, the default answer is NO, otherwise YES.
Returns the user answer in the form `True` or `False`.
'''
if not careful:
sample = '(Y/n)'
- prompt = 'You really want to %s %s? ' %(subject, sample)
+ prompt = 'You really want to %s %s? ' % (subject, sample)
try:
answer = input(prompt).lower()
if careful:
else:
return answer not in ('no', 'n')
except EOFError:
- print(file=sys.stderr) # Put Newline since Enter was never pressed
+ print(file=sys.stderr) # Put Newline since Enter was never pressed
# TODO:
- # Figure out why on OS X the raw_input keeps raising
- # EOFError and is never able to reset and get more input
- # Hint: Look at how IPython implements their console
+ # Figure out why on OS X the raw_input keeps raising
+ # EOFError and is never able to reset and get more input
+ # Hint: Look at how IPython implements their console
default = True
if careful:
default = False
def __call__(self, twitter, options):
action = actions.get(options['action'], NoSuchAction)()
try:
- doAction = lambda : action(twitter, options)
- if (options['refresh'] and isinstance(action, StatusAction)):
+ doAction = lambda: action(twitter, options)
+ if options['refresh'] and isinstance(action, StatusAction):
while True:
doAction()
+ sys.stdout.flush()
time.sleep(options['refresh_rate'])
else:
doAction()
print('\n[Keyboard Interrupt]', file=sys.stderr)
pass
+
class NoSuchActionError(Exception):
pass
+
class NoSuchAction(Action):
def __call__(self, twitter, options):
- raise NoSuchActionError("No such action: %s" %(options['action']))
+ raise NoSuchActionError("No such action: %s" % (options['action']))
+
class StatusAction(Action):
def __call__(self, twitter, options):
if statusStr.strip():
printNicely(statusStr)
+
class SearchAction(Action):
def __call__(self, twitter, options):
# We need to be pointing at search.twitter.com to work, and it is less
# tangly to do it here than in the main()
- twitter.domain="search.twitter.com"
- twitter.uriparts=()
+ twitter.domain = "search.twitter.com"
+ twitter.uriparts = ()
# We need to bypass the TwitterCall parameter encoding, so we
# don't encode the plus sign, so we have to encode it ourselves
query_string = "+".join(
if resultStr.strip():
printNicely(resultStr)
+
class AdminAction(Action):
def __call__(self, twitter, options):
if not (options['extra_args'] and options['extra_args'][0]):
else:
printNicely(af(options['action'], user))
+
class ListsAction(StatusAction):
def getStatuses(self, twitter, options):
if not options['extra_args']:
screen_name = options['extra_args'][0]
if not options['extra_args'][1:]:
- lists = twitter.user.lists(user=screen_name)['lists']
+ lists = twitter.lists.list(screen_name=screen_name)
if not lists:
printNicely("This user has no lists.")
for list in lists:
printNicely(lf(list))
return []
else:
- return reversed(twitter.user.lists.list.statuses(
- user=screen_name, list=options['extra_args'][1]))
+ return list(reversed(twitter.lists.statuses(
+ count=options['length'],
+ owner_screen_name=screen_name,
+ slug=options['extra_args'][1])))
class MyListsAction(ListsAction):
class FriendsAction(StatusAction):
def getStatuses(self, twitter, options):
- return reversed(twitter.statuses.friends_timeline(count=options["length"]))
+ return list(reversed(
+ twitter.statuses.home_timeline(count=options["length"])))
-class PublicAction(StatusAction):
- def getStatuses(self, twitter, options):
- return reversed(twitter.statuses.public_timeline(count=options["length"]))
class RepliesAction(StatusAction):
def getStatuses(self, twitter, options):
- return reversed(twitter.statuses.replies(count=options["length"]))
+ return list(reversed(
+ twitter.statuses.mentions_timeline(count=options["length"])))
+
class FollowAction(AdminAction):
def getUser(self, twitter, user):
- return twitter.friendships.create(id=user)
+ return twitter.friendships.create(screen_name=user)
+
class LeaveAction(AdminAction):
def getUser(self, twitter, user):
- return twitter.friendships.destroy(id=user)
+ return twitter.friendships.destroy(screen_name=user)
+
class SetStatusAction(Action):
def __call__(self, twitter, options):
statusTxt = (" ".join(options['extra_args'])
if options['extra_args']
else str(input("message: ")))
- twitter.statuses.update(status=statusTxt)
+ replies = []
+ ptr = re.compile("@[\w_]+")
+ while statusTxt:
+ s = ptr.match(statusTxt)
+ if s and s.start() == 0:
+ replies.append(statusTxt[s.start():s.end()])
+ statusTxt = statusTxt[s.end() + 1:]
+ else:
+ break
+ replies = " ".join(replies)
+ if len(replies) >= 140:
+ # just go back
+ statusTxt = replies
+ replies = ""
+
+ splitted = []
+ while statusTxt:
+ limit = 140 - len(replies)
+ if len(statusTxt) > limit:
+ end = str.rfind(statusTxt, ' ', 0, limit)
+ else:
+ end = limit
+ splitted.append(" ".join((replies, statusTxt[:end])))
+ statusTxt = statusTxt[end:]
+
+ if options['invert_split']:
+ splitted.reverse()
+ for status in splitted:
+ twitter.statuses.update(status=status)
+
class TwitterShell(Action):
def render_prompt(self, prompt):
'''Parses the `prompt` string and returns the rendered version'''
- prompt = prompt.strip("'").replace("\\'","'")
+ prompt = prompt.strip("'").replace("\\'", "'")
for colour in ansi.COLOURS_NAMED:
- if '[%s]' %(colour) in prompt:
+ if '[%s]' % (colour) in prompt:
prompt = prompt.replace(
- '[%s]' %(colour), ansi.cmdColourNamed(colour))
- prompt = prompt.replace('[R]', ansi.cmdReset())
+ '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
+ prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
return prompt
def __call__(self, twitter, options):
continue
elif options['action'] == 'help':
print('''\ntwitter> `action`\n
- The Shell Accepts all the command line actions along with:
+ The Shell accepts all the command line actions along with:
exit Leave the twitter shell (^D may also be used)
- Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
+ Full CMD Line help is appended below for your convenience.''',
+ file=sys.stderr)
Action()(twitter, options)
options['action'] = ''
except NoSuchActionError as e:
else:
raise SystemExit(0)
+
class PythonPromptAction(Action):
def __call__(self, twitter, options):
try:
except EOFError:
pass
+
class HelpAction(Action):
def __call__(self, twitter, options):
print(__doc__)
+
class DoNothingAction(Action):
def __call__(self, twitter, options):
pass
+
+class RateLimitStatus(Action):
+ def __call__(self, twitter, options):
+ rate = twitter.application.rate_limit_status()
+ resources = rate['resources']
+ for resource in resources:
+ for method in resources[resource]:
+ limit = resources[resource][method]['limit']
+ remaining = resources[resource][method]['remaining']
+ reset = resources[resource][method]['reset']
+
+ print("Remaining API requests for %s: %s / %s" %
+ (method, remaining, limit))
+ print("Next reset in %ss (%s)\n" % (int(reset - time.time()),
+ time.asctime(time.localtime(reset))))
+
+
+class ReplAction(Action):
+ def __call__(self, twitter, options):
+ upload = Twitter(
+ auth=twitter.auth,
+ domain="upload.twitter.com")
+ printNicely(
+ "\nUse the 'twitter' object to interact with"
+ " the Twitter REST API.\n"
+ "Use twitter_upload to interact with "
+ "upload.twitter.com\n\n")
+ code.interact(local={
+ "twitter": twitter,
+ "t": twitter,
+ "twitter_upload": upload,
+ "u": upload
+ })
+
+
actions = {
'authorize' : DoNothingAction,
'follow' : FollowAction,
'mylist' : MyListsAction,
'help' : HelpAction,
'leave' : LeaveAction,
- 'public' : PublicAction,
'pyprompt' : PythonPromptAction,
'replies' : RepliesAction,
'search' : SearchAction,
'set' : SetStatusAction,
'shell' : TwitterShell,
+ 'rate' : RateLimitStatus,
+ 'repl' : ReplAction,
}
+
def loadConfig(filename):
options = dict(OPTIONS)
if os.path.exists(filename):
for option in ('format', 'prompt'):
if cp.has_option('twitter', option):
options[option] = cp.get('twitter', option)
+ # process booleans
+ for option in ('invert_split',):
+ if cp.has_option('twitter', option):
+ options[option] = cp.getboolean('twitter', option)
return options
+
def main(args=sys.argv[1:]):
arg_options = {}
try:
parse_args(args, arg_options)
except GetoptError as e:
- print("I can't do that, %s." %(e), file=sys.stderr)
+ print("I can't do that, %s." % (e), file=sys.stderr)
print(file=sys.stderr)
raise SystemExit(1)
# arguments.
options = dict(OPTIONS)
for d in config_options, arg_options:
- for k,v in list(d.items()):
- if v: options[k] = v
+ for k, v in list(d.items()):
+ if v:
+ options[k] = v
- if options['refresh'] and options['action'] not in (
- 'friends', 'public', 'replies'):
- print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
+ if options['refresh'] and options['action'] not in ('friends', 'replies'):
+ print("You can only refresh the friends or replies actions.",
+ file=sys.stderr)
print("Use 'twitter -h' for help.", file=sys.stderr)
return 1
oauth_filename = os.path.expanduser(options['oauth_filename'])
- if (options['action'] == 'authorize'
- or not os.path.exists(oauth_filename)):
+ if options['action'] == 'authorize' or not os.path.exists(oauth_filename):
oauth_dance(
"the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
options['oauth_filename'])
+ global ansiFormatter
+ ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
+
oauth_token, oauth_token_secret = read_token_file(oauth_filename)
twitter = Twitter(
auth=OAuth(
oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
secure=options['secure'],
- api_version='1',
+ api_version='1.1',
domain='api.twitter.com')
try: