]> jfr.im git - z_archive/twitter.git/blame_incremental - twitter/cmdline.py
Okay. It works now. Omg.
[z_archive/twitter.git] / twitter / cmdline.py
... / ...
CommitLineData
1# encoding: utf-8
2"""
3USAGE:
4
5 twitter [action] [options]
6
7
8ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 pyprompt start a Python prompt for interacting with the twitter
19 object directly
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25
26
27OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57[twitter]
58format: <desired_default_format_for_output>
59prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63"""
64
65from __future__ import print_function
66
67try:
68 input = __builtins__['raw_input']
69except (AttributeError, KeyError):
70 pass
71
72
73CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
74CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76from getopt import gnu_getopt as getopt, GetoptError
77from getpass import getpass
78import json
79import locale
80import os.path
81import re
82import string
83import sys
84import time
85
86try:
87 from ConfigParser import SafeConfigParser
88except ImportError:
89 from configparser import ConfigParser as SafeConfigParser
90import datetime
91try:
92 from urllib.parse import quote
93except ImportError:
94 from urllib2 import quote
95try:
96 import HTMLParser
97except ImportError:
98 import html.parser as HTMLParser
99
100import webbrowser
101
102from .api import Twitter, TwitterError
103from .oauth import OAuth, write_token_file, read_token_file
104from .oauth_dance import oauth_dance
105from . import ansi
106from .util import smrt_input, printNicely
107
108OPTIONS = {
109 'action': 'friends',
110 'refresh': False,
111 'refresh_rate': 600,
112 'format': 'default',
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
115 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
116 'length': 20,
117 'timestamp': False,
118 'datestamp': False,
119 'extra_args': [],
120 'secure': True,
121 'invert_split': False,
122 'force-ansi': False,
123}
124
125gHtmlParser = HTMLParser.HTMLParser()
126hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
127profileRe = re.compile(r'(?P<profile>\@\S+)')
128ansiFormatter = ansi.AnsiCmd(False)
129
130def parse_args(args, options):
131 long_opts = ['help', 'format=', 'refresh', 'oauth=',
132 'refresh-rate=', 'config=', 'length=', 'timestamp',
133 'datestamp', 'no-ssl', 'force-ansi']
134 short_opts = "e:p:f:h?rR:c:l:td"
135 opts, extra_args = getopt(args, short_opts, long_opts)
136 if extra_args and hasattr(extra_args[0], 'decode'):
137 extra_args = [arg.decode(locale.getpreferredencoding())
138 for arg in extra_args]
139
140 for opt, arg in opts:
141 if opt in ('-f', '--format'):
142 options['format'] = arg
143 elif opt in ('-r', '--refresh'):
144 options['refresh'] = True
145 elif opt in ('-R', '--refresh-rate'):
146 options['refresh_rate'] = int(arg)
147 elif opt in ('-l', '--length'):
148 options["length"] = int(arg)
149 elif opt in ('-t', '--timestamp'):
150 options["timestamp"] = True
151 elif opt in ('-d', '--datestamp'):
152 options["datestamp"] = True
153 elif opt in ('-?', '-h', '--help'):
154 options['action'] = 'help'
155 elif opt in ('-c', '--config'):
156 options['config_filename'] = arg
157 elif opt == '--no-ssl':
158 options['secure'] = False
159 elif opt == '--oauth':
160 options['oauth_filename'] = arg
161 elif opt == '--force-ansi':
162 options['force-ansi'] = True
163
164 if extra_args and not ('action' in options and options['action'] == 'help'):
165 options['action'] = extra_args[0]
166 options['extra_args'] = extra_args[1:]
167
168def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
169 timestamp = options["timestamp"]
170 datestamp = options["datestamp"]
171 t = time.strptime(status['created_at'], format)
172 i_hate_timezones = time.timezone
173 if (time.daylight):
174 i_hate_timezones = time.altzone
175 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
176 seconds=i_hate_timezones)
177 t = dt.timetuple()
178 if timestamp and datestamp:
179 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
180 elif timestamp:
181 return time.strftime("%H:%M:%S ", t)
182 elif datestamp:
183 return time.strftime("%Y-%m-%d ", t)
184 return ""
185
186def reRepl(m):
187 ansiTypes = {
188 'clear': ansiFormatter.cmdReset(),
189 'hashtag': ansiFormatter.cmdBold(),
190 'profile': ansiFormatter.cmdUnderline(),
191 }
192
193 s = None
194 try:
195 mkey = m.lastgroup
196 if m.group(mkey):
197 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
198 except IndexError:
199 pass
200 return s
201
202def replaceInStatus(status):
203 txt = gHtmlParser.unescape(status)
204 txt = re.sub(hashtagRe, reRepl, txt)
205 txt = re.sub(profileRe, reRepl, txt)
206 return txt
207
208class StatusFormatter(object):
209 def __call__(self, status, options):
210 return ("%s%s %s" % (
211 get_time_string(status, options),
212 status['user']['screen_name'], gHtmlParser.unescape(status['text'])))
213
214class AnsiStatusFormatter(object):
215 def __init__(self):
216 self._colourMap = ansi.ColourMap()
217
218 def __call__(self, status, options):
219 colour = self._colourMap.colourFor(status['user']['screen_name'])
220 return ("%s%s%s%s %s" % (
221 get_time_string(status, options),
222 ansiFormatter.cmdColour(colour), status['user']['screen_name'],
223 ansiFormatter.cmdReset(), replaceInStatus(status['text'])))
224
225class VerboseStatusFormatter(object):
226 def __call__(self, status, options):
227 return ("-- %s (%s) on %s\n%s\n" % (
228 status['user']['screen_name'],
229 status['user']['location'],
230 status['created_at'],
231 gHtmlParser.unescape(status['text'])))
232
233class JSONStatusFormatter(object):
234 def __call__(self, status, options):
235 status['text'] = gHtmlParser.unescape(status['text'])
236 return json.dumps(status)
237
238class URLStatusFormatter(object):
239 urlmatch = re.compile(r'https?://\S+')
240 def __call__(self, status, options):
241 urls = self.urlmatch.findall(status['text'])
242 return '\n'.join(urls) if urls else ""
243
244
245class ListsFormatter(object):
246 def __call__(self, list):
247 if list['description']:
248 list_str = "%-30s (%s)" % (list['name'], list['description'])
249 else:
250 list_str = "%-30s" % (list['name'])
251 return "%s\n" % list_str
252
253class ListsVerboseFormatter(object):
254 def __call__(self, list):
255 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
256 return list_str
257
258class AnsiListsFormatter(object):
259 def __init__(self):
260 self._colourMap = ansi.ColourMap()
261
262 def __call__(self, list):
263 colour = self._colourMap.colourFor(list['name'])
264 return ("%s%-15s%s %s" % (
265 ansiFormatter.cmdColour(colour), list['name'],
266 ansiFormatter.cmdReset(), list['description']))
267
268
269class AdminFormatter(object):
270 def __call__(self, action, user):
271 user_str = "%s (%s)" % (user['screen_name'], user['name'])
272 if action == "follow":
273 return "You are now following %s.\n" % (user_str)
274 else:
275 return "You are no longer following %s.\n" % (user_str)
276
277class VerboseAdminFormatter(object):
278 def __call__(self, action, user):
279 return("-- %s: %s (%s): %s" % (
280 "Following" if action == "follow" else "Leaving",
281 user['screen_name'],
282 user['name'],
283 user['url']))
284
285class SearchFormatter(object):
286 def __call__(self, result, options):
287 return("%s%s %s" % (
288 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
289 result['from_user'], result['text']))
290
291class VerboseSearchFormatter(SearchFormatter):
292 pass # Default to the regular one
293
294class URLSearchFormatter(object):
295 urlmatch = re.compile(r'https?://\S+')
296 def __call__(self, result, options):
297 urls = self.urlmatch.findall(result['text'])
298 return '\n'.join(urls) if urls else ""
299
300class AnsiSearchFormatter(object):
301 def __init__(self):
302 self._colourMap = ansi.ColourMap()
303
304 def __call__(self, result, options):
305 colour = self._colourMap.colourFor(result['from_user'])
306 return ("%s%s%s%s %s" % (
307 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
308 ansiFormatter.cmdColour(colour), result['from_user'],
309 ansiFormatter.cmdReset(), result['text']))
310
311_term_encoding = None
312def get_term_encoding():
313 global _term_encoding
314 if not _term_encoding:
315 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
316 if lang[1:]:
317 _term_encoding = lang[1]
318 else:
319 _term_encoding = 'UTF-8'
320 return _term_encoding
321
322formatters = {}
323status_formatters = {
324 'default': StatusFormatter,
325 'verbose': VerboseStatusFormatter,
326 'json': JSONStatusFormatter,
327 'urls': URLStatusFormatter,
328 'ansi': AnsiStatusFormatter
329}
330formatters['status'] = status_formatters
331
332admin_formatters = {
333 'default': AdminFormatter,
334 'verbose': VerboseAdminFormatter,
335 'urls': AdminFormatter,
336 'ansi': AdminFormatter
337}
338formatters['admin'] = admin_formatters
339
340search_formatters = {
341 'default': SearchFormatter,
342 'verbose': VerboseSearchFormatter,
343 'urls': URLSearchFormatter,
344 'ansi': AnsiSearchFormatter
345}
346formatters['search'] = search_formatters
347
348lists_formatters = {
349 'default': ListsFormatter,
350 'verbose': ListsVerboseFormatter,
351 'urls': None,
352 'ansi': AnsiListsFormatter
353}
354formatters['lists'] = lists_formatters
355
356def get_formatter(action_type, options):
357 formatters_dict = formatters.get(action_type)
358 if (not formatters_dict):
359 raise TwitterError(
360 "There was an error finding a class of formatters for your type (%s)"
361 % (action_type))
362 f = formatters_dict.get(options['format'])
363 if (not f):
364 raise TwitterError(
365 "Unknown formatter '%s' for status actions" % (options['format']))
366 return f()
367
368class Action(object):
369
370 def ask(self, subject='perform this action', careful=False):
371 '''
372 Requests fromt he user using `raw_input` if `subject` should be
373 performed. When `careful`, the default answer is NO, otherwise YES.
374 Returns the user answer in the form `True` or `False`.
375 '''
376 sample = '(y/N)'
377 if not careful:
378 sample = '(Y/n)'
379
380 prompt = 'You really want to %s %s? ' % (subject, sample)
381 try:
382 answer = input(prompt).lower()
383 if careful:
384 return answer in ('yes', 'y')
385 else:
386 return answer not in ('no', 'n')
387 except EOFError:
388 print(file=sys.stderr) # Put Newline since Enter was never pressed
389 # TODO:
390 # Figure out why on OS X the raw_input keeps raising
391 # EOFError and is never able to reset and get more input
392 # Hint: Look at how IPython implements their console
393 default = True
394 if careful:
395 default = False
396 return default
397
398 def __call__(self, twitter, options):
399 action = actions.get(options['action'], NoSuchAction)()
400 try:
401 doAction = lambda : action(twitter, options)
402 if (options['refresh'] and isinstance(action, StatusAction)):
403 while True:
404 doAction()
405 sys.stdout.flush()
406 time.sleep(options['refresh_rate'])
407 else:
408 doAction()
409 except KeyboardInterrupt:
410 print('\n[Keyboard Interrupt]', file=sys.stderr)
411 pass
412
413class NoSuchActionError(Exception):
414 pass
415
416class NoSuchAction(Action):
417 def __call__(self, twitter, options):
418 raise NoSuchActionError("No such action: %s" % (options['action']))
419
420class StatusAction(Action):
421 def __call__(self, twitter, options):
422 statuses = self.getStatuses(twitter, options)
423 sf = get_formatter('status', options)
424 for status in statuses:
425 statusStr = sf(status, options)
426 if statusStr.strip():
427 printNicely(statusStr)
428
429class SearchAction(Action):
430 def __call__(self, twitter, options):
431 # We need to be pointing at search.twitter.com to work, and it is less
432 # tangly to do it here than in the main()
433 twitter.domain = "search.twitter.com"
434 twitter.uriparts = ()
435 # We need to bypass the TwitterCall parameter encoding, so we
436 # don't encode the plus sign, so we have to encode it ourselves
437 query_string = "+".join(
438 [quote(term)
439 for term in options['extra_args']])
440
441 results = twitter.search(q=query_string)['results']
442 f = get_formatter('search', options)
443 for result in results:
444 resultStr = f(result, options)
445 if resultStr.strip():
446 printNicely(resultStr)
447
448class AdminAction(Action):
449 def __call__(self, twitter, options):
450 if not (options['extra_args'] and options['extra_args'][0]):
451 raise TwitterError("You need to specify a user (screen name)")
452 af = get_formatter('admin', options)
453 try:
454 user = self.getUser(twitter, options['extra_args'][0])
455 except TwitterError as e:
456 print("There was a problem following or leaving the specified user.")
457 print("You may be trying to follow a user you are already following;")
458 print("Leaving a user you are not currently following;")
459 print("Or the user may not exist.")
460 print("Sorry.")
461 print()
462 print(e)
463 else:
464 printNicely(af(options['action'], user))
465
466class ListsAction(StatusAction):
467 def getStatuses(self, twitter, options):
468 if not options['extra_args']:
469 raise TwitterError("Please provide a user to query for lists")
470
471 screen_name = options['extra_args'][0]
472
473 if not options['extra_args'][1:]:
474 lists = twitter.lists.list(screen_name=screen_name)
475 if not lists:
476 printNicely("This user has no lists.")
477 for list in lists:
478 lf = get_formatter('lists', options)
479 printNicely(lf(list))
480 return []
481 else:
482 return reversed(twitter.user.lists.list.statuses(
483 user=screen_name, list=options['extra_args'][1]))
484
485
486class MyListsAction(ListsAction):
487 def getStatuses(self, twitter, options):
488 screen_name = twitter.account.verify_credentials()['screen_name']
489 options['extra_args'].insert(0, screen_name)
490 return ListsAction.getStatuses(self, twitter, options)
491
492
493class FriendsAction(StatusAction):
494 def getStatuses(self, twitter, options):
495 return reversed(twitter.statuses.home_timeline(count=options["length"]))
496
497class RepliesAction(StatusAction):
498 def getStatuses(self, twitter, options):
499 return reversed(twitter.statuses.mentions_timeline(count=options["length"]))
500
501class FollowAction(AdminAction):
502 def getUser(self, twitter, user):
503 return twitter.friendships.create(screen_name=user)
504
505class LeaveAction(AdminAction):
506 def getUser(self, twitter, user):
507 return twitter.friendships.destroy(screen_name=user)
508
509class SetStatusAction(Action):
510 def __call__(self, twitter, options):
511 statusTxt = (" ".join(options['extra_args'])
512 if options['extra_args']
513 else str(input("message: ")))
514 replies = []
515 ptr = re.compile("@[\w_]+")
516 while statusTxt:
517 s = ptr.match(statusTxt)
518 if s and s.start() == 0:
519 replies.append(statusTxt[s.start():s.end()])
520 statusTxt = statusTxt[s.end() + 1:]
521 else:
522 break
523 replies = " ".join(replies)
524 if len(replies) >= 140:
525 # just go back
526 statusTxt = replies
527 replies = ""
528
529 splitted = []
530 while statusTxt:
531 limit = 140 - len(replies)
532 if len(statusTxt) > limit:
533 end = string.rfind(statusTxt, ' ', 0, limit)
534 else:
535 end = limit
536 splitted.append(" ".join((replies, statusTxt[:end])))
537 statusTxt = statusTxt[end:]
538
539 if options['invert_split']:
540 splitted.reverse()
541 for status in splitted:
542 twitter.statuses.update(status=status)
543
544class TwitterShell(Action):
545
546 def render_prompt(self, prompt):
547 '''Parses the `prompt` string and returns the rendered version'''
548 prompt = prompt.strip("'").replace("\\'", "'")
549 for colour in ansi.COLOURS_NAMED:
550 if '[%s]' % (colour) in prompt:
551 prompt = prompt.replace(
552 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
553 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
554 return prompt
555
556 def __call__(self, twitter, options):
557 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
558 while True:
559 options['action'] = ""
560 try:
561 args = input(prompt).split()
562 parse_args(args, options)
563 if not options['action']:
564 continue
565 elif options['action'] == 'exit':
566 raise SystemExit(0)
567 elif options['action'] == 'shell':
568 print('Sorry Xzibit does not work here!', file=sys.stderr)
569 continue
570 elif options['action'] == 'help':
571 print('''\ntwitter> `action`\n
572 The Shell Accepts all the command line actions along with:
573
574 exit Leave the twitter shell (^D may also be used)
575
576 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
577 Action()(twitter, options)
578 options['action'] = ''
579 except NoSuchActionError as e:
580 print(e, file=sys.stderr)
581 except KeyboardInterrupt:
582 print('\n[Keyboard Interrupt]', file=sys.stderr)
583 except EOFError:
584 print(file=sys.stderr)
585 leaving = self.ask(subject='Leave')
586 if not leaving:
587 print('Excellent!', file=sys.stderr)
588 else:
589 raise SystemExit(0)
590
591class PythonPromptAction(Action):
592 def __call__(self, twitter, options):
593 try:
594 import pdb; pdb.set_trace()
595 while True:
596 smrt_input(globals(), locals())
597 except EOFError:
598 pass
599
600class HelpAction(Action):
601 def __call__(self, twitter, options):
602 print(__doc__)
603
604class DoNothingAction(Action):
605 def __call__(self, twitter, options):
606 pass
607
608class RateLimitStatus(Action):
609 def __call__(self, twitter, options):
610 rate = twitter.account.rate_limit_status()
611 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
612 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
613 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
614
615actions = {
616 'authorize' : DoNothingAction,
617 'follow' : FollowAction,
618 'friends' : FriendsAction,
619 'list' : ListsAction,
620 'mylist' : MyListsAction,
621 'help' : HelpAction,
622 'leave' : LeaveAction,
623 'pyprompt' : PythonPromptAction,
624 'replies' : RepliesAction,
625 'search' : SearchAction,
626 'set' : SetStatusAction,
627 'shell' : TwitterShell,
628 'rate' : RateLimitStatus,
629}
630
631def loadConfig(filename):
632 options = dict(OPTIONS)
633 if os.path.exists(filename):
634 cp = SafeConfigParser()
635 cp.read([filename])
636 for option in ('format', 'prompt'):
637 if cp.has_option('twitter', option):
638 options[option] = cp.get('twitter', option)
639 # process booleans
640 for option in ('invert_split',):
641 if cp.has_option('twitter', option):
642 options[option] = cp.getboolean('twitter', option)
643 return options
644
645def main(args=sys.argv[1:]):
646 arg_options = {}
647 try:
648 parse_args(args, arg_options)
649 except GetoptError as e:
650 print("I can't do that, %s." % (e), file=sys.stderr)
651 print(file=sys.stderr)
652 raise SystemExit(1)
653
654 config_path = os.path.expanduser(
655 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
656 config_options = loadConfig(config_path)
657
658 # Apply the various options in order, the most important applied last.
659 # Defaults first, then what's read from config file, then command-line
660 # arguments.
661 options = dict(OPTIONS)
662 for d in config_options, arg_options:
663 for k, v in list(d.items()):
664 if v: options[k] = v
665
666 if options['refresh'] and options['action'] not in (
667 'friends', 'replies'):
668 print("You can only refresh the friends or replies actions.", file=sys.stderr)
669 print("Use 'twitter -h' for help.", file=sys.stderr)
670 return 1
671
672 oauth_filename = os.path.expanduser(options['oauth_filename'])
673
674 if (options['action'] == 'authorize'
675 or not os.path.exists(oauth_filename)):
676 oauth_dance(
677 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
678 options['oauth_filename'])
679
680 global ansiFormatter
681 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
682
683 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
684
685 twitter = Twitter(
686 auth=OAuth(
687 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
688 secure=options['secure'],
689 api_version='1.1',
690 domain='api.twitter.com')
691
692 try:
693 Action()(twitter, options)
694 except NoSuchActionError as e:
695 print(e, file=sys.stderr)
696 raise SystemExit(1)
697 except TwitterError as e:
698 print(str(e), file=sys.stderr)
699 print("Use 'twitter -h' for help.", file=sys.stderr)
700 raise SystemExit(1)