]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
54e0531fbcb63548a58501a099a38154c1dc8ed1
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 pyprompt start a Python prompt for interacting with the twitter
19 object directly
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 try:
68 input = __builtins__.raw_input
69 except (AttributeError, KeyError):
70 pass
71
72
73 CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76 from getopt import gnu_getopt as getopt, GetoptError
77 from getpass import getpass
78 import json
79 import locale
80 import os.path
81 import re
82 import string
83 import sys
84 import time
85
86 try:
87 from ConfigParser import SafeConfigParser
88 except ImportError:
89 from configparser import ConfigParser as SafeConfigParser
90 import datetime
91 try:
92 from urllib.parse import quote
93 except ImportError:
94 from urllib2 import quote
95 try:
96 import HTMLParser
97 except ImportError:
98 import html.parser as HTMLParser
99
100 import webbrowser
101
102 from .api import Twitter, TwitterError
103 from .oauth import OAuth, write_token_file, read_token_file
104 from .oauth_dance import oauth_dance
105 from . import ansi
106 from .util import smrt_input, printNicely, align_text
107
108 OPTIONS = {
109 'action': 'friends',
110 'refresh': False,
111 'refresh_rate': 600,
112 'format': 'default',
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
115 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
116 'length': 20,
117 'timestamp': False,
118 'datestamp': False,
119 'extra_args': [],
120 'secure': True,
121 'invert_split': False,
122 'force-ansi': False,
123 }
124
125 gHtmlParser = HTMLParser.HTMLParser()
126 hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
127 profileRe = re.compile(r'(?P<profile>\@\S+)')
128 ansiFormatter = ansi.AnsiCmd(False)
129
130 def parse_args(args, options):
131 long_opts = ['help', 'format=', 'refresh', 'oauth=',
132 'refresh-rate=', 'config=', 'length=', 'timestamp',
133 'datestamp', 'no-ssl', 'force-ansi']
134 short_opts = "e:p:f:h?rR:c:l:td"
135 opts, extra_args = getopt(args, short_opts, long_opts)
136 if extra_args and hasattr(extra_args[0], 'decode'):
137 extra_args = [arg.decode(locale.getpreferredencoding())
138 for arg in extra_args]
139
140 for opt, arg in opts:
141 if opt in ('-f', '--format'):
142 options['format'] = arg
143 elif opt in ('-r', '--refresh'):
144 options['refresh'] = True
145 elif opt in ('-R', '--refresh-rate'):
146 options['refresh_rate'] = int(arg)
147 elif opt in ('-l', '--length'):
148 options["length"] = int(arg)
149 elif opt in ('-t', '--timestamp'):
150 options["timestamp"] = True
151 elif opt in ('-d', '--datestamp'):
152 options["datestamp"] = True
153 elif opt in ('-?', '-h', '--help'):
154 options['action'] = 'help'
155 elif opt in ('-c', '--config'):
156 options['config_filename'] = arg
157 elif opt == '--no-ssl':
158 options['secure'] = False
159 elif opt == '--oauth':
160 options['oauth_filename'] = arg
161 elif opt == '--force-ansi':
162 options['force-ansi'] = True
163
164 if extra_args and not ('action' in options and options['action'] == 'help'):
165 options['action'] = extra_args[0]
166 options['extra_args'] = extra_args[1:]
167
168 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
169 timestamp = options["timestamp"]
170 datestamp = options["datestamp"]
171 t = time.strptime(status['created_at'], format)
172 i_hate_timezones = time.timezone
173 if (time.daylight):
174 i_hate_timezones = time.altzone
175 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
176 seconds=i_hate_timezones)
177 t = dt.timetuple()
178 if timestamp and datestamp:
179 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
180 elif timestamp:
181 return time.strftime("%H:%M:%S ", t)
182 elif datestamp:
183 return time.strftime("%Y-%m-%d ", t)
184 return ""
185
186 def reRepl(m):
187 ansiTypes = {
188 'clear': ansiFormatter.cmdReset(),
189 'hashtag': ansiFormatter.cmdBold(),
190 'profile': ansiFormatter.cmdUnderline(),
191 }
192
193 s = None
194 try:
195 mkey = m.lastgroup
196 if m.group(mkey):
197 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
198 except IndexError:
199 pass
200 return s
201
202 def replaceInStatus(status):
203 txt = gHtmlParser.unescape(status)
204 txt = re.sub(hashtagRe, reRepl, txt)
205 txt = re.sub(profileRe, reRepl, txt)
206 return txt
207
208 class StatusFormatter(object):
209 def __call__(self, status, options):
210 return ("%s%s %s" % (
211 get_time_string(status, options),
212 status['user']['screen_name'], gHtmlParser.unescape(status['text'])))
213
214 class AnsiStatusFormatter(object):
215 def __init__(self):
216 self._colourMap = ansi.ColourMap()
217
218 def __call__(self, status, options):
219 colour = self._colourMap.colourFor(status['user']['screen_name'])
220 return ("%s%s% 16s%s %s " % (
221 get_time_string(status, options),
222 ansiFormatter.cmdColour(colour), status['user']['screen_name'],
223 ansiFormatter.cmdReset(), align_text(replaceInStatus(status['text']))))
224
225 class VerboseStatusFormatter(object):
226 def __call__(self, status, options):
227 return ("-- %s (%s) on %s\n%s\n" % (
228 status['user']['screen_name'],
229 status['user']['location'],
230 status['created_at'],
231 gHtmlParser.unescape(status['text'])))
232
233 class JSONStatusFormatter(object):
234 def __call__(self, status, options):
235 status['text'] = gHtmlParser.unescape(status['text'])
236 return json.dumps(status)
237
238 class URLStatusFormatter(object):
239 urlmatch = re.compile(r'https?://\S+')
240 def __call__(self, status, options):
241 urls = self.urlmatch.findall(status['text'])
242 return '\n'.join(urls) if urls else ""
243
244
245 class ListsFormatter(object):
246 def __call__(self, list):
247 if list['description']:
248 list_str = "%-30s (%s)" % (list['name'], list['description'])
249 else:
250 list_str = "%-30s" % (list['name'])
251 return "%s\n" % list_str
252
253 class ListsVerboseFormatter(object):
254 def __call__(self, list):
255 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
256 return list_str
257
258 class AnsiListsFormatter(object):
259 def __init__(self):
260 self._colourMap = ansi.ColourMap()
261
262 def __call__(self, list):
263 colour = self._colourMap.colourFor(list['name'])
264 return ("%s%-15s%s %s" % (
265 ansiFormatter.cmdColour(colour), list['name'],
266 ansiFormatter.cmdReset(), list['description']))
267
268
269 class AdminFormatter(object):
270 def __call__(self, action, user):
271 user_str = "%s (%s)" % (user['screen_name'], user['name'])
272 if action == "follow":
273 return "You are now following %s.\n" % (user_str)
274 else:
275 return "You are no longer following %s.\n" % (user_str)
276
277 class VerboseAdminFormatter(object):
278 def __call__(self, action, user):
279 return("-- %s: %s (%s): %s" % (
280 "Following" if action == "follow" else "Leaving",
281 user['screen_name'],
282 user['name'],
283 user['url']))
284
285 class SearchFormatter(object):
286 def __call__(self, result, options):
287 return("%s%s %s" % (
288 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
289 result['from_user'], result['text']))
290
291 class VerboseSearchFormatter(SearchFormatter):
292 pass # Default to the regular one
293
294 class URLSearchFormatter(object):
295 urlmatch = re.compile(r'https?://\S+')
296 def __call__(self, result, options):
297 urls = self.urlmatch.findall(result['text'])
298 return '\n'.join(urls) if urls else ""
299
300 class AnsiSearchFormatter(object):
301 def __init__(self):
302 self._colourMap = ansi.ColourMap()
303
304 def __call__(self, result, options):
305 colour = self._colourMap.colourFor(result['from_user'])
306 return ("%s%s%s%s %s" % (
307 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
308 ansiFormatter.cmdColour(colour), result['from_user'],
309 ansiFormatter.cmdReset(), result['text']))
310
311 _term_encoding = None
312 def get_term_encoding():
313 global _term_encoding
314 if not _term_encoding:
315 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
316 if lang[1:]:
317 _term_encoding = lang[1]
318 else:
319 _term_encoding = 'UTF-8'
320 return _term_encoding
321
322 formatters = {}
323 status_formatters = {
324 'default': StatusFormatter,
325 'verbose': VerboseStatusFormatter,
326 'json': JSONStatusFormatter,
327 'urls': URLStatusFormatter,
328 'ansi': AnsiStatusFormatter
329 }
330 formatters['status'] = status_formatters
331
332 admin_formatters = {
333 'default': AdminFormatter,
334 'verbose': VerboseAdminFormatter,
335 'urls': AdminFormatter,
336 'ansi': AdminFormatter
337 }
338 formatters['admin'] = admin_formatters
339
340 search_formatters = {
341 'default': SearchFormatter,
342 'verbose': VerboseSearchFormatter,
343 'urls': URLSearchFormatter,
344 'ansi': AnsiSearchFormatter
345 }
346 formatters['search'] = search_formatters
347
348 lists_formatters = {
349 'default': ListsFormatter,
350 'verbose': ListsVerboseFormatter,
351 'urls': None,
352 'ansi': AnsiListsFormatter
353 }
354 formatters['lists'] = lists_formatters
355
356 def get_formatter(action_type, options):
357 formatters_dict = formatters.get(action_type)
358 if (not formatters_dict):
359 raise TwitterError(
360 "There was an error finding a class of formatters for your type (%s)"
361 % (action_type))
362 f = formatters_dict.get(options['format'])
363 if (not f):
364 raise TwitterError(
365 "Unknown formatter '%s' for status actions" % (options['format']))
366 return f()
367
368 class Action(object):
369
370 def ask(self, subject='perform this action', careful=False):
371 '''
372 Requests from the user using `raw_input` if `subject` should be
373 performed. When `careful`, the default answer is NO, otherwise YES.
374 Returns the user answer in the form `True` or `False`.
375 '''
376 sample = '(y/N)'
377 if not careful:
378 sample = '(Y/n)'
379
380 prompt = 'You really want to %s %s? ' % (subject, sample)
381 try:
382 answer = input(prompt).lower()
383 if careful:
384 return answer in ('yes', 'y')
385 else:
386 return answer not in ('no', 'n')
387 except EOFError:
388 print(file=sys.stderr) # Put Newline since Enter was never pressed
389 # TODO:
390 # Figure out why on OS X the raw_input keeps raising
391 # EOFError and is never able to reset and get more input
392 # Hint: Look at how IPython implements their console
393 default = True
394 if careful:
395 default = False
396 return default
397
398 def __call__(self, twitter, options):
399 action = actions.get(options['action'], NoSuchAction)()
400 try:
401 doAction = lambda : action(twitter, options)
402 if (options['refresh'] and isinstance(action, StatusAction)):
403 while True:
404 doAction()
405 sys.stdout.flush()
406 time.sleep(options['refresh_rate'])
407 else:
408 doAction()
409 except KeyboardInterrupt:
410 print('\n[Keyboard Interrupt]', file=sys.stderr)
411 pass
412
413 class NoSuchActionError(Exception):
414 pass
415
416 class NoSuchAction(Action):
417 def __call__(self, twitter, options):
418 raise NoSuchActionError("No such action: %s" % (options['action']))
419
420 class StatusAction(Action):
421 def __call__(self, twitter, options):
422 statuses = self.getStatuses(twitter, options)
423 sf = get_formatter('status', options)
424 for status in statuses:
425 statusStr = sf(status, options)
426 if statusStr.strip():
427 printNicely(statusStr)
428
429 class SearchAction(Action):
430 def __call__(self, twitter, options):
431 # We need to be pointing at search.twitter.com to work, and it is less
432 # tangly to do it here than in the main()
433 twitter.domain = "search.twitter.com"
434 twitter.uriparts = ()
435 # We need to bypass the TwitterCall parameter encoding, so we
436 # don't encode the plus sign, so we have to encode it ourselves
437 query_string = "+".join(
438 [quote(term)
439 for term in options['extra_args']])
440
441 results = twitter.search(q=query_string)['results']
442 f = get_formatter('search', options)
443 for result in results:
444 resultStr = f(result, options)
445 if resultStr.strip():
446 printNicely(resultStr)
447
448 class AdminAction(Action):
449 def __call__(self, twitter, options):
450 if not (options['extra_args'] and options['extra_args'][0]):
451 raise TwitterError("You need to specify a user (screen name)")
452 af = get_formatter('admin', options)
453 try:
454 user = self.getUser(twitter, options['extra_args'][0])
455 except TwitterError as e:
456 print("There was a problem following or leaving the specified user.")
457 print("You may be trying to follow a user you are already following;")
458 print("Leaving a user you are not currently following;")
459 print("Or the user may not exist.")
460 print("Sorry.")
461 print()
462 print(e)
463 else:
464 printNicely(af(options['action'], user))
465
466 class ListsAction(StatusAction):
467 def getStatuses(self, twitter, options):
468 if not options['extra_args']:
469 raise TwitterError("Please provide a user to query for lists")
470
471 screen_name = options['extra_args'][0]
472
473 if not options['extra_args'][1:]:
474 lists = twitter.lists.list(screen_name=screen_name)
475 if not lists:
476 printNicely("This user has no lists.")
477 for list in lists:
478 lf = get_formatter('lists', options)
479 printNicely(lf(list))
480 return []
481 else:
482 return reversed(twitter.lists.statuses(
483 owner_screen_name=screen_name, slug=options['extra_args'][1]))
484
485
486 class MyListsAction(ListsAction):
487 def getStatuses(self, twitter, options):
488 screen_name = twitter.account.verify_credentials()['screen_name']
489 options['extra_args'].insert(0, screen_name)
490 return ListsAction.getStatuses(self, twitter, options)
491
492
493 class FriendsAction(StatusAction):
494 def getStatuses(self, twitter, options):
495 return reversed(twitter.statuses.home_timeline(count=options["length"]))
496
497 class RepliesAction(StatusAction):
498 def getStatuses(self, twitter, options):
499 return reversed(twitter.statuses.mentions_timeline(count=options["length"]))
500
501 class FollowAction(AdminAction):
502 def getUser(self, twitter, user):
503 return twitter.friendships.create(screen_name=user)
504
505 class LeaveAction(AdminAction):
506 def getUser(self, twitter, user):
507 return twitter.friendships.destroy(screen_name=user)
508
509 class SetStatusAction(Action):
510 def __call__(self, twitter, options):
511 statusTxt = (" ".join(options['extra_args'])
512 if options['extra_args']
513 else str(input("message: ")))
514 replies = []
515 ptr = re.compile("@[\w_]+")
516 while statusTxt:
517 s = ptr.match(statusTxt)
518 if s and s.start() == 0:
519 replies.append(statusTxt[s.start():s.end()])
520 statusTxt = statusTxt[s.end() + 1:]
521 else:
522 break
523 replies = " ".join(replies)
524 if len(replies) >= 140:
525 # just go back
526 statusTxt = replies
527 replies = ""
528
529 splitted = []
530 while statusTxt:
531 limit = 140 - len(replies)
532 if len(statusTxt) > limit:
533 end = string.rfind(statusTxt, ' ', 0, limit)
534 else:
535 end = limit
536 splitted.append(" ".join((replies, statusTxt[:end])))
537 statusTxt = statusTxt[end:]
538
539 if options['invert_split']:
540 splitted.reverse()
541 for status in splitted:
542 twitter.statuses.update(status=status)
543
544 class TwitterShell(Action):
545
546 def render_prompt(self, prompt):
547 '''Parses the `prompt` string and returns the rendered version'''
548 prompt = prompt.strip("'").replace("\\'", "'")
549 for colour in ansi.COLOURS_NAMED:
550 if '[%s]' % (colour) in prompt:
551 prompt = prompt.replace(
552 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
553 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
554 return prompt
555
556 def __call__(self, twitter, options):
557 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
558 while True:
559 options['action'] = ""
560 try:
561 args = input(prompt).split()
562 parse_args(args, options)
563 if not options['action']:
564 continue
565 elif options['action'] == 'exit':
566 raise SystemExit(0)
567 elif options['action'] == 'shell':
568 print('Sorry Xzibit does not work here!', file=sys.stderr)
569 continue
570 elif options['action'] == 'help':
571 print('''\ntwitter> `action`\n
572 The Shell Accepts all the command line actions along with:
573
574 exit Leave the twitter shell (^D may also be used)
575
576 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
577 Action()(twitter, options)
578 options['action'] = ''
579 except NoSuchActionError as e:
580 print(e, file=sys.stderr)
581 except KeyboardInterrupt:
582 print('\n[Keyboard Interrupt]', file=sys.stderr)
583 except EOFError:
584 print(file=sys.stderr)
585 leaving = self.ask(subject='Leave')
586 if not leaving:
587 print('Excellent!', file=sys.stderr)
588 else:
589 raise SystemExit(0)
590
591 class PythonPromptAction(Action):
592 def __call__(self, twitter, options):
593 try:
594 while True:
595 smrt_input(globals(), locals())
596 except EOFError:
597 pass
598
599 class HelpAction(Action):
600 def __call__(self, twitter, options):
601 print(__doc__)
602
603 class DoNothingAction(Action):
604 def __call__(self, twitter, options):
605 pass
606
607 class RateLimitStatus(Action):
608 def __call__(self, twitter, options):
609 rate = twitter.application.rate_limit_status()
610 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
611 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
612 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
613
614 actions = {
615 'authorize' : DoNothingAction,
616 'follow' : FollowAction,
617 'friends' : FriendsAction,
618 'list' : ListsAction,
619 'mylist' : MyListsAction,
620 'help' : HelpAction,
621 'leave' : LeaveAction,
622 'pyprompt' : PythonPromptAction,
623 'replies' : RepliesAction,
624 'search' : SearchAction,
625 'set' : SetStatusAction,
626 'shell' : TwitterShell,
627 'rate' : RateLimitStatus,
628 }
629
630 def loadConfig(filename):
631 options = dict(OPTIONS)
632 if os.path.exists(filename):
633 cp = SafeConfigParser()
634 cp.read([filename])
635 for option in ('format', 'prompt'):
636 if cp.has_option('twitter', option):
637 options[option] = cp.get('twitter', option)
638 # process booleans
639 for option in ('invert_split',):
640 if cp.has_option('twitter', option):
641 options[option] = cp.getboolean('twitter', option)
642 return options
643
644 def main(args=sys.argv[1:]):
645 arg_options = {}
646 try:
647 parse_args(args, arg_options)
648 except GetoptError as e:
649 print("I can't do that, %s." % (e), file=sys.stderr)
650 print(file=sys.stderr)
651 raise SystemExit(1)
652
653 config_path = os.path.expanduser(
654 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
655 config_options = loadConfig(config_path)
656
657 # Apply the various options in order, the most important applied last.
658 # Defaults first, then what's read from config file, then command-line
659 # arguments.
660 options = dict(OPTIONS)
661 for d in config_options, arg_options:
662 for k, v in list(d.items()):
663 if v: options[k] = v
664
665 if options['refresh'] and options['action'] not in (
666 'friends', 'replies'):
667 print("You can only refresh the friends or replies actions.", file=sys.stderr)
668 print("Use 'twitter -h' for help.", file=sys.stderr)
669 return 1
670
671 oauth_filename = os.path.expanduser(options['oauth_filename'])
672
673 if (options['action'] == 'authorize'
674 or not os.path.exists(oauth_filename)):
675 oauth_dance(
676 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
677 options['oauth_filename'])
678
679 global ansiFormatter
680 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
681
682 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
683
684 twitter = Twitter(
685 auth=OAuth(
686 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
687 secure=options['secure'],
688 api_version='1.1',
689 domain='api.twitter.com')
690
691 try:
692 Action()(twitter, options)
693 except NoSuchActionError as e:
694 print(e, file=sys.stderr)
695 raise SystemExit(1)
696 except TwitterError as e:
697 print(str(e), file=sys.stderr)
698 print("Use 'twitter -h' for help.", file=sys.stderr)
699 raise SystemExit(1)