]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
e6f04b0fde4fd1a57331ddedfa82479d38759cd0
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 replies get latest replies to you
20 search search twitter (Beware: octothorpe, escape it)
21 set set your twitter status
22 shell login to the twitter shell
23
24
25 OPTIONS:
26
27 -r --refresh run this command forever, polling every once
28 in a while (default: every 5 minutes)
29 -R --refresh-rate <rate> set the refresh rate (in seconds)
30 -f --format <format> specify the output format for status updates
31 -c --config <filename> read username and password from given config
32 file (default ~/.twitter)
33 -l --length <count> specify number of status updates shown
34 (default: 20, max: 200)
35 -t --timestamp show time before status lines
36 -d --datestamp show date before status lines
37 --no-ssl use less-secure HTTP instead of HTTPS
38 --oauth <filename> filename to read/store oauth credentials to
39
40 FORMATS for the --format option
41
42 default one line per status
43 verbose multiple lines per status, more verbose status info
44 urls nothing but URLs
45 ansi ansi colour (rainbow mode)
46
47
48 CONFIG FILES
49
50 The config file should be placed in your home directory and be named .twitter.
51 It must contain a [twitter] header, and all the desired options you wish to
52 set, like so:
53
54 [twitter]
55 format: <desired_default_format_for_output>
56 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
57
58 OAuth authentication tokens are stored in the file .twitter_oauth in your
59 home directory.
60 """
61
62 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
63 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
64
65 import sys
66 import time
67 from getopt import gnu_getopt as getopt, GetoptError
68 from getpass import getpass
69 import re
70 import os.path
71 from configparser import SafeConfigParser
72 import datetime
73 from urllib.parse import quote
74 import webbrowser
75
76 from .api import Twitter, TwitterError
77 from .oauth import OAuth, write_token_file, read_token_file
78 from .oauth_dance import oauth_dance
79 from . import ansi
80
81 OPTIONS = {
82 'action': 'friends',
83 'refresh': False,
84 'refresh_rate': 600,
85 'format': 'default',
86 'prompt': '[cyan]twitter[R]> ',
87 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
88 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
89 'length': 20,
90 'timestamp': False,
91 'datestamp': False,
92 'extra_args': [],
93 'secure': True,
94 }
95
96 def parse_args(args, options):
97 long_opts = ['help', 'format=', 'refresh', 'oauth=',
98 'refresh-rate=', 'config=', 'length=', 'timestamp',
99 'datestamp', 'no-ssl']
100 short_opts = "e:p:f:h?rR:c:l:td"
101 opts, extra_args = getopt(args, short_opts, long_opts)
102
103 for opt, arg in opts:
104 if opt in ('-f', '--format'):
105 options['format'] = arg
106 elif opt in ('-r', '--refresh'):
107 options['refresh'] = True
108 elif opt in ('-R', '--refresh-rate'):
109 options['refresh_rate'] = int(arg)
110 elif opt in ('-l', '--length'):
111 options["length"] = int(arg)
112 elif opt in ('-t', '--timestamp'):
113 options["timestamp"] = True
114 elif opt in ('-d', '--datestamp'):
115 options["datestamp"] = True
116 elif opt in ('-?', '-h', '--help'):
117 options['action'] = 'help'
118 elif opt in ('-c', '--config'):
119 options['config_filename'] = arg
120 elif opt == '--no-ssl':
121 options['secure'] = False
122 elif opt == '--oauth':
123 options['oauth_filename'] = arg
124
125 if extra_args and not ('action' in options and options['action'] == 'help'):
126 options['action'] = extra_args[0]
127 options['extra_args'] = extra_args[1:]
128
129 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
130 timestamp = options["timestamp"]
131 datestamp = options["datestamp"]
132 t = time.strptime(status['created_at'], format)
133 i_hate_timezones = time.timezone
134 if (time.daylight):
135 i_hate_timezones = time.altzone
136 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
137 seconds=i_hate_timezones)
138 t = dt.timetuple()
139 if timestamp and datestamp:
140 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
141 elif timestamp:
142 return time.strftime("%H:%M:%S ", t)
143 elif datestamp:
144 return time.strftime("%Y-%m-%d ", t)
145 return ""
146
147 class StatusFormatter(object):
148 def __call__(self, status, options):
149 return ("%s%s %s" %(
150 get_time_string(status, options),
151 status['user']['screen_name'], status['text']))
152
153 class AnsiStatusFormatter(object):
154 def __init__(self):
155 self._colourMap = ansi.ColourMap()
156
157 def __call__(self, status, options):
158 colour = self._colourMap.colourFor(status['user']['screen_name'])
159 return ("%s%s%s%s %s" %(
160 get_time_string(status, options),
161 ansi.cmdColour(colour), status['user']['screen_name'],
162 ansi.cmdReset(), status['text']))
163
164 class VerboseStatusFormatter(object):
165 def __call__(self, status, options):
166 return ("-- %s (%s) on %s\n%s\n" %(
167 status['user']['screen_name'],
168 status['user']['location'],
169 status['created_at'],
170 status['text']))
171
172 class URLStatusFormatter(object):
173 urlmatch = re.compile(r'https?://\S+')
174 def __call__(self, status, options):
175 urls = self.urlmatch.findall(status['text'])
176 return '\n'.join(urls) if urls else ""
177
178
179 class ListsFormatter(object):
180 def __call__(self, list):
181 if list['description']:
182 list_str = "%-30s (%s)" % (list['name'], list['description'])
183 else:
184 list_str = "%-30s" % (list['name'])
185 return "%s\n" % list_str
186
187 class ListsVerboseFormatter(object):
188 def __call__(self, list):
189 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
190 return list_str
191
192 class AnsiListsFormatter(object):
193 def __init__(self):
194 self._colourMap = ansi.ColourMap()
195
196 def __call__(self, list):
197 colour = self._colourMap.colourFor(list['name'])
198 return ("%s%-15s%s %s" %(
199 ansi.cmdColour(colour), list['name'],
200 ansi.cmdReset(), list['description']))
201
202
203 class AdminFormatter(object):
204 def __call__(self, action, user):
205 user_str = "%s (%s)" %(user['screen_name'], user['name'])
206 if action == "follow":
207 return "You are now following %s.\n" %(user_str)
208 else:
209 return "You are no longer following %s.\n" %(user_str)
210
211 class VerboseAdminFormatter(object):
212 def __call__(self, action, user):
213 return("-- %s: %s (%s): %s" % (
214 "Following" if action == "follow" else "Leaving",
215 user['screen_name'],
216 user['name'],
217 user['url']))
218
219 class SearchFormatter(object):
220 def __call__(self, result, options):
221 return("%s%s %s" %(
222 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
223 result['from_user'], result['text']))
224
225 class VerboseSearchFormatter(SearchFormatter):
226 pass #Default to the regular one
227
228 class URLSearchFormatter(object):
229 urlmatch = re.compile(r'https?://\S+')
230 def __call__(self, result, options):
231 urls = self.urlmatch.findall(result['text'])
232 return '\n'.join(urls) if urls else ""
233
234 class AnsiSearchFormatter(object):
235 def __init__(self):
236 self._colourMap = ansi.ColourMap()
237
238 def __call__(self, result, options):
239 colour = self._colourMap.colourFor(result['from_user'])
240 return ("%s%s%s%s %s" %(
241 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
242 ansi.cmdColour(colour), result['from_user'],
243 ansi.cmdReset(), result['text']))
244
245 _term_encoding = None
246 def get_term_encoding():
247 global _term_encoding
248 if not _term_encoding:
249 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
250 if lang[1:]:
251 _term_encoding = lang[1]
252 else:
253 _term_encoding = 'UTF-8'
254 return _term_encoding
255
256 formatters = {}
257 status_formatters = {
258 'default': StatusFormatter,
259 'verbose': VerboseStatusFormatter,
260 'urls': URLStatusFormatter,
261 'ansi': AnsiStatusFormatter
262 }
263 formatters['status'] = status_formatters
264
265 admin_formatters = {
266 'default': AdminFormatter,
267 'verbose': VerboseAdminFormatter,
268 'urls': AdminFormatter,
269 'ansi': AdminFormatter
270 }
271 formatters['admin'] = admin_formatters
272
273 search_formatters = {
274 'default': SearchFormatter,
275 'verbose': VerboseSearchFormatter,
276 'urls': URLSearchFormatter,
277 'ansi': AnsiSearchFormatter
278 }
279 formatters['search'] = search_formatters
280
281 lists_formatters = {
282 'default': ListsFormatter,
283 'verbose': ListsVerboseFormatter,
284 'urls': None,
285 'ansi': AnsiListsFormatter
286 }
287 formatters['lists'] = lists_formatters
288
289 def get_formatter(action_type, options):
290 formatters_dict = formatters.get(action_type)
291 if (not formatters_dict):
292 raise TwitterError(
293 "There was an error finding a class of formatters for your type (%s)"
294 %(action_type))
295 f = formatters_dict.get(options['format'])
296 if (not f):
297 raise TwitterError(
298 "Unknown formatter '%s' for status actions" %(options['format']))
299 return f()
300
301 class Action(object):
302
303 def ask(self, subject='perform this action', careful=False):
304 '''
305 Requests fromt he user using `raw_input` if `subject` should be
306 performed. When `careful`, the default answer is NO, otherwise YES.
307 Returns the user answer in the form `True` or `False`.
308 '''
309 sample = '(y/N)'
310 if not careful:
311 sample = '(Y/n)'
312
313 prompt = 'You really want to %s %s? ' %(subject, sample)
314 try:
315 answer = input(prompt).lower()
316 if careful:
317 return answer in ('yes', 'y')
318 else:
319 return answer not in ('no', 'n')
320 except EOFError:
321 print(file=sys.stderr) # Put Newline since Enter was never pressed
322 # TODO:
323 # Figure out why on OS X the raw_input keeps raising
324 # EOFError and is never able to reset and get more input
325 # Hint: Look at how IPython implements their console
326 default = True
327 if careful:
328 default = False
329 return default
330
331 def __call__(self, twitter, options):
332 action = actions.get(options['action'], NoSuchAction)()
333 try:
334 doAction = lambda : action(twitter, options)
335 if (options['refresh'] and isinstance(action, StatusAction)):
336 while True:
337 doAction()
338 time.sleep(options['refresh_rate'])
339 else:
340 doAction()
341 except KeyboardInterrupt:
342 print('\n[Keyboard Interrupt]', file=sys.stderr)
343 pass
344
345 class NoSuchActionError(Exception):
346 pass
347
348 class NoSuchAction(Action):
349 def __call__(self, twitter, options):
350 raise NoSuchActionError("No such action: %s" %(options['action']))
351
352 def printNicely(string):
353 if sys.stdout.encoding:
354 print(string.encode(sys.stdout.encoding, 'replace'))
355 else:
356 print(string.encode('utf-8'))
357
358 class StatusAction(Action):
359 def __call__(self, twitter, options):
360 statuses = self.getStatuses(twitter, options)
361 sf = get_formatter('status', options)
362 for status in statuses:
363 statusStr = sf(status, options)
364 if statusStr.strip():
365 printNicely(statusStr)
366
367 class SearchAction(Action):
368 def __call__(self, twitter, options):
369 # We need to be pointing at search.twitter.com to work, and it is less
370 # tangly to do it here than in the main()
371 twitter.domain="search.twitter.com"
372 twitter.uriparts=()
373 # We need to bypass the TwitterCall parameter encoding, so we
374 # don't encode the plus sign, so we have to encode it ourselves
375 query_string = "+".join(
376 [quote(term.decode(get_term_encoding()))
377 for term in options['extra_args']])
378
379 results = twitter.search(q=query_string)['results']
380 f = get_formatter('search', options)
381 for result in results:
382 resultStr = f(result, options)
383 if resultStr.strip():
384 printNicely(resultStr)
385
386 class AdminAction(Action):
387 def __call__(self, twitter, options):
388 if not (options['extra_args'] and options['extra_args'][0]):
389 raise TwitterError("You need to specify a user (screen name)")
390 af = get_formatter('admin', options)
391 try:
392 user = self.getUser(twitter, options['extra_args'][0])
393 except TwitterError as e:
394 print("There was a problem following or leaving the specified user.")
395 print("You may be trying to follow a user you are already following;")
396 print("Leaving a user you are not currently following;")
397 print("Or the user may not exist.")
398 print("Sorry.")
399 print()
400 print(e)
401 else:
402 printNicely(af(options['action'], user))
403
404 class ListsAction(StatusAction):
405 def getStatuses(self, twitter, options):
406 if not options['extra_args']:
407 raise TwitterError("Please provide a user to query for lists")
408
409 screen_name = options['extra_args'][0]
410
411 if not options['extra_args'][1:]:
412 lists = twitter.user.lists(user=screen_name)['lists']
413 if not lists:
414 printNicely("This user has no lists.")
415 for list in lists:
416 lf = get_formatter('lists', options)
417 printNicely(lf(list))
418 return []
419 else:
420 return reversed(twitter.user.lists.list.statuses(
421 user=screen_name, list=options['extra_args'][1]))
422
423
424 class MyListsAction(ListsAction):
425 def getStatuses(self, twitter, options):
426 screen_name = twitter.account.verify_credentials()['screen_name']
427 options['extra_args'].insert(0, screen_name)
428 return ListsAction.getStatuses(self, twitter, options)
429
430
431 class FriendsAction(StatusAction):
432 def getStatuses(self, twitter, options):
433 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
434
435 class PublicAction(StatusAction):
436 def getStatuses(self, twitter, options):
437 return reversed(twitter.statuses.public_timeline(count=options["length"]))
438
439 class RepliesAction(StatusAction):
440 def getStatuses(self, twitter, options):
441 return reversed(twitter.statuses.replies(count=options["length"]))
442
443 class FollowAction(AdminAction):
444 def getUser(self, twitter, user):
445 return twitter.friendships.create(id=user)
446
447 class LeaveAction(AdminAction):
448 def getUser(self, twitter, user):
449 return twitter.friendships.destroy(id=user)
450
451 class SetStatusAction(Action):
452 def __call__(self, twitter, options):
453 statusTxt = (" ".join(options['extra_args']).decode(get_term_encoding())
454 if options['extra_args']
455 else str(input("message: ")))
456 status = (statusTxt.encode('utf8', 'replace'))
457 twitter.statuses.update(status=status)
458
459 class TwitterShell(Action):
460
461 def render_prompt(self, prompt):
462 '''Parses the `prompt` string and returns the rendered version'''
463 prompt = prompt.strip("'").replace("\\'","'")
464 for colour in ansi.COLOURS_NAMED:
465 if '[%s]' %(colour) in prompt:
466 prompt = prompt.replace(
467 '[%s]' %(colour), ansi.cmdColourNamed(colour))
468 prompt = prompt.replace('[R]', ansi.cmdReset())
469 return prompt
470
471 def __call__(self, twitter, options):
472 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
473 while True:
474 options['action'] = ""
475 try:
476 args = input(prompt).split()
477 parse_args(args, options)
478 if not options['action']:
479 continue
480 elif options['action'] == 'exit':
481 raise SystemExit(0)
482 elif options['action'] == 'shell':
483 print('Sorry Xzibit does not work here!', file=sys.stderr)
484 continue
485 elif options['action'] == 'help':
486 print('''\ntwitter> `action`\n
487 The Shell Accepts all the command line actions along with:
488
489 exit Leave the twitter shell (^D may also be used)
490
491 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
492 Action()(twitter, options)
493 options['action'] = ''
494 except NoSuchActionError as e:
495 print(e, file=sys.stderr)
496 except KeyboardInterrupt:
497 print('\n[Keyboard Interrupt]', file=sys.stderr)
498 except EOFError:
499 print(file=sys.stderr)
500 leaving = self.ask(subject='Leave')
501 if not leaving:
502 print('Excellent!', file=sys.stderr)
503 else:
504 raise SystemExit(0)
505
506 class HelpAction(Action):
507 def __call__(self, twitter, options):
508 print(__doc__)
509
510 class DoNothingAction(Action):
511 def __call__(self, twitter, options):
512 pass
513
514 actions = {
515 'authorize' : DoNothingAction,
516 'follow' : FollowAction,
517 'friends' : FriendsAction,
518 'list' : ListsAction,
519 'mylist' : MyListsAction,
520 'help' : HelpAction,
521 'leave' : LeaveAction,
522 'public' : PublicAction,
523 'replies' : RepliesAction,
524 'search' : SearchAction,
525 'set' : SetStatusAction,
526 'shell' : TwitterShell,
527 }
528
529 def loadConfig(filename):
530 options = dict(OPTIONS)
531 if os.path.exists(filename):
532 cp = SafeConfigParser()
533 cp.read([filename])
534 for option in ('format', 'prompt'):
535 if cp.has_option('twitter', option):
536 options[option] = cp.get('twitter', option)
537 return options
538
539 def main(args=sys.argv[1:]):
540 arg_options = {}
541 try:
542 parse_args(args, arg_options)
543 except GetoptError as e:
544 print("I can't do that, %s." %(e), file=sys.stderr)
545 print(file=sys.stderr)
546 raise SystemExit(1)
547
548 config_path = os.path.expanduser(
549 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
550 config_options = loadConfig(config_path)
551
552 # Apply the various options in order, the most important applied last.
553 # Defaults first, then what's read from config file, then command-line
554 # arguments.
555 options = dict(OPTIONS)
556 for d in config_options, arg_options:
557 for k,v in list(d.items()):
558 if v: options[k] = v
559
560 if options['refresh'] and options['action'] not in (
561 'friends', 'public', 'replies'):
562 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
563 print("Use 'twitter -h' for help.", file=sys.stderr)
564 return 1
565
566 oauth_filename = os.path.expanduser(options['oauth_filename'])
567
568 if (options['action'] == 'authorize'
569 or not os.path.exists(oauth_filename)):
570 oauth_dance(
571 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
572 options['oauth_filename'])
573
574 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
575
576 twitter = Twitter(
577 auth=OAuth(
578 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
579 secure=options['secure'],
580 api_version='1',
581 domain='api.twitter.com')
582
583 try:
584 Action()(twitter, options)
585 except NoSuchActionError as e:
586 print(e, file=sys.stderr)
587 raise SystemExit(1)
588 except TwitterError as e:
589 print(str(e), file=sys.stderr)
590 print("Use 'twitter -h' for help.", file=sys.stderr)
591 raise SystemExit(1)