]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Adjusting the formatter
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 pyprompt start a Python prompt for interacting with the twitter
19 object directly
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 try:
68 input = __builtins__.raw_input
69 except (AttributeError, KeyError):
70 pass
71
72
73 CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76 from getopt import gnu_getopt as getopt, GetoptError
77 from getpass import getpass
78 import json
79 import locale
80 import os.path
81 import re
82 import string
83 import sys
84 import time
85
86 try:
87 from ConfigParser import SafeConfigParser
88 except ImportError:
89 from configparser import ConfigParser as SafeConfigParser
90 import datetime
91 try:
92 from urllib.parse import quote
93 except ImportError:
94 from urllib2 import quote
95 try:
96 import HTMLParser
97 except ImportError:
98 import html.parser as HTMLParser
99
100 import webbrowser
101
102 from .api import Twitter, TwitterError
103 from .oauth import OAuth, write_token_file, read_token_file
104 from .oauth_dance import oauth_dance
105 from . import ansi
106 from .util import smrt_input, printNicely, align_text
107
108 OPTIONS = {
109 'action': 'friends',
110 'refresh': False,
111 'refresh_rate': 600,
112 'format': 'default',
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
115 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
116 'length': 20,
117 'timestamp': False,
118 'datestamp': False,
119 'extra_args': [],
120 'secure': True,
121 'invert_split': False,
122 'force-ansi': False,
123 }
124
125 gHtmlParser = HTMLParser.HTMLParser()
126 hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
127 profileRe = re.compile(r'(?P<profile>\@\S+)')
128 ansiFormatter = ansi.AnsiCmd(False)
129
130 def parse_args(args, options):
131 long_opts = ['help', 'format=', 'refresh', 'oauth=',
132 'refresh-rate=', 'config=', 'length=', 'timestamp',
133 'datestamp', 'no-ssl', 'force-ansi']
134 short_opts = "e:p:f:h?rR:c:l:td"
135 opts, extra_args = getopt(args, short_opts, long_opts)
136 if extra_args and hasattr(extra_args[0], 'decode'):
137 extra_args = [arg.decode(locale.getpreferredencoding())
138 for arg in extra_args]
139
140 for opt, arg in opts:
141 if opt in ('-f', '--format'):
142 options['format'] = arg
143 elif opt in ('-r', '--refresh'):
144 options['refresh'] = True
145 elif opt in ('-R', '--refresh-rate'):
146 options['refresh_rate'] = int(arg)
147 elif opt in ('-l', '--length'):
148 options["length"] = int(arg)
149 elif opt in ('-t', '--timestamp'):
150 options["timestamp"] = True
151 elif opt in ('-d', '--datestamp'):
152 options["datestamp"] = True
153 elif opt in ('-?', '-h', '--help'):
154 options['action'] = 'help'
155 elif opt in ('-c', '--config'):
156 options['config_filename'] = arg
157 elif opt == '--no-ssl':
158 options['secure'] = False
159 elif opt == '--oauth':
160 options['oauth_filename'] = arg
161 elif opt == '--force-ansi':
162 options['force-ansi'] = True
163
164 if extra_args and not ('action' in options and options['action'] == 'help'):
165 options['action'] = extra_args[0]
166 options['extra_args'] = extra_args[1:]
167
168 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
169 timestamp = options["timestamp"]
170 datestamp = options["datestamp"]
171 t = time.strptime(status['created_at'], format)
172 i_hate_timezones = time.timezone
173 if (time.daylight):
174 i_hate_timezones = time.altzone
175 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
176 seconds=i_hate_timezones)
177 t = dt.timetuple()
178 if timestamp and datestamp:
179 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
180 elif timestamp:
181 return time.strftime("%H:%M:%S ", t)
182 elif datestamp:
183 return time.strftime("%Y-%m-%d ", t)
184 return ""
185
186 def reRepl(m):
187 ansiTypes = {
188 'clear': ansiFormatter.cmdReset(),
189 'hashtag': ansiFormatter.cmdBold(),
190 'profile': ansiFormatter.cmdUnderline(),
191 }
192
193 s = None
194 try:
195 mkey = m.lastgroup
196 if m.group(mkey):
197 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
198 except IndexError:
199 pass
200 return s
201
202 def replaceInStatus(status):
203 txt = gHtmlParser.unescape(status)
204 txt = re.sub(hashtagRe, reRepl, txt)
205 txt = re.sub(profileRe, reRepl, txt)
206 return txt
207 def correctRTStatus(status):
208 if('retweeted_status' in status):
209 return "RT " + status['retweeted_status']['user']['screen_name'] + " " + status['retweeted_status']['text']
210 else:
211 return status['text']
212
213 class StatusFormatter(object):
214 def __call__(self, status, options):
215 return ("%s@%s %s" % (
216 get_time_string(status, options),
217 status['user']['screen_name'], gHtmlParser.unescape(correctRTStatus(status))))
218
219 class AnsiStatusFormatter(object):
220 def __init__(self):
221 self._colourMap = ansi.ColourMap()
222
223 def __call__(self, status, options):
224 colour = self._colourMap.colourFor(status['user']['screen_name'])
225 return ("%s%s% 16s%s %s " % (
226 get_time_string(status, options),
227 ansiFormatter.cmdColour(colour), status['user']['screen_name'],
228 ansiFormatter.cmdReset(), align_text(replaceInStatus(correctRTStatus(status)))))
229
230 class VerboseStatusFormatter(object):
231 def __call__(self, status, options):
232 return ("-- %s (%s) on %s\n%s\n" % (
233 status['user']['screen_name'],
234 status['user']['location'],
235 status['created_at'],
236 gHtmlParser.unescape(correctRTStatus(status))))
237
238 class JSONStatusFormatter(object):
239 def __call__(self, status, options):
240 status['text'] = gHtmlParser.unescape(status['text'])
241 return json.dumps(status)
242
243 class URLStatusFormatter(object):
244 urlmatch = re.compile(r'https?://\S+')
245 def __call__(self, status, options):
246 urls = self.urlmatch.findall(correctRTStatus(status))
247 return '\n'.join(urls) if urls else ""
248
249
250 class ListsFormatter(object):
251 def __call__(self, list):
252 if list['description']:
253 list_str = "%-30s (%s)" % (list['name'], list['description'])
254 else:
255 list_str = "%-30s" % (list['name'])
256 return "%s\n" % list_str
257
258 class ListsVerboseFormatter(object):
259 def __call__(self, list):
260 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
261 return list_str
262
263 class AnsiListsFormatter(object):
264 def __init__(self):
265 self._colourMap = ansi.ColourMap()
266
267 def __call__(self, list):
268 colour = self._colourMap.colourFor(list['name'])
269 return ("%s%-15s%s %s" % (
270 ansiFormatter.cmdColour(colour), list['name'],
271 ansiFormatter.cmdReset(), list['description']))
272
273
274 class AdminFormatter(object):
275 def __call__(self, action, user):
276 user_str = "%s (%s)" % (user['screen_name'], user['name'])
277 if action == "follow":
278 return "You are now following %s.\n" % (user_str)
279 else:
280 return "You are no longer following %s.\n" % (user_str)
281
282 class VerboseAdminFormatter(object):
283 def __call__(self, action, user):
284 return("-- %s: %s (%s): %s" % (
285 "Following" if action == "follow" else "Leaving",
286 user['screen_name'],
287 user['name'],
288 user['url']))
289
290 class SearchFormatter(object):
291 def __call__(self, result, options):
292 return("%s%s %s" % (
293 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
294 result['from_user'], result['text']))
295
296 class VerboseSearchFormatter(SearchFormatter):
297 pass # Default to the regular one
298
299 class URLSearchFormatter(object):
300 urlmatch = re.compile(r'https?://\S+')
301 def __call__(self, result, options):
302 urls = self.urlmatch.findall(result['text'])
303 return '\n'.join(urls) if urls else ""
304
305 class AnsiSearchFormatter(object):
306 def __init__(self):
307 self._colourMap = ansi.ColourMap()
308
309 def __call__(self, result, options):
310 colour = self._colourMap.colourFor(result['from_user'])
311 return ("%s%s%s%s %s" % (
312 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
313 ansiFormatter.cmdColour(colour), result['from_user'],
314 ansiFormatter.cmdReset(), result['text']))
315
316 _term_encoding = None
317 def get_term_encoding():
318 global _term_encoding
319 if not _term_encoding:
320 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
321 if lang[1:]:
322 _term_encoding = lang[1]
323 else:
324 _term_encoding = 'UTF-8'
325 return _term_encoding
326
327 formatters = {}
328 status_formatters = {
329 'default': StatusFormatter,
330 'verbose': VerboseStatusFormatter,
331 'json': JSONStatusFormatter,
332 'urls': URLStatusFormatter,
333 'ansi': AnsiStatusFormatter
334 }
335 formatters['status'] = status_formatters
336
337 admin_formatters = {
338 'default': AdminFormatter,
339 'verbose': VerboseAdminFormatter,
340 'urls': AdminFormatter,
341 'ansi': AdminFormatter
342 }
343 formatters['admin'] = admin_formatters
344
345 search_formatters = {
346 'default': SearchFormatter,
347 'verbose': VerboseSearchFormatter,
348 'urls': URLSearchFormatter,
349 'ansi': AnsiSearchFormatter
350 }
351 formatters['search'] = search_formatters
352
353 lists_formatters = {
354 'default': ListsFormatter,
355 'verbose': ListsVerboseFormatter,
356 'urls': None,
357 'ansi': AnsiListsFormatter
358 }
359 formatters['lists'] = lists_formatters
360
361 def get_formatter(action_type, options):
362 formatters_dict = formatters.get(action_type)
363 if (not formatters_dict):
364 raise TwitterError(
365 "There was an error finding a class of formatters for your type (%s)"
366 % (action_type))
367 f = formatters_dict.get(options['format'])
368 if (not f):
369 raise TwitterError(
370 "Unknown formatter '%s' for status actions" % (options['format']))
371 return f()
372
373 class Action(object):
374
375 def ask(self, subject='perform this action', careful=False):
376 '''
377 Requests from the user using `raw_input` if `subject` should be
378 performed. When `careful`, the default answer is NO, otherwise YES.
379 Returns the user answer in the form `True` or `False`.
380 '''
381 sample = '(y/N)'
382 if not careful:
383 sample = '(Y/n)'
384
385 prompt = 'You really want to %s %s? ' % (subject, sample)
386 try:
387 answer = input(prompt).lower()
388 if careful:
389 return answer in ('yes', 'y')
390 else:
391 return answer not in ('no', 'n')
392 except EOFError:
393 print(file=sys.stderr) # Put Newline since Enter was never pressed
394 # TODO:
395 # Figure out why on OS X the raw_input keeps raising
396 # EOFError and is never able to reset and get more input
397 # Hint: Look at how IPython implements their console
398 default = True
399 if careful:
400 default = False
401 return default
402
403 def __call__(self, twitter, options):
404 action = actions.get(options['action'], NoSuchAction)()
405 try:
406 doAction = lambda : action(twitter, options)
407 if (options['refresh'] and isinstance(action, StatusAction)):
408 while True:
409 doAction()
410 sys.stdout.flush()
411 time.sleep(options['refresh_rate'])
412 else:
413 doAction()
414 except KeyboardInterrupt:
415 print('\n[Keyboard Interrupt]', file=sys.stderr)
416 pass
417
418 class NoSuchActionError(Exception):
419 pass
420
421 class NoSuchAction(Action):
422 def __call__(self, twitter, options):
423 raise NoSuchActionError("No such action: %s" % (options['action']))
424
425 class StatusAction(Action):
426 def __call__(self, twitter, options):
427 statuses = self.getStatuses(twitter, options)
428 sf = get_formatter('status', options)
429 if(options['format'] == "json"):
430 printNicely("[")
431 for status in statuses[:-1]:
432 statusStr = sf(status, options)
433 if statusStr.strip():
434 printNicely(statusStr+",")
435 printNicely(sf(statuses[-1], options)+"]")
436 else:
437 for status in statuses:
438 statusStr = sf(status, options)
439 if statusStr.strip():
440 printNicely(statusStr)
441
442 class SearchAction(Action):
443 def __call__(self, twitter, options):
444 # We need to be pointing at search.twitter.com to work, and it is less
445 # tangly to do it here than in the main()
446 twitter.domain = "search.twitter.com"
447 twitter.uriparts = ()
448 # We need to bypass the TwitterCall parameter encoding, so we
449 # don't encode the plus sign, so we have to encode it ourselves
450 query_string = "+".join(
451 [quote(term)
452 for term in options['extra_args']])
453
454 results = twitter.search(q=query_string)['results']
455 f = get_formatter('search', options)
456 for result in results:
457 resultStr = f(result, options)
458 if resultStr.strip():
459 printNicely(resultStr)
460
461 class AdminAction(Action):
462 def __call__(self, twitter, options):
463 if not (options['extra_args'] and options['extra_args'][0]):
464 raise TwitterError("You need to specify a user (screen name)")
465 af = get_formatter('admin', options)
466 try:
467 user = self.getUser(twitter, options['extra_args'][0])
468 except TwitterError as e:
469 print("There was a problem following or leaving the specified user.")
470 print("You may be trying to follow a user you are already following;")
471 print("Leaving a user you are not currently following;")
472 print("Or the user may not exist.")
473 print("Sorry.")
474 print()
475 print(e)
476 else:
477 printNicely(af(options['action'], user))
478
479 class ListsAction(StatusAction):
480 def getStatuses(self, twitter, options):
481 if not options['extra_args']:
482 raise TwitterError("Please provide a user to query for lists")
483
484 screen_name = options['extra_args'][0]
485
486 if not options['extra_args'][1:]:
487 lists = twitter.lists.list(screen_name=screen_name)
488 if not lists:
489 printNicely("This user has no lists.")
490 for list in lists:
491 lf = get_formatter('lists', options)
492 printNicely(lf(list))
493 return []
494 else:
495 return list(reversed(twitter.lists.statuses(
496 owner_screen_name=screen_name, slug=options['extra_args'][1])))
497
498
499 class MyListsAction(ListsAction):
500 def getStatuses(self, twitter, options):
501 screen_name = twitter.account.verify_credentials()['screen_name']
502 options['extra_args'].insert(0, screen_name)
503 return ListsAction.getStatuses(self, twitter, options)
504
505
506 class FriendsAction(StatusAction):
507 def getStatuses(self, twitter, options):
508 return list(reversed(twitter.statuses.home_timeline(count=options["length"])))
509
510 class RepliesAction(StatusAction):
511 def getStatuses(self, twitter, options):
512 return list(reversed(twitter.statuses.mentions_timeline(count=options["length"])))
513
514 class FollowAction(AdminAction):
515 def getUser(self, twitter, user):
516 return twitter.friendships.create(screen_name=user)
517
518 class LeaveAction(AdminAction):
519 def getUser(self, twitter, user):
520 return twitter.friendships.destroy(screen_name=user)
521
522 class SetStatusAction(Action):
523 def __call__(self, twitter, options):
524 statusTxt = (" ".join(options['extra_args'])
525 if options['extra_args']
526 else str(input("message: ")))
527 replies = []
528 ptr = re.compile("@[\w_]+")
529 while statusTxt:
530 s = ptr.match(statusTxt)
531 if s and s.start() == 0:
532 replies.append(statusTxt[s.start():s.end()])
533 statusTxt = statusTxt[s.end() + 1:]
534 else:
535 break
536 replies = " ".join(replies)
537 if len(replies) >= 140:
538 # just go back
539 statusTxt = replies
540 replies = ""
541
542 splitted = []
543 while statusTxt:
544 limit = 140 - len(replies)
545 if len(statusTxt) > limit:
546 end = string.rfind(statusTxt, ' ', 0, limit)
547 else:
548 end = limit
549 splitted.append(" ".join((replies, statusTxt[:end])))
550 statusTxt = statusTxt[end:]
551
552 if options['invert_split']:
553 splitted.reverse()
554 for status in splitted:
555 twitter.statuses.update(status=status)
556
557 class TwitterShell(Action):
558
559 def render_prompt(self, prompt):
560 '''Parses the `prompt` string and returns the rendered version'''
561 prompt = prompt.strip("'").replace("\\'", "'")
562 for colour in ansi.COLOURS_NAMED:
563 if '[%s]' % (colour) in prompt:
564 prompt = prompt.replace(
565 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
566 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
567 return prompt
568
569 def __call__(self, twitter, options):
570 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
571 while True:
572 options['action'] = ""
573 try:
574 args = input(prompt).split()
575 parse_args(args, options)
576 if not options['action']:
577 continue
578 elif options['action'] == 'exit':
579 raise SystemExit(0)
580 elif options['action'] == 'shell':
581 print('Sorry Xzibit does not work here!', file=sys.stderr)
582 continue
583 elif options['action'] == 'help':
584 print('''\ntwitter> `action`\n
585 The Shell Accepts all the command line actions along with:
586
587 exit Leave the twitter shell (^D may also be used)
588
589 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
590 Action()(twitter, options)
591 options['action'] = ''
592 except NoSuchActionError as e:
593 print(e, file=sys.stderr)
594 except KeyboardInterrupt:
595 print('\n[Keyboard Interrupt]', file=sys.stderr)
596 except EOFError:
597 print(file=sys.stderr)
598 leaving = self.ask(subject='Leave')
599 if not leaving:
600 print('Excellent!', file=sys.stderr)
601 else:
602 raise SystemExit(0)
603
604 class PythonPromptAction(Action):
605 def __call__(self, twitter, options):
606 try:
607 while True:
608 smrt_input(globals(), locals())
609 except EOFError:
610 pass
611
612 class HelpAction(Action):
613 def __call__(self, twitter, options):
614 print(__doc__)
615
616 class DoNothingAction(Action):
617 def __call__(self, twitter, options):
618 pass
619
620 class RateLimitStatus(Action):
621 def __call__(self, twitter, options):
622 rate = twitter.application.rate_limit_status()
623 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
624 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
625 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
626
627 actions = {
628 'authorize' : DoNothingAction,
629 'follow' : FollowAction,
630 'friends' : FriendsAction,
631 'list' : ListsAction,
632 'mylist' : MyListsAction,
633 'help' : HelpAction,
634 'leave' : LeaveAction,
635 'pyprompt' : PythonPromptAction,
636 'replies' : RepliesAction,
637 'search' : SearchAction,
638 'set' : SetStatusAction,
639 'shell' : TwitterShell,
640 'rate' : RateLimitStatus,
641 }
642
643 def loadConfig(filename):
644 options = dict(OPTIONS)
645 if os.path.exists(filename):
646 cp = SafeConfigParser()
647 cp.read([filename])
648 for option in ('format', 'prompt'):
649 if cp.has_option('twitter', option):
650 options[option] = cp.get('twitter', option)
651 # process booleans
652 for option in ('invert_split',):
653 if cp.has_option('twitter', option):
654 options[option] = cp.getboolean('twitter', option)
655 return options
656
657 def main(args=sys.argv[1:]):
658 arg_options = {}
659 try:
660 parse_args(args, arg_options)
661 except GetoptError as e:
662 print("I can't do that, %s." % (e), file=sys.stderr)
663 print(file=sys.stderr)
664 raise SystemExit(1)
665
666 config_path = os.path.expanduser(
667 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
668 config_options = loadConfig(config_path)
669
670 # Apply the various options in order, the most important applied last.
671 # Defaults first, then what's read from config file, then command-line
672 # arguments.
673 options = dict(OPTIONS)
674 for d in config_options, arg_options:
675 for k, v in list(d.items()):
676 if v: options[k] = v
677
678 if options['refresh'] and options['action'] not in (
679 'friends', 'replies'):
680 print("You can only refresh the friends or replies actions.", file=sys.stderr)
681 print("Use 'twitter -h' for help.", file=sys.stderr)
682 return 1
683
684 oauth_filename = os.path.expanduser(options['oauth_filename'])
685
686 if (options['action'] == 'authorize'
687 or not os.path.exists(oauth_filename)):
688 oauth_dance(
689 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
690 options['oauth_filename'])
691
692 global ansiFormatter
693 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
694
695 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
696
697 twitter = Twitter(
698 auth=OAuth(
699 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
700 secure=options['secure'],
701 api_version='1.1',
702 domain='api.twitter.com')
703
704 try:
705 Action()(twitter, options)
706 except NoSuchActionError as e:
707 print(e, file=sys.stderr)
708 raise SystemExit(1)
709 except TwitterError as e:
710 print(str(e), file=sys.stderr)
711 print("Use 'twitter -h' for help.", file=sys.stderr)
712 raise SystemExit(1)