]> jfr.im git - z_archive/twitter.git/blame_incremental - twitter/cmdline.py
Improved display of multiline tweets.
[z_archive/twitter.git] / twitter / cmdline.py
... / ...
CommitLineData
1# encoding: utf-8
2"""
3USAGE:
4
5 twitter [action] [options]
6
7
8ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 pyprompt start a Python prompt for interacting with the twitter
19 object directly
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25
26
27OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57[twitter]
58format: <desired_default_format_for_output>
59prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63"""
64
65from __future__ import print_function
66
67try:
68 input = __builtins__.raw_input
69except (AttributeError, KeyError):
70 pass
71
72
73CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
74CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76from getopt import gnu_getopt as getopt, GetoptError
77from getpass import getpass
78import json
79import locale
80import os.path
81import re
82import string
83import sys
84import time
85
86try:
87 from ConfigParser import SafeConfigParser
88except ImportError:
89 from configparser import ConfigParser as SafeConfigParser
90import datetime
91try:
92 from urllib.parse import quote
93except ImportError:
94 from urllib2 import quote
95try:
96 import HTMLParser
97except ImportError:
98 import html.parser as HTMLParser
99
100import webbrowser
101
102from .api import Twitter, TwitterError
103from .oauth import OAuth, write_token_file, read_token_file
104from .oauth_dance import oauth_dance
105from . import ansi
106from .util import smrt_input, printNicely, align_text
107
108OPTIONS = {
109 'action': 'friends',
110 'refresh': False,
111 'refresh_rate': 600,
112 'format': 'default',
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
115 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
116 'length': 20,
117 'timestamp': False,
118 'datestamp': False,
119 'extra_args': [],
120 'secure': True,
121 'invert_split': False,
122 'force-ansi': False,
123}
124
125gHtmlParser = HTMLParser.HTMLParser()
126hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
127profileRe = re.compile(r'(?P<profile>\@\S+)')
128ansiFormatter = ansi.AnsiCmd(False)
129
130def parse_args(args, options):
131 long_opts = ['help', 'format=', 'refresh', 'oauth=',
132 'refresh-rate=', 'config=', 'length=', 'timestamp',
133 'datestamp', 'no-ssl', 'force-ansi']
134 short_opts = "e:p:f:h?rR:c:l:td"
135 opts, extra_args = getopt(args, short_opts, long_opts)
136 if extra_args and hasattr(extra_args[0], 'decode'):
137 extra_args = [arg.decode(locale.getpreferredencoding())
138 for arg in extra_args]
139
140 for opt, arg in opts:
141 if opt in ('-f', '--format'):
142 options['format'] = arg
143 elif opt in ('-r', '--refresh'):
144 options['refresh'] = True
145 elif opt in ('-R', '--refresh-rate'):
146 options['refresh_rate'] = int(arg)
147 elif opt in ('-l', '--length'):
148 options["length"] = int(arg)
149 elif opt in ('-t', '--timestamp'):
150 options["timestamp"] = True
151 elif opt in ('-d', '--datestamp'):
152 options["datestamp"] = True
153 elif opt in ('-?', '-h', '--help'):
154 options['action'] = 'help'
155 elif opt in ('-c', '--config'):
156 options['config_filename'] = arg
157 elif opt == '--no-ssl':
158 options['secure'] = False
159 elif opt == '--oauth':
160 options['oauth_filename'] = arg
161 elif opt == '--force-ansi':
162 options['force-ansi'] = True
163
164 if extra_args and not ('action' in options and options['action'] == 'help'):
165 options['action'] = extra_args[0]
166 options['extra_args'] = extra_args[1:]
167
168def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
169 timestamp = options["timestamp"]
170 datestamp = options["datestamp"]
171 t = time.strptime(status['created_at'], format)
172 i_hate_timezones = time.timezone
173 if (time.daylight):
174 i_hate_timezones = time.altzone
175 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
176 seconds=i_hate_timezones)
177 t = dt.timetuple()
178 if timestamp and datestamp:
179 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
180 elif timestamp:
181 return time.strftime("%H:%M:%S ", t)
182 elif datestamp:
183 return time.strftime("%Y-%m-%d ", t)
184 return ""
185
186def reRepl(m):
187 ansiTypes = {
188 'clear': ansiFormatter.cmdReset(),
189 'hashtag': ansiFormatter.cmdBold(),
190 'profile': ansiFormatter.cmdUnderline(),
191 }
192
193 s = None
194 try:
195 mkey = m.lastgroup
196 if m.group(mkey):
197 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
198 except IndexError:
199 pass
200 return s
201
202def replaceInStatus(status):
203 txt = gHtmlParser.unescape(status)
204 txt = re.sub(hashtagRe, reRepl, txt)
205 txt = re.sub(profileRe, reRepl, txt)
206 return txt
207
208class StatusFormatter(object):
209 def __call__(self, status, options):
210 return ("%s%s %s" % (
211 get_time_string(status, options),
212 status['user']['screen_name'], gHtmlParser.unescape(status['text'])))
213
214class AnsiStatusFormatter(object):
215 def __init__(self):
216 self._colourMap = ansi.ColourMap()
217
218 def __call__(self, status, options):
219 colour = self._colourMap.colourFor(status['user']['screen_name'])
220 ret = "%s%s% 16s%s " %(
221 get_time_string(status, options),
222 ansi.cmdColour(colour), status['user']['screen_name'],
223 ansi.cmdReset())
224 ret += "%s" % align_text(status['text'])
225 return ret
226
227class VerboseStatusFormatter(object):
228 def __call__(self, status, options):
229 return ("-- %s (%s) on %s\n%s\n" % (
230 status['user']['screen_name'],
231 status['user']['location'],
232 status['created_at'],
233 gHtmlParser.unescape(status['text'])))
234
235class JSONStatusFormatter(object):
236 def __call__(self, status, options):
237 status['text'] = gHtmlParser.unescape(status['text'])
238 return json.dumps(status)
239
240class URLStatusFormatter(object):
241 urlmatch = re.compile(r'https?://\S+')
242 def __call__(self, status, options):
243 urls = self.urlmatch.findall(status['text'])
244 return '\n'.join(urls) if urls else ""
245
246
247class ListsFormatter(object):
248 def __call__(self, list):
249 if list['description']:
250 list_str = "%-30s (%s)" % (list['name'], list['description'])
251 else:
252 list_str = "%-30s" % (list['name'])
253 return "%s\n" % list_str
254
255class ListsVerboseFormatter(object):
256 def __call__(self, list):
257 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
258 return list_str
259
260class AnsiListsFormatter(object):
261 def __init__(self):
262 self._colourMap = ansi.ColourMap()
263
264 def __call__(self, list):
265 colour = self._colourMap.colourFor(list['name'])
266 return ("%s%-15s%s %s" % (
267 ansiFormatter.cmdColour(colour), list['name'],
268 ansiFormatter.cmdReset(), list['description']))
269
270
271class AdminFormatter(object):
272 def __call__(self, action, user):
273 user_str = "%s (%s)" % (user['screen_name'], user['name'])
274 if action == "follow":
275 return "You are now following %s.\n" % (user_str)
276 else:
277 return "You are no longer following %s.\n" % (user_str)
278
279class VerboseAdminFormatter(object):
280 def __call__(self, action, user):
281 return("-- %s: %s (%s): %s" % (
282 "Following" if action == "follow" else "Leaving",
283 user['screen_name'],
284 user['name'],
285 user['url']))
286
287class SearchFormatter(object):
288 def __call__(self, result, options):
289 return("%s%s %s" % (
290 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
291 result['from_user'], result['text']))
292
293class VerboseSearchFormatter(SearchFormatter):
294 pass # Default to the regular one
295
296class URLSearchFormatter(object):
297 urlmatch = re.compile(r'https?://\S+')
298 def __call__(self, result, options):
299 urls = self.urlmatch.findall(result['text'])
300 return '\n'.join(urls) if urls else ""
301
302class AnsiSearchFormatter(object):
303 def __init__(self):
304 self._colourMap = ansi.ColourMap()
305
306 def __call__(self, result, options):
307 colour = self._colourMap.colourFor(result['from_user'])
308 return ("%s%s%s%s %s" % (
309 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
310 ansiFormatter.cmdColour(colour), result['from_user'],
311 ansiFormatter.cmdReset(), result['text']))
312
313_term_encoding = None
314def get_term_encoding():
315 global _term_encoding
316 if not _term_encoding:
317 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
318 if lang[1:]:
319 _term_encoding = lang[1]
320 else:
321 _term_encoding = 'UTF-8'
322 return _term_encoding
323
324formatters = {}
325status_formatters = {
326 'default': StatusFormatter,
327 'verbose': VerboseStatusFormatter,
328 'json': JSONStatusFormatter,
329 'urls': URLStatusFormatter,
330 'ansi': AnsiStatusFormatter
331}
332formatters['status'] = status_formatters
333
334admin_formatters = {
335 'default': AdminFormatter,
336 'verbose': VerboseAdminFormatter,
337 'urls': AdminFormatter,
338 'ansi': AdminFormatter
339}
340formatters['admin'] = admin_formatters
341
342search_formatters = {
343 'default': SearchFormatter,
344 'verbose': VerboseSearchFormatter,
345 'urls': URLSearchFormatter,
346 'ansi': AnsiSearchFormatter
347}
348formatters['search'] = search_formatters
349
350lists_formatters = {
351 'default': ListsFormatter,
352 'verbose': ListsVerboseFormatter,
353 'urls': None,
354 'ansi': AnsiListsFormatter
355}
356formatters['lists'] = lists_formatters
357
358def get_formatter(action_type, options):
359 formatters_dict = formatters.get(action_type)
360 if (not formatters_dict):
361 raise TwitterError(
362 "There was an error finding a class of formatters for your type (%s)"
363 % (action_type))
364 f = formatters_dict.get(options['format'])
365 if (not f):
366 raise TwitterError(
367 "Unknown formatter '%s' for status actions" % (options['format']))
368 return f()
369
370class Action(object):
371
372 def ask(self, subject='perform this action', careful=False):
373 '''
374 Requests from the user using `raw_input` if `subject` should be
375 performed. When `careful`, the default answer is NO, otherwise YES.
376 Returns the user answer in the form `True` or `False`.
377 '''
378 sample = '(y/N)'
379 if not careful:
380 sample = '(Y/n)'
381
382 prompt = 'You really want to %s %s? ' % (subject, sample)
383 try:
384 answer = input(prompt).lower()
385 if careful:
386 return answer in ('yes', 'y')
387 else:
388 return answer not in ('no', 'n')
389 except EOFError:
390 print(file=sys.stderr) # Put Newline since Enter was never pressed
391 # TODO:
392 # Figure out why on OS X the raw_input keeps raising
393 # EOFError and is never able to reset and get more input
394 # Hint: Look at how IPython implements their console
395 default = True
396 if careful:
397 default = False
398 return default
399
400 def __call__(self, twitter, options):
401 action = actions.get(options['action'], NoSuchAction)()
402 try:
403 doAction = lambda : action(twitter, options)
404 if (options['refresh'] and isinstance(action, StatusAction)):
405 while True:
406 doAction()
407 sys.stdout.flush()
408 time.sleep(options['refresh_rate'])
409 else:
410 doAction()
411 except KeyboardInterrupt:
412 print('\n[Keyboard Interrupt]', file=sys.stderr)
413 pass
414
415class NoSuchActionError(Exception):
416 pass
417
418class NoSuchAction(Action):
419 def __call__(self, twitter, options):
420 raise NoSuchActionError("No such action: %s" % (options['action']))
421
422class StatusAction(Action):
423 def __call__(self, twitter, options):
424 statuses = self.getStatuses(twitter, options)
425 sf = get_formatter('status', options)
426 for status in statuses:
427 statusStr = sf(status, options)
428 if statusStr.strip():
429 printNicely(statusStr)
430
431class SearchAction(Action):
432 def __call__(self, twitter, options):
433 # We need to be pointing at search.twitter.com to work, and it is less
434 # tangly to do it here than in the main()
435 twitter.domain = "search.twitter.com"
436 twitter.uriparts = ()
437 # We need to bypass the TwitterCall parameter encoding, so we
438 # don't encode the plus sign, so we have to encode it ourselves
439 query_string = "+".join(
440 [quote(term)
441 for term in options['extra_args']])
442
443 results = twitter.search(q=query_string)['results']
444 f = get_formatter('search', options)
445 for result in results:
446 resultStr = f(result, options)
447 if resultStr.strip():
448 printNicely(resultStr)
449
450class AdminAction(Action):
451 def __call__(self, twitter, options):
452 if not (options['extra_args'] and options['extra_args'][0]):
453 raise TwitterError("You need to specify a user (screen name)")
454 af = get_formatter('admin', options)
455 try:
456 user = self.getUser(twitter, options['extra_args'][0])
457 except TwitterError as e:
458 print("There was a problem following or leaving the specified user.")
459 print("You may be trying to follow a user you are already following;")
460 print("Leaving a user you are not currently following;")
461 print("Or the user may not exist.")
462 print("Sorry.")
463 print()
464 print(e)
465 else:
466 printNicely(af(options['action'], user))
467
468class ListsAction(StatusAction):
469 def getStatuses(self, twitter, options):
470 if not options['extra_args']:
471 raise TwitterError("Please provide a user to query for lists")
472
473 screen_name = options['extra_args'][0]
474
475 if not options['extra_args'][1:]:
476 lists = twitter.lists.list(screen_name=screen_name)
477 if not lists:
478 printNicely("This user has no lists.")
479 for list in lists:
480 lf = get_formatter('lists', options)
481 printNicely(lf(list))
482 return []
483 else:
484 return reversed(twitter.user.lists.list.statuses(
485 user=screen_name, list=options['extra_args'][1]))
486
487
488class MyListsAction(ListsAction):
489 def getStatuses(self, twitter, options):
490 screen_name = twitter.account.verify_credentials()['screen_name']
491 options['extra_args'].insert(0, screen_name)
492 return ListsAction.getStatuses(self, twitter, options)
493
494
495class FriendsAction(StatusAction):
496 def getStatuses(self, twitter, options):
497 return reversed(twitter.statuses.home_timeline(count=options["length"]))
498
499class RepliesAction(StatusAction):
500 def getStatuses(self, twitter, options):
501 return reversed(twitter.statuses.mentions_timeline(count=options["length"]))
502
503class FollowAction(AdminAction):
504 def getUser(self, twitter, user):
505 return twitter.friendships.create(screen_name=user)
506
507class LeaveAction(AdminAction):
508 def getUser(self, twitter, user):
509 return twitter.friendships.destroy(screen_name=user)
510
511class SetStatusAction(Action):
512 def __call__(self, twitter, options):
513 statusTxt = (" ".join(options['extra_args'])
514 if options['extra_args']
515 else str(input("message: ")))
516 replies = []
517 ptr = re.compile("@[\w_]+")
518 while statusTxt:
519 s = ptr.match(statusTxt)
520 if s and s.start() == 0:
521 replies.append(statusTxt[s.start():s.end()])
522 statusTxt = statusTxt[s.end() + 1:]
523 else:
524 break
525 replies = " ".join(replies)
526 if len(replies) >= 140:
527 # just go back
528 statusTxt = replies
529 replies = ""
530
531 splitted = []
532 while statusTxt:
533 limit = 140 - len(replies)
534 if len(statusTxt) > limit:
535 end = string.rfind(statusTxt, ' ', 0, limit)
536 else:
537 end = limit
538 splitted.append(" ".join((replies, statusTxt[:end])))
539 statusTxt = statusTxt[end:]
540
541 if options['invert_split']:
542 splitted.reverse()
543 for status in splitted:
544 twitter.statuses.update(status=status)
545
546class TwitterShell(Action):
547
548 def render_prompt(self, prompt):
549 '''Parses the `prompt` string and returns the rendered version'''
550 prompt = prompt.strip("'").replace("\\'", "'")
551 for colour in ansi.COLOURS_NAMED:
552 if '[%s]' % (colour) in prompt:
553 prompt = prompt.replace(
554 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
555 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
556 return prompt
557
558 def __call__(self, twitter, options):
559 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
560 while True:
561 options['action'] = ""
562 try:
563 args = input(prompt).split()
564 parse_args(args, options)
565 if not options['action']:
566 continue
567 elif options['action'] == 'exit':
568 raise SystemExit(0)
569 elif options['action'] == 'shell':
570 print('Sorry Xzibit does not work here!', file=sys.stderr)
571 continue
572 elif options['action'] == 'help':
573 print('''\ntwitter> `action`\n
574 The Shell Accepts all the command line actions along with:
575
576 exit Leave the twitter shell (^D may also be used)
577
578 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
579 Action()(twitter, options)
580 options['action'] = ''
581 except NoSuchActionError as e:
582 print(e, file=sys.stderr)
583 except KeyboardInterrupt:
584 print('\n[Keyboard Interrupt]', file=sys.stderr)
585 except EOFError:
586 print(file=sys.stderr)
587 leaving = self.ask(subject='Leave')
588 if not leaving:
589 print('Excellent!', file=sys.stderr)
590 else:
591 raise SystemExit(0)
592
593class PythonPromptAction(Action):
594 def __call__(self, twitter, options):
595 try:
596 while True:
597 smrt_input(globals(), locals())
598 except EOFError:
599 pass
600
601class HelpAction(Action):
602 def __call__(self, twitter, options):
603 print(__doc__)
604
605class DoNothingAction(Action):
606 def __call__(self, twitter, options):
607 pass
608
609class RateLimitStatus(Action):
610 def __call__(self, twitter, options):
611 rate = twitter.application.rate_limit_status()
612 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
613 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
614 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
615
616actions = {
617 'authorize' : DoNothingAction,
618 'follow' : FollowAction,
619 'friends' : FriendsAction,
620 'list' : ListsAction,
621 'mylist' : MyListsAction,
622 'help' : HelpAction,
623 'leave' : LeaveAction,
624 'pyprompt' : PythonPromptAction,
625 'replies' : RepliesAction,
626 'search' : SearchAction,
627 'set' : SetStatusAction,
628 'shell' : TwitterShell,
629 'rate' : RateLimitStatus,
630}
631
632def loadConfig(filename):
633 options = dict(OPTIONS)
634 if os.path.exists(filename):
635 cp = SafeConfigParser()
636 cp.read([filename])
637 for option in ('format', 'prompt'):
638 if cp.has_option('twitter', option):
639 options[option] = cp.get('twitter', option)
640 # process booleans
641 for option in ('invert_split',):
642 if cp.has_option('twitter', option):
643 options[option] = cp.getboolean('twitter', option)
644 return options
645
646def main(args=sys.argv[1:]):
647 arg_options = {}
648 try:
649 parse_args(args, arg_options)
650 except GetoptError as e:
651 print("I can't do that, %s." % (e), file=sys.stderr)
652 print(file=sys.stderr)
653 raise SystemExit(1)
654
655 config_path = os.path.expanduser(
656 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
657 config_options = loadConfig(config_path)
658
659 # Apply the various options in order, the most important applied last.
660 # Defaults first, then what's read from config file, then command-line
661 # arguments.
662 options = dict(OPTIONS)
663 for d in config_options, arg_options:
664 for k, v in list(d.items()):
665 if v: options[k] = v
666
667 if options['refresh'] and options['action'] not in (
668 'friends', 'replies'):
669 print("You can only refresh the friends or replies actions.", file=sys.stderr)
670 print("Use 'twitter -h' for help.", file=sys.stderr)
671 return 1
672
673 oauth_filename = os.path.expanduser(options['oauth_filename'])
674
675 if (options['action'] == 'authorize'
676 or not os.path.exists(oauth_filename)):
677 oauth_dance(
678 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
679 options['oauth_filename'])
680
681 global ansiFormatter
682 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
683
684 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
685
686 twitter = Twitter(
687 auth=OAuth(
688 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
689 secure=options['secure'],
690 api_version='1.1',
691 domain='api.twitter.com')
692
693 try:
694 Action()(twitter, options)
695 except NoSuchActionError as e:
696 print(e, file=sys.stderr)
697 raise SystemExit(1)
698 except TwitterError as e:
699 print(str(e), file=sys.stderr)
700 print("Use 'twitter -h' for help.", file=sys.stderr)
701 raise SystemExit(1)