]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Merge branch 'master' into python3
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 urls nothing but URLs
47 ansi ansi colour (rainbow mode)
48
49
50 CONFIG FILES
51
52 The config file should be placed in your home directory and be named .twitter.
53 It must contain a [twitter] header, and all the desired options you wish to
54 set, like so:
55
56 [twitter]
57 format: <desired_default_format_for_output>
58 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
59
60 OAuth authentication tokens are stored in the file .twitter_oauth in your
61 home directory.
62 """
63
64 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
65 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
66
67 import sys
68 import time
69 from getopt import gnu_getopt as getopt, GetoptError
70 from getpass import getpass
71 import re
72 import os.path
73 from configparser import SafeConfigParser
74 import datetime
75 from urllib.parse import quote
76 import webbrowser
77
78 from .api import Twitter, TwitterError
79 from .oauth import OAuth, write_token_file, read_token_file
80 from .oauth_dance import oauth_dance
81 from . import ansi
82 from .util import smrt_input
83
84 OPTIONS = {
85 'action': 'friends',
86 'refresh': False,
87 'refresh_rate': 600,
88 'format': 'default',
89 'prompt': '[cyan]twitter[R]> ',
90 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
91 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
92 'length': 20,
93 'timestamp': False,
94 'datestamp': False,
95 'extra_args': [],
96 'secure': True,
97 }
98
99 def parse_args(args, options):
100 long_opts = ['help', 'format=', 'refresh', 'oauth=',
101 'refresh-rate=', 'config=', 'length=', 'timestamp',
102 'datestamp', 'no-ssl']
103 short_opts = "e:p:f:h?rR:c:l:td"
104 opts, extra_args = getopt(args, short_opts, long_opts)
105
106 for opt, arg in opts:
107 if opt in ('-f', '--format'):
108 options['format'] = arg
109 elif opt in ('-r', '--refresh'):
110 options['refresh'] = True
111 elif opt in ('-R', '--refresh-rate'):
112 options['refresh_rate'] = int(arg)
113 elif opt in ('-l', '--length'):
114 options["length"] = int(arg)
115 elif opt in ('-t', '--timestamp'):
116 options["timestamp"] = True
117 elif opt in ('-d', '--datestamp'):
118 options["datestamp"] = True
119 elif opt in ('-?', '-h', '--help'):
120 options['action'] = 'help'
121 elif opt in ('-c', '--config'):
122 options['config_filename'] = arg
123 elif opt == '--no-ssl':
124 options['secure'] = False
125 elif opt == '--oauth':
126 options['oauth_filename'] = arg
127
128 if extra_args and not ('action' in options and options['action'] == 'help'):
129 options['action'] = extra_args[0]
130 options['extra_args'] = extra_args[1:]
131
132 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
133 timestamp = options["timestamp"]
134 datestamp = options["datestamp"]
135 t = time.strptime(status['created_at'], format)
136 i_hate_timezones = time.timezone
137 if (time.daylight):
138 i_hate_timezones = time.altzone
139 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
140 seconds=i_hate_timezones)
141 t = dt.timetuple()
142 if timestamp and datestamp:
143 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
144 elif timestamp:
145 return time.strftime("%H:%M:%S ", t)
146 elif datestamp:
147 return time.strftime("%Y-%m-%d ", t)
148 return ""
149
150 class StatusFormatter(object):
151 def __call__(self, status, options):
152 return ("%s%s %s" %(
153 get_time_string(status, options),
154 status['user']['screen_name'], status['text']))
155
156 class AnsiStatusFormatter(object):
157 def __init__(self):
158 self._colourMap = ansi.ColourMap()
159
160 def __call__(self, status, options):
161 colour = self._colourMap.colourFor(status['user']['screen_name'])
162 return ("%s%s%s%s %s" %(
163 get_time_string(status, options),
164 ansi.cmdColour(colour), status['user']['screen_name'],
165 ansi.cmdReset(), status['text']))
166
167 class VerboseStatusFormatter(object):
168 def __call__(self, status, options):
169 return ("-- %s (%s) on %s\n%s\n" %(
170 status['user']['screen_name'],
171 status['user']['location'],
172 status['created_at'],
173 status['text']))
174
175 class URLStatusFormatter(object):
176 urlmatch = re.compile(r'https?://\S+')
177 def __call__(self, status, options):
178 urls = self.urlmatch.findall(status['text'])
179 return '\n'.join(urls) if urls else ""
180
181
182 class ListsFormatter(object):
183 def __call__(self, list):
184 if list['description']:
185 list_str = "%-30s (%s)" % (list['name'], list['description'])
186 else:
187 list_str = "%-30s" % (list['name'])
188 return "%s\n" % list_str
189
190 class ListsVerboseFormatter(object):
191 def __call__(self, list):
192 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
193 return list_str
194
195 class AnsiListsFormatter(object):
196 def __init__(self):
197 self._colourMap = ansi.ColourMap()
198
199 def __call__(self, list):
200 colour = self._colourMap.colourFor(list['name'])
201 return ("%s%-15s%s %s" %(
202 ansi.cmdColour(colour), list['name'],
203 ansi.cmdReset(), list['description']))
204
205
206 class AdminFormatter(object):
207 def __call__(self, action, user):
208 user_str = "%s (%s)" %(user['screen_name'], user['name'])
209 if action == "follow":
210 return "You are now following %s.\n" %(user_str)
211 else:
212 return "You are no longer following %s.\n" %(user_str)
213
214 class VerboseAdminFormatter(object):
215 def __call__(self, action, user):
216 return("-- %s: %s (%s): %s" % (
217 "Following" if action == "follow" else "Leaving",
218 user['screen_name'],
219 user['name'],
220 user['url']))
221
222 class SearchFormatter(object):
223 def __call__(self, result, options):
224 return("%s%s %s" %(
225 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
226 result['from_user'], result['text']))
227
228 class VerboseSearchFormatter(SearchFormatter):
229 pass #Default to the regular one
230
231 class URLSearchFormatter(object):
232 urlmatch = re.compile(r'https?://\S+')
233 def __call__(self, result, options):
234 urls = self.urlmatch.findall(result['text'])
235 return '\n'.join(urls) if urls else ""
236
237 class AnsiSearchFormatter(object):
238 def __init__(self):
239 self._colourMap = ansi.ColourMap()
240
241 def __call__(self, result, options):
242 colour = self._colourMap.colourFor(result['from_user'])
243 return ("%s%s%s%s %s" %(
244 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
245 ansi.cmdColour(colour), result['from_user'],
246 ansi.cmdReset(), result['text']))
247
248 _term_encoding = None
249 def get_term_encoding():
250 global _term_encoding
251 if not _term_encoding:
252 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
253 if lang[1:]:
254 _term_encoding = lang[1]
255 else:
256 _term_encoding = 'UTF-8'
257 return _term_encoding
258
259 formatters = {}
260 status_formatters = {
261 'default': StatusFormatter,
262 'verbose': VerboseStatusFormatter,
263 'urls': URLStatusFormatter,
264 'ansi': AnsiStatusFormatter
265 }
266 formatters['status'] = status_formatters
267
268 admin_formatters = {
269 'default': AdminFormatter,
270 'verbose': VerboseAdminFormatter,
271 'urls': AdminFormatter,
272 'ansi': AdminFormatter
273 }
274 formatters['admin'] = admin_formatters
275
276 search_formatters = {
277 'default': SearchFormatter,
278 'verbose': VerboseSearchFormatter,
279 'urls': URLSearchFormatter,
280 'ansi': AnsiSearchFormatter
281 }
282 formatters['search'] = search_formatters
283
284 lists_formatters = {
285 'default': ListsFormatter,
286 'verbose': ListsVerboseFormatter,
287 'urls': None,
288 'ansi': AnsiListsFormatter
289 }
290 formatters['lists'] = lists_formatters
291
292 def get_formatter(action_type, options):
293 formatters_dict = formatters.get(action_type)
294 if (not formatters_dict):
295 raise TwitterError(
296 "There was an error finding a class of formatters for your type (%s)"
297 %(action_type))
298 f = formatters_dict.get(options['format'])
299 if (not f):
300 raise TwitterError(
301 "Unknown formatter '%s' for status actions" %(options['format']))
302 return f()
303
304 class Action(object):
305
306 def ask(self, subject='perform this action', careful=False):
307 '''
308 Requests fromt he user using `raw_input` if `subject` should be
309 performed. When `careful`, the default answer is NO, otherwise YES.
310 Returns the user answer in the form `True` or `False`.
311 '''
312 sample = '(y/N)'
313 if not careful:
314 sample = '(Y/n)'
315
316 prompt = 'You really want to %s %s? ' %(subject, sample)
317 try:
318 answer = input(prompt).lower()
319 if careful:
320 return answer in ('yes', 'y')
321 else:
322 return answer not in ('no', 'n')
323 except EOFError:
324 print(file=sys.stderr) # Put Newline since Enter was never pressed
325 # TODO:
326 # Figure out why on OS X the raw_input keeps raising
327 # EOFError and is never able to reset and get more input
328 # Hint: Look at how IPython implements their console
329 default = True
330 if careful:
331 default = False
332 return default
333
334 def __call__(self, twitter, options):
335 action = actions.get(options['action'], NoSuchAction)()
336 try:
337 doAction = lambda : action(twitter, options)
338 if (options['refresh'] and isinstance(action, StatusAction)):
339 while True:
340 doAction()
341 time.sleep(options['refresh_rate'])
342 else:
343 doAction()
344 except KeyboardInterrupt:
345 print('\n[Keyboard Interrupt]', file=sys.stderr)
346 pass
347
348 class NoSuchActionError(Exception):
349 pass
350
351 class NoSuchAction(Action):
352 def __call__(self, twitter, options):
353 raise NoSuchActionError("No such action: %s" %(options['action']))
354
355 def printNicely(string):
356 sys.stdout.buffer.write(string.encode('utf8'))
357 print()
358
359 class StatusAction(Action):
360 def __call__(self, twitter, options):
361 statuses = self.getStatuses(twitter, options)
362 sf = get_formatter('status', options)
363 for status in statuses:
364 statusStr = sf(status, options)
365 if statusStr.strip():
366 printNicely(statusStr)
367
368 class SearchAction(Action):
369 def __call__(self, twitter, options):
370 # We need to be pointing at search.twitter.com to work, and it is less
371 # tangly to do it here than in the main()
372 twitter.domain="search.twitter.com"
373 twitter.uriparts=()
374 # We need to bypass the TwitterCall parameter encoding, so we
375 # don't encode the plus sign, so we have to encode it ourselves
376 query_string = "+".join(
377 [quote(term)
378 for term in options['extra_args']])
379
380 results = twitter.search(q=query_string)['results']
381 f = get_formatter('search', options)
382 for result in results:
383 resultStr = f(result, options)
384 if resultStr.strip():
385 printNicely(resultStr)
386
387 class AdminAction(Action):
388 def __call__(self, twitter, options):
389 if not (options['extra_args'] and options['extra_args'][0]):
390 raise TwitterError("You need to specify a user (screen name)")
391 af = get_formatter('admin', options)
392 try:
393 user = self.getUser(twitter, options['extra_args'][0])
394 except TwitterError as e:
395 print("There was a problem following or leaving the specified user.")
396 print("You may be trying to follow a user you are already following;")
397 print("Leaving a user you are not currently following;")
398 print("Or the user may not exist.")
399 print("Sorry.")
400 print()
401 print(e)
402 else:
403 printNicely(af(options['action'], user))
404
405 class ListsAction(StatusAction):
406 def getStatuses(self, twitter, options):
407 if not options['extra_args']:
408 raise TwitterError("Please provide a user to query for lists")
409
410 screen_name = options['extra_args'][0]
411
412 if not options['extra_args'][1:]:
413 lists = twitter.user.lists(user=screen_name)['lists']
414 if not lists:
415 printNicely("This user has no lists.")
416 for list in lists:
417 lf = get_formatter('lists', options)
418 printNicely(lf(list))
419 return []
420 else:
421 return reversed(twitter.user.lists.list.statuses(
422 user=screen_name, list=options['extra_args'][1]))
423
424
425 class MyListsAction(ListsAction):
426 def getStatuses(self, twitter, options):
427 screen_name = twitter.account.verify_credentials()['screen_name']
428 options['extra_args'].insert(0, screen_name)
429 return ListsAction.getStatuses(self, twitter, options)
430
431
432 class FriendsAction(StatusAction):
433 def getStatuses(self, twitter, options):
434 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
435
436 class PublicAction(StatusAction):
437 def getStatuses(self, twitter, options):
438 return reversed(twitter.statuses.public_timeline(count=options["length"]))
439
440 class RepliesAction(StatusAction):
441 def getStatuses(self, twitter, options):
442 return reversed(twitter.statuses.replies(count=options["length"]))
443
444 class FollowAction(AdminAction):
445 def getUser(self, twitter, user):
446 return twitter.friendships.create(id=user)
447
448 class LeaveAction(AdminAction):
449 def getUser(self, twitter, user):
450 return twitter.friendships.destroy(id=user)
451
452 class SetStatusAction(Action):
453 def __call__(self, twitter, options):
454 statusTxt = (" ".join(options['extra_args'])
455 if options['extra_args']
456 else str(input("message: ")))
457 twitter.statuses.update(status=statusTxt)
458
459 class TwitterShell(Action):
460
461 def render_prompt(self, prompt):
462 '''Parses the `prompt` string and returns the rendered version'''
463 prompt = prompt.strip("'").replace("\\'","'")
464 for colour in ansi.COLOURS_NAMED:
465 if '[%s]' %(colour) in prompt:
466 prompt = prompt.replace(
467 '[%s]' %(colour), ansi.cmdColourNamed(colour))
468 prompt = prompt.replace('[R]', ansi.cmdReset())
469 return prompt
470
471 def __call__(self, twitter, options):
472 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
473 while True:
474 options['action'] = ""
475 try:
476 args = input(prompt).split()
477 parse_args(args, options)
478 if not options['action']:
479 continue
480 elif options['action'] == 'exit':
481 raise SystemExit(0)
482 elif options['action'] == 'shell':
483 print('Sorry Xzibit does not work here!', file=sys.stderr)
484 continue
485 elif options['action'] == 'help':
486 print('''\ntwitter> `action`\n
487 The Shell Accepts all the command line actions along with:
488
489 exit Leave the twitter shell (^D may also be used)
490
491 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
492 Action()(twitter, options)
493 options['action'] = ''
494 except NoSuchActionError as e:
495 print(e, file=sys.stderr)
496 except KeyboardInterrupt:
497 print('\n[Keyboard Interrupt]', file=sys.stderr)
498 except EOFError:
499 print(file=sys.stderr)
500 leaving = self.ask(subject='Leave')
501 if not leaving:
502 print('Excellent!', file=sys.stderr)
503 else:
504 raise SystemExit(0)
505
506 class PythonPromptAction(Action):
507 def __call__(self, twitter, options):
508 try:
509 while True:
510 smrt_input(globals(), locals())
511 except EOFError:
512 pass
513
514 class HelpAction(Action):
515 def __call__(self, twitter, options):
516 print(__doc__)
517
518 class DoNothingAction(Action):
519 def __call__(self, twitter, options):
520 pass
521
522 actions = {
523 'authorize' : DoNothingAction,
524 'follow' : FollowAction,
525 'friends' : FriendsAction,
526 'list' : ListsAction,
527 'mylist' : MyListsAction,
528 'help' : HelpAction,
529 'leave' : LeaveAction,
530 'public' : PublicAction,
531 'pyprompt' : PythonPromptAction,
532 'replies' : RepliesAction,
533 'search' : SearchAction,
534 'set' : SetStatusAction,
535 'shell' : TwitterShell,
536 }
537
538 def loadConfig(filename):
539 options = dict(OPTIONS)
540 if os.path.exists(filename):
541 cp = SafeConfigParser()
542 cp.read([filename])
543 for option in ('format', 'prompt'):
544 if cp.has_option('twitter', option):
545 options[option] = cp.get('twitter', option)
546 return options
547
548 def main(args=sys.argv[1:]):
549 arg_options = {}
550 try:
551 parse_args(args, arg_options)
552 except GetoptError as e:
553 print("I can't do that, %s." %(e), file=sys.stderr)
554 print(file=sys.stderr)
555 raise SystemExit(1)
556
557 config_path = os.path.expanduser(
558 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
559 config_options = loadConfig(config_path)
560
561 # Apply the various options in order, the most important applied last.
562 # Defaults first, then what's read from config file, then command-line
563 # arguments.
564 options = dict(OPTIONS)
565 for d in config_options, arg_options:
566 for k,v in list(d.items()):
567 if v: options[k] = v
568
569 if options['refresh'] and options['action'] not in (
570 'friends', 'public', 'replies'):
571 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
572 print("Use 'twitter -h' for help.", file=sys.stderr)
573 return 1
574
575 oauth_filename = os.path.expanduser(options['oauth_filename'])
576
577 if (options['action'] == 'authorize'
578 or not os.path.exists(oauth_filename)):
579 oauth_dance(
580 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
581 options['oauth_filename'])
582
583 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
584
585 twitter = Twitter(
586 auth=OAuth(
587 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
588 secure=options['secure'],
589 api_version='1',
590 domain='api.twitter.com')
591
592 try:
593 Action()(twitter, options)
594 except NoSuchActionError as e:
595 print(e, file=sys.stderr)
596 raise SystemExit(1)
597 except TwitterError as e:
598 print(str(e), file=sys.stderr)
599 print("Use 'twitter -h' for help.", file=sys.stderr)
600 raise SystemExit(1)