]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
472f70cd4a072641a1949e0ce4415e3520213a42
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 pyprompt start a Python prompt for interacting with the twitter
19 object directly
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 try:
68 input = __builtins__.raw_input
69 except (AttributeError, KeyError):
70 pass
71
72
73 CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76 from getopt import gnu_getopt as getopt, GetoptError
77 from getpass import getpass
78 import json
79 import locale
80 import os.path
81 import re
82 import string
83 import sys
84 import time
85
86 try:
87 from ConfigParser import SafeConfigParser
88 except ImportError:
89 from configparser import ConfigParser as SafeConfigParser
90 import datetime
91 try:
92 from urllib.parse import quote
93 except ImportError:
94 from urllib2 import quote
95 try:
96 import HTMLParser
97 except ImportError:
98 import html.parser as HTMLParser
99
100 import webbrowser
101
102 from .api import Twitter, TwitterError
103 from .oauth import OAuth, write_token_file, read_token_file
104 from .oauth_dance import oauth_dance
105 from . import ansi
106 from .util import smrt_input, printNicely, align_text
107
108 OPTIONS = {
109 'action': 'friends',
110 'refresh': False,
111 'refresh_rate': 600,
112 'format': 'default',
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os.environ.get('HOME',
115 os.environ.get('USERPROFILE', ''))
116 + os.sep + '.twitter',
117 'oauth_filename': os.environ.get('HOME',
118 os.environ.get('USERPROFILE', ''))
119 + os.sep + '.twitter_oauth',
120 'length': 20,
121 'timestamp': False,
122 'datestamp': False,
123 'extra_args': [],
124 'secure': True,
125 'invert_split': False,
126 'force-ansi': False,
127 }
128
129 gHtmlParser = HTMLParser.HTMLParser()
130 hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
131 profileRe = re.compile(r'(?P<profile>\@\S+)')
132 ansiFormatter = ansi.AnsiCmd(False)
133
134 def parse_args(args, options):
135 long_opts = ['help', 'format=', 'refresh', 'oauth=',
136 'refresh-rate=', 'config=', 'length=', 'timestamp',
137 'datestamp', 'no-ssl', 'force-ansi']
138 short_opts = "e:p:f:h?rR:c:l:td"
139 opts, extra_args = getopt(args, short_opts, long_opts)
140 if extra_args and hasattr(extra_args[0], 'decode'):
141 extra_args = [arg.decode(locale.getpreferredencoding())
142 for arg in extra_args]
143
144 for opt, arg in opts:
145 if opt in ('-f', '--format'):
146 options['format'] = arg
147 elif opt in ('-r', '--refresh'):
148 options['refresh'] = True
149 elif opt in ('-R', '--refresh-rate'):
150 options['refresh_rate'] = int(arg)
151 elif opt in ('-l', '--length'):
152 options["length"] = int(arg)
153 elif opt in ('-t', '--timestamp'):
154 options["timestamp"] = True
155 elif opt in ('-d', '--datestamp'):
156 options["datestamp"] = True
157 elif opt in ('-?', '-h', '--help'):
158 options['action'] = 'help'
159 elif opt in ('-c', '--config'):
160 options['config_filename'] = arg
161 elif opt == '--no-ssl':
162 options['secure'] = False
163 elif opt == '--oauth':
164 options['oauth_filename'] = arg
165 elif opt == '--force-ansi':
166 options['force-ansi'] = True
167
168 if extra_args and not ('action' in options and options['action'] == 'help'):
169 options['action'] = extra_args[0]
170 options['extra_args'] = extra_args[1:]
171
172 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
173 timestamp = options["timestamp"]
174 datestamp = options["datestamp"]
175 t = time.strptime(status['created_at'], format)
176 i_hate_timezones = time.timezone
177 if time.daylight:
178 i_hate_timezones = time.altzone
179 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
180 seconds=i_hate_timezones)
181 t = dt.timetuple()
182 if timestamp and datestamp:
183 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
184 elif timestamp:
185 return time.strftime("%H:%M:%S ", t)
186 elif datestamp:
187 return time.strftime("%Y-%m-%d ", t)
188 return ""
189
190 def reRepl(m):
191 ansiTypes = {
192 'clear': ansiFormatter.cmdReset(),
193 'hashtag': ansiFormatter.cmdBold(),
194 'profile': ansiFormatter.cmdUnderline(),
195 }
196
197 s = None
198 try:
199 mkey = m.lastgroup
200 if m.group(mkey):
201 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
202 except IndexError:
203 pass
204 return s
205
206 def replaceInStatus(status):
207 txt = gHtmlParser.unescape(status)
208 txt = re.sub(hashtagRe, reRepl, txt)
209 txt = re.sub(profileRe, reRepl, txt)
210 return txt
211 def correctRTStatus(status):
212 if 'retweeted_status' in status:
213 return ("RT @" + status['retweeted_status']['user']['screen_name']
214 + " " + status['retweeted_status']['text'])
215 else:
216 return status['text']
217
218 class StatusFormatter(object):
219 def __call__(self, status, options):
220 return ("%s@%s %s" % (
221 get_time_string(status, options),
222 status['user']['screen_name'],
223 gHtmlParser.unescape(correctRTStatus(status))))
224
225 class AnsiStatusFormatter(object):
226 def __init__(self):
227 self._colourMap = ansi.ColourMap()
228
229 def __call__(self, status, options):
230 colour = self._colourMap.colourFor(status['user']['screen_name'])
231 return ("%s%s% 16s%s %s " % (
232 get_time_string(status, options),
233 ansiFormatter.cmdColour(colour), status['user']['screen_name'],
234 ansiFormatter.cmdReset(),
235 align_text(replaceInStatus(correctRTStatus(status)))))
236
237 class VerboseStatusFormatter(object):
238 def __call__(self, status, options):
239 return ("-- %s (%s) on %s\n%s\n" % (
240 status['user']['screen_name'],
241 status['user']['location'],
242 status['created_at'],
243 gHtmlParser.unescape(correctRTStatus(status))))
244
245 class JSONStatusFormatter(object):
246 def __call__(self, status, options):
247 status['text'] = gHtmlParser.unescape(status['text'])
248 return json.dumps(status)
249
250 class URLStatusFormatter(object):
251 urlmatch = re.compile(r'https?://\S+')
252 def __call__(self, status, options):
253 urls = self.urlmatch.findall(correctRTStatus(status))
254 return '\n'.join(urls) if urls else ""
255
256
257 class ListsFormatter(object):
258 def __call__(self, list):
259 if list['description']:
260 list_str = "%-30s (%s)" % (list['name'], list['description'])
261 else:
262 list_str = "%-30s" % (list['name'])
263 return "%s\n" % list_str
264
265 class ListsVerboseFormatter(object):
266 def __call__(self, list):
267 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (
268 list['name'], list['description'],
269 list['member_count'], list['mode'])
270 return list_str
271
272 class AnsiListsFormatter(object):
273 def __init__(self):
274 self._colourMap = ansi.ColourMap()
275
276 def __call__(self, list):
277 colour = self._colourMap.colourFor(list['name'])
278 return ("%s%-15s%s %s" % (
279 ansiFormatter.cmdColour(colour), list['name'],
280 ansiFormatter.cmdReset(), list['description']))
281
282
283 class AdminFormatter(object):
284 def __call__(self, action, user):
285 user_str = "%s (%s)" % (user['screen_name'], user['name'])
286 if action == "follow":
287 return "You are now following %s.\n" % (user_str)
288 else:
289 return "You are no longer following %s.\n" % (user_str)
290
291 class VerboseAdminFormatter(object):
292 def __call__(self, action, user):
293 return("-- %s: %s (%s): %s" % (
294 "Following" if action == "follow" else "Leaving",
295 user['screen_name'],
296 user['name'],
297 user['url']))
298
299 class SearchFormatter(object):
300 def __call__(self, result, options):
301 return("%s%s %s" % (
302 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
303 result['from_user'], result['text']))
304
305 class VerboseSearchFormatter(SearchFormatter):
306 pass # Default to the regular one
307
308 class URLSearchFormatter(object):
309 urlmatch = re.compile(r'https?://\S+')
310 def __call__(self, result, options):
311 urls = self.urlmatch.findall(result['text'])
312 return '\n'.join(urls) if urls else ""
313
314 class AnsiSearchFormatter(object):
315 def __init__(self):
316 self._colourMap = ansi.ColourMap()
317
318 def __call__(self, result, options):
319 colour = self._colourMap.colourFor(result['from_user'])
320 return ("%s%s%s%s %s" % (
321 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
322 ansiFormatter.cmdColour(colour), result['from_user'],
323 ansiFormatter.cmdReset(), result['text']))
324
325 _term_encoding = None
326 def get_term_encoding():
327 global _term_encoding
328 if not _term_encoding:
329 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
330 if lang[1:]:
331 _term_encoding = lang[1]
332 else:
333 _term_encoding = 'UTF-8'
334 return _term_encoding
335
336 formatters = {}
337 status_formatters = {
338 'default': StatusFormatter,
339 'verbose': VerboseStatusFormatter,
340 'json': JSONStatusFormatter,
341 'urls': URLStatusFormatter,
342 'ansi': AnsiStatusFormatter
343 }
344 formatters['status'] = status_formatters
345
346 admin_formatters = {
347 'default': AdminFormatter,
348 'verbose': VerboseAdminFormatter,
349 'urls': AdminFormatter,
350 'ansi': AdminFormatter
351 }
352 formatters['admin'] = admin_formatters
353
354 search_formatters = {
355 'default': SearchFormatter,
356 'verbose': VerboseSearchFormatter,
357 'urls': URLSearchFormatter,
358 'ansi': AnsiSearchFormatter
359 }
360 formatters['search'] = search_formatters
361
362 lists_formatters = {
363 'default': ListsFormatter,
364 'verbose': ListsVerboseFormatter,
365 'urls': None,
366 'ansi': AnsiListsFormatter
367 }
368 formatters['lists'] = lists_formatters
369
370 def get_formatter(action_type, options):
371 formatters_dict = formatters.get(action_type)
372 if not formatters_dict:
373 raise TwitterError(
374 "There was an error finding a class of formatters for your type (%s)"
375 % (action_type))
376 f = formatters_dict.get(options['format'])
377 if not f:
378 raise TwitterError(
379 "Unknown formatter '%s' for status actions" % (options['format']))
380 return f()
381
382 class Action(object):
383
384 def ask(self, subject='perform this action', careful=False):
385 '''
386 Requests from the user using `raw_input` if `subject` should be
387 performed. When `careful`, the default answer is NO, otherwise YES.
388 Returns the user answer in the form `True` or `False`.
389 '''
390 sample = '(y/N)'
391 if not careful:
392 sample = '(Y/n)'
393
394 prompt = 'You really want to %s %s? ' % (subject, sample)
395 try:
396 answer = input(prompt).lower()
397 if careful:
398 return answer in ('yes', 'y')
399 else:
400 return answer not in ('no', 'n')
401 except EOFError:
402 print(file=sys.stderr) # Put Newline since Enter was never pressed
403 # TODO:
404 # Figure out why on OS X the raw_input keeps raising
405 # EOFError and is never able to reset and get more input
406 # Hint: Look at how IPython implements their console
407 default = True
408 if careful:
409 default = False
410 return default
411
412 def __call__(self, twitter, options):
413 action = actions.get(options['action'], NoSuchAction)()
414 try:
415 doAction = lambda: action(twitter, options)
416 if options['refresh'] and isinstance(action, StatusAction):
417 while True:
418 doAction()
419 sys.stdout.flush()
420 time.sleep(options['refresh_rate'])
421 else:
422 doAction()
423 except KeyboardInterrupt:
424 print('\n[Keyboard Interrupt]', file=sys.stderr)
425 pass
426
427 class NoSuchActionError(Exception):
428 pass
429
430 class NoSuchAction(Action):
431 def __call__(self, twitter, options):
432 raise NoSuchActionError("No such action: %s" % (options['action']))
433
434 class StatusAction(Action):
435 def __call__(self, twitter, options):
436 statuses = self.getStatuses(twitter, options)
437 sf = get_formatter('status', options)
438 if options['format'] == "json":
439 printNicely("[")
440 for status in statuses[:-1]:
441 statusStr = sf(status, options)
442 if statusStr.strip():
443 printNicely(statusStr+",")
444 printNicely(sf(statuses[-1], options)+"]")
445 else:
446 for status in statuses:
447 statusStr = sf(status, options)
448 if statusStr.strip():
449 printNicely(statusStr)
450
451 class SearchAction(Action):
452 def __call__(self, twitter, options):
453 # We need to be pointing at search.twitter.com to work, and it is less
454 # tangly to do it here than in the main()
455 twitter.domain = "search.twitter.com"
456 twitter.uriparts = ()
457 # We need to bypass the TwitterCall parameter encoding, so we
458 # don't encode the plus sign, so we have to encode it ourselves
459 query_string = "+".join(
460 [quote(term)
461 for term in options['extra_args']])
462
463 results = twitter.search(q=query_string)['results']
464 f = get_formatter('search', options)
465 for result in results:
466 resultStr = f(result, options)
467 if resultStr.strip():
468 printNicely(resultStr)
469
470 class AdminAction(Action):
471 def __call__(self, twitter, options):
472 if not (options['extra_args'] and options['extra_args'][0]):
473 raise TwitterError("You need to specify a user (screen name)")
474 af = get_formatter('admin', options)
475 try:
476 user = self.getUser(twitter, options['extra_args'][0])
477 except TwitterError as e:
478 print("There was a problem following or leaving the specified user.")
479 print("You may be trying to follow a user you are already following;")
480 print("Leaving a user you are not currently following;")
481 print("Or the user may not exist.")
482 print("Sorry.")
483 print()
484 print(e)
485 else:
486 printNicely(af(options['action'], user))
487
488 class ListsAction(StatusAction):
489 def getStatuses(self, twitter, options):
490 if not options['extra_args']:
491 raise TwitterError("Please provide a user to query for lists")
492
493 screen_name = options['extra_args'][0]
494
495 if not options['extra_args'][1:]:
496 lists = twitter.lists.list(screen_name=screen_name)
497 if not lists:
498 printNicely("This user has no lists.")
499 for list in lists:
500 lf = get_formatter('lists', options)
501 printNicely(lf(list))
502 return []
503 else:
504 return list(reversed(twitter.lists.statuses(
505 owner_screen_name=screen_name,
506 slug=options['extra_args'][1])))
507
508
509 class MyListsAction(ListsAction):
510 def getStatuses(self, twitter, options):
511 screen_name = twitter.account.verify_credentials()['screen_name']
512 options['extra_args'].insert(0, screen_name)
513 return ListsAction.getStatuses(self, twitter, options)
514
515
516 class FriendsAction(StatusAction):
517 def getStatuses(self, twitter, options):
518 return list(reversed(
519 twitter.statuses.home_timeline(count=options["length"])))
520
521 class RepliesAction(StatusAction):
522 def getStatuses(self, twitter, options):
523 return list(reversed(
524 twitter.statuses.mentions_timeline(count=options["length"])))
525
526 class FollowAction(AdminAction):
527 def getUser(self, twitter, user):
528 return twitter.friendships.create(screen_name=user)
529
530 class LeaveAction(AdminAction):
531 def getUser(self, twitter, user):
532 return twitter.friendships.destroy(screen_name=user)
533
534 class SetStatusAction(Action):
535 def __call__(self, twitter, options):
536 statusTxt = (" ".join(options['extra_args'])
537 if options['extra_args']
538 else str(input("message: ")))
539 replies = []
540 ptr = re.compile("@[\w_]+")
541 while statusTxt:
542 s = ptr.match(statusTxt)
543 if s and s.start() == 0:
544 replies.append(statusTxt[s.start():s.end()])
545 statusTxt = statusTxt[s.end() + 1:]
546 else:
547 break
548 replies = " ".join(replies)
549 if len(replies) >= 140:
550 # just go back
551 statusTxt = replies
552 replies = ""
553
554 splitted = []
555 while statusTxt:
556 limit = 140 - len(replies)
557 if len(statusTxt) > limit:
558 end = str.rfind(statusTxt, ' ', 0, limit)
559 else:
560 end = limit
561 splitted.append(" ".join((replies, statusTxt[:end])))
562 statusTxt = statusTxt[end:]
563
564 if options['invert_split']:
565 splitted.reverse()
566 for status in splitted:
567 twitter.statuses.update(status=status)
568
569 class TwitterShell(Action):
570
571 def render_prompt(self, prompt):
572 '''Parses the `prompt` string and returns the rendered version'''
573 prompt = prompt.strip("'").replace("\\'", "'")
574 for colour in ansi.COLOURS_NAMED:
575 if '[%s]' % (colour) in prompt:
576 prompt = prompt.replace(
577 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
578 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
579 return prompt
580
581 def __call__(self, twitter, options):
582 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
583 while True:
584 options['action'] = ""
585 try:
586 args = input(prompt).split()
587 parse_args(args, options)
588 if not options['action']:
589 continue
590 elif options['action'] == 'exit':
591 raise SystemExit(0)
592 elif options['action'] == 'shell':
593 print('Sorry Xzibit does not work here!', file=sys.stderr)
594 continue
595 elif options['action'] == 'help':
596 print('''\ntwitter> `action`\n
597 The Shell Accepts all the command line actions along with:
598
599 exit Leave the twitter shell (^D may also be used)
600
601 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
602 Action()(twitter, options)
603 options['action'] = ''
604 except NoSuchActionError as e:
605 print(e, file=sys.stderr)
606 except KeyboardInterrupt:
607 print('\n[Keyboard Interrupt]', file=sys.stderr)
608 except EOFError:
609 print(file=sys.stderr)
610 leaving = self.ask(subject='Leave')
611 if not leaving:
612 print('Excellent!', file=sys.stderr)
613 else:
614 raise SystemExit(0)
615
616 class PythonPromptAction(Action):
617 def __call__(self, twitter, options):
618 try:
619 while True:
620 smrt_input(globals(), locals())
621 except EOFError:
622 pass
623
624 class HelpAction(Action):
625 def __call__(self, twitter, options):
626 print(__doc__)
627
628 class DoNothingAction(Action):
629 def __call__(self, twitter, options):
630 pass
631
632 class RateLimitStatus(Action):
633 def __call__(self, twitter, options):
634 rate = twitter.application.rate_limit_status()
635 print("Remaining API requests: %s / %s (hourly limit)" % (
636 rate['remaining_hits'], rate['hourly_limit']))
637 print("Next reset in %ss (%s)" % (
638 int(rate['reset_time_in_seconds'] - time.time()),
639 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
640
641 actions = {
642 'authorize' : DoNothingAction,
643 'follow' : FollowAction,
644 'friends' : FriendsAction,
645 'list' : ListsAction,
646 'mylist' : MyListsAction,
647 'help' : HelpAction,
648 'leave' : LeaveAction,
649 'pyprompt' : PythonPromptAction,
650 'replies' : RepliesAction,
651 'search' : SearchAction,
652 'set' : SetStatusAction,
653 'shell' : TwitterShell,
654 'rate' : RateLimitStatus,
655 }
656
657 def loadConfig(filename):
658 options = dict(OPTIONS)
659 if os.path.exists(filename):
660 cp = SafeConfigParser()
661 cp.read([filename])
662 for option in ('format', 'prompt'):
663 if cp.has_option('twitter', option):
664 options[option] = cp.get('twitter', option)
665 # process booleans
666 for option in ('invert_split',):
667 if cp.has_option('twitter', option):
668 options[option] = cp.getboolean('twitter', option)
669 return options
670
671 def main(args=sys.argv[1:]):
672 arg_options = {}
673 try:
674 parse_args(args, arg_options)
675 except GetoptError as e:
676 print("I can't do that, %s." % (e), file=sys.stderr)
677 print(file=sys.stderr)
678 raise SystemExit(1)
679
680 config_path = os.path.expanduser(
681 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
682 config_options = loadConfig(config_path)
683
684 # Apply the various options in order, the most important applied last.
685 # Defaults first, then what's read from config file, then command-line
686 # arguments.
687 options = dict(OPTIONS)
688 for d in config_options, arg_options:
689 for k, v in list(d.items()):
690 if v: options[k] = v
691
692 if options['refresh'] and options['action'] not in ('friends', 'replies'):
693 print("You can only refresh the friends or replies actions.",
694 file=sys.stderr)
695 print("Use 'twitter -h' for help.", file=sys.stderr)
696 return 1
697
698 oauth_filename = os.path.expanduser(options['oauth_filename'])
699
700 if options['action'] == 'authorize' or not os.path.exists(oauth_filename):
701 oauth_dance(
702 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
703 options['oauth_filename'])
704
705 global ansiFormatter
706 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
707
708 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
709
710 twitter = Twitter(
711 auth=OAuth(
712 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
713 secure=options['secure'],
714 api_version='1.1',
715 domain='api.twitter.com')
716
717 try:
718 Action()(twitter, options)
719 except NoSuchActionError as e:
720 print(e, file=sys.stderr)
721 raise SystemExit(1)
722 except TwitterError as e:
723 print(str(e), file=sys.stderr)
724 print("Use 'twitter -h' for help.", file=sys.stderr)
725 raise SystemExit(1)