]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
make inversion optional
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
26
27
28 OPTIONS:
29
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
42
43 FORMATS for the --format option
44
45 default one line per status
46 verbose multiple lines per status, more verbose status info
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
68 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
69
70 import sys
71 import time
72 from getopt import gnu_getopt as getopt, GetoptError
73 from getpass import getpass
74 import re
75 import os.path
76 import locale
77 import string
78
79 try:
80 from ConfigParser import SafeConfigParser
81 except ImportError:
82 from configparser import ConfigParser as SafeConfigParser
83 import datetime
84 try:
85 from urllib.parse import quote
86 except ImportError:
87 from urllib2 import quote
88 import webbrowser
89
90 from .api import Twitter, TwitterError
91 from .oauth import OAuth, write_token_file, read_token_file
92 from .oauth_dance import oauth_dance
93 from . import ansi
94 from .util import smrt_input, printNicely
95
96 OPTIONS = {
97 'action': 'friends',
98 'refresh': False,
99 'refresh_rate': 600,
100 'format': 'default',
101 'prompt': '[cyan]twitter[R]> ',
102 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
103 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
104 'length': 20,
105 'timestamp': False,
106 'datestamp': False,
107 'extra_args': [],
108 'secure': True,
109 'invert_split': False,
110 }
111
112 def parse_args(args, options):
113 long_opts = ['help', 'format=', 'refresh', 'oauth=',
114 'refresh-rate=', 'config=', 'length=', 'timestamp',
115 'datestamp', 'no-ssl']
116 short_opts = "e:p:f:h?rR:c:l:td"
117 opts, extra_args = getopt(args, short_opts, long_opts)
118 extra_args = [arg.decode(locale.getpreferredencoding())
119 for arg in extra_args]
120
121 for opt, arg in opts:
122 if opt in ('-f', '--format'):
123 options['format'] = arg
124 elif opt in ('-r', '--refresh'):
125 options['refresh'] = True
126 elif opt in ('-R', '--refresh-rate'):
127 options['refresh_rate'] = int(arg)
128 elif opt in ('-l', '--length'):
129 options["length"] = int(arg)
130 elif opt in ('-t', '--timestamp'):
131 options["timestamp"] = True
132 elif opt in ('-d', '--datestamp'):
133 options["datestamp"] = True
134 elif opt in ('-?', '-h', '--help'):
135 options['action'] = 'help'
136 elif opt in ('-c', '--config'):
137 options['config_filename'] = arg
138 elif opt == '--no-ssl':
139 options['secure'] = False
140 elif opt == '--oauth':
141 options['oauth_filename'] = arg
142
143 if extra_args and not ('action' in options and options['action'] == 'help'):
144 options['action'] = extra_args[0]
145 options['extra_args'] = extra_args[1:]
146
147 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
148 timestamp = options["timestamp"]
149 datestamp = options["datestamp"]
150 t = time.strptime(status['created_at'], format)
151 i_hate_timezones = time.timezone
152 if (time.daylight):
153 i_hate_timezones = time.altzone
154 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
155 seconds=i_hate_timezones)
156 t = dt.timetuple()
157 if timestamp and datestamp:
158 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
159 elif timestamp:
160 return time.strftime("%H:%M:%S ", t)
161 elif datestamp:
162 return time.strftime("%Y-%m-%d ", t)
163 return ""
164
165 class StatusFormatter(object):
166 def __call__(self, status, options):
167 return ("%s%s %s" %(
168 get_time_string(status, options),
169 status['user']['screen_name'], status['text']))
170
171 class AnsiStatusFormatter(object):
172 def __init__(self):
173 self._colourMap = ansi.ColourMap()
174
175 def __call__(self, status, options):
176 colour = self._colourMap.colourFor(status['user']['screen_name'])
177 return ("%s%s%s%s %s" %(
178 get_time_string(status, options),
179 ansi.cmdColour(colour), status['user']['screen_name'],
180 ansi.cmdReset(), status['text']))
181
182 class VerboseStatusFormatter(object):
183 def __call__(self, status, options):
184 return ("-- %s (%s) on %s\n%s\n" %(
185 status['user']['screen_name'],
186 status['user']['location'],
187 status['created_at'],
188 status['text']))
189
190 class URLStatusFormatter(object):
191 urlmatch = re.compile(r'https?://\S+')
192 def __call__(self, status, options):
193 urls = self.urlmatch.findall(status['text'])
194 return '\n'.join(urls) if urls else ""
195
196
197 class ListsFormatter(object):
198 def __call__(self, list):
199 if list['description']:
200 list_str = "%-30s (%s)" % (list['name'], list['description'])
201 else:
202 list_str = "%-30s" % (list['name'])
203 return "%s\n" % list_str
204
205 class ListsVerboseFormatter(object):
206 def __call__(self, list):
207 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
208 return list_str
209
210 class AnsiListsFormatter(object):
211 def __init__(self):
212 self._colourMap = ansi.ColourMap()
213
214 def __call__(self, list):
215 colour = self._colourMap.colourFor(list['name'])
216 return ("%s%-15s%s %s" %(
217 ansi.cmdColour(colour), list['name'],
218 ansi.cmdReset(), list['description']))
219
220
221 class AdminFormatter(object):
222 def __call__(self, action, user):
223 user_str = "%s (%s)" %(user['screen_name'], user['name'])
224 if action == "follow":
225 return "You are now following %s.\n" %(user_str)
226 else:
227 return "You are no longer following %s.\n" %(user_str)
228
229 class VerboseAdminFormatter(object):
230 def __call__(self, action, user):
231 return("-- %s: %s (%s): %s" % (
232 "Following" if action == "follow" else "Leaving",
233 user['screen_name'],
234 user['name'],
235 user['url']))
236
237 class SearchFormatter(object):
238 def __call__(self, result, options):
239 return("%s%s %s" %(
240 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
241 result['from_user'], result['text']))
242
243 class VerboseSearchFormatter(SearchFormatter):
244 pass #Default to the regular one
245
246 class URLSearchFormatter(object):
247 urlmatch = re.compile(r'https?://\S+')
248 def __call__(self, result, options):
249 urls = self.urlmatch.findall(result['text'])
250 return '\n'.join(urls) if urls else ""
251
252 class AnsiSearchFormatter(object):
253 def __init__(self):
254 self._colourMap = ansi.ColourMap()
255
256 def __call__(self, result, options):
257 colour = self._colourMap.colourFor(result['from_user'])
258 return ("%s%s%s%s %s" %(
259 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
260 ansi.cmdColour(colour), result['from_user'],
261 ansi.cmdReset(), result['text']))
262
263 _term_encoding = None
264 def get_term_encoding():
265 global _term_encoding
266 if not _term_encoding:
267 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
268 if lang[1:]:
269 _term_encoding = lang[1]
270 else:
271 _term_encoding = 'UTF-8'
272 return _term_encoding
273
274 formatters = {}
275 status_formatters = {
276 'default': StatusFormatter,
277 'verbose': VerboseStatusFormatter,
278 'urls': URLStatusFormatter,
279 'ansi': AnsiStatusFormatter
280 }
281 formatters['status'] = status_formatters
282
283 admin_formatters = {
284 'default': AdminFormatter,
285 'verbose': VerboseAdminFormatter,
286 'urls': AdminFormatter,
287 'ansi': AdminFormatter
288 }
289 formatters['admin'] = admin_formatters
290
291 search_formatters = {
292 'default': SearchFormatter,
293 'verbose': VerboseSearchFormatter,
294 'urls': URLSearchFormatter,
295 'ansi': AnsiSearchFormatter
296 }
297 formatters['search'] = search_formatters
298
299 lists_formatters = {
300 'default': ListsFormatter,
301 'verbose': ListsVerboseFormatter,
302 'urls': None,
303 'ansi': AnsiListsFormatter
304 }
305 formatters['lists'] = lists_formatters
306
307 def get_formatter(action_type, options):
308 formatters_dict = formatters.get(action_type)
309 if (not formatters_dict):
310 raise TwitterError(
311 "There was an error finding a class of formatters for your type (%s)"
312 %(action_type))
313 f = formatters_dict.get(options['format'])
314 if (not f):
315 raise TwitterError(
316 "Unknown formatter '%s' for status actions" %(options['format']))
317 return f()
318
319 class Action(object):
320
321 def ask(self, subject='perform this action', careful=False):
322 '''
323 Requests fromt he user using `raw_input` if `subject` should be
324 performed. When `careful`, the default answer is NO, otherwise YES.
325 Returns the user answer in the form `True` or `False`.
326 '''
327 sample = '(y/N)'
328 if not careful:
329 sample = '(Y/n)'
330
331 prompt = 'You really want to %s %s? ' %(subject, sample)
332 try:
333 answer = input(prompt).lower()
334 if careful:
335 return answer in ('yes', 'y')
336 else:
337 return answer not in ('no', 'n')
338 except EOFError:
339 print(file=sys.stderr) # Put Newline since Enter was never pressed
340 # TODO:
341 # Figure out why on OS X the raw_input keeps raising
342 # EOFError and is never able to reset and get more input
343 # Hint: Look at how IPython implements their console
344 default = True
345 if careful:
346 default = False
347 return default
348
349 def __call__(self, twitter, options):
350 action = actions.get(options['action'], NoSuchAction)()
351 try:
352 doAction = lambda : action(twitter, options)
353 if (options['refresh'] and isinstance(action, StatusAction)):
354 while True:
355 doAction()
356 sys.stdout.flush()
357 time.sleep(options['refresh_rate'])
358 else:
359 doAction()
360 except KeyboardInterrupt:
361 print('\n[Keyboard Interrupt]', file=sys.stderr)
362 pass
363
364 class NoSuchActionError(Exception):
365 pass
366
367 class NoSuchAction(Action):
368 def __call__(self, twitter, options):
369 raise NoSuchActionError("No such action: %s" %(options['action']))
370
371 class StatusAction(Action):
372 def __call__(self, twitter, options):
373 statuses = self.getStatuses(twitter, options)
374 sf = get_formatter('status', options)
375 for status in statuses:
376 statusStr = sf(status, options)
377 if statusStr.strip():
378 printNicely(statusStr)
379
380 class SearchAction(Action):
381 def __call__(self, twitter, options):
382 # We need to be pointing at search.twitter.com to work, and it is less
383 # tangly to do it here than in the main()
384 twitter.domain="search.twitter.com"
385 twitter.uriparts=()
386 # We need to bypass the TwitterCall parameter encoding, so we
387 # don't encode the plus sign, so we have to encode it ourselves
388 query_string = "+".join(
389 [quote(term)
390 for term in options['extra_args']])
391
392 results = twitter.search(q=query_string)['results']
393 f = get_formatter('search', options)
394 for result in results:
395 resultStr = f(result, options)
396 if resultStr.strip():
397 printNicely(resultStr)
398
399 class AdminAction(Action):
400 def __call__(self, twitter, options):
401 if not (options['extra_args'] and options['extra_args'][0]):
402 raise TwitterError("You need to specify a user (screen name)")
403 af = get_formatter('admin', options)
404 try:
405 user = self.getUser(twitter, options['extra_args'][0])
406 except TwitterError as e:
407 print("There was a problem following or leaving the specified user.")
408 print("You may be trying to follow a user you are already following;")
409 print("Leaving a user you are not currently following;")
410 print("Or the user may not exist.")
411 print("Sorry.")
412 print()
413 print(e)
414 else:
415 printNicely(af(options['action'], user))
416
417 class ListsAction(StatusAction):
418 def getStatuses(self, twitter, options):
419 if not options['extra_args']:
420 raise TwitterError("Please provide a user to query for lists")
421
422 screen_name = options['extra_args'][0]
423
424 if not options['extra_args'][1:]:
425 lists = twitter.user.lists(user=screen_name)['lists']
426 if not lists:
427 printNicely("This user has no lists.")
428 for list in lists:
429 lf = get_formatter('lists', options)
430 printNicely(lf(list))
431 return []
432 else:
433 return reversed(twitter.user.lists.list.statuses(
434 user=screen_name, list=options['extra_args'][1]))
435
436
437 class MyListsAction(ListsAction):
438 def getStatuses(self, twitter, options):
439 screen_name = twitter.account.verify_credentials()['screen_name']
440 options['extra_args'].insert(0, screen_name)
441 return ListsAction.getStatuses(self, twitter, options)
442
443
444 class FriendsAction(StatusAction):
445 def getStatuses(self, twitter, options):
446 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
447
448 class PublicAction(StatusAction):
449 def getStatuses(self, twitter, options):
450 return reversed(twitter.statuses.public_timeline(count=options["length"]))
451
452 class RepliesAction(StatusAction):
453 def getStatuses(self, twitter, options):
454 return reversed(twitter.statuses.replies(count=options["length"]))
455
456 class FollowAction(AdminAction):
457 def getUser(self, twitter, user):
458 return twitter.friendships.create(id=user)
459
460 class LeaveAction(AdminAction):
461 def getUser(self, twitter, user):
462 return twitter.friendships.destroy(id=user)
463
464 class SetStatusAction(Action):
465 def __call__(self, twitter, options):
466 statusTxt = (" ".join(options['extra_args'])
467 if options['extra_args']
468 else str(input("message: ")))
469 replies = []
470 ptr = re.compile("@[\w_]+")
471 while statusTxt:
472 s = ptr.match(statusTxt)
473 if s and s.start() == 0:
474 replies.append(statusTxt[s.start():s.end()])
475 statusTxt = statusTxt[s.end()+1:]
476 else:
477 break
478 replies = " ".join(replies)
479 if len(replies) >= 140:
480 # just go back
481 statusTxt = replies
482 replies = ""
483
484 splitted = []
485 while statusTxt:
486 limit = 140 - len(replies)
487 if len(statusTxt) > limit:
488 end = string.rfind(statusTxt, ' ', 0, limit)
489 else:
490 end = limit
491 splitted.append(" ".join((replies,statusTxt[:end])))
492 statusTxt = statusTxt[end:]
493
494 if options['invert_split']:
495 splitted.reverse()
496 for status in splitted:
497 twitter.statuses.update(status=status)
498
499 class TwitterShell(Action):
500
501 def render_prompt(self, prompt):
502 '''Parses the `prompt` string and returns the rendered version'''
503 prompt = prompt.strip("'").replace("\\'","'")
504 for colour in ansi.COLOURS_NAMED:
505 if '[%s]' %(colour) in prompt:
506 prompt = prompt.replace(
507 '[%s]' %(colour), ansi.cmdColourNamed(colour))
508 prompt = prompt.replace('[R]', ansi.cmdReset())
509 return prompt
510
511 def __call__(self, twitter, options):
512 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
513 while True:
514 options['action'] = ""
515 try:
516 args = input(prompt).split()
517 parse_args(args, options)
518 if not options['action']:
519 continue
520 elif options['action'] == 'exit':
521 raise SystemExit(0)
522 elif options['action'] == 'shell':
523 print('Sorry Xzibit does not work here!', file=sys.stderr)
524 continue
525 elif options['action'] == 'help':
526 print('''\ntwitter> `action`\n
527 The Shell Accepts all the command line actions along with:
528
529 exit Leave the twitter shell (^D may also be used)
530
531 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
532 Action()(twitter, options)
533 options['action'] = ''
534 except NoSuchActionError as e:
535 print(e, file=sys.stderr)
536 except KeyboardInterrupt:
537 print('\n[Keyboard Interrupt]', file=sys.stderr)
538 except EOFError:
539 print(file=sys.stderr)
540 leaving = self.ask(subject='Leave')
541 if not leaving:
542 print('Excellent!', file=sys.stderr)
543 else:
544 raise SystemExit(0)
545
546 class PythonPromptAction(Action):
547 def __call__(self, twitter, options):
548 try:
549 while True:
550 smrt_input(globals(), locals())
551 except EOFError:
552 pass
553
554 class HelpAction(Action):
555 def __call__(self, twitter, options):
556 print(__doc__)
557
558 class DoNothingAction(Action):
559 def __call__(self, twitter, options):
560 pass
561
562 class RateLimitStatus(Action):
563 def __call__(self, twitter, options):
564 rate = twitter.account.rate_limit_status()
565 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
566 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds']-time.time()),
567 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
568
569 actions = {
570 'authorize' : DoNothingAction,
571 'follow' : FollowAction,
572 'friends' : FriendsAction,
573 'list' : ListsAction,
574 'mylist' : MyListsAction,
575 'help' : HelpAction,
576 'leave' : LeaveAction,
577 'public' : PublicAction,
578 'pyprompt' : PythonPromptAction,
579 'replies' : RepliesAction,
580 'search' : SearchAction,
581 'set' : SetStatusAction,
582 'shell' : TwitterShell,
583 'rate' : RateLimitStatus,
584 }
585
586 def loadConfig(filename):
587 options = dict(OPTIONS)
588 if os.path.exists(filename):
589 cp = SafeConfigParser()
590 cp.read([filename])
591 for option in ('format', 'prompt'):
592 if cp.has_option('twitter', option):
593 options[option] = cp.get('twitter', option)
594 # process booleans
595 for option in ('invert_split',):
596 if cp.has_option('twitter', option ):
597 options[option] = cp.getboolean('twitter', option)
598 return options
599
600 def main(args=sys.argv[1:]):
601 arg_options = {}
602 try:
603 parse_args(args, arg_options)
604 except GetoptError as e:
605 print("I can't do that, %s." %(e), file=sys.stderr)
606 print(file=sys.stderr)
607 raise SystemExit(1)
608
609 config_path = os.path.expanduser(
610 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
611 config_options = loadConfig(config_path)
612
613 # Apply the various options in order, the most important applied last.
614 # Defaults first, then what's read from config file, then command-line
615 # arguments.
616 options = dict(OPTIONS)
617 for d in config_options, arg_options:
618 for k,v in list(d.items()):
619 if v: options[k] = v
620
621 if options['refresh'] and options['action'] not in (
622 'friends', 'public', 'replies'):
623 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
624 print("Use 'twitter -h' for help.", file=sys.stderr)
625 return 1
626
627 oauth_filename = os.path.expanduser(options['oauth_filename'])
628
629 if (options['action'] == 'authorize'
630 or not os.path.exists(oauth_filename)):
631 oauth_dance(
632 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
633 options['oauth_filename'])
634
635 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
636
637 twitter = Twitter(
638 auth=OAuth(
639 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
640 secure=options['secure'],
641 api_version='1',
642 domain='api.twitter.com')
643
644 try:
645 Action()(twitter, options)
646 except NoSuchActionError as e:
647 print(e, file=sys.stderr)
648 raise SystemExit(1)
649 except TwitterError as e:
650 print(str(e), file=sys.stderr)
651 print("Use 'twitter -h' for help.", file=sys.stderr)
652 raise SystemExit(1)