]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Make the code python 2 and 3 compatible.
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 urls nothing but URLs
47 ansi ansi colour (rainbow mode)
48
49
50 CONFIG FILES
51
52 The config file should be placed in your home directory and be named .twitter.
53 It must contain a [twitter] header, and all the desired options you wish to
54 set, like so:
55
56 [twitter]
57 format: <desired_default_format_for_output>
58 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
59
60 OAuth authentication tokens are stored in the file .twitter_oauth in your
61 home directory.
62 """
63
64 from __future__ import print_function
65
66 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
67 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
68
69 import sys
70 import time
71 from getopt import gnu_getopt as getopt, GetoptError
72 from getpass import getpass
73 import re
74 import os.path
75 try:
76 from ConfigParser import SafeConfigParser
77 except ImportError:
78 from configparser import ConfigParser as SafeConfigParser
79 import datetime
80 try:
81 from urllib.parse import quote
82 except ImportError:
83 from urllib2 import quote
84 import webbrowser
85
86 from .api import Twitter, TwitterError
87 from .oauth import OAuth, write_token_file, read_token_file
88 from .oauth_dance import oauth_dance
89 from . import ansi
90 from .util import smrt_input
91
92 OPTIONS = {
93 'action': 'friends',
94 'refresh': False,
95 'refresh_rate': 600,
96 'format': 'default',
97 'prompt': '[cyan]twitter[R]> ',
98 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
99 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
100 'length': 20,
101 'timestamp': False,
102 'datestamp': False,
103 'extra_args': [],
104 'secure': True,
105 }
106
107 def parse_args(args, options):
108 long_opts = ['help', 'format=', 'refresh', 'oauth=',
109 'refresh-rate=', 'config=', 'length=', 'timestamp',
110 'datestamp', 'no-ssl']
111 short_opts = "e:p:f:h?rR:c:l:td"
112 opts, extra_args = getopt(args, short_opts, long_opts)
113
114 for opt, arg in opts:
115 if opt in ('-f', '--format'):
116 options['format'] = arg
117 elif opt in ('-r', '--refresh'):
118 options['refresh'] = True
119 elif opt in ('-R', '--refresh-rate'):
120 options['refresh_rate'] = int(arg)
121 elif opt in ('-l', '--length'):
122 options["length"] = int(arg)
123 elif opt in ('-t', '--timestamp'):
124 options["timestamp"] = True
125 elif opt in ('-d', '--datestamp'):
126 options["datestamp"] = True
127 elif opt in ('-?', '-h', '--help'):
128 options['action'] = 'help'
129 elif opt in ('-c', '--config'):
130 options['config_filename'] = arg
131 elif opt == '--no-ssl':
132 options['secure'] = False
133 elif opt == '--oauth':
134 options['oauth_filename'] = arg
135
136 if extra_args and not ('action' in options and options['action'] == 'help'):
137 options['action'] = extra_args[0]
138 options['extra_args'] = extra_args[1:]
139
140 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
141 timestamp = options["timestamp"]
142 datestamp = options["datestamp"]
143 t = time.strptime(status['created_at'], format)
144 i_hate_timezones = time.timezone
145 if (time.daylight):
146 i_hate_timezones = time.altzone
147 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
148 seconds=i_hate_timezones)
149 t = dt.timetuple()
150 if timestamp and datestamp:
151 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
152 elif timestamp:
153 return time.strftime("%H:%M:%S ", t)
154 elif datestamp:
155 return time.strftime("%Y-%m-%d ", t)
156 return ""
157
158 class StatusFormatter(object):
159 def __call__(self, status, options):
160 return ("%s%s %s" %(
161 get_time_string(status, options),
162 status['user']['screen_name'], status['text']))
163
164 class AnsiStatusFormatter(object):
165 def __init__(self):
166 self._colourMap = ansi.ColourMap()
167
168 def __call__(self, status, options):
169 colour = self._colourMap.colourFor(status['user']['screen_name'])
170 return ("%s%s%s%s %s" %(
171 get_time_string(status, options),
172 ansi.cmdColour(colour), status['user']['screen_name'],
173 ansi.cmdReset(), status['text']))
174
175 class VerboseStatusFormatter(object):
176 def __call__(self, status, options):
177 return ("-- %s (%s) on %s\n%s\n" %(
178 status['user']['screen_name'],
179 status['user']['location'],
180 status['created_at'],
181 status['text']))
182
183 class URLStatusFormatter(object):
184 urlmatch = re.compile(r'https?://\S+')
185 def __call__(self, status, options):
186 urls = self.urlmatch.findall(status['text'])
187 return '\n'.join(urls) if urls else ""
188
189
190 class ListsFormatter(object):
191 def __call__(self, list):
192 if list['description']:
193 list_str = "%-30s (%s)" % (list['name'], list['description'])
194 else:
195 list_str = "%-30s" % (list['name'])
196 return "%s\n" % list_str
197
198 class ListsVerboseFormatter(object):
199 def __call__(self, list):
200 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
201 return list_str
202
203 class AnsiListsFormatter(object):
204 def __init__(self):
205 self._colourMap = ansi.ColourMap()
206
207 def __call__(self, list):
208 colour = self._colourMap.colourFor(list['name'])
209 return ("%s%-15s%s %s" %(
210 ansi.cmdColour(colour), list['name'],
211 ansi.cmdReset(), list['description']))
212
213
214 class AdminFormatter(object):
215 def __call__(self, action, user):
216 user_str = "%s (%s)" %(user['screen_name'], user['name'])
217 if action == "follow":
218 return "You are now following %s.\n" %(user_str)
219 else:
220 return "You are no longer following %s.\n" %(user_str)
221
222 class VerboseAdminFormatter(object):
223 def __call__(self, action, user):
224 return("-- %s: %s (%s): %s" % (
225 "Following" if action == "follow" else "Leaving",
226 user['screen_name'],
227 user['name'],
228 user['url']))
229
230 class SearchFormatter(object):
231 def __call__(self, result, options):
232 return("%s%s %s" %(
233 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
234 result['from_user'], result['text']))
235
236 class VerboseSearchFormatter(SearchFormatter):
237 pass #Default to the regular one
238
239 class URLSearchFormatter(object):
240 urlmatch = re.compile(r'https?://\S+')
241 def __call__(self, result, options):
242 urls = self.urlmatch.findall(result['text'])
243 return '\n'.join(urls) if urls else ""
244
245 class AnsiSearchFormatter(object):
246 def __init__(self):
247 self._colourMap = ansi.ColourMap()
248
249 def __call__(self, result, options):
250 colour = self._colourMap.colourFor(result['from_user'])
251 return ("%s%s%s%s %s" %(
252 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
253 ansi.cmdColour(colour), result['from_user'],
254 ansi.cmdReset(), result['text']))
255
256 _term_encoding = None
257 def get_term_encoding():
258 global _term_encoding
259 if not _term_encoding:
260 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
261 if lang[1:]:
262 _term_encoding = lang[1]
263 else:
264 _term_encoding = 'UTF-8'
265 return _term_encoding
266
267 formatters = {}
268 status_formatters = {
269 'default': StatusFormatter,
270 'verbose': VerboseStatusFormatter,
271 'urls': URLStatusFormatter,
272 'ansi': AnsiStatusFormatter
273 }
274 formatters['status'] = status_formatters
275
276 admin_formatters = {
277 'default': AdminFormatter,
278 'verbose': VerboseAdminFormatter,
279 'urls': AdminFormatter,
280 'ansi': AdminFormatter
281 }
282 formatters['admin'] = admin_formatters
283
284 search_formatters = {
285 'default': SearchFormatter,
286 'verbose': VerboseSearchFormatter,
287 'urls': URLSearchFormatter,
288 'ansi': AnsiSearchFormatter
289 }
290 formatters['search'] = search_formatters
291
292 lists_formatters = {
293 'default': ListsFormatter,
294 'verbose': ListsVerboseFormatter,
295 'urls': None,
296 'ansi': AnsiListsFormatter
297 }
298 formatters['lists'] = lists_formatters
299
300 def get_formatter(action_type, options):
301 formatters_dict = formatters.get(action_type)
302 if (not formatters_dict):
303 raise TwitterError(
304 "There was an error finding a class of formatters for your type (%s)"
305 %(action_type))
306 f = formatters_dict.get(options['format'])
307 if (not f):
308 raise TwitterError(
309 "Unknown formatter '%s' for status actions" %(options['format']))
310 return f()
311
312 class Action(object):
313
314 def ask(self, subject='perform this action', careful=False):
315 '''
316 Requests fromt he user using `raw_input` if `subject` should be
317 performed. When `careful`, the default answer is NO, otherwise YES.
318 Returns the user answer in the form `True` or `False`.
319 '''
320 sample = '(y/N)'
321 if not careful:
322 sample = '(Y/n)'
323
324 prompt = 'You really want to %s %s? ' %(subject, sample)
325 try:
326 answer = input(prompt).lower()
327 if careful:
328 return answer in ('yes', 'y')
329 else:
330 return answer not in ('no', 'n')
331 except EOFError:
332 print(file=sys.stderr) # Put Newline since Enter was never pressed
333 # TODO:
334 # Figure out why on OS X the raw_input keeps raising
335 # EOFError and is never able to reset and get more input
336 # Hint: Look at how IPython implements their console
337 default = True
338 if careful:
339 default = False
340 return default
341
342 def __call__(self, twitter, options):
343 action = actions.get(options['action'], NoSuchAction)()
344 try:
345 doAction = lambda : action(twitter, options)
346 if (options['refresh'] and isinstance(action, StatusAction)):
347 while True:
348 doAction()
349 time.sleep(options['refresh_rate'])
350 else:
351 doAction()
352 except KeyboardInterrupt:
353 print('\n[Keyboard Interrupt]', file=sys.stderr)
354 pass
355
356 class NoSuchActionError(Exception):
357 pass
358
359 class NoSuchAction(Action):
360 def __call__(self, twitter, options):
361 raise NoSuchActionError("No such action: %s" %(options['action']))
362
363 def printNicely(string):
364 if hasattr(sys.stdout, 'buffer'):
365 sys.stdout.buffer.write(string.encode('utf8'))
366 print()
367 else:
368 print(string.encode('utf8'))
369
370 class StatusAction(Action):
371 def __call__(self, twitter, options):
372 statuses = self.getStatuses(twitter, options)
373 sf = get_formatter('status', options)
374 for status in statuses:
375 statusStr = sf(status, options)
376 if statusStr.strip():
377 printNicely(statusStr)
378
379 class SearchAction(Action):
380 def __call__(self, twitter, options):
381 # We need to be pointing at search.twitter.com to work, and it is less
382 # tangly to do it here than in the main()
383 twitter.domain="search.twitter.com"
384 twitter.uriparts=()
385 # We need to bypass the TwitterCall parameter encoding, so we
386 # don't encode the plus sign, so we have to encode it ourselves
387 query_string = "+".join(
388 [quote(term)
389 for term in options['extra_args']])
390
391 results = twitter.search(q=query_string)['results']
392 f = get_formatter('search', options)
393 for result in results:
394 resultStr = f(result, options)
395 if resultStr.strip():
396 printNicely(resultStr)
397
398 class AdminAction(Action):
399 def __call__(self, twitter, options):
400 if not (options['extra_args'] and options['extra_args'][0]):
401 raise TwitterError("You need to specify a user (screen name)")
402 af = get_formatter('admin', options)
403 try:
404 user = self.getUser(twitter, options['extra_args'][0])
405 except TwitterError as e:
406 print("There was a problem following or leaving the specified user.")
407 print("You may be trying to follow a user you are already following;")
408 print("Leaving a user you are not currently following;")
409 print("Or the user may not exist.")
410 print("Sorry.")
411 print()
412 print(e)
413 else:
414 printNicely(af(options['action'], user))
415
416 class ListsAction(StatusAction):
417 def getStatuses(self, twitter, options):
418 if not options['extra_args']:
419 raise TwitterError("Please provide a user to query for lists")
420
421 screen_name = options['extra_args'][0]
422
423 if not options['extra_args'][1:]:
424 lists = twitter.user.lists(user=screen_name)['lists']
425 if not lists:
426 printNicely("This user has no lists.")
427 for list in lists:
428 lf = get_formatter('lists', options)
429 printNicely(lf(list))
430 return []
431 else:
432 return reversed(twitter.user.lists.list.statuses(
433 user=screen_name, list=options['extra_args'][1]))
434
435
436 class MyListsAction(ListsAction):
437 def getStatuses(self, twitter, options):
438 screen_name = twitter.account.verify_credentials()['screen_name']
439 options['extra_args'].insert(0, screen_name)
440 return ListsAction.getStatuses(self, twitter, options)
441
442
443 class FriendsAction(StatusAction):
444 def getStatuses(self, twitter, options):
445 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
446
447 class PublicAction(StatusAction):
448 def getStatuses(self, twitter, options):
449 return reversed(twitter.statuses.public_timeline(count=options["length"]))
450
451 class RepliesAction(StatusAction):
452 def getStatuses(self, twitter, options):
453 return reversed(twitter.statuses.replies(count=options["length"]))
454
455 class FollowAction(AdminAction):
456 def getUser(self, twitter, user):
457 return twitter.friendships.create(id=user)
458
459 class LeaveAction(AdminAction):
460 def getUser(self, twitter, user):
461 return twitter.friendships.destroy(id=user)
462
463 class SetStatusAction(Action):
464 def __call__(self, twitter, options):
465 statusTxt = (" ".join(options['extra_args'])
466 if options['extra_args']
467 else str(input("message: ")))
468 twitter.statuses.update(status=statusTxt)
469
470 class TwitterShell(Action):
471
472 def render_prompt(self, prompt):
473 '''Parses the `prompt` string and returns the rendered version'''
474 prompt = prompt.strip("'").replace("\\'","'")
475 for colour in ansi.COLOURS_NAMED:
476 if '[%s]' %(colour) in prompt:
477 prompt = prompt.replace(
478 '[%s]' %(colour), ansi.cmdColourNamed(colour))
479 prompt = prompt.replace('[R]', ansi.cmdReset())
480 return prompt
481
482 def __call__(self, twitter, options):
483 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
484 while True:
485 options['action'] = ""
486 try:
487 args = input(prompt).split()
488 parse_args(args, options)
489 if not options['action']:
490 continue
491 elif options['action'] == 'exit':
492 raise SystemExit(0)
493 elif options['action'] == 'shell':
494 print('Sorry Xzibit does not work here!', file=sys.stderr)
495 continue
496 elif options['action'] == 'help':
497 print('''\ntwitter> `action`\n
498 The Shell Accepts all the command line actions along with:
499
500 exit Leave the twitter shell (^D may also be used)
501
502 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
503 Action()(twitter, options)
504 options['action'] = ''
505 except NoSuchActionError as e:
506 print(e, file=sys.stderr)
507 except KeyboardInterrupt:
508 print('\n[Keyboard Interrupt]', file=sys.stderr)
509 except EOFError:
510 print(file=sys.stderr)
511 leaving = self.ask(subject='Leave')
512 if not leaving:
513 print('Excellent!', file=sys.stderr)
514 else:
515 raise SystemExit(0)
516
517 class PythonPromptAction(Action):
518 def __call__(self, twitter, options):
519 try:
520 while True:
521 smrt_input(globals(), locals())
522 except EOFError:
523 pass
524
525 class HelpAction(Action):
526 def __call__(self, twitter, options):
527 print(__doc__)
528
529 class DoNothingAction(Action):
530 def __call__(self, twitter, options):
531 pass
532
533 actions = {
534 'authorize' : DoNothingAction,
535 'follow' : FollowAction,
536 'friends' : FriendsAction,
537 'list' : ListsAction,
538 'mylist' : MyListsAction,
539 'help' : HelpAction,
540 'leave' : LeaveAction,
541 'public' : PublicAction,
542 'pyprompt' : PythonPromptAction,
543 'replies' : RepliesAction,
544 'search' : SearchAction,
545 'set' : SetStatusAction,
546 'shell' : TwitterShell,
547 }
548
549 def loadConfig(filename):
550 options = dict(OPTIONS)
551 if os.path.exists(filename):
552 cp = SafeConfigParser()
553 cp.read([filename])
554 for option in ('format', 'prompt'):
555 if cp.has_option('twitter', option):
556 options[option] = cp.get('twitter', option)
557 return options
558
559 def main(args=sys.argv[1:]):
560 arg_options = {}
561 try:
562 parse_args(args, arg_options)
563 except GetoptError as e:
564 print("I can't do that, %s." %(e), file=sys.stderr)
565 print(file=sys.stderr)
566 raise SystemExit(1)
567
568 config_path = os.path.expanduser(
569 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
570 config_options = loadConfig(config_path)
571
572 # Apply the various options in order, the most important applied last.
573 # Defaults first, then what's read from config file, then command-line
574 # arguments.
575 options = dict(OPTIONS)
576 for d in config_options, arg_options:
577 for k,v in list(d.items()):
578 if v: options[k] = v
579
580 if options['refresh'] and options['action'] not in (
581 'friends', 'public', 'replies'):
582 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
583 print("Use 'twitter -h' for help.", file=sys.stderr)
584 return 1
585
586 oauth_filename = os.path.expanduser(options['oauth_filename'])
587
588 if (options['action'] == 'authorize'
589 or not os.path.exists(oauth_filename)):
590 oauth_dance(
591 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
592 options['oauth_filename'])
593
594 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
595
596 twitter = Twitter(
597 auth=OAuth(
598 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
599 secure=options['secure'],
600 api_version='1',
601 domain='api.twitter.com')
602
603 try:
604 Action()(twitter, options)
605 except NoSuchActionError as e:
606 print(e, file=sys.stderr)
607 raise SystemExit(1)
608 except TwitterError as e:
609 print(str(e), file=sys.stderr)
610 print("Use 'twitter -h' for help.", file=sys.stderr)
611 raise SystemExit(1)