]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Don't need term encoding in py3. (py3_only)
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 replies get latest replies to you
20 search search twitter (Beware: octothorpe, escape it)
21 set set your twitter status
22 shell login to the twitter shell
23
24
25 OPTIONS:
26
27 -r --refresh run this command forever, polling every once
28 in a while (default: every 5 minutes)
29 -R --refresh-rate <rate> set the refresh rate (in seconds)
30 -f --format <format> specify the output format for status updates
31 -c --config <filename> read username and password from given config
32 file (default ~/.twitter)
33 -l --length <count> specify number of status updates shown
34 (default: 20, max: 200)
35 -t --timestamp show time before status lines
36 -d --datestamp show date before status lines
37 --no-ssl use less-secure HTTP instead of HTTPS
38 --oauth <filename> filename to read/store oauth credentials to
39
40 FORMATS for the --format option
41
42 default one line per status
43 verbose multiple lines per status, more verbose status info
44 urls nothing but URLs
45 ansi ansi colour (rainbow mode)
46
47
48 CONFIG FILES
49
50 The config file should be placed in your home directory and be named .twitter.
51 It must contain a [twitter] header, and all the desired options you wish to
52 set, like so:
53
54 [twitter]
55 format: <desired_default_format_for_output>
56 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
57
58 OAuth authentication tokens are stored in the file .twitter_oauth in your
59 home directory.
60 """
61
62 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
63 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
64
65 import sys
66 import time
67 from getopt import gnu_getopt as getopt, GetoptError
68 from getpass import getpass
69 import re
70 import os.path
71 from configparser import SafeConfigParser
72 import datetime
73 from urllib.parse import quote
74 import webbrowser
75
76 from .api import Twitter, TwitterError
77 from .oauth import OAuth, write_token_file, read_token_file
78 from .oauth_dance import oauth_dance
79 from . import ansi
80
81 OPTIONS = {
82 'action': 'friends',
83 'refresh': False,
84 'refresh_rate': 600,
85 'format': 'default',
86 'prompt': '[cyan]twitter[R]> ',
87 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
88 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
89 'length': 20,
90 'timestamp': False,
91 'datestamp': False,
92 'extra_args': [],
93 'secure': True,
94 }
95
96 def parse_args(args, options):
97 long_opts = ['help', 'format=', 'refresh', 'oauth=',
98 'refresh-rate=', 'config=', 'length=', 'timestamp',
99 'datestamp', 'no-ssl']
100 short_opts = "e:p:f:h?rR:c:l:td"
101 opts, extra_args = getopt(args, short_opts, long_opts)
102
103 for opt, arg in opts:
104 if opt in ('-f', '--format'):
105 options['format'] = arg
106 elif opt in ('-r', '--refresh'):
107 options['refresh'] = True
108 elif opt in ('-R', '--refresh-rate'):
109 options['refresh_rate'] = int(arg)
110 elif opt in ('-l', '--length'):
111 options["length"] = int(arg)
112 elif opt in ('-t', '--timestamp'):
113 options["timestamp"] = True
114 elif opt in ('-d', '--datestamp'):
115 options["datestamp"] = True
116 elif opt in ('-?', '-h', '--help'):
117 options['action'] = 'help'
118 elif opt in ('-c', '--config'):
119 options['config_filename'] = arg
120 elif opt == '--no-ssl':
121 options['secure'] = False
122 elif opt == '--oauth':
123 options['oauth_filename'] = arg
124
125 if extra_args and not ('action' in options and options['action'] == 'help'):
126 options['action'] = extra_args[0]
127 options['extra_args'] = extra_args[1:]
128
129 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
130 timestamp = options["timestamp"]
131 datestamp = options["datestamp"]
132 t = time.strptime(status['created_at'], format)
133 i_hate_timezones = time.timezone
134 if (time.daylight):
135 i_hate_timezones = time.altzone
136 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
137 seconds=i_hate_timezones)
138 t = dt.timetuple()
139 if timestamp and datestamp:
140 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
141 elif timestamp:
142 return time.strftime("%H:%M:%S ", t)
143 elif datestamp:
144 return time.strftime("%Y-%m-%d ", t)
145 return ""
146
147 class StatusFormatter(object):
148 def __call__(self, status, options):
149 return ("%s%s %s" %(
150 get_time_string(status, options),
151 status['user']['screen_name'], status['text']))
152
153 class AnsiStatusFormatter(object):
154 def __init__(self):
155 self._colourMap = ansi.ColourMap()
156
157 def __call__(self, status, options):
158 colour = self._colourMap.colourFor(status['user']['screen_name'])
159 return ("%s%s%s%s %s" %(
160 get_time_string(status, options),
161 ansi.cmdColour(colour), status['user']['screen_name'],
162 ansi.cmdReset(), status['text']))
163
164 class VerboseStatusFormatter(object):
165 def __call__(self, status, options):
166 return ("-- %s (%s) on %s\n%s\n" %(
167 status['user']['screen_name'],
168 status['user']['location'],
169 status['created_at'],
170 status['text']))
171
172 class URLStatusFormatter(object):
173 urlmatch = re.compile(r'https?://\S+')
174 def __call__(self, status, options):
175 urls = self.urlmatch.findall(status['text'])
176 return '\n'.join(urls) if urls else ""
177
178
179 class ListsFormatter(object):
180 def __call__(self, list):
181 if list['description']:
182 list_str = "%-30s (%s)" % (list['name'], list['description'])
183 else:
184 list_str = "%-30s" % (list['name'])
185 return "%s\n" % list_str
186
187 class ListsVerboseFormatter(object):
188 def __call__(self, list):
189 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
190 return list_str
191
192 class AnsiListsFormatter(object):
193 def __init__(self):
194 self._colourMap = ansi.ColourMap()
195
196 def __call__(self, list):
197 colour = self._colourMap.colourFor(list['name'])
198 return ("%s%-15s%s %s" %(
199 ansi.cmdColour(colour), list['name'],
200 ansi.cmdReset(), list['description']))
201
202
203 class AdminFormatter(object):
204 def __call__(self, action, user):
205 user_str = "%s (%s)" %(user['screen_name'], user['name'])
206 if action == "follow":
207 return "You are now following %s.\n" %(user_str)
208 else:
209 return "You are no longer following %s.\n" %(user_str)
210
211 class VerboseAdminFormatter(object):
212 def __call__(self, action, user):
213 return("-- %s: %s (%s): %s" % (
214 "Following" if action == "follow" else "Leaving",
215 user['screen_name'],
216 user['name'],
217 user['url']))
218
219 class SearchFormatter(object):
220 def __call__(self, result, options):
221 return("%s%s %s" %(
222 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
223 result['from_user'], result['text']))
224
225 class VerboseSearchFormatter(SearchFormatter):
226 pass #Default to the regular one
227
228 class URLSearchFormatter(object):
229 urlmatch = re.compile(r'https?://\S+')
230 def __call__(self, result, options):
231 urls = self.urlmatch.findall(result['text'])
232 return '\n'.join(urls) if urls else ""
233
234 class AnsiSearchFormatter(object):
235 def __init__(self):
236 self._colourMap = ansi.ColourMap()
237
238 def __call__(self, result, options):
239 colour = self._colourMap.colourFor(result['from_user'])
240 return ("%s%s%s%s %s" %(
241 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
242 ansi.cmdColour(colour), result['from_user'],
243 ansi.cmdReset(), result['text']))
244
245 _term_encoding = None
246 def get_term_encoding():
247 global _term_encoding
248 if not _term_encoding:
249 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
250 if lang[1:]:
251 _term_encoding = lang[1]
252 else:
253 _term_encoding = 'UTF-8'
254 return _term_encoding
255
256 formatters = {}
257 status_formatters = {
258 'default': StatusFormatter,
259 'verbose': VerboseStatusFormatter,
260 'urls': URLStatusFormatter,
261 'ansi': AnsiStatusFormatter
262 }
263 formatters['status'] = status_formatters
264
265 admin_formatters = {
266 'default': AdminFormatter,
267 'verbose': VerboseAdminFormatter,
268 'urls': AdminFormatter,
269 'ansi': AdminFormatter
270 }
271 formatters['admin'] = admin_formatters
272
273 search_formatters = {
274 'default': SearchFormatter,
275 'verbose': VerboseSearchFormatter,
276 'urls': URLSearchFormatter,
277 'ansi': AnsiSearchFormatter
278 }
279 formatters['search'] = search_formatters
280
281 lists_formatters = {
282 'default': ListsFormatter,
283 'verbose': ListsVerboseFormatter,
284 'urls': None,
285 'ansi': AnsiListsFormatter
286 }
287 formatters['lists'] = lists_formatters
288
289 def get_formatter(action_type, options):
290 formatters_dict = formatters.get(action_type)
291 if (not formatters_dict):
292 raise TwitterError(
293 "There was an error finding a class of formatters for your type (%s)"
294 %(action_type))
295 f = formatters_dict.get(options['format'])
296 if (not f):
297 raise TwitterError(
298 "Unknown formatter '%s' for status actions" %(options['format']))
299 return f()
300
301 class Action(object):
302
303 def ask(self, subject='perform this action', careful=False):
304 '''
305 Requests fromt he user using `raw_input` if `subject` should be
306 performed. When `careful`, the default answer is NO, otherwise YES.
307 Returns the user answer in the form `True` or `False`.
308 '''
309 sample = '(y/N)'
310 if not careful:
311 sample = '(Y/n)'
312
313 prompt = 'You really want to %s %s? ' %(subject, sample)
314 try:
315 answer = input(prompt).lower()
316 if careful:
317 return answer in ('yes', 'y')
318 else:
319 return answer not in ('no', 'n')
320 except EOFError:
321 print(file=sys.stderr) # Put Newline since Enter was never pressed
322 # TODO:
323 # Figure out why on OS X the raw_input keeps raising
324 # EOFError and is never able to reset and get more input
325 # Hint: Look at how IPython implements their console
326 default = True
327 if careful:
328 default = False
329 return default
330
331 def __call__(self, twitter, options):
332 action = actions.get(options['action'], NoSuchAction)()
333 try:
334 doAction = lambda : action(twitter, options)
335 if (options['refresh'] and isinstance(action, StatusAction)):
336 while True:
337 doAction()
338 time.sleep(options['refresh_rate'])
339 else:
340 doAction()
341 except KeyboardInterrupt:
342 print('\n[Keyboard Interrupt]', file=sys.stderr)
343 pass
344
345 class NoSuchActionError(Exception):
346 pass
347
348 class NoSuchAction(Action):
349 def __call__(self, twitter, options):
350 raise NoSuchActionError("No such action: %s" %(options['action']))
351
352 def printNicely(string):
353 sys.stdout.buffer.write(string.encode('utf8'))
354 print()
355
356 class StatusAction(Action):
357 def __call__(self, twitter, options):
358 statuses = self.getStatuses(twitter, options)
359 sf = get_formatter('status', options)
360 for status in statuses:
361 statusStr = sf(status, options)
362 if statusStr.strip():
363 printNicely(statusStr)
364
365 class SearchAction(Action):
366 def __call__(self, twitter, options):
367 # We need to be pointing at search.twitter.com to work, and it is less
368 # tangly to do it here than in the main()
369 twitter.domain="search.twitter.com"
370 twitter.uriparts=()
371 # We need to bypass the TwitterCall parameter encoding, so we
372 # don't encode the plus sign, so we have to encode it ourselves
373 query_string = "+".join(
374 [quote(term.decode(get_term_encoding()))
375 for term in options['extra_args']])
376
377 results = twitter.search(q=query_string)['results']
378 f = get_formatter('search', options)
379 for result in results:
380 resultStr = f(result, options)
381 if resultStr.strip():
382 printNicely(resultStr)
383
384 class AdminAction(Action):
385 def __call__(self, twitter, options):
386 if not (options['extra_args'] and options['extra_args'][0]):
387 raise TwitterError("You need to specify a user (screen name)")
388 af = get_formatter('admin', options)
389 try:
390 user = self.getUser(twitter, options['extra_args'][0])
391 except TwitterError as e:
392 print("There was a problem following or leaving the specified user.")
393 print("You may be trying to follow a user you are already following;")
394 print("Leaving a user you are not currently following;")
395 print("Or the user may not exist.")
396 print("Sorry.")
397 print()
398 print(e)
399 else:
400 printNicely(af(options['action'], user))
401
402 class ListsAction(StatusAction):
403 def getStatuses(self, twitter, options):
404 if not options['extra_args']:
405 raise TwitterError("Please provide a user to query for lists")
406
407 screen_name = options['extra_args'][0]
408
409 if not options['extra_args'][1:]:
410 lists = twitter.user.lists(user=screen_name)['lists']
411 if not lists:
412 printNicely("This user has no lists.")
413 for list in lists:
414 lf = get_formatter('lists', options)
415 printNicely(lf(list))
416 return []
417 else:
418 return reversed(twitter.user.lists.list.statuses(
419 user=screen_name, list=options['extra_args'][1]))
420
421
422 class MyListsAction(ListsAction):
423 def getStatuses(self, twitter, options):
424 screen_name = twitter.account.verify_credentials()['screen_name']
425 options['extra_args'].insert(0, screen_name)
426 return ListsAction.getStatuses(self, twitter, options)
427
428
429 class FriendsAction(StatusAction):
430 def getStatuses(self, twitter, options):
431 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
432
433 class PublicAction(StatusAction):
434 def getStatuses(self, twitter, options):
435 return reversed(twitter.statuses.public_timeline(count=options["length"]))
436
437 class RepliesAction(StatusAction):
438 def getStatuses(self, twitter, options):
439 return reversed(twitter.statuses.replies(count=options["length"]))
440
441 class FollowAction(AdminAction):
442 def getUser(self, twitter, user):
443 return twitter.friendships.create(id=user)
444
445 class LeaveAction(AdminAction):
446 def getUser(self, twitter, user):
447 return twitter.friendships.destroy(id=user)
448
449 class SetStatusAction(Action):
450 def __call__(self, twitter, options):
451 statusTxt = (" ".join(options['extra_args'])
452 if options['extra_args']
453 else str(input("message: ")))
454 status = (statusTxt.encode('utf8', 'replace'))
455 twitter.statuses.update(status=status)
456
457 class TwitterShell(Action):
458
459 def render_prompt(self, prompt):
460 '''Parses the `prompt` string and returns the rendered version'''
461 prompt = prompt.strip("'").replace("\\'","'")
462 for colour in ansi.COLOURS_NAMED:
463 if '[%s]' %(colour) in prompt:
464 prompt = prompt.replace(
465 '[%s]' %(colour), ansi.cmdColourNamed(colour))
466 prompt = prompt.replace('[R]', ansi.cmdReset())
467 return prompt
468
469 def __call__(self, twitter, options):
470 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
471 while True:
472 options['action'] = ""
473 try:
474 args = input(prompt).split()
475 parse_args(args, options)
476 if not options['action']:
477 continue
478 elif options['action'] == 'exit':
479 raise SystemExit(0)
480 elif options['action'] == 'shell':
481 print('Sorry Xzibit does not work here!', file=sys.stderr)
482 continue
483 elif options['action'] == 'help':
484 print('''\ntwitter> `action`\n
485 The Shell Accepts all the command line actions along with:
486
487 exit Leave the twitter shell (^D may also be used)
488
489 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
490 Action()(twitter, options)
491 options['action'] = ''
492 except NoSuchActionError as e:
493 print(e, file=sys.stderr)
494 except KeyboardInterrupt:
495 print('\n[Keyboard Interrupt]', file=sys.stderr)
496 except EOFError:
497 print(file=sys.stderr)
498 leaving = self.ask(subject='Leave')
499 if not leaving:
500 print('Excellent!', file=sys.stderr)
501 else:
502 raise SystemExit(0)
503
504 class HelpAction(Action):
505 def __call__(self, twitter, options):
506 print(__doc__)
507
508 class DoNothingAction(Action):
509 def __call__(self, twitter, options):
510 pass
511
512 actions = {
513 'authorize' : DoNothingAction,
514 'follow' : FollowAction,
515 'friends' : FriendsAction,
516 'list' : ListsAction,
517 'mylist' : MyListsAction,
518 'help' : HelpAction,
519 'leave' : LeaveAction,
520 'public' : PublicAction,
521 'replies' : RepliesAction,
522 'search' : SearchAction,
523 'set' : SetStatusAction,
524 'shell' : TwitterShell,
525 }
526
527 def loadConfig(filename):
528 options = dict(OPTIONS)
529 if os.path.exists(filename):
530 cp = SafeConfigParser()
531 cp.read([filename])
532 for option in ('format', 'prompt'):
533 if cp.has_option('twitter', option):
534 options[option] = cp.get('twitter', option)
535 return options
536
537 def main(args=sys.argv[1:]):
538 arg_options = {}
539 try:
540 parse_args(args, arg_options)
541 except GetoptError as e:
542 print("I can't do that, %s." %(e), file=sys.stderr)
543 print(file=sys.stderr)
544 raise SystemExit(1)
545
546 config_path = os.path.expanduser(
547 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
548 config_options = loadConfig(config_path)
549
550 # Apply the various options in order, the most important applied last.
551 # Defaults first, then what's read from config file, then command-line
552 # arguments.
553 options = dict(OPTIONS)
554 for d in config_options, arg_options:
555 for k,v in list(d.items()):
556 if v: options[k] = v
557
558 if options['refresh'] and options['action'] not in (
559 'friends', 'public', 'replies'):
560 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
561 print("Use 'twitter -h' for help.", file=sys.stderr)
562 return 1
563
564 oauth_filename = os.path.expanduser(options['oauth_filename'])
565
566 if (options['action'] == 'authorize'
567 or not os.path.exists(oauth_filename)):
568 oauth_dance(
569 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
570 options['oauth_filename'])
571
572 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
573
574 twitter = Twitter(
575 auth=OAuth(
576 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
577 secure=options['secure'],
578 api_version='1',
579 domain='api.twitter.com')
580
581 try:
582 Action()(twitter, options)
583 except NoSuchActionError as e:
584 print(e, file=sys.stderr)
585 raise SystemExit(1)
586 except TwitterError as e:
587 print(str(e), file=sys.stderr)
588 print("Use 'twitter -h' for help.", file=sys.stderr)
589 raise SystemExit(1)