]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Merge pull request #185 from cegme/json_status_dump
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 pyprompt start a Python prompt for interacting with the twitter
19 object directly
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 try:
68 input = __builtins__['raw_input']
69 except (AttributeError, KeyError):
70 pass
71
72
73 CONSUMER_KEY = 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET = 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76 import sys
77 import time
78 from getopt import gnu_getopt as getopt, GetoptError
79 from getpass import getpass
80 import re
81 import os.path
82 import locale
83 import string
84
85 try:
86 from ConfigParser import SafeConfigParser
87 except ImportError:
88 from configparser import ConfigParser as SafeConfigParser
89 import datetime
90 try:
91 from urllib.parse import quote
92 except ImportError:
93 from urllib2 import quote
94 try:
95 import HTMLParser
96 except ImportError:
97 import html.parser as HTMLParser
98
99 import webbrowser
100
101 from .api import Twitter, TwitterError
102 from .oauth import OAuth, write_token_file, read_token_file
103 from .oauth_dance import oauth_dance
104 from . import ansi
105 from .util import smrt_input, printNicely
106
107 OPTIONS = {
108 'action': 'friends',
109 'refresh': False,
110 'refresh_rate': 600,
111 'format': 'default',
112 'prompt': '[cyan]twitter[R]> ',
113 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
114 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
115 'length': 20,
116 'timestamp': False,
117 'datestamp': False,
118 'extra_args': [],
119 'secure': True,
120 'invert_split': False,
121 'force-ansi': False,
122 }
123
124 gHtmlParser = HTMLParser.HTMLParser()
125 hashtagRe = re.compile(r'(?P<hashtag>#\S+)')
126 profileRe = re.compile(r'(?P<profile>\@\S+)')
127 ansiFormatter = ansi.AnsiCmd(False)
128
129 def parse_args(args, options):
130 long_opts = ['help', 'format=', 'refresh', 'oauth=',
131 'refresh-rate=', 'config=', 'length=', 'timestamp',
132 'datestamp', 'no-ssl', 'force-ansi']
133 short_opts = "e:p:f:h?rR:c:l:td"
134 opts, extra_args = getopt(args, short_opts, long_opts)
135 if extra_args and hasattr(extra_args[0], 'decode'):
136 extra_args = [arg.decode(locale.getpreferredencoding())
137 for arg in extra_args]
138
139 for opt, arg in opts:
140 if opt in ('-f', '--format'):
141 options['format'] = arg
142 elif opt in ('-r', '--refresh'):
143 options['refresh'] = True
144 elif opt in ('-R', '--refresh-rate'):
145 options['refresh_rate'] = int(arg)
146 elif opt in ('-l', '--length'):
147 options["length"] = int(arg)
148 elif opt in ('-t', '--timestamp'):
149 options["timestamp"] = True
150 elif opt in ('-d', '--datestamp'):
151 options["datestamp"] = True
152 elif opt in ('-?', '-h', '--help'):
153 options['action'] = 'help'
154 elif opt in ('-c', '--config'):
155 options['config_filename'] = arg
156 elif opt == '--no-ssl':
157 options['secure'] = False
158 elif opt == '--oauth':
159 options['oauth_filename'] = arg
160 elif opt == '--force-ansi':
161 options['force-ansi'] = True
162
163 if extra_args and not ('action' in options and options['action'] == 'help'):
164 options['action'] = extra_args[0]
165 options['extra_args'] = extra_args[1:]
166
167 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
168 timestamp = options["timestamp"]
169 datestamp = options["datestamp"]
170 t = time.strptime(status['created_at'], format)
171 i_hate_timezones = time.timezone
172 if (time.daylight):
173 i_hate_timezones = time.altzone
174 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
175 seconds=i_hate_timezones)
176 t = dt.timetuple()
177 if timestamp and datestamp:
178 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
179 elif timestamp:
180 return time.strftime("%H:%M:%S ", t)
181 elif datestamp:
182 return time.strftime("%Y-%m-%d ", t)
183 return ""
184
185 def reRepl(m):
186 ansiTypes = {
187 'clear': ansiFormatter.cmdReset(),
188 'hashtag': ansiFormatter.cmdBold(),
189 'profile': ansiFormatter.cmdUnderline(),
190 }
191
192 s = None
193 try:
194 mkey = m.lastgroup
195 if m.group(mkey):
196 s = '%s%s%s' % (ansiTypes[mkey], m.group(mkey), ansiTypes['clear'])
197 except IndexError:
198 pass
199 return s
200
201 def replaceInStatus(status):
202 txt = gHtmlParser.unescape(status)
203 txt = re.sub(hashtagRe, reRepl, txt)
204 txt = re.sub(profileRe, reRepl, txt)
205 return txt
206
207 class StatusFormatter(object):
208 def __call__(self, status, options):
209 return ("%s%s %s" % (
210 get_time_string(status, options),
211 status['user']['screen_name'], gHtmlParser.unescape(status['text'])))
212
213 class AnsiStatusFormatter(object):
214 def __init__(self):
215 self._colourMap = ansi.ColourMap()
216
217 def __call__(self, status, options):
218 colour = self._colourMap.colourFor(status['user']['screen_name'])
219 return ("%s%s%s%s %s" % (
220 get_time_string(status, options),
221 ansiFormatter.cmdColour(colour), status['user']['screen_name'],
222 ansiFormatter.cmdReset(), replaceInStatus(status['text'])))
223
224 class VerboseStatusFormatter(object):
225 def __call__(self, status, options):
226 return ("-- %s (%s) on %s\n%s\n" % (
227 status['user']['screen_name'],
228 status['user']['location'],
229 status['created_at'],
230 gHtmlParser.unescape(status['text'])))
231
232 class JSONStatusFormatter(object):
233 def __call__(self, status, options):
234 status['text'] = gHtmlParser.unescape(status['text'])
235 return json.dumps(status)
236
237 class URLStatusFormatter(object):
238 urlmatch = re.compile(r'https?://\S+')
239 def __call__(self, status, options):
240 urls = self.urlmatch.findall(status['text'])
241 return '\n'.join(urls) if urls else ""
242
243
244 class ListsFormatter(object):
245 def __call__(self, list):
246 if list['description']:
247 list_str = "%-30s (%s)" % (list['name'], list['description'])
248 else:
249 list_str = "%-30s" % (list['name'])
250 return "%s\n" % list_str
251
252 class ListsVerboseFormatter(object):
253 def __call__(self, list):
254 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
255 return list_str
256
257 class AnsiListsFormatter(object):
258 def __init__(self):
259 self._colourMap = ansi.ColourMap()
260
261 def __call__(self, list):
262 colour = self._colourMap.colourFor(list['name'])
263 return ("%s%-15s%s %s" % (
264 ansiFormatter.cmdColour(colour), list['name'],
265 ansiFormatter.cmdReset(), list['description']))
266
267
268 class AdminFormatter(object):
269 def __call__(self, action, user):
270 user_str = "%s (%s)" % (user['screen_name'], user['name'])
271 if action == "follow":
272 return "You are now following %s.\n" % (user_str)
273 else:
274 return "You are no longer following %s.\n" % (user_str)
275
276 class VerboseAdminFormatter(object):
277 def __call__(self, action, user):
278 return("-- %s: %s (%s): %s" % (
279 "Following" if action == "follow" else "Leaving",
280 user['screen_name'],
281 user['name'],
282 user['url']))
283
284 class SearchFormatter(object):
285 def __call__(self, result, options):
286 return("%s%s %s" % (
287 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
288 result['from_user'], result['text']))
289
290 class VerboseSearchFormatter(SearchFormatter):
291 pass # Default to the regular one
292
293 class URLSearchFormatter(object):
294 urlmatch = re.compile(r'https?://\S+')
295 def __call__(self, result, options):
296 urls = self.urlmatch.findall(result['text'])
297 return '\n'.join(urls) if urls else ""
298
299 class AnsiSearchFormatter(object):
300 def __init__(self):
301 self._colourMap = ansi.ColourMap()
302
303 def __call__(self, result, options):
304 colour = self._colourMap.colourFor(result['from_user'])
305 return ("%s%s%s%s %s" % (
306 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
307 ansiFormatter.cmdColour(colour), result['from_user'],
308 ansiFormatter.cmdReset(), result['text']))
309
310 _term_encoding = None
311 def get_term_encoding():
312 global _term_encoding
313 if not _term_encoding:
314 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
315 if lang[1:]:
316 _term_encoding = lang[1]
317 else:
318 _term_encoding = 'UTF-8'
319 return _term_encoding
320
321 formatters = {}
322 status_formatters = {
323 'default': StatusFormatter,
324 'verbose': VerboseStatusFormatter,
325 'json': JSONStatusFormatter,
326 'urls': URLStatusFormatter,
327 'ansi': AnsiStatusFormatter
328 }
329 formatters['status'] = status_formatters
330
331 admin_formatters = {
332 'default': AdminFormatter,
333 'verbose': VerboseAdminFormatter,
334 'urls': AdminFormatter,
335 'ansi': AdminFormatter
336 }
337 formatters['admin'] = admin_formatters
338
339 search_formatters = {
340 'default': SearchFormatter,
341 'verbose': VerboseSearchFormatter,
342 'urls': URLSearchFormatter,
343 'ansi': AnsiSearchFormatter
344 }
345 formatters['search'] = search_formatters
346
347 lists_formatters = {
348 'default': ListsFormatter,
349 'verbose': ListsVerboseFormatter,
350 'urls': None,
351 'ansi': AnsiListsFormatter
352 }
353 formatters['lists'] = lists_formatters
354
355 def get_formatter(action_type, options):
356 formatters_dict = formatters.get(action_type)
357 if (not formatters_dict):
358 raise TwitterError(
359 "There was an error finding a class of formatters for your type (%s)"
360 % (action_type))
361 f = formatters_dict.get(options['format'])
362 if (not f):
363 raise TwitterError(
364 "Unknown formatter '%s' for status actions" % (options['format']))
365 return f()
366
367 class Action(object):
368
369 def ask(self, subject='perform this action', careful=False):
370 '''
371 Requests fromt he user using `raw_input` if `subject` should be
372 performed. When `careful`, the default answer is NO, otherwise YES.
373 Returns the user answer in the form `True` or `False`.
374 '''
375 sample = '(y/N)'
376 if not careful:
377 sample = '(Y/n)'
378
379 prompt = 'You really want to %s %s? ' % (subject, sample)
380 try:
381 answer = input(prompt).lower()
382 if careful:
383 return answer in ('yes', 'y')
384 else:
385 return answer not in ('no', 'n')
386 except EOFError:
387 print(file=sys.stderr) # Put Newline since Enter was never pressed
388 # TODO:
389 # Figure out why on OS X the raw_input keeps raising
390 # EOFError and is never able to reset and get more input
391 # Hint: Look at how IPython implements their console
392 default = True
393 if careful:
394 default = False
395 return default
396
397 def __call__(self, twitter, options):
398 action = actions.get(options['action'], NoSuchAction)()
399 try:
400 doAction = lambda : action(twitter, options)
401 if (options['refresh'] and isinstance(action, StatusAction)):
402 while True:
403 doAction()
404 sys.stdout.flush()
405 time.sleep(options['refresh_rate'])
406 else:
407 doAction()
408 except KeyboardInterrupt:
409 print('\n[Keyboard Interrupt]', file=sys.stderr)
410 pass
411
412 class NoSuchActionError(Exception):
413 pass
414
415 class NoSuchAction(Action):
416 def __call__(self, twitter, options):
417 raise NoSuchActionError("No such action: %s" % (options['action']))
418
419 class StatusAction(Action):
420 def __call__(self, twitter, options):
421 statuses = self.getStatuses(twitter, options)
422 sf = get_formatter('status', options)
423 for status in statuses:
424 statusStr = sf(status, options)
425 if statusStr.strip():
426 printNicely(statusStr)
427
428 class SearchAction(Action):
429 def __call__(self, twitter, options):
430 # We need to be pointing at search.twitter.com to work, and it is less
431 # tangly to do it here than in the main()
432 twitter.domain = "search.twitter.com"
433 twitter.uriparts = ()
434 # We need to bypass the TwitterCall parameter encoding, so we
435 # don't encode the plus sign, so we have to encode it ourselves
436 query_string = "+".join(
437 [quote(term)
438 for term in options['extra_args']])
439
440 results = twitter.search(q=query_string)['results']
441 f = get_formatter('search', options)
442 for result in results:
443 resultStr = f(result, options)
444 if resultStr.strip():
445 printNicely(resultStr)
446
447 class AdminAction(Action):
448 def __call__(self, twitter, options):
449 if not (options['extra_args'] and options['extra_args'][0]):
450 raise TwitterError("You need to specify a user (screen name)")
451 af = get_formatter('admin', options)
452 try:
453 user = self.getUser(twitter, options['extra_args'][0])
454 except TwitterError as e:
455 print("There was a problem following or leaving the specified user.")
456 print("You may be trying to follow a user you are already following;")
457 print("Leaving a user you are not currently following;")
458 print("Or the user may not exist.")
459 print("Sorry.")
460 print()
461 print(e)
462 else:
463 printNicely(af(options['action'], user))
464
465 class ListsAction(StatusAction):
466 def getStatuses(self, twitter, options):
467 if not options['extra_args']:
468 raise TwitterError("Please provide a user to query for lists")
469
470 screen_name = options['extra_args'][0]
471
472 if not options['extra_args'][1:]:
473 lists = twitter.lists.list(screen_name=screen_name)
474 if not lists:
475 printNicely("This user has no lists.")
476 for list in lists:
477 lf = get_formatter('lists', options)
478 printNicely(lf(list))
479 return []
480 else:
481 return reversed(twitter.user.lists.list.statuses(
482 user=screen_name, list=options['extra_args'][1]))
483
484
485 class MyListsAction(ListsAction):
486 def getStatuses(self, twitter, options):
487 screen_name = twitter.account.verify_credentials()['screen_name']
488 options['extra_args'].insert(0, screen_name)
489 return ListsAction.getStatuses(self, twitter, options)
490
491
492 class FriendsAction(StatusAction):
493 def getStatuses(self, twitter, options):
494 return reversed(twitter.statuses.home_timeline(count=options["length"]))
495
496 class RepliesAction(StatusAction):
497 def getStatuses(self, twitter, options):
498 return reversed(twitter.statuses.mentions_timeline(count=options["length"]))
499
500 class FollowAction(AdminAction):
501 def getUser(self, twitter, user):
502 return twitter.friendships.create(id=user)
503
504 class LeaveAction(AdminAction):
505 def getUser(self, twitter, user):
506 return twitter.friendships.destroy(id=user)
507
508 class SetStatusAction(Action):
509 def __call__(self, twitter, options):
510 statusTxt = (" ".join(options['extra_args'])
511 if options['extra_args']
512 else str(input("message: ")))
513 replies = []
514 ptr = re.compile("@[\w_]+")
515 while statusTxt:
516 s = ptr.match(statusTxt)
517 if s and s.start() == 0:
518 replies.append(statusTxt[s.start():s.end()])
519 statusTxt = statusTxt[s.end() + 1:]
520 else:
521 break
522 replies = " ".join(replies)
523 if len(replies) >= 140:
524 # just go back
525 statusTxt = replies
526 replies = ""
527
528 splitted = []
529 while statusTxt:
530 limit = 140 - len(replies)
531 if len(statusTxt) > limit:
532 end = string.rfind(statusTxt, ' ', 0, limit)
533 else:
534 end = limit
535 splitted.append(" ".join((replies, statusTxt[:end])))
536 statusTxt = statusTxt[end:]
537
538 if options['invert_split']:
539 splitted.reverse()
540 for status in splitted:
541 twitter.statuses.update(status=status)
542
543 class TwitterShell(Action):
544
545 def render_prompt(self, prompt):
546 '''Parses the `prompt` string and returns the rendered version'''
547 prompt = prompt.strip("'").replace("\\'", "'")
548 for colour in ansi.COLOURS_NAMED:
549 if '[%s]' % (colour) in prompt:
550 prompt = prompt.replace(
551 '[%s]' % (colour), ansiFormatter.cmdColourNamed(colour))
552 prompt = prompt.replace('[R]', ansiFormatter.cmdReset())
553 return prompt
554
555 def __call__(self, twitter, options):
556 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
557 while True:
558 options['action'] = ""
559 try:
560 args = input(prompt).split()
561 parse_args(args, options)
562 if not options['action']:
563 continue
564 elif options['action'] == 'exit':
565 raise SystemExit(0)
566 elif options['action'] == 'shell':
567 print('Sorry Xzibit does not work here!', file=sys.stderr)
568 continue
569 elif options['action'] == 'help':
570 print('''\ntwitter> `action`\n
571 The Shell Accepts all the command line actions along with:
572
573 exit Leave the twitter shell (^D may also be used)
574
575 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
576 Action()(twitter, options)
577 options['action'] = ''
578 except NoSuchActionError as e:
579 print(e, file=sys.stderr)
580 except KeyboardInterrupt:
581 print('\n[Keyboard Interrupt]', file=sys.stderr)
582 except EOFError:
583 print(file=sys.stderr)
584 leaving = self.ask(subject='Leave')
585 if not leaving:
586 print('Excellent!', file=sys.stderr)
587 else:
588 raise SystemExit(0)
589
590 class PythonPromptAction(Action):
591 def __call__(self, twitter, options):
592 try:
593 while True:
594 smrt_input(globals(), locals())
595 except EOFError:
596 pass
597
598 class HelpAction(Action):
599 def __call__(self, twitter, options):
600 print(__doc__)
601
602 class DoNothingAction(Action):
603 def __call__(self, twitter, options):
604 pass
605
606 class RateLimitStatus(Action):
607 def __call__(self, twitter, options):
608 rate = twitter.account.rate_limit_status()
609 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
610 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds'] - time.time()),
611 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
612
613 actions = {
614 'authorize' : DoNothingAction,
615 'follow' : FollowAction,
616 'friends' : FriendsAction,
617 'list' : ListsAction,
618 'mylist' : MyListsAction,
619 'help' : HelpAction,
620 'leave' : LeaveAction,
621 'pyprompt' : PythonPromptAction,
622 'replies' : RepliesAction,
623 'search' : SearchAction,
624 'set' : SetStatusAction,
625 'shell' : TwitterShell,
626 'rate' : RateLimitStatus,
627 }
628
629 def loadConfig(filename):
630 options = dict(OPTIONS)
631 if os.path.exists(filename):
632 cp = SafeConfigParser()
633 cp.read([filename])
634 for option in ('format', 'prompt'):
635 if cp.has_option('twitter', option):
636 options[option] = cp.get('twitter', option)
637 # process booleans
638 for option in ('invert_split',):
639 if cp.has_option('twitter', option):
640 options[option] = cp.getboolean('twitter', option)
641 return options
642
643 def main(args=sys.argv[1:]):
644 arg_options = {}
645 try:
646 parse_args(args, arg_options)
647 except GetoptError as e:
648 print("I can't do that, %s." % (e), file=sys.stderr)
649 print(file=sys.stderr)
650 raise SystemExit(1)
651
652 config_path = os.path.expanduser(
653 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
654 config_options = loadConfig(config_path)
655
656 # Apply the various options in order, the most important applied last.
657 # Defaults first, then what's read from config file, then command-line
658 # arguments.
659 options = dict(OPTIONS)
660 for d in config_options, arg_options:
661 for k, v in list(d.items()):
662 if v: options[k] = v
663
664 if options['refresh'] and options['action'] not in (
665 'friends', 'replies'):
666 print("You can only refresh the friends or replies actions.", file=sys.stderr)
667 print("Use 'twitter -h' for help.", file=sys.stderr)
668 return 1
669
670 oauth_filename = os.path.expanduser(options['oauth_filename'])
671
672 if (options['action'] == 'authorize'
673 or not os.path.exists(oauth_filename)):
674 oauth_dance(
675 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
676 options['oauth_filename'])
677
678 global ansiFormatter
679 ansiFormatter = ansi.AnsiCmd(options["force-ansi"])
680
681 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
682
683 twitter = Twitter(
684 auth=OAuth(
685 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
686 secure=options['secure'],
687 api_version='1.1',
688 domain='api.twitter.com')
689
690 try:
691 Action()(twitter, options)
692 except NoSuchActionError as e:
693 print(e, file=sys.stderr)
694 raise SystemExit(1)
695 except TwitterError as e:
696 print(str(e), file=sys.stderr)
697 print("Use 'twitter -h' for help.", file=sys.stderr)
698 raise SystemExit(1)