]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
fd062f098640325f8c4260afce3f4ebc3169405c
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
26
27
28 OPTIONS:
29
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
42
43 FORMATS for the --format option
44
45 default one line per status
46 verbose multiple lines per status, more verbose status info
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 try:
68 input = __builtins__['raw_input']
69 except (AttributeError, KeyError):
70 pass
71
72
73 CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76 import sys
77 import time
78 from getopt import gnu_getopt as getopt, GetoptError
79 from getpass import getpass
80 import re
81 import os.path
82 import locale
83 import string
84
85 try:
86 from ConfigParser import SafeConfigParser
87 except ImportError:
88 from configparser import ConfigParser as SafeConfigParser
89 import datetime
90 try:
91 from urllib.parse import quote
92 except ImportError:
93 from urllib2 import quote
94 try:
95 import HTMLParser
96 except ImportError:
97 import html.parser as HTMLParser
98
99 import webbrowser
100
101 from .api import Twitter, TwitterError
102 from .oauth import OAuth, write_token_file, read_token_file
103 from .oauth_dance import oauth_dance
104 from . import ansi
105 from .util import smrt_input, printNicely
106
107 OPTIONS = {
108 'action': 'friends',
109 'refresh': False,
110 'refresh_rate': 600,
111 'format': 'default',
112 'prompt': '[cyan]twitter[R]> ',
113 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
114 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
115 'length': 20,
116 'timestamp': False,
117 'datestamp': False,
118 'extra_args': [],
119 'secure': True,
120 'invert_split': False,
121 'force-ansi': False,
122 }
123
124 gHtmlParser = HTMLParser.HTMLParser()
125 hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
126 profileRe = re.compile(r'(?P<profile>\@\S+)')
127 ansiFormatter = ansi.AnsiCmd(False)
128
129 def parse_args(args, options):
130 long_opts = ['help', 'format=', 'refresh', 'oauth=',
131 'refresh-rate=', 'config=', 'length=', 'timestamp',
132 'datestamp', 'no-ssl', 'force-ansi']
133 short_opts = "e:p:f:h?rR:c:l:td"
134 opts, extra_args = getopt(args, short_opts, long_opts)
135 if extra_args and hasattr(extra_args[0], 'decode'):
136 extra_args = [arg.decode(locale.getpreferredencoding())
137 for arg in extra_args]
138
139 for opt, arg in opts:
140 if opt in ('-f', '--format'):
141 options['format'] = arg
142 elif opt in ('-r', '--refresh'):
143 options['refresh'] = True
144 elif opt in ('-R', '--refresh-rate'):
145 options['refresh_rate'] = int(arg)
146 elif opt in ('-l', '--length'):
147 options["length"] = int(arg)
148 elif opt in ('-t', '--timestamp'):
149 options["timestamp"] = True
150 elif opt in ('-d', '--datestamp'):
151 options["datestamp"] = True
152 elif opt in ('-?', '-h', '--help'):
153 options['action'] = 'help'
154 elif opt in ('-c', '--config'):
155 options['config_filename'] = arg
156 elif opt == '--no-ssl':
157 options['secure'] = False
158 elif opt == '--oauth':
159 options['oauth_filename'] = arg
160 elif opt == '--force-ansi':
161 options['force-ansi'] = True
162
163 if extra_args and not ('action' in options and options['action'] == 'help'):
164 options['action'] = extra_args[0]
165 options['extra_args'] = extra_args[1:]
166
167 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
168 timestamp = options["timestamp"]
169 datestamp = options["datestamp"]
170 t = time.strptime(status['created_at'], format)
171 i_hate_timezones = time.timezone
172 if (time.daylight):
173 i_hate_timezones = time.altzone
174 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
175 seconds=i_hate_timezones)
176 t = dt.timetuple()
177 if timestamp and datestamp:
178 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
179 elif timestamp:
180 return time.strftime("%H:%M:%S ", t)
181 elif datestamp:
182 return time.strftime("%Y-%m-%d ", t)
183 return ""
184
185 def reRepl(m):
186 ansiTypes = {
187 'clear': ansiFormatter.cmdReset(),
188 'hashtag': ansiFormatter.cmdBold(),
189 'profile': ansiFormatter.cmdUnderline(),
190 }
191
192 s = None
193 try:
194 mkey = m.lastgroup
195 if m.group(mkey):
196 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
197 except IndexError:
198 pass
199 return s
200
201 def replaceInStatus(status):
202 txt = gHtmlParser.unescape(status)
203 txt = re.sub(hashtagRe, reRepl, txt)
204 txt = re.sub(profileRe, reRepl, txt)
205 return txt
206
207 class StatusFormatter(object):
208 def __call__(self, status, options):
209 return ("%s%s %s" % (
210 get_time_string(status, options),
211 status['user']['screen_name'], gHtmlParser.unescape(status['text'])))
212
213 class AnsiStatusFormatter(object):
214 def __init__(self):
215 self._colourMap = ansi.ColourMap()
216
217 def __call__(self, status, options):
218 colour = self._colourMap.colourFor(status['user']['screen_name'])
219 return ("%s%s%s%s %s" % (
220 get_time_string(status, options),
221 ansiFormatter.cmdColour(colour), status['user']['screen_name'],
222 ansiFormatter.cmdReset(), replaceInStatus(status['text'])))
223
224 class VerboseStatusFormatter(object):
225 def __call__(self, status, options):
226 return ("-- %s (%s) on %s\n%s\n" % (
227 status['user']['screen_name'],
228 status['user']['location'],
229 status['created_at'],
230 gHtmlParser.unescape(status['text'])))
231
232 class URLStatusFormatter(object):
233 urlmatch = re.compile(r'https?://\S+')
234 def __call__(self, status, options):
235 urls = self.urlmatch.findall(status['text'])
236 return '\n'.join(urls) if urls else ""
237
238
239 class ListsFormatter(object):
240 def __call__(self, list):
241 if list['description']:
242 list_str = "%-30s (%s)" % (list['name'], list['description'])
243 else:
244 list_str = "%-30s" % (list['name'])
245 return "%s\n" % list_str
246
247 class ListsVerboseFormatter(object):
248 def __call__(self, list):
249 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
250 return list_str
251
252 class AnsiListsFormatter(object):
253 def __init__(self):
254 self._colourMap = ansi.ColourMap()
255
256 def __call__(self, list):
257 colour = self._colourMap.colourFor(list['name'])
258 return ("%s%-15s%s %s" % (
259 ansiFormatter.cmdColour(colour), list['name'],
260 ansiFormatter.cmdReset(), list['description']))
261
262
263 class AdminFormatter(object):
264 def __call__(self, action, user):
265 user_str = "%s (%s)" % (user['screen_name'], user['name'])
266 if action == "follow":
267 return "You are now following %s.\n" % (user_str)
268 else:
269 return "You are no longer following %s.\n" % (user_str)
270
271 class VerboseAdminFormatter(object):
272 def __call__(self, action, user):
273 return("-- %s: %s (%s): %s" % (
274 "Following" if action == "follow" else "Leaving",
275 user['screen_name'],
276 user['name'],
277 user['url']))
278
279 class SearchFormatter(object):
280 def __call__(self, result, options):
281 return("%s%s %s" % (
282 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
283 result['from_user'], result['text']))
284
285 class VerboseSearchFormatter(SearchFormatter):
286 pass # Default to the regular one
287
288 class URLSearchFormatter(object):
289 urlmatch = re.compile(r'https?://\S+')
290 def __call__(self, result, options):
291 urls = self.urlmatch.findall(result['text'])
292 return '\n'.join(urls) if urls else ""
293
294 class AnsiSearchFormatter(object):
295 def __init__(self):
296 self._colourMap = ansi.ColourMap()
297
298 def __call__(self, result, options):
299 colour = self._colourMap.colourFor(result['from_user'])
300 return ("%s%s%s%s %s" % (
301 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
302 ansiFormatter.cmdColour(colour), result['from_user'],
303 ansiFormatter.cmdReset(), result['text']))
304
305 _term_encoding = None
306 def get_term_encoding():
307 global _term_encoding
308 if not _term_encoding:
309 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
310 if lang[1:]:
311 _term_encoding = lang[1]
312 else:
313 _term_encoding = 'UTF-8'
314 return _term_encoding
315
316 formatters = {}
317 status_formatters = {
318 'default': StatusFormatter,
319 'verbose': VerboseStatusFormatter,
320 'urls': URLStatusFormatter,
321 'ansi': AnsiStatusFormatter
322 }
323 formatters['status'] = status_formatters
324
325 admin_formatters = {
326 'default': AdminFormatter,
327 'verbose': VerboseAdminFormatter,
328 'urls': AdminFormatter,
329 'ansi': AdminFormatter
330 }
331 formatters['admin'] = admin_formatters
332
333 search_formatters = {
334 'default': SearchFormatter,
335 'verbose': VerboseSearchFormatter,
336 'urls': URLSearchFormatter,
337 'ansi': AnsiSearchFormatter
338 }
339 formatters['search'] = search_formatters
340
341 lists_formatters = {
342 'default': ListsFormatter,
343 'verbose': ListsVerboseFormatter,
344 'urls': None,
345 'ansi': AnsiListsFormatter
346 }
347 formatters['lists'] = lists_formatters
348
349 def get_formatter(action_type, options):
350 formatters_dict = formatters.get(action_type)
351 if (not formatters_dict):
352 raise TwitterError(
353 "There was an error finding a class of formatters for your type (%s)"
354 % (action_type))
355 f = formatters_dict.get(options['format'])
356 if (not f):
357 raise TwitterError(
358 "Unknown formatter '%s' for status actions" % (options['format']))
359 return f()
360
361 class Action(object):
362
363 def ask(self, subject='perform this action', careful=False):
364 '''
365 Requests fromt he user using `raw_input` if `subject` should be
366 performed. When `careful`, the default answer is NO, otherwise YES.
367 Returns the user answer in the form `True` or `False`.
368 '''
369 sample = '(y/N)'
370 if not careful:
371 sample = '(Y/n)'
372
373 prompt = 'You really want to %s %s? ' % (subject, sample)
374 try:
375 answer = input(prompt).lower()
376 if careful:
377 return answer in ('yes', 'y')
378 else:
379 return answer not in ('no', 'n')
380 except EOFError:
381 print(file=sys.stderr) # Put Newline since Enter was never pressed
382 # TODO:
383 # Figure out why on OS X the raw_input keeps raising
384 # EOFError and is never able to reset and get more input
385 # Hint: Look at how IPython implements their console
386 default = True
387 if careful:
388 default = False
389 return default
390
391 def __call__(self, twitter, options):
392 action = actions.get(options['action'], NoSuchAction)()
393 try:
394 doAction = lambda : action(twitter, options)
395 if (options['refresh'] and isinstance(action, StatusAction)):
396 while True:
397 doAction()
398 sys.stdout.flush()
399 time.sleep(options['refresh_rate'])
400 else:
401 doAction()
402 except KeyboardInterrupt:
403 print('\n[Keyboard Interrupt]', file=sys.stderr)
404 pass
405
406 class NoSuchActionError(Exception):
407 pass
408
409 class NoSuchAction(Action):
410 def __call__(self, twitter, options):
411 raise NoSuchActionError("No such action: %s" % (options['action']))
412
413 class StatusAction(Action):
414 def __call__(self, twitter, options):
415 statuses = self.getStatuses(twitter, options)
416 sf = get_formatter('status', options)
417 for status in statuses:
418 statusStr = sf(status, options)
419 if statusStr.strip():
420 printNicely(statusStr)
421
422 class SearchAction(Action):
423 def __call__(self, twitter, options):
424 # We need to be pointing at search.twitter.com to work, and it is less
425 # tangly to do it here than in the main()
426 twitter.domain = "search.twitter.com"
427 twitter.uriparts = ()
428 # We need to bypass the TwitterCall parameter encoding, so we
429 # don't encode the plus sign, so we have to encode it ourselves
430 query_string = "+".join(
431 [quote(term)
432 for term in options['extra_args']])
433
434 results = twitter.search(q=query_string)['results']
435 f = get_formatter('search', options)
436 for result in results:
437 resultStr = f(result, options)
438 if resultStr.strip():
439 printNicely(resultStr)
440
441 class AdminAction(Action):
442 def __call__(self, twitter, options):
443 if not (options['extra_args'] and options['extra_args'][0]):
444 raise TwitterError("You need to specify a user (screen name)")
445 af = get_formatter('admin', options)
446 try:
447 user = self.getUser(twitter, options['extra_args'][0])
448 except TwitterError as e:
449 print("There was a problem following or leaving the specified user.")
450 print("You may be trying to follow a user you are already following;")
451 print("Leaving a user you are not currently following;")
452 print("Or the user may not exist.")
453 print("Sorry.")
454 print()
455 print(e)
456 else:
457 printNicely(af(options['action'], user))
458
459 class ListsAction(StatusAction):
460 def getStatuses(self, twitter, options):
461 if not options['extra_args']:
462 raise TwitterError("Please provide a user to query for lists")
463
464 screen_name = options['extra_args'][0]
465
466 if not options['extra_args'][1:]:
467 lists = twitter.user.lists(user=screen_name)['lists']
468 if not lists:
469 printNicely("This user has no lists.")
470 for list in lists:
471 lf = get_formatter('lists', options)
472 printNicely(lf(list))
473 return []
474 else:
475 return reversed(twitter.user.lists.list.statuses(
476 user=screen_name, list=options['extra_args'][1]))
477
478
479 class MyListsAction(ListsAction):
480 def getStatuses(self, twitter, options):
481 screen_name = twitter.account.verify_credentials()['screen_name']
482 options['extra_args'].insert(0, screen_name)
483 return ListsAction.getStatuses(self, twitter, options)
484
485
486 class FriendsAction(StatusAction):
487 def getStatuses(self, twitter, options):
488 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
489
490 class PublicAction(StatusAction):
491 def getStatuses(self, twitter, options):
492 return reversed(twitter.statuses.public_timeline(count=options["length"]))
493
494 class RepliesAction(StatusAction):
495 def getStatuses(self, twitter, options):
496 return reversed(twitter.statuses.replies(count=options["length"]))
497
498 class FollowAction(AdminAction):
499 def getUser(self, twitter, user):
500 return twitter.friendships.create(id=user)
501
502 class LeaveAction(AdminAction):
503 def getUser(self, twitter, user):
504 return twitter.friendships.destroy(id=user)
505
506 class SetStatusAction(Action):
507 def __call__(self, twitter, options):
508 statusTxt = (" ".join(options['extra_args'])
509 if options['extra_args']
510 else str(input("message: ")))
511 replies = []
512 ptr = re.compile("@[\w_]+")
513 while statusTxt:
514 s = ptr.match(statusTxt)
515 if s and s.start() == 0:
516 replies.append(statusTxt[s.start():s.end()])
517 statusTxt = statusTxt[s.end() + 1:]
518 else:
519 break
520 replies = " ".join(replies)
521 if len(replies) >= 140:
522 # just go back
523 statusTxt = replies
524 replies = ""
525
526 splitted = []
527 while statusTxt:
528 limit = 140 - len(replies)
529 if len(statusTxt) > limit:
530 end = string.rfind(statusTxt, ' ', 0, limit)
531 else:
532 end = limit
533 splitted.append(" ".join((replies, statusTxt[:end])))
534 statusTxt = statusTxt[end:]
535
536 if options['invert_split']:
537 splitted.reverse()
538 for status in splitted:
539 twitter.statuses.update(status=status)
540
541 class TwitterShell(Action):
542
543 def render_prompt(self, prompt):
544 '''Parses the `prompt` string and returns the rendered version'''
545 prompt = prompt.strip("'").replace("\\'", "'")
546 for colour in ansi.COLOURS_NAMED:
547 if '[%s]' % (colour) in prompt:
548 prompt = prompt.replace(
549 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
550 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
551 return prompt
552
553 def __call__(self, twitter, options):
554 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
555 while True:
556 options['action'] = ""
557 try:
558 args = input(prompt).split()
559 parse_args(args, options)
560 if not options['action']:
561 continue
562 elif options['action'] == 'exit':
563 raise SystemExit(0)
564 elif options['action'] == 'shell':
565 print('Sorry Xzibit does not work here!', file=sys.stderr)
566 continue
567 elif options['action'] == 'help':
568 print('''\ntwitter> `action`\n
569 The Shell Accepts all the command line actions along with:
570
571 exit Leave the twitter shell (^D may also be used)
572
573 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
574 Action()(twitter, options)
575 options['action'] = ''
576 except NoSuchActionError as e:
577 print(e, file=sys.stderr)
578 except KeyboardInterrupt:
579 print('\n[Keyboard Interrupt]', file=sys.stderr)
580 except EOFError:
581 print(file=sys.stderr)
582 leaving = self.ask(subject='Leave')
583 if not leaving:
584 print('Excellent!', file=sys.stderr)
585 else:
586 raise SystemExit(0)
587
588 class PythonPromptAction(Action):
589 def __call__(self, twitter, options):
590 try:
591 while True:
592 smrt_input(globals(), locals())
593 except EOFError:
594 pass
595
596 class HelpAction(Action):
597 def __call__(self, twitter, options):
598 print(__doc__)
599
600 class DoNothingAction(Action):
601 def __call__(self, twitter, options):
602 pass
603
604 class RateLimitStatus(Action):
605 def __call__(self, twitter, options):
606 rate = twitter.account.rate_limit_status()
607 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
608 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
609 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
610
611 actions = {
612 'authorize' : DoNothingAction,
613 'follow' : FollowAction,
614 'friends' : FriendsAction,
615 'list' : ListsAction,
616 'mylist' : MyListsAction,
617 'help' : HelpAction,
618 'leave' : LeaveAction,
619 'public' : PublicAction,
620 'pyprompt' : PythonPromptAction,
621 'replies' : RepliesAction,
622 'search' : SearchAction,
623 'set' : SetStatusAction,
624 'shell' : TwitterShell,
625 'rate' : RateLimitStatus,
626 }
627
628 def loadConfig(filename):
629 options = dict(OPTIONS)
630 if os.path.exists(filename):
631 cp = SafeConfigParser()
632 cp.read([filename])
633 for option in ('format', 'prompt'):
634 if cp.has_option('twitter', option):
635 options[option] = cp.get('twitter', option)
636 # process booleans
637 for option in ('invert_split',):
638 if cp.has_option('twitter', option):
639 options[option] = cp.getboolean('twitter', option)
640 return options
641
642 def main(args=sys.argv[1:]):
643 arg_options = {}
644 try:
645 parse_args(args, arg_options)
646 except GetoptError as e:
647 print("I can't do that, %s." % (e), file=sys.stderr)
648 print(file=sys.stderr)
649 raise SystemExit(1)
650
651 config_path = os.path.expanduser(
652 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
653 config_options = loadConfig(config_path)
654
655 # Apply the various options in order, the most important applied last.
656 # Defaults first, then what's read from config file, then command-line
657 # arguments.
658 options = dict(OPTIONS)
659 for d in config_options, arg_options:
660 for k, v in list(d.items()):
661 if v: options[k] = v
662
663 if options['refresh'] and options['action'] not in (
664 'friends', 'public', 'replies'):
665 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
666 print("Use 'twitter -h' for help.", file=sys.stderr)
667 return 1
668
669 oauth_filename = os.path.expanduser(options['oauth_filename'])
670
671 if (options['action'] == 'authorize'
672 or not os.path.exists(oauth_filename)):
673 oauth_dance(
674 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
675 options['oauth_filename'])
676
677 global ansiFormatter
678 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
679
680 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
681
682 twitter = Twitter(
683 auth=OAuth(
684 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
685 secure=options['secure'],
686 api_version='1',
687 domain='api.twitter.com')
688
689 try:
690 Action()(twitter, options)
691 except NoSuchActionError as e:
692 print(e, file=sys.stderr)
693 raise SystemExit(1)
694 except TwitterError as e:
695 print(str(e), file=sys.stderr)
696 print("Use 'twitter -h' for help.", file=sys.stderr)
697 raise SystemExit(1)