]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Fix the shell. Broken since the Python 3 move. I guess nobody uses it. :(
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
26
27
28 OPTIONS:
29
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
42
43 FORMATS for the --format option
44
45 default one line per status
46 verbose multiple lines per status, more verbose status info
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 try:
68 input = __builtins__['raw_input']
69 except AttributeError:
70 pass
71
72
73 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
75
76 import sys
77 import time
78 from getopt import gnu_getopt as getopt, GetoptError
79 from getpass import getpass
80 import re
81 import os.path
82 import locale
83 import string
84
85 try:
86 from ConfigParser import SafeConfigParser
87 except ImportError:
88 from configparser import ConfigParser as SafeConfigParser
89 import datetime
90 try:
91 from urllib.parse import quote
92 except ImportError:
93 from urllib2 import quote
94 import webbrowser
95
96 from .api import Twitter, TwitterError
97 from .oauth import OAuth, write_token_file, read_token_file
98 from .oauth_dance import oauth_dance
99 from . import ansi
100 from .util import smrt_input, printNicely
101
102 OPTIONS = {
103 'action': 'friends',
104 'refresh': False,
105 'refresh_rate': 600,
106 'format': 'default',
107 'prompt': '[cyan]twitter[R]> ',
108 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
109 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
110 'length': 20,
111 'timestamp': False,
112 'datestamp': False,
113 'extra_args': [],
114 'secure': True,
115 }
116
117 def parse_args(args, options):
118 long_opts = ['help', 'format=', 'refresh', 'oauth=',
119 'refresh-rate=', 'config=', 'length=', 'timestamp',
120 'datestamp', 'no-ssl']
121 short_opts = "e:p:f:h?rR:c:l:td"
122 opts, extra_args = getopt(args, short_opts, long_opts)
123 extra_args = [arg.decode(locale.getpreferredencoding())
124 for arg in extra_args]
125
126 for opt, arg in opts:
127 if opt in ('-f', '--format'):
128 options['format'] = arg
129 elif opt in ('-r', '--refresh'):
130 options['refresh'] = True
131 elif opt in ('-R', '--refresh-rate'):
132 options['refresh_rate'] = int(arg)
133 elif opt in ('-l', '--length'):
134 options["length"] = int(arg)
135 elif opt in ('-t', '--timestamp'):
136 options["timestamp"] = True
137 elif opt in ('-d', '--datestamp'):
138 options["datestamp"] = True
139 elif opt in ('-?', '-h', '--help'):
140 options['action'] = 'help'
141 elif opt in ('-c', '--config'):
142 options['config_filename'] = arg
143 elif opt == '--no-ssl':
144 options['secure'] = False
145 elif opt == '--oauth':
146 options['oauth_filename'] = arg
147
148 if extra_args and not ('action' in options and options['action'] == 'help'):
149 options['action'] = extra_args[0]
150 options['extra_args'] = extra_args[1:]
151
152 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
153 timestamp = options["timestamp"]
154 datestamp = options["datestamp"]
155 t = time.strptime(status['created_at'], format)
156 i_hate_timezones = time.timezone
157 if (time.daylight):
158 i_hate_timezones = time.altzone
159 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
160 seconds=i_hate_timezones)
161 t = dt.timetuple()
162 if timestamp and datestamp:
163 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
164 elif timestamp:
165 return time.strftime("%H:%M:%S ", t)
166 elif datestamp:
167 return time.strftime("%Y-%m-%d ", t)
168 return ""
169
170 class StatusFormatter(object):
171 def __call__(self, status, options):
172 return ("%s%s %s" %(
173 get_time_string(status, options),
174 status['user']['screen_name'], status['text']))
175
176 class AnsiStatusFormatter(object):
177 def __init__(self):
178 self._colourMap = ansi.ColourMap()
179
180 def __call__(self, status, options):
181 colour = self._colourMap.colourFor(status['user']['screen_name'])
182 return ("%s%s%s%s %s" %(
183 get_time_string(status, options),
184 ansi.cmdColour(colour), status['user']['screen_name'],
185 ansi.cmdReset(), status['text']))
186
187 class VerboseStatusFormatter(object):
188 def __call__(self, status, options):
189 return ("-- %s (%s) on %s\n%s\n" %(
190 status['user']['screen_name'],
191 status['user']['location'],
192 status['created_at'],
193 status['text']))
194
195 class URLStatusFormatter(object):
196 urlmatch = re.compile(r'https?://\S+')
197 def __call__(self, status, options):
198 urls = self.urlmatch.findall(status['text'])
199 return '\n'.join(urls) if urls else ""
200
201
202 class ListsFormatter(object):
203 def __call__(self, list):
204 if list['description']:
205 list_str = "%-30s (%s)" % (list['name'], list['description'])
206 else:
207 list_str = "%-30s" % (list['name'])
208 return "%s\n" % list_str
209
210 class ListsVerboseFormatter(object):
211 def __call__(self, list):
212 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
213 return list_str
214
215 class AnsiListsFormatter(object):
216 def __init__(self):
217 self._colourMap = ansi.ColourMap()
218
219 def __call__(self, list):
220 colour = self._colourMap.colourFor(list['name'])
221 return ("%s%-15s%s %s" %(
222 ansi.cmdColour(colour), list['name'],
223 ansi.cmdReset(), list['description']))
224
225
226 class AdminFormatter(object):
227 def __call__(self, action, user):
228 user_str = "%s (%s)" %(user['screen_name'], user['name'])
229 if action == "follow":
230 return "You are now following %s.\n" %(user_str)
231 else:
232 return "You are no longer following %s.\n" %(user_str)
233
234 class VerboseAdminFormatter(object):
235 def __call__(self, action, user):
236 return("-- %s: %s (%s): %s" % (
237 "Following" if action == "follow" else "Leaving",
238 user['screen_name'],
239 user['name'],
240 user['url']))
241
242 class SearchFormatter(object):
243 def __call__(self, result, options):
244 return("%s%s %s" %(
245 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
246 result['from_user'], result['text']))
247
248 class VerboseSearchFormatter(SearchFormatter):
249 pass #Default to the regular one
250
251 class URLSearchFormatter(object):
252 urlmatch = re.compile(r'https?://\S+')
253 def __call__(self, result, options):
254 urls = self.urlmatch.findall(result['text'])
255 return '\n'.join(urls) if urls else ""
256
257 class AnsiSearchFormatter(object):
258 def __init__(self):
259 self._colourMap = ansi.ColourMap()
260
261 def __call__(self, result, options):
262 colour = self._colourMap.colourFor(result['from_user'])
263 return ("%s%s%s%s %s" %(
264 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
265 ansi.cmdColour(colour), result['from_user'],
266 ansi.cmdReset(), result['text']))
267
268 _term_encoding = None
269 def get_term_encoding():
270 global _term_encoding
271 if not _term_encoding:
272 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
273 if lang[1:]:
274 _term_encoding = lang[1]
275 else:
276 _term_encoding = 'UTF-8'
277 return _term_encoding
278
279 formatters = {}
280 status_formatters = {
281 'default': StatusFormatter,
282 'verbose': VerboseStatusFormatter,
283 'urls': URLStatusFormatter,
284 'ansi': AnsiStatusFormatter
285 }
286 formatters['status'] = status_formatters
287
288 admin_formatters = {
289 'default': AdminFormatter,
290 'verbose': VerboseAdminFormatter,
291 'urls': AdminFormatter,
292 'ansi': AdminFormatter
293 }
294 formatters['admin'] = admin_formatters
295
296 search_formatters = {
297 'default': SearchFormatter,
298 'verbose': VerboseSearchFormatter,
299 'urls': URLSearchFormatter,
300 'ansi': AnsiSearchFormatter
301 }
302 formatters['search'] = search_formatters
303
304 lists_formatters = {
305 'default': ListsFormatter,
306 'verbose': ListsVerboseFormatter,
307 'urls': None,
308 'ansi': AnsiListsFormatter
309 }
310 formatters['lists'] = lists_formatters
311
312 def get_formatter(action_type, options):
313 formatters_dict = formatters.get(action_type)
314 if (not formatters_dict):
315 raise TwitterError(
316 "There was an error finding a class of formatters for your type (%s)"
317 %(action_type))
318 f = formatters_dict.get(options['format'])
319 if (not f):
320 raise TwitterError(
321 "Unknown formatter '%s' for status actions" %(options['format']))
322 return f()
323
324 class Action(object):
325
326 def ask(self, subject='perform this action', careful=False):
327 '''
328 Requests fromt he user using `raw_input` if `subject` should be
329 performed. When `careful`, the default answer is NO, otherwise YES.
330 Returns the user answer in the form `True` or `False`.
331 '''
332 sample = '(y/N)'
333 if not careful:
334 sample = '(Y/n)'
335
336 prompt = 'You really want to %s %s? ' %(subject, sample)
337 try:
338 answer = input(prompt).lower()
339 if careful:
340 return answer in ('yes', 'y')
341 else:
342 return answer not in ('no', 'n')
343 except EOFError:
344 print(file=sys.stderr) # Put Newline since Enter was never pressed
345 # TODO:
346 # Figure out why on OS X the raw_input keeps raising
347 # EOFError and is never able to reset and get more input
348 # Hint: Look at how IPython implements their console
349 default = True
350 if careful:
351 default = False
352 return default
353
354 def __call__(self, twitter, options):
355 action = actions.get(options['action'], NoSuchAction)()
356 try:
357 doAction = lambda : action(twitter, options)
358 if (options['refresh'] and isinstance(action, StatusAction)):
359 while True:
360 doAction()
361 sys.stdout.flush()
362 time.sleep(options['refresh_rate'])
363 else:
364 doAction()
365 except KeyboardInterrupt:
366 print('\n[Keyboard Interrupt]', file=sys.stderr)
367 pass
368
369 class NoSuchActionError(Exception):
370 pass
371
372 class NoSuchAction(Action):
373 def __call__(self, twitter, options):
374 raise NoSuchActionError("No such action: %s" %(options['action']))
375
376 class StatusAction(Action):
377 def __call__(self, twitter, options):
378 statuses = self.getStatuses(twitter, options)
379 sf = get_formatter('status', options)
380 for status in statuses:
381 statusStr = sf(status, options)
382 if statusStr.strip():
383 printNicely(statusStr)
384
385 class SearchAction(Action):
386 def __call__(self, twitter, options):
387 # We need to be pointing at search.twitter.com to work, and it is less
388 # tangly to do it here than in the main()
389 twitter.domain="search.twitter.com"
390 twitter.uriparts=()
391 # We need to bypass the TwitterCall parameter encoding, so we
392 # don't encode the plus sign, so we have to encode it ourselves
393 query_string = "+".join(
394 [quote(term)
395 for term in options['extra_args']])
396
397 results = twitter.search(q=query_string)['results']
398 f = get_formatter('search', options)
399 for result in results:
400 resultStr = f(result, options)
401 if resultStr.strip():
402 printNicely(resultStr)
403
404 class AdminAction(Action):
405 def __call__(self, twitter, options):
406 if not (options['extra_args'] and options['extra_args'][0]):
407 raise TwitterError("You need to specify a user (screen name)")
408 af = get_formatter('admin', options)
409 try:
410 user = self.getUser(twitter, options['extra_args'][0])
411 except TwitterError as e:
412 print("There was a problem following or leaving the specified user.")
413 print("You may be trying to follow a user you are already following;")
414 print("Leaving a user you are not currently following;")
415 print("Or the user may not exist.")
416 print("Sorry.")
417 print()
418 print(e)
419 else:
420 printNicely(af(options['action'], user))
421
422 class ListsAction(StatusAction):
423 def getStatuses(self, twitter, options):
424 if not options['extra_args']:
425 raise TwitterError("Please provide a user to query for lists")
426
427 screen_name = options['extra_args'][0]
428
429 if not options['extra_args'][1:]:
430 lists = twitter.user.lists(user=screen_name)['lists']
431 if not lists:
432 printNicely("This user has no lists.")
433 for list in lists:
434 lf = get_formatter('lists', options)
435 printNicely(lf(list))
436 return []
437 else:
438 return reversed(twitter.user.lists.list.statuses(
439 user=screen_name, list=options['extra_args'][1]))
440
441
442 class MyListsAction(ListsAction):
443 def getStatuses(self, twitter, options):
444 screen_name = twitter.account.verify_credentials()['screen_name']
445 options['extra_args'].insert(0, screen_name)
446 return ListsAction.getStatuses(self, twitter, options)
447
448
449 class FriendsAction(StatusAction):
450 def getStatuses(self, twitter, options):
451 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
452
453 class PublicAction(StatusAction):
454 def getStatuses(self, twitter, options):
455 return reversed(twitter.statuses.public_timeline(count=options["length"]))
456
457 class RepliesAction(StatusAction):
458 def getStatuses(self, twitter, options):
459 return reversed(twitter.statuses.replies(count=options["length"]))
460
461 class FollowAction(AdminAction):
462 def getUser(self, twitter, user):
463 return twitter.friendships.create(id=user)
464
465 class LeaveAction(AdminAction):
466 def getUser(self, twitter, user):
467 return twitter.friendships.destroy(id=user)
468
469 class SetStatusAction(Action):
470 def __call__(self, twitter, options):
471 statusTxt = (" ".join(options['extra_args'])
472 if options['extra_args']
473 else str(input("message: ")))
474 replies = []
475 ptr = re.compile("@[\w_]+")
476 while statusTxt:
477 s = ptr.match(statusTxt)
478 if s and s.start() == 0:
479 replies.append(statusTxt[s.start():s.end()])
480 statusTxt = statusTxt[s.end()+1:]
481 else:
482 break
483 replies = " ".join(replies)
484 if len(replies) >= 140:
485 # just go back
486 statusTxt = replies
487 replies = ""
488
489 splitted = []
490 while statusTxt:
491 limit = 140 - len(replies)
492 if len(statusTxt) > limit:
493 end = string.rfind(statusTxt, ' ', 0, limit)
494 else:
495 end = limit
496 splitted.append(" ".join((replies,statusTxt[:end])))
497 statusTxt = statusTxt[end:]
498
499 for status in splitted:
500 twitter.statuses.update(status=status)
501
502 class TwitterShell(Action):
503
504 def render_prompt(self, prompt):
505 '''Parses the `prompt` string and returns the rendered version'''
506 prompt = prompt.strip("'").replace("\\'","'")
507 for colour in ansi.COLOURS_NAMED:
508 if '[%s]' %(colour) in prompt:
509 prompt = prompt.replace(
510 '[%s]' %(colour), ansi.cmdColourNamed(colour))
511 prompt = prompt.replace('[R]', ansi.cmdReset())
512 return prompt
513
514 def __call__(self, twitter, options):
515 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
516 while True:
517 options['action'] = ""
518 try:
519 args = input(prompt).split()
520 parse_args(args, options)
521 if not options['action']:
522 continue
523 elif options['action'] == 'exit':
524 raise SystemExit(0)
525 elif options['action'] == 'shell':
526 print('Sorry Xzibit does not work here!', file=sys.stderr)
527 continue
528 elif options['action'] == 'help':
529 print('''\ntwitter> `action`\n
530 The Shell Accepts all the command line actions along with:
531
532 exit Leave the twitter shell (^D may also be used)
533
534 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
535 Action()(twitter, options)
536 options['action'] = ''
537 except NoSuchActionError as e:
538 print(e, file=sys.stderr)
539 except KeyboardInterrupt:
540 print('\n[Keyboard Interrupt]', file=sys.stderr)
541 except EOFError:
542 print(file=sys.stderr)
543 leaving = self.ask(subject='Leave')
544 if not leaving:
545 print('Excellent!', file=sys.stderr)
546 else:
547 raise SystemExit(0)
548
549 class PythonPromptAction(Action):
550 def __call__(self, twitter, options):
551 try:
552 while True:
553 smrt_input(globals(), locals())
554 except EOFError:
555 pass
556
557 class HelpAction(Action):
558 def __call__(self, twitter, options):
559 print(__doc__)
560
561 class DoNothingAction(Action):
562 def __call__(self, twitter, options):
563 pass
564
565 class RateLimitStatus(Action):
566 def __call__(self, twitter, options):
567 rate = twitter.account.rate_limit_status()
568 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
569 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds']-time.time()),
570 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
571
572 actions = {
573 'authorize' : DoNothingAction,
574 'follow' : FollowAction,
575 'friends' : FriendsAction,
576 'list' : ListsAction,
577 'mylist' : MyListsAction,
578 'help' : HelpAction,
579 'leave' : LeaveAction,
580 'public' : PublicAction,
581 'pyprompt' : PythonPromptAction,
582 'replies' : RepliesAction,
583 'search' : SearchAction,
584 'set' : SetStatusAction,
585 'shell' : TwitterShell,
586 'rate' : RateLimitStatus,
587 }
588
589 def loadConfig(filename):
590 options = dict(OPTIONS)
591 if os.path.exists(filename):
592 cp = SafeConfigParser()
593 cp.read([filename])
594 for option in ('format', 'prompt'):
595 if cp.has_option('twitter', option):
596 options[option] = cp.get('twitter', option)
597 return options
598
599 def main(args=sys.argv[1:]):
600 arg_options = {}
601 try:
602 parse_args(args, arg_options)
603 except GetoptError as e:
604 print("I can't do that, %s." %(e), file=sys.stderr)
605 print(file=sys.stderr)
606 raise SystemExit(1)
607
608 config_path = os.path.expanduser(
609 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
610 config_options = loadConfig(config_path)
611
612 # Apply the various options in order, the most important applied last.
613 # Defaults first, then what's read from config file, then command-line
614 # arguments.
615 options = dict(OPTIONS)
616 for d in config_options, arg_options:
617 for k,v in list(d.items()):
618 if v: options[k] = v
619
620 if options['refresh'] and options['action'] not in (
621 'friends', 'public', 'replies'):
622 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
623 print("Use 'twitter -h' for help.", file=sys.stderr)
624 return 1
625
626 oauth_filename = os.path.expanduser(options['oauth_filename'])
627
628 if (options['action'] == 'authorize'
629 or not os.path.exists(oauth_filename)):
630 oauth_dance(
631 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
632 options['oauth_filename'])
633
634 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
635
636 twitter = Twitter(
637 auth=OAuth(
638 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
639 secure=options['secure'],
640 api_version='1',
641 domain='api.twitter.com')
642
643 try:
644 Action()(twitter, options)
645 except NoSuchActionError as e:
646 print(e, file=sys.stderr)
647 raise SystemExit(1)
648 except TwitterError as e:
649 print(str(e), file=sys.stderr)
650 print("Use 'twitter -h' for help.", file=sys.stderr)
651 raise SystemExit(1)