]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Update some of the cmdline to support API 1.1.
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 pyprompt start a Python prompt for interacting with the twitter
19 object directly
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 urls nothing but URLs
47 ansi ansi colour (rainbow mode)
48
49
50 CONFIG FILES
51
52 The config file should be placed in your home directory and be named .twitter.
53 It must contain a [twitter] header, and all the desired options you wish to
54 set, like so:
55
56 [twitter]
57 format: <desired_default_format_for_output>
58 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
59
60 OAuth authentication tokens are stored in the file .twitter_oauth in your
61 home directory.
62 """
63
64 from __future__ import print_function
65
66 try:
67 input = __builtins__['raw_input']
68 except (AttributeError, KeyError):
69 pass
70
71
72 CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
73 CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
74
75 import sys
76 import time
77 from getopt import gnu_getopt as getopt, GetoptError
78 from getpass import getpass
79 import re
80 import os.path
81 import locale
82 import string
83
84 try:
85 from ConfigParser import SafeConfigParser
86 except ImportError:
87 from configparser import ConfigParser as SafeConfigParser
88 import datetime
89 try:
90 from urllib.parse import quote
91 except ImportError:
92 from urllib2 import quote
93 try:
94 import HTMLParser
95 except ImportError:
96 import html.parser as HTMLParser
97
98 import webbrowser
99
100 from .api import Twitter, TwitterError
101 from .oauth import OAuth, write_token_file, read_token_file
102 from .oauth_dance import oauth_dance
103 from . import ansi
104 from .util import smrt_input, printNicely
105
106 OPTIONS = {
107 'action': 'friends',
108 'refresh': False,
109 'refresh_rate': 600,
110 'format': 'default',
111 'prompt': '[cyan]twitter[R]> ',
112 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
113 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
114 'length': 20,
115 'timestamp': False,
116 'datestamp': False,
117 'extra_args': [],
118 'secure': True,
119 'invert_split': False,
120 'force-ansi': False,
121 }
122
123 gHtmlParser = HTMLParser.HTMLParser()
124 hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
125 profileRe = re.compile(r'(?P<profile>\@\S+)')
126 ansiFormatter = ansi.AnsiCmd(False)
127
128 def parse_args(args, options):
129 long_opts = ['help', 'format=', 'refresh', 'oauth=',
130 'refresh-rate=', 'config=', 'length=', 'timestamp',
131 'datestamp', 'no-ssl', 'force-ansi']
132 short_opts = "e:p:f:h?rR:c:l:td"
133 opts, extra_args = getopt(args, short_opts, long_opts)
134 if extra_args and hasattr(extra_args[0], 'decode'):
135 extra_args = [arg.decode(locale.getpreferredencoding())
136 for arg in extra_args]
137
138 for opt, arg in opts:
139 if opt in ('-f', '--format'):
140 options['format'] = arg
141 elif opt in ('-r', '--refresh'):
142 options['refresh'] = True
143 elif opt in ('-R', '--refresh-rate'):
144 options['refresh_rate'] = int(arg)
145 elif opt in ('-l', '--length'):
146 options["length"] = int(arg)
147 elif opt in ('-t', '--timestamp'):
148 options["timestamp"] = True
149 elif opt in ('-d', '--datestamp'):
150 options["datestamp"] = True
151 elif opt in ('-?', '-h', '--help'):
152 options['action'] = 'help'
153 elif opt in ('-c', '--config'):
154 options['config_filename'] = arg
155 elif opt == '--no-ssl':
156 options['secure'] = False
157 elif opt == '--oauth':
158 options['oauth_filename'] = arg
159 elif opt == '--force-ansi':
160 options['force-ansi'] = True
161
162 if extra_args and not ('action' in options and options['action'] == 'help'):
163 options['action'] = extra_args[0]
164 options['extra_args'] = extra_args[1:]
165
166 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
167 timestamp = options["timestamp"]
168 datestamp = options["datestamp"]
169 t = time.strptime(status['created_at'], format)
170 i_hate_timezones = time.timezone
171 if (time.daylight):
172 i_hate_timezones = time.altzone
173 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
174 seconds=i_hate_timezones)
175 t = dt.timetuple()
176 if timestamp and datestamp:
177 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
178 elif timestamp:
179 return time.strftime("%H:%M:%S ", t)
180 elif datestamp:
181 return time.strftime("%Y-%m-%d ", t)
182 return ""
183
184 def reRepl(m):
185 ansiTypes = {
186 'clear': ansiFormatter.cmdReset(),
187 'hashtag': ansiFormatter.cmdBold(),
188 'profile': ansiFormatter.cmdUnderline(),
189 }
190
191 s = None
192 try:
193 mkey = m.lastgroup
194 if m.group(mkey):
195 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
196 except IndexError:
197 pass
198 return s
199
200 def replaceInStatus(status):
201 txt = gHtmlParser.unescape(status)
202 txt = re.sub(hashtagRe, reRepl, txt)
203 txt = re.sub(profileRe, reRepl, txt)
204 return txt
205
206 class StatusFormatter(object):
207 def __call__(self, status, options):
208 return ("%s%s %s" % (
209 get_time_string(status, options),
210 status['user']['screen_name'], gHtmlParser.unescape(status['text'])))
211
212 class AnsiStatusFormatter(object):
213 def __init__(self):
214 self._colourMap = ansi.ColourMap()
215
216 def __call__(self, status, options):
217 colour = self._colourMap.colourFor(status['user']['screen_name'])
218 return ("%s%s%s%s %s" % (
219 get_time_string(status, options),
220 ansiFormatter.cmdColour(colour), status['user']['screen_name'],
221 ansiFormatter.cmdReset(), replaceInStatus(status['text'])))
222
223 class VerboseStatusFormatter(object):
224 def __call__(self, status, options):
225 return ("-- %s (%s) on %s\n%s\n" % (
226 status['user']['screen_name'],
227 status['user']['location'],
228 status['created_at'],
229 gHtmlParser.unescape(status['text'])))
230
231 class URLStatusFormatter(object):
232 urlmatch = re.compile(r'https?://\S+')
233 def __call__(self, status, options):
234 urls = self.urlmatch.findall(status['text'])
235 return '\n'.join(urls) if urls else ""
236
237
238 class ListsFormatter(object):
239 def __call__(self, list):
240 if list['description']:
241 list_str = "%-30s (%s)" % (list['name'], list['description'])
242 else:
243 list_str = "%-30s" % (list['name'])
244 return "%s\n" % list_str
245
246 class ListsVerboseFormatter(object):
247 def __call__(self, list):
248 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
249 return list_str
250
251 class AnsiListsFormatter(object):
252 def __init__(self):
253 self._colourMap = ansi.ColourMap()
254
255 def __call__(self, list):
256 colour = self._colourMap.colourFor(list['name'])
257 return ("%s%-15s%s %s" % (
258 ansiFormatter.cmdColour(colour), list['name'],
259 ansiFormatter.cmdReset(), list['description']))
260
261
262 class AdminFormatter(object):
263 def __call__(self, action, user):
264 user_str = "%s (%s)" % (user['screen_name'], user['name'])
265 if action == "follow":
266 return "You are now following %s.\n" % (user_str)
267 else:
268 return "You are no longer following %s.\n" % (user_str)
269
270 class VerboseAdminFormatter(object):
271 def __call__(self, action, user):
272 return("-- %s: %s (%s): %s" % (
273 "Following" if action == "follow" else "Leaving",
274 user['screen_name'],
275 user['name'],
276 user['url']))
277
278 class SearchFormatter(object):
279 def __call__(self, result, options):
280 return("%s%s %s" % (
281 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
282 result['from_user'], result['text']))
283
284 class VerboseSearchFormatter(SearchFormatter):
285 pass # Default to the regular one
286
287 class URLSearchFormatter(object):
288 urlmatch = re.compile(r'https?://\S+')
289 def __call__(self, result, options):
290 urls = self.urlmatch.findall(result['text'])
291 return '\n'.join(urls) if urls else ""
292
293 class AnsiSearchFormatter(object):
294 def __init__(self):
295 self._colourMap = ansi.ColourMap()
296
297 def __call__(self, result, options):
298 colour = self._colourMap.colourFor(result['from_user'])
299 return ("%s%s%s%s %s" % (
300 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
301 ansiFormatter.cmdColour(colour), result['from_user'],
302 ansiFormatter.cmdReset(), result['text']))
303
304 _term_encoding = None
305 def get_term_encoding():
306 global _term_encoding
307 if not _term_encoding:
308 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
309 if lang[1:]:
310 _term_encoding = lang[1]
311 else:
312 _term_encoding = 'UTF-8'
313 return _term_encoding
314
315 formatters = {}
316 status_formatters = {
317 'default': StatusFormatter,
318 'verbose': VerboseStatusFormatter,
319 'urls': URLStatusFormatter,
320 'ansi': AnsiStatusFormatter
321 }
322 formatters['status'] = status_formatters
323
324 admin_formatters = {
325 'default': AdminFormatter,
326 'verbose': VerboseAdminFormatter,
327 'urls': AdminFormatter,
328 'ansi': AdminFormatter
329 }
330 formatters['admin'] = admin_formatters
331
332 search_formatters = {
333 'default': SearchFormatter,
334 'verbose': VerboseSearchFormatter,
335 'urls': URLSearchFormatter,
336 'ansi': AnsiSearchFormatter
337 }
338 formatters['search'] = search_formatters
339
340 lists_formatters = {
341 'default': ListsFormatter,
342 'verbose': ListsVerboseFormatter,
343 'urls': None,
344 'ansi': AnsiListsFormatter
345 }
346 formatters['lists'] = lists_formatters
347
348 def get_formatter(action_type, options):
349 formatters_dict = formatters.get(action_type)
350 if (not formatters_dict):
351 raise TwitterError(
352 "There was an error finding a class of formatters for your type (%s)"
353 % (action_type))
354 f = formatters_dict.get(options['format'])
355 if (not f):
356 raise TwitterError(
357 "Unknown formatter '%s' for status actions" % (options['format']))
358 return f()
359
360 class Action(object):
361
362 def ask(self, subject='perform this action', careful=False):
363 '''
364 Requests fromt he user using `raw_input` if `subject` should be
365 performed. When `careful`, the default answer is NO, otherwise YES.
366 Returns the user answer in the form `True` or `False`.
367 '''
368 sample = '(y/N)'
369 if not careful:
370 sample = '(Y/n)'
371
372 prompt = 'You really want to %s %s? ' % (subject, sample)
373 try:
374 answer = input(prompt).lower()
375 if careful:
376 return answer in ('yes', 'y')
377 else:
378 return answer not in ('no', 'n')
379 except EOFError:
380 print(file=sys.stderr) # Put Newline since Enter was never pressed
381 # TODO:
382 # Figure out why on OS X the raw_input keeps raising
383 # EOFError and is never able to reset and get more input
384 # Hint: Look at how IPython implements their console
385 default = True
386 if careful:
387 default = False
388 return default
389
390 def __call__(self, twitter, options):
391 action = actions.get(options['action'], NoSuchAction)()
392 try:
393 doAction = lambda : action(twitter, options)
394 if (options['refresh'] and isinstance(action, StatusAction)):
395 while True:
396 doAction()
397 sys.stdout.flush()
398 time.sleep(options['refresh_rate'])
399 else:
400 doAction()
401 except KeyboardInterrupt:
402 print('\n[Keyboard Interrupt]', file=sys.stderr)
403 pass
404
405 class NoSuchActionError(Exception):
406 pass
407
408 class NoSuchAction(Action):
409 def __call__(self, twitter, options):
410 raise NoSuchActionError("No such action: %s" % (options['action']))
411
412 class StatusAction(Action):
413 def __call__(self, twitter, options):
414 statuses = self.getStatuses(twitter, options)
415 sf = get_formatter('status', options)
416 for status in statuses:
417 statusStr = sf(status, options)
418 if statusStr.strip():
419 printNicely(statusStr)
420
421 class SearchAction(Action):
422 def __call__(self, twitter, options):
423 # We need to be pointing at search.twitter.com to work, and it is less
424 # tangly to do it here than in the main()
425 twitter.domain = "search.twitter.com"
426 twitter.uriparts = ()
427 # We need to bypass the TwitterCall parameter encoding, so we
428 # don't encode the plus sign, so we have to encode it ourselves
429 query_string = "+".join(
430 [quote(term)
431 for term in options['extra_args']])
432
433 results = twitter.search(q=query_string)['results']
434 f = get_formatter('search', options)
435 for result in results:
436 resultStr = f(result, options)
437 if resultStr.strip():
438 printNicely(resultStr)
439
440 class AdminAction(Action):
441 def __call__(self, twitter, options):
442 if not (options['extra_args'] and options['extra_args'][0]):
443 raise TwitterError("You need to specify a user (screen name)")
444 af = get_formatter('admin', options)
445 try:
446 user = self.getUser(twitter, options['extra_args'][0])
447 except TwitterError as e:
448 print("There was a problem following or leaving the specified user.")
449 print("You may be trying to follow a user you are already following;")
450 print("Leaving a user you are not currently following;")
451 print("Or the user may not exist.")
452 print("Sorry.")
453 print()
454 print(e)
455 else:
456 printNicely(af(options['action'], user))
457
458 class ListsAction(StatusAction):
459 def getStatuses(self, twitter, options):
460 if not options['extra_args']:
461 raise TwitterError("Please provide a user to query for lists")
462
463 screen_name = options['extra_args'][0]
464
465 if not options['extra_args'][1:]:
466 lists = twitter.lists.list(screen_name=screen_name)
467 if not lists:
468 printNicely("This user has no lists.")
469 for list in lists:
470 lf = get_formatter('lists', options)
471 printNicely(lf(list))
472 return []
473 else:
474 return reversed(twitter.user.lists.list.statuses(
475 user=screen_name, list=options['extra_args'][1]))
476
477
478 class MyListsAction(ListsAction):
479 def getStatuses(self, twitter, options):
480 screen_name = twitter.account.verify_credentials()['screen_name']
481 options['extra_args'].insert(0, screen_name)
482 return ListsAction.getStatuses(self, twitter, options)
483
484
485 class FriendsAction(StatusAction):
486 def getStatuses(self, twitter, options):
487 return reversed(twitter.statuses.home_timeline(count=options["length"]))
488
489 class RepliesAction(StatusAction):
490 def getStatuses(self, twitter, options):
491 return reversed(twitter.statuses.mentions_timeline(count=options["length"]))
492
493 class FollowAction(AdminAction):
494 def getUser(self, twitter, user):
495 return twitter.friendships.create(id=user)
496
497 class LeaveAction(AdminAction):
498 def getUser(self, twitter, user):
499 return twitter.friendships.destroy(id=user)
500
501 class SetStatusAction(Action):
502 def __call__(self, twitter, options):
503 statusTxt = (" ".join(options['extra_args'])
504 if options['extra_args']
505 else str(input("message: ")))
506 replies = []
507 ptr = re.compile("@[\w_]+")
508 while statusTxt:
509 s = ptr.match(statusTxt)
510 if s and s.start() == 0:
511 replies.append(statusTxt[s.start():s.end()])
512 statusTxt = statusTxt[s.end() + 1:]
513 else:
514 break
515 replies = " ".join(replies)
516 if len(replies) >= 140:
517 # just go back
518 statusTxt = replies
519 replies = ""
520
521 splitted = []
522 while statusTxt:
523 limit = 140 - len(replies)
524 if len(statusTxt) > limit:
525 end = string.rfind(statusTxt, ' ', 0, limit)
526 else:
527 end = limit
528 splitted.append(" ".join((replies, statusTxt[:end])))
529 statusTxt = statusTxt[end:]
530
531 if options['invert_split']:
532 splitted.reverse()
533 for status in splitted:
534 twitter.statuses.update(status=status)
535
536 class TwitterShell(Action):
537
538 def render_prompt(self, prompt):
539 '''Parses the `prompt` string and returns the rendered version'''
540 prompt = prompt.strip("'").replace("\\'", "'")
541 for colour in ansi.COLOURS_NAMED:
542 if '[%s]' % (colour) in prompt:
543 prompt = prompt.replace(
544 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
545 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
546 return prompt
547
548 def __call__(self, twitter, options):
549 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
550 while True:
551 options['action'] = ""
552 try:
553 args = input(prompt).split()
554 parse_args(args, options)
555 if not options['action']:
556 continue
557 elif options['action'] == 'exit':
558 raise SystemExit(0)
559 elif options['action'] == 'shell':
560 print('Sorry Xzibit does not work here!', file=sys.stderr)
561 continue
562 elif options['action'] == 'help':
563 print('''\ntwitter> `action`\n
564 The Shell Accepts all the command line actions along with:
565
566 exit Leave the twitter shell (^D may also be used)
567
568 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
569 Action()(twitter, options)
570 options['action'] = ''
571 except NoSuchActionError as e:
572 print(e, file=sys.stderr)
573 except KeyboardInterrupt:
574 print('\n[Keyboard Interrupt]', file=sys.stderr)
575 except EOFError:
576 print(file=sys.stderr)
577 leaving = self.ask(subject='Leave')
578 if not leaving:
579 print('Excellent!', file=sys.stderr)
580 else:
581 raise SystemExit(0)
582
583 class PythonPromptAction(Action):
584 def __call__(self, twitter, options):
585 try:
586 while True:
587 smrt_input(globals(), locals())
588 except EOFError:
589 pass
590
591 class HelpAction(Action):
592 def __call__(self, twitter, options):
593 print(__doc__)
594
595 class DoNothingAction(Action):
596 def __call__(self, twitter, options):
597 pass
598
599 class RateLimitStatus(Action):
600 def __call__(self, twitter, options):
601 rate = twitter.account.rate_limit_status()
602 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
603 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
604 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
605
606 actions = {
607 'authorize' : DoNothingAction,
608 'follow' : FollowAction,
609 'friends' : FriendsAction,
610 'list' : ListsAction,
611 'mylist' : MyListsAction,
612 'help' : HelpAction,
613 'leave' : LeaveAction,
614 'pyprompt' : PythonPromptAction,
615 'replies' : RepliesAction,
616 'search' : SearchAction,
617 'set' : SetStatusAction,
618 'shell' : TwitterShell,
619 'rate' : RateLimitStatus,
620 }
621
622 def loadConfig(filename):
623 options = dict(OPTIONS)
624 if os.path.exists(filename):
625 cp = SafeConfigParser()
626 cp.read([filename])
627 for option in ('format', 'prompt'):
628 if cp.has_option('twitter', option):
629 options[option] = cp.get('twitter', option)
630 # process booleans
631 for option in ('invert_split',):
632 if cp.has_option('twitter', option):
633 options[option] = cp.getboolean('twitter', option)
634 return options
635
636 def main(args=sys.argv[1:]):
637 arg_options = {}
638 try:
639 parse_args(args, arg_options)
640 except GetoptError as e:
641 print("I can't do that, %s." % (e), file=sys.stderr)
642 print(file=sys.stderr)
643 raise SystemExit(1)
644
645 config_path = os.path.expanduser(
646 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
647 config_options = loadConfig(config_path)
648
649 # Apply the various options in order, the most important applied last.
650 # Defaults first, then what's read from config file, then command-line
651 # arguments.
652 options = dict(OPTIONS)
653 for d in config_options, arg_options:
654 for k, v in list(d.items()):
655 if v: options[k] = v
656
657 if options['refresh'] and options['action'] not in (
658 'friends', 'replies'):
659 print("You can only refresh the friends or replies actions.", file=sys.stderr)
660 print("Use 'twitter -h' for help.", file=sys.stderr)
661 return 1
662
663 oauth_filename = os.path.expanduser(options['oauth_filename'])
664
665 if (options['action'] == 'authorize'
666 or not os.path.exists(oauth_filename)):
667 oauth_dance(
668 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
669 options['oauth_filename'])
670
671 global ansiFormatter
672 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
673
674 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
675
676 twitter = Twitter(
677 auth=OAuth(
678 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
679 secure=options['secure'],
680 api_version='1.1',
681 domain='api.twitter.com')
682
683 try:
684 Action()(twitter, options)
685 except NoSuchActionError as e:
686 print(e, file=sys.stderr)
687 raise SystemExit(1)
688 except TwitterError as e:
689 print(str(e), file=sys.stderr)
690 print("Use 'twitter -h' for help.", file=sys.stderr)
691 raise SystemExit(1)