]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Fix Issue #60 can't write config file on Windows
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
26
27
28 OPTIONS:
29
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
42
43 FORMATS for the --format option
44
45 default one line per status
46 verbose multiple lines per status, more verbose status info
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 try:
68 input = __builtins__['raw_input']
69 except (AttributeError, KeyError):
70 pass
71
72
73 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76 import sys
77 import time
78 from getopt import gnu_getopt as getopt, GetoptError
79 from getpass import getpass
80 import re
81 import os.path
82 import locale
83 import string
84
85 try:
86 from ConfigParser import SafeConfigParser
87 except ImportError:
88 from configparser import ConfigParser as SafeConfigParser
89 import datetime
90 try:
91 from urllib.parse import quote
92 except ImportError:
93 from urllib2 import quote
94 import webbrowser
95
96 from .api import Twitter, TwitterError
97 from .oauth import OAuth, write_token_file, read_token_file
98 from .oauth_dance import oauth_dance
99 from . import ansi
100 from .util import smrt_input, printNicely
101
102 OPTIONS = {
103 'action': 'friends',
104 'refresh': False,
105 'refresh_rate': 600,
106 'format': 'default',
107 'prompt': '[cyan]twitter[R]> ',
108 'config_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter',
109 'oauth_filename': os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + os.sep + '.twitter_oauth',
110 'length': 20,
111 'timestamp': False,
112 'datestamp': False,
113 'extra_args': [],
114 'secure': True,
115 'invert_split': False,
116 }
117
118 def parse_args(args, options):
119 long_opts = ['help', 'format=', 'refresh', 'oauth=',
120 'refresh-rate=', 'config=', 'length=', 'timestamp',
121 'datestamp', 'no-ssl']
122 short_opts = "e:p:f:h?rR:c:l:td"
123 opts, extra_args = getopt(args, short_opts, long_opts)
124 extra_args = [arg.decode(locale.getpreferredencoding())
125 for arg in extra_args]
126
127 for opt, arg in opts:
128 if opt in ('-f', '--format'):
129 options['format'] = arg
130 elif opt in ('-r', '--refresh'):
131 options['refresh'] = True
132 elif opt in ('-R', '--refresh-rate'):
133 options['refresh_rate'] = int(arg)
134 elif opt in ('-l', '--length'):
135 options["length"] = int(arg)
136 elif opt in ('-t', '--timestamp'):
137 options["timestamp"] = True
138 elif opt in ('-d', '--datestamp'):
139 options["datestamp"] = True
140 elif opt in ('-?', '-h', '--help'):
141 options['action'] = 'help'
142 elif opt in ('-c', '--config'):
143 options['config_filename'] = arg
144 elif opt == '--no-ssl':
145 options['secure'] = False
146 elif opt == '--oauth':
147 options['oauth_filename'] = arg
148
149 if extra_args and not ('action' in options and options['action'] == 'help'):
150 options['action'] = extra_args[0]
151 options['extra_args'] = extra_args[1:]
152
153 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
154 timestamp = options["timestamp"]
155 datestamp = options["datestamp"]
156 t = time.strptime(status['created_at'], format)
157 i_hate_timezones = time.timezone
158 if (time.daylight):
159 i_hate_timezones = time.altzone
160 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
161 seconds=i_hate_timezones)
162 t = dt.timetuple()
163 if timestamp and datestamp:
164 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
165 elif timestamp:
166 return time.strftime("%H:%M:%S ", t)
167 elif datestamp:
168 return time.strftime("%Y-%m-%d ", t)
169 return ""
170
171 class StatusFormatter(object):
172 def __call__(self, status, options):
173 return ("%s%s %s" %(
174 get_time_string(status, options),
175 status['user']['screen_name'], status['text']))
176
177 class AnsiStatusFormatter(object):
178 def __init__(self):
179 self._colourMap = ansi.ColourMap()
180
181 def __call__(self, status, options):
182 colour = self._colourMap.colourFor(status['user']['screen_name'])
183 return ("%s%s%s%s %s" %(
184 get_time_string(status, options),
185 ansi.cmdColour(colour), status['user']['screen_name'],
186 ansi.cmdReset(), status['text']))
187
188 class VerboseStatusFormatter(object):
189 def __call__(self, status, options):
190 return ("-- %s (%s) on %s\n%s\n" %(
191 status['user']['screen_name'],
192 status['user']['location'],
193 status['created_at'],
194 status['text']))
195
196 class URLStatusFormatter(object):
197 urlmatch = re.compile(r'https?://\S+')
198 def __call__(self, status, options):
199 urls = self.urlmatch.findall(status['text'])
200 return '\n'.join(urls) if urls else ""
201
202
203 class ListsFormatter(object):
204 def __call__(self, list):
205 if list['description']:
206 list_str = "%-30s (%s)" % (list['name'], list['description'])
207 else:
208 list_str = "%-30s" % (list['name'])
209 return "%s\n" % list_str
210
211 class ListsVerboseFormatter(object):
212 def __call__(self, list):
213 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
214 return list_str
215
216 class AnsiListsFormatter(object):
217 def __init__(self):
218 self._colourMap = ansi.ColourMap()
219
220 def __call__(self, list):
221 colour = self._colourMap.colourFor(list['name'])
222 return ("%s%-15s%s %s" %(
223 ansi.cmdColour(colour), list['name'],
224 ansi.cmdReset(), list['description']))
225
226
227 class AdminFormatter(object):
228 def __call__(self, action, user):
229 user_str = "%s (%s)" %(user['screen_name'], user['name'])
230 if action == "follow":
231 return "You are now following %s.\n" %(user_str)
232 else:
233 return "You are no longer following %s.\n" %(user_str)
234
235 class VerboseAdminFormatter(object):
236 def __call__(self, action, user):
237 return("-- %s: %s (%s): %s" % (
238 "Following" if action == "follow" else "Leaving",
239 user['screen_name'],
240 user['name'],
241 user['url']))
242
243 class SearchFormatter(object):
244 def __call__(self, result, options):
245 return("%s%s %s" %(
246 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
247 result['from_user'], result['text']))
248
249 class VerboseSearchFormatter(SearchFormatter):
250 pass #Default to the regular one
251
252 class URLSearchFormatter(object):
253 urlmatch = re.compile(r'https?://\S+')
254 def __call__(self, result, options):
255 urls = self.urlmatch.findall(result['text'])
256 return '\n'.join(urls) if urls else ""
257
258 class AnsiSearchFormatter(object):
259 def __init__(self):
260 self._colourMap = ansi.ColourMap()
261
262 def __call__(self, result, options):
263 colour = self._colourMap.colourFor(result['from_user'])
264 return ("%s%s%s%s %s" %(
265 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
266 ansi.cmdColour(colour), result['from_user'],
267 ansi.cmdReset(), result['text']))
268
269 _term_encoding = None
270 def get_term_encoding():
271 global _term_encoding
272 if not _term_encoding:
273 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
274 if lang[1:]:
275 _term_encoding = lang[1]
276 else:
277 _term_encoding = 'UTF-8'
278 return _term_encoding
279
280 formatters = {}
281 status_formatters = {
282 'default': StatusFormatter,
283 'verbose': VerboseStatusFormatter,
284 'urls': URLStatusFormatter,
285 'ansi': AnsiStatusFormatter
286 }
287 formatters['status'] = status_formatters
288
289 admin_formatters = {
290 'default': AdminFormatter,
291 'verbose': VerboseAdminFormatter,
292 'urls': AdminFormatter,
293 'ansi': AdminFormatter
294 }
295 formatters['admin'] = admin_formatters
296
297 search_formatters = {
298 'default': SearchFormatter,
299 'verbose': VerboseSearchFormatter,
300 'urls': URLSearchFormatter,
301 'ansi': AnsiSearchFormatter
302 }
303 formatters['search'] = search_formatters
304
305 lists_formatters = {
306 'default': ListsFormatter,
307 'verbose': ListsVerboseFormatter,
308 'urls': None,
309 'ansi': AnsiListsFormatter
310 }
311 formatters['lists'] = lists_formatters
312
313 def get_formatter(action_type, options):
314 formatters_dict = formatters.get(action_type)
315 if (not formatters_dict):
316 raise TwitterError(
317 "There was an error finding a class of formatters for your type (%s)"
318 %(action_type))
319 f = formatters_dict.get(options['format'])
320 if (not f):
321 raise TwitterError(
322 "Unknown formatter '%s' for status actions" %(options['format']))
323 return f()
324
325 class Action(object):
326
327 def ask(self, subject='perform this action', careful=False):
328 '''
329 Requests fromt he user using `raw_input` if `subject` should be
330 performed. When `careful`, the default answer is NO, otherwise YES.
331 Returns the user answer in the form `True` or `False`.
332 '''
333 sample = '(y/N)'
334 if not careful:
335 sample = '(Y/n)'
336
337 prompt = 'You really want to %s %s? ' %(subject, sample)
338 try:
339 answer = input(prompt).lower()
340 if careful:
341 return answer in ('yes', 'y')
342 else:
343 return answer not in ('no', 'n')
344 except EOFError:
345 print(file=sys.stderr) # Put Newline since Enter was never pressed
346 # TODO:
347 # Figure out why on OS X the raw_input keeps raising
348 # EOFError and is never able to reset and get more input
349 # Hint: Look at how IPython implements their console
350 default = True
351 if careful:
352 default = False
353 return default
354
355 def __call__(self, twitter, options):
356 action = actions.get(options['action'], NoSuchAction)()
357 try:
358 doAction = lambda : action(twitter, options)
359 if (options['refresh'] and isinstance(action, StatusAction)):
360 while True:
361 doAction()
362 sys.stdout.flush()
363 time.sleep(options['refresh_rate'])
364 else:
365 doAction()
366 except KeyboardInterrupt:
367 print('\n[Keyboard Interrupt]', file=sys.stderr)
368 pass
369
370 class NoSuchActionError(Exception):
371 pass
372
373 class NoSuchAction(Action):
374 def __call__(self, twitter, options):
375 raise NoSuchActionError("No such action: %s" %(options['action']))
376
377 class StatusAction(Action):
378 def __call__(self, twitter, options):
379 statuses = self.getStatuses(twitter, options)
380 sf = get_formatter('status', options)
381 for status in statuses:
382 statusStr = sf(status, options)
383 if statusStr.strip():
384 printNicely(statusStr)
385
386 class SearchAction(Action):
387 def __call__(self, twitter, options):
388 # We need to be pointing at search.twitter.com to work, and it is less
389 # tangly to do it here than in the main()
390 twitter.domain="search.twitter.com"
391 twitter.uriparts=()
392 # We need to bypass the TwitterCall parameter encoding, so we
393 # don't encode the plus sign, so we have to encode it ourselves
394 query_string = "+".join(
395 [quote(term)
396 for term in options['extra_args']])
397
398 results = twitter.search(q=query_string)['results']
399 f = get_formatter('search', options)
400 for result in results:
401 resultStr = f(result, options)
402 if resultStr.strip():
403 printNicely(resultStr)
404
405 class AdminAction(Action):
406 def __call__(self, twitter, options):
407 if not (options['extra_args'] and options['extra_args'][0]):
408 raise TwitterError("You need to specify a user (screen name)")
409 af = get_formatter('admin', options)
410 try:
411 user = self.getUser(twitter, options['extra_args'][0])
412 except TwitterError as e:
413 print("There was a problem following or leaving the specified user.")
414 print("You may be trying to follow a user you are already following;")
415 print("Leaving a user you are not currently following;")
416 print("Or the user may not exist.")
417 print("Sorry.")
418 print()
419 print(e)
420 else:
421 printNicely(af(options['action'], user))
422
423 class ListsAction(StatusAction):
424 def getStatuses(self, twitter, options):
425 if not options['extra_args']:
426 raise TwitterError("Please provide a user to query for lists")
427
428 screen_name = options['extra_args'][0]
429
430 if not options['extra_args'][1:]:
431 lists = twitter.user.lists(user=screen_name)['lists']
432 if not lists:
433 printNicely("This user has no lists.")
434 for list in lists:
435 lf = get_formatter('lists', options)
436 printNicely(lf(list))
437 return []
438 else:
439 return reversed(twitter.user.lists.list.statuses(
440 user=screen_name, list=options['extra_args'][1]))
441
442
443 class MyListsAction(ListsAction):
444 def getStatuses(self, twitter, options):
445 screen_name = twitter.account.verify_credentials()['screen_name']
446 options['extra_args'].insert(0, screen_name)
447 return ListsAction.getStatuses(self, twitter, options)
448
449
450 class FriendsAction(StatusAction):
451 def getStatuses(self, twitter, options):
452 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
453
454 class PublicAction(StatusAction):
455 def getStatuses(self, twitter, options):
456 return reversed(twitter.statuses.public_timeline(count=options["length"]))
457
458 class RepliesAction(StatusAction):
459 def getStatuses(self, twitter, options):
460 return reversed(twitter.statuses.replies(count=options["length"]))
461
462 class FollowAction(AdminAction):
463 def getUser(self, twitter, user):
464 return twitter.friendships.create(id=user)
465
466 class LeaveAction(AdminAction):
467 def getUser(self, twitter, user):
468 return twitter.friendships.destroy(id=user)
469
470 class SetStatusAction(Action):
471 def __call__(self, twitter, options):
472 statusTxt = (" ".join(options['extra_args'])
473 if options['extra_args']
474 else str(input("message: ")))
475 replies = []
476 ptr = re.compile("@[\w_]+")
477 while statusTxt:
478 s = ptr.match(statusTxt)
479 if s and s.start() == 0:
480 replies.append(statusTxt[s.start():s.end()])
481 statusTxt = statusTxt[s.end()+1:]
482 else:
483 break
484 replies = " ".join(replies)
485 if len(replies) >= 140:
486 # just go back
487 statusTxt = replies
488 replies = ""
489
490 splitted = []
491 while statusTxt:
492 limit = 140 - len(replies)
493 if len(statusTxt) > limit:
494 end = string.rfind(statusTxt, ' ', 0, limit)
495 else:
496 end = limit
497 splitted.append(" ".join((replies,statusTxt[:end])))
498 statusTxt = statusTxt[end:]
499
500 if options['invert_split']:
501 splitted.reverse()
502 for status in splitted:
503 twitter.statuses.update(status=status)
504
505 class TwitterShell(Action):
506
507 def render_prompt(self, prompt):
508 '''Parses the `prompt` string and returns the rendered version'''
509 prompt = prompt.strip("'").replace("\\'","'")
510 for colour in ansi.COLOURS_NAMED:
511 if '[%s]' %(colour) in prompt:
512 prompt = prompt.replace(
513 '[%s]' %(colour), ansi.cmdColourNamed(colour))
514 prompt = prompt.replace('[R]', ansi.cmdReset())
515 return prompt
516
517 def __call__(self, twitter, options):
518 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
519 while True:
520 options['action'] = ""
521 try:
522 args = input(prompt).split()
523 parse_args(args, options)
524 if not options['action']:
525 continue
526 elif options['action'] == 'exit':
527 raise SystemExit(0)
528 elif options['action'] == 'shell':
529 print('Sorry Xzibit does not work here!', file=sys.stderr)
530 continue
531 elif options['action'] == 'help':
532 print('''\ntwitter> `action`\n
533 The Shell Accepts all the command line actions along with:
534
535 exit Leave the twitter shell (^D may also be used)
536
537 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
538 Action()(twitter, options)
539 options['action'] = ''
540 except NoSuchActionError as e:
541 print(e, file=sys.stderr)
542 except KeyboardInterrupt:
543 print('\n[Keyboard Interrupt]', file=sys.stderr)
544 except EOFError:
545 print(file=sys.stderr)
546 leaving = self.ask(subject='Leave')
547 if not leaving:
548 print('Excellent!', file=sys.stderr)
549 else:
550 raise SystemExit(0)
551
552 class PythonPromptAction(Action):
553 def __call__(self, twitter, options):
554 try:
555 while True:
556 smrt_input(globals(), locals())
557 except EOFError:
558 pass
559
560 class HelpAction(Action):
561 def __call__(self, twitter, options):
562 print(__doc__)
563
564 class DoNothingAction(Action):
565 def __call__(self, twitter, options):
566 pass
567
568 class RateLimitStatus(Action):
569 def __call__(self, twitter, options):
570 rate = twitter.account.rate_limit_status()
571 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
572 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds']-time.time()),
573 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
574
575 actions = {
576 'authorize' : DoNothingAction,
577 'follow' : FollowAction,
578 'friends' : FriendsAction,
579 'list' : ListsAction,
580 'mylist' : MyListsAction,
581 'help' : HelpAction,
582 'leave' : LeaveAction,
583 'public' : PublicAction,
584 'pyprompt' : PythonPromptAction,
585 'replies' : RepliesAction,
586 'search' : SearchAction,
587 'set' : SetStatusAction,
588 'shell' : TwitterShell,
589 'rate' : RateLimitStatus,
590 }
591
592 def loadConfig(filename):
593 options = dict(OPTIONS)
594 if os.path.exists(filename):
595 cp = SafeConfigParser()
596 cp.read([filename])
597 for option in ('format', 'prompt'):
598 if cp.has_option('twitter', option):
599 options[option] = cp.get('twitter', option)
600 # process booleans
601 for option in ('invert_split',):
602 if cp.has_option('twitter', option ):
603 options[option] = cp.getboolean('twitter', option)
604 return options
605
606 def main(args=sys.argv[1:]):
607 arg_options = {}
608 try:
609 parse_args(args, arg_options)
610 except GetoptError as e:
611 print("I can't do that, %s." %(e), file=sys.stderr)
612 print(file=sys.stderr)
613 raise SystemExit(1)
614
615 config_path = os.path.expanduser(
616 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
617 config_options = loadConfig(config_path)
618
619 # Apply the various options in order, the most important applied last.
620 # Defaults first, then what's read from config file, then command-line
621 # arguments.
622 options = dict(OPTIONS)
623 for d in config_options, arg_options:
624 for k,v in list(d.items()):
625 if v: options[k] = v
626
627 if options['refresh'] and options['action'] not in (
628 'friends', 'public', 'replies'):
629 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
630 print("Use 'twitter -h' for help.", file=sys.stderr)
631 return 1
632
633 oauth_filename = os.path.expanduser(options['oauth_filename'])
634
635 if (options['action'] == 'authorize'
636 or not os.path.exists(oauth_filename)):
637 oauth_dance(
638 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
639 options['oauth_filename'])
640
641 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
642
643 twitter = Twitter(
644 auth=OAuth(
645 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
646 secure=options['secure'],
647 api_version='1',
648 domain='api.twitter.com')
649
650 try:
651 Action()(twitter, options)
652 except NoSuchActionError as e:
653 print(e, file=sys.stderr)
654 raise SystemExit(1)
655 except TwitterError as e:
656 print(str(e), file=sys.stderr)
657 print("Use 'twitter -h' for help.", file=sys.stderr)
658 raise SystemExit(1)