]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
- Check rate limit using the command line tool. Patch by @stalkr_
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
26
27
28 OPTIONS:
29
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
42
43 FORMATS for the --format option
44
45 default one line per status
46 verbose multiple lines per status, more verbose status info
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
68 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
69
70 import sys
71 import time
72 from getopt import gnu_getopt as getopt, GetoptError
73 from getpass import getpass
74 import re
75 import os.path
76 try:
77 from ConfigParser import SafeConfigParser
78 except ImportError:
79 from configparser import ConfigParser as SafeConfigParser
80 import datetime
81 try:
82 from urllib.parse import quote
83 except ImportError:
84 from urllib2 import quote
85 import webbrowser
86
87 from .api import Twitter, TwitterError
88 from .oauth import OAuth, write_token_file, read_token_file
89 from .oauth_dance import oauth_dance
90 from . import ansi
91 from .util import smrt_input, printNicely
92
93 OPTIONS = {
94 'action': 'friends',
95 'refresh': False,
96 'refresh_rate': 600,
97 'format': 'default',
98 'prompt': '[cyan]twitter[R]> ',
99 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
100 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
101 'length': 20,
102 'timestamp': False,
103 'datestamp': False,
104 'extra_args': [],
105 'secure': True,
106 }
107
108 def parse_args(args, options):
109 long_opts = ['help', 'format=', 'refresh', 'oauth=',
110 'refresh-rate=', 'config=', 'length=', 'timestamp',
111 'datestamp', 'no-ssl']
112 short_opts = "e:p:f:h?rR:c:l:td"
113 opts, extra_args = getopt(args, short_opts, long_opts)
114
115 for opt, arg in opts:
116 if opt in ('-f', '--format'):
117 options['format'] = arg
118 elif opt in ('-r', '--refresh'):
119 options['refresh'] = True
120 elif opt in ('-R', '--refresh-rate'):
121 options['refresh_rate'] = int(arg)
122 elif opt in ('-l', '--length'):
123 options["length"] = int(arg)
124 elif opt in ('-t', '--timestamp'):
125 options["timestamp"] = True
126 elif opt in ('-d', '--datestamp'):
127 options["datestamp"] = True
128 elif opt in ('-?', '-h', '--help'):
129 options['action'] = 'help'
130 elif opt in ('-c', '--config'):
131 options['config_filename'] = arg
132 elif opt == '--no-ssl':
133 options['secure'] = False
134 elif opt == '--oauth':
135 options['oauth_filename'] = arg
136
137 if extra_args and not ('action' in options and options['action'] == 'help'):
138 options['action'] = extra_args[0]
139 options['extra_args'] = extra_args[1:]
140
141 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
142 timestamp = options["timestamp"]
143 datestamp = options["datestamp"]
144 t = time.strptime(status['created_at'], format)
145 i_hate_timezones = time.timezone
146 if (time.daylight):
147 i_hate_timezones = time.altzone
148 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
149 seconds=i_hate_timezones)
150 t = dt.timetuple()
151 if timestamp and datestamp:
152 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
153 elif timestamp:
154 return time.strftime("%H:%M:%S ", t)
155 elif datestamp:
156 return time.strftime("%Y-%m-%d ", t)
157 return ""
158
159 class StatusFormatter(object):
160 def __call__(self, status, options):
161 return ("%s%s %s" %(
162 get_time_string(status, options),
163 status['user']['screen_name'], status['text']))
164
165 class AnsiStatusFormatter(object):
166 def __init__(self):
167 self._colourMap = ansi.ColourMap()
168
169 def __call__(self, status, options):
170 colour = self._colourMap.colourFor(status['user']['screen_name'])
171 return ("%s%s%s%s %s" %(
172 get_time_string(status, options),
173 ansi.cmdColour(colour), status['user']['screen_name'],
174 ansi.cmdReset(), status['text']))
175
176 class VerboseStatusFormatter(object):
177 def __call__(self, status, options):
178 return ("-- %s (%s) on %s\n%s\n" %(
179 status['user']['screen_name'],
180 status['user']['location'],
181 status['created_at'],
182 status['text']))
183
184 class URLStatusFormatter(object):
185 urlmatch = re.compile(r'https?://\S+')
186 def __call__(self, status, options):
187 urls = self.urlmatch.findall(status['text'])
188 return '\n'.join(urls) if urls else ""
189
190
191 class ListsFormatter(object):
192 def __call__(self, list):
193 if list['description']:
194 list_str = "%-30s (%s)" % (list['name'], list['description'])
195 else:
196 list_str = "%-30s" % (list['name'])
197 return "%s\n" % list_str
198
199 class ListsVerboseFormatter(object):
200 def __call__(self, list):
201 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
202 return list_str
203
204 class AnsiListsFormatter(object):
205 def __init__(self):
206 self._colourMap = ansi.ColourMap()
207
208 def __call__(self, list):
209 colour = self._colourMap.colourFor(list['name'])
210 return ("%s%-15s%s %s" %(
211 ansi.cmdColour(colour), list['name'],
212 ansi.cmdReset(), list['description']))
213
214
215 class AdminFormatter(object):
216 def __call__(self, action, user):
217 user_str = "%s (%s)" %(user['screen_name'], user['name'])
218 if action == "follow":
219 return "You are now following %s.\n" %(user_str)
220 else:
221 return "You are no longer following %s.\n" %(user_str)
222
223 class VerboseAdminFormatter(object):
224 def __call__(self, action, user):
225 return("-- %s: %s (%s): %s" % (
226 "Following" if action == "follow" else "Leaving",
227 user['screen_name'],
228 user['name'],
229 user['url']))
230
231 class SearchFormatter(object):
232 def __call__(self, result, options):
233 return("%s%s %s" %(
234 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
235 result['from_user'], result['text']))
236
237 class VerboseSearchFormatter(SearchFormatter):
238 pass #Default to the regular one
239
240 class URLSearchFormatter(object):
241 urlmatch = re.compile(r'https?://\S+')
242 def __call__(self, result, options):
243 urls = self.urlmatch.findall(result['text'])
244 return '\n'.join(urls) if urls else ""
245
246 class AnsiSearchFormatter(object):
247 def __init__(self):
248 self._colourMap = ansi.ColourMap()
249
250 def __call__(self, result, options):
251 colour = self._colourMap.colourFor(result['from_user'])
252 return ("%s%s%s%s %s" %(
253 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
254 ansi.cmdColour(colour), result['from_user'],
255 ansi.cmdReset(), result['text']))
256
257 _term_encoding = None
258 def get_term_encoding():
259 global _term_encoding
260 if not _term_encoding:
261 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
262 if lang[1:]:
263 _term_encoding = lang[1]
264 else:
265 _term_encoding = 'UTF-8'
266 return _term_encoding
267
268 formatters = {}
269 status_formatters = {
270 'default': StatusFormatter,
271 'verbose': VerboseStatusFormatter,
272 'urls': URLStatusFormatter,
273 'ansi': AnsiStatusFormatter
274 }
275 formatters['status'] = status_formatters
276
277 admin_formatters = {
278 'default': AdminFormatter,
279 'verbose': VerboseAdminFormatter,
280 'urls': AdminFormatter,
281 'ansi': AdminFormatter
282 }
283 formatters['admin'] = admin_formatters
284
285 search_formatters = {
286 'default': SearchFormatter,
287 'verbose': VerboseSearchFormatter,
288 'urls': URLSearchFormatter,
289 'ansi': AnsiSearchFormatter
290 }
291 formatters['search'] = search_formatters
292
293 lists_formatters = {
294 'default': ListsFormatter,
295 'verbose': ListsVerboseFormatter,
296 'urls': None,
297 'ansi': AnsiListsFormatter
298 }
299 formatters['lists'] = lists_formatters
300
301 def get_formatter(action_type, options):
302 formatters_dict = formatters.get(action_type)
303 if (not formatters_dict):
304 raise TwitterError(
305 "There was an error finding a class of formatters for your type (%s)"
306 %(action_type))
307 f = formatters_dict.get(options['format'])
308 if (not f):
309 raise TwitterError(
310 "Unknown formatter '%s' for status actions" %(options['format']))
311 return f()
312
313 class Action(object):
314
315 def ask(self, subject='perform this action', careful=False):
316 '''
317 Requests fromt he user using `raw_input` if `subject` should be
318 performed. When `careful`, the default answer is NO, otherwise YES.
319 Returns the user answer in the form `True` or `False`.
320 '''
321 sample = '(y/N)'
322 if not careful:
323 sample = '(Y/n)'
324
325 prompt = 'You really want to %s %s? ' %(subject, sample)
326 try:
327 answer = input(prompt).lower()
328 if careful:
329 return answer in ('yes', 'y')
330 else:
331 return answer not in ('no', 'n')
332 except EOFError:
333 print(file=sys.stderr) # Put Newline since Enter was never pressed
334 # TODO:
335 # Figure out why on OS X the raw_input keeps raising
336 # EOFError and is never able to reset and get more input
337 # Hint: Look at how IPython implements their console
338 default = True
339 if careful:
340 default = False
341 return default
342
343 def __call__(self, twitter, options):
344 action = actions.get(options['action'], NoSuchAction)()
345 try:
346 doAction = lambda : action(twitter, options)
347 if (options['refresh'] and isinstance(action, StatusAction)):
348 while True:
349 doAction()
350 time.sleep(options['refresh_rate'])
351 else:
352 doAction()
353 except KeyboardInterrupt:
354 print('\n[Keyboard Interrupt]', file=sys.stderr)
355 pass
356
357 class NoSuchActionError(Exception):
358 pass
359
360 class NoSuchAction(Action):
361 def __call__(self, twitter, options):
362 raise NoSuchActionError("No such action: %s" %(options['action']))
363
364 class StatusAction(Action):
365 def __call__(self, twitter, options):
366 statuses = self.getStatuses(twitter, options)
367 sf = get_formatter('status', options)
368 for status in statuses:
369 statusStr = sf(status, options)
370 if statusStr.strip():
371 printNicely(statusStr)
372
373 class SearchAction(Action):
374 def __call__(self, twitter, options):
375 # We need to be pointing at search.twitter.com to work, and it is less
376 # tangly to do it here than in the main()
377 twitter.domain="search.twitter.com"
378 twitter.uriparts=()
379 # We need to bypass the TwitterCall parameter encoding, so we
380 # don't encode the plus sign, so we have to encode it ourselves
381 query_string = "+".join(
382 [quote(term)
383 for term in options['extra_args']])
384
385 results = twitter.search(q=query_string)['results']
386 f = get_formatter('search', options)
387 for result in results:
388 resultStr = f(result, options)
389 if resultStr.strip():
390 printNicely(resultStr)
391
392 class AdminAction(Action):
393 def __call__(self, twitter, options):
394 if not (options['extra_args'] and options['extra_args'][0]):
395 raise TwitterError("You need to specify a user (screen name)")
396 af = get_formatter('admin', options)
397 try:
398 user = self.getUser(twitter, options['extra_args'][0])
399 except TwitterError as e:
400 print("There was a problem following or leaving the specified user.")
401 print("You may be trying to follow a user you are already following;")
402 print("Leaving a user you are not currently following;")
403 print("Or the user may not exist.")
404 print("Sorry.")
405 print()
406 print(e)
407 else:
408 printNicely(af(options['action'], user))
409
410 class ListsAction(StatusAction):
411 def getStatuses(self, twitter, options):
412 if not options['extra_args']:
413 raise TwitterError("Please provide a user to query for lists")
414
415 screen_name = options['extra_args'][0]
416
417 if not options['extra_args'][1:]:
418 lists = twitter.user.lists(user=screen_name)['lists']
419 if not lists:
420 printNicely("This user has no lists.")
421 for list in lists:
422 lf = get_formatter('lists', options)
423 printNicely(lf(list))
424 return []
425 else:
426 return reversed(twitter.user.lists.list.statuses(
427 user=screen_name, list=options['extra_args'][1]))
428
429
430 class MyListsAction(ListsAction):
431 def getStatuses(self, twitter, options):
432 screen_name = twitter.account.verify_credentials()['screen_name']
433 options['extra_args'].insert(0, screen_name)
434 return ListsAction.getStatuses(self, twitter, options)
435
436
437 class FriendsAction(StatusAction):
438 def getStatuses(self, twitter, options):
439 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
440
441 class PublicAction(StatusAction):
442 def getStatuses(self, twitter, options):
443 return reversed(twitter.statuses.public_timeline(count=options["length"]))
444
445 class RepliesAction(StatusAction):
446 def getStatuses(self, twitter, options):
447 return reversed(twitter.statuses.replies(count=options["length"]))
448
449 class FollowAction(AdminAction):
450 def getUser(self, twitter, user):
451 return twitter.friendships.create(id=user)
452
453 class LeaveAction(AdminAction):
454 def getUser(self, twitter, user):
455 return twitter.friendships.destroy(id=user)
456
457 class SetStatusAction(Action):
458 def __call__(self, twitter, options):
459 statusTxt = (" ".join(options['extra_args'])
460 if options['extra_args']
461 else str(input("message: ")))
462 twitter.statuses.update(status=statusTxt)
463
464 class TwitterShell(Action):
465
466 def render_prompt(self, prompt):
467 '''Parses the `prompt` string and returns the rendered version'''
468 prompt = prompt.strip("'").replace("\\'","'")
469 for colour in ansi.COLOURS_NAMED:
470 if '[%s]' %(colour) in prompt:
471 prompt = prompt.replace(
472 '[%s]' %(colour), ansi.cmdColourNamed(colour))
473 prompt = prompt.replace('[R]', ansi.cmdReset())
474 return prompt
475
476 def __call__(self, twitter, options):
477 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
478 while True:
479 options['action'] = ""
480 try:
481 args = input(prompt).split()
482 parse_args(args, options)
483 if not options['action']:
484 continue
485 elif options['action'] == 'exit':
486 raise SystemExit(0)
487 elif options['action'] == 'shell':
488 print('Sorry Xzibit does not work here!', file=sys.stderr)
489 continue
490 elif options['action'] == 'help':
491 print('''\ntwitter> `action`\n
492 The Shell Accepts all the command line actions along with:
493
494 exit Leave the twitter shell (^D may also be used)
495
496 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
497 Action()(twitter, options)
498 options['action'] = ''
499 except NoSuchActionError as e:
500 print(e, file=sys.stderr)
501 except KeyboardInterrupt:
502 print('\n[Keyboard Interrupt]', file=sys.stderr)
503 except EOFError:
504 print(file=sys.stderr)
505 leaving = self.ask(subject='Leave')
506 if not leaving:
507 print('Excellent!', file=sys.stderr)
508 else:
509 raise SystemExit(0)
510
511 class PythonPromptAction(Action):
512 def __call__(self, twitter, options):
513 try:
514 while True:
515 smrt_input(globals(), locals())
516 except EOFError:
517 pass
518
519 class HelpAction(Action):
520 def __call__(self, twitter, options):
521 print(__doc__)
522
523 class DoNothingAction(Action):
524 def __call__(self, twitter, options):
525 pass
526
527 class RateLimitStatus(Action):
528 def __call__(self, twitter, options):
529 rate = twitter.account.rate_limit_status()
530 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
531 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds']-time.time()),
532 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
533
534 actions = {
535 'authorize' : DoNothingAction,
536 'follow' : FollowAction,
537 'friends' : FriendsAction,
538 'list' : ListsAction,
539 'mylist' : MyListsAction,
540 'help' : HelpAction,
541 'leave' : LeaveAction,
542 'public' : PublicAction,
543 'pyprompt' : PythonPromptAction,
544 'replies' : RepliesAction,
545 'search' : SearchAction,
546 'set' : SetStatusAction,
547 'shell' : TwitterShell,
548 'rate' : RateLimitStatus,
549 }
550
551 def loadConfig(filename):
552 options = dict(OPTIONS)
553 if os.path.exists(filename):
554 cp = SafeConfigParser()
555 cp.read([filename])
556 for option in ('format', 'prompt'):
557 if cp.has_option('twitter', option):
558 options[option] = cp.get('twitter', option)
559 return options
560
561 def main(args=sys.argv[1:]):
562 arg_options = {}
563 try:
564 parse_args(args, arg_options)
565 except GetoptError as e:
566 print("I can't do that, %s." %(e), file=sys.stderr)
567 print(file=sys.stderr)
568 raise SystemExit(1)
569
570 config_path = os.path.expanduser(
571 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
572 config_options = loadConfig(config_path)
573
574 # Apply the various options in order, the most important applied last.
575 # Defaults first, then what's read from config file, then command-line
576 # arguments.
577 options = dict(OPTIONS)
578 for d in config_options, arg_options:
579 for k,v in list(d.items()):
580 if v: options[k] = v
581
582 if options['refresh'] and options['action'] not in (
583 'friends', 'public', 'replies'):
584 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
585 print("Use 'twitter -h' for help.", file=sys.stderr)
586 return 1
587
588 oauth_filename = os.path.expanduser(options['oauth_filename'])
589
590 if (options['action'] == 'authorize'
591 or not os.path.exists(oauth_filename)):
592 oauth_dance(
593 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
594 options['oauth_filename'])
595
596 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
597
598 twitter = Twitter(
599 auth=OAuth(
600 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
601 secure=options['secure'],
602 api_version='1',
603 domain='api.twitter.com')
604
605 try:
606 Action()(twitter, options)
607 except NoSuchActionError as e:
608 print(e, file=sys.stderr)
609 raise SystemExit(1)
610 except TwitterError as e:
611 print(str(e), file=sys.stderr)
612 print("Use 'twitter -h' for help.", file=sys.stderr)
613 raise SystemExit(1)