]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Make twitter-log work with py 3.2
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25
26
27 OPTIONS:
28
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
41
42 FORMATS for the --format option
43
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 urls nothing but URLs
47 ansi ansi colour (rainbow mode)
48
49
50 CONFIG FILES
51
52 The config file should be placed in your home directory and be named .twitter.
53 It must contain a [twitter] header, and all the desired options you wish to
54 set, like so:
55
56 [twitter]
57 format: <desired_default_format_for_output>
58 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
59
60 OAuth authentication tokens are stored in the file .twitter_oauth in your
61 home directory.
62 """
63
64 from __future__ import print_function
65
66 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
67 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
68
69 import sys
70 import time
71 from getopt import gnu_getopt as getopt, GetoptError
72 from getpass import getpass
73 import re
74 import os.path
75 try:
76 from ConfigParser import SafeConfigParser
77 except ImportError:
78 from configparser import ConfigParser as SafeConfigParser
79 import datetime
80 try:
81 from urllib.parse import quote
82 except ImportError:
83 from urllib2 import quote
84 import webbrowser
85
86 from .api import Twitter, TwitterError
87 from .oauth import OAuth, write_token_file, read_token_file
88 from .oauth_dance import oauth_dance
89 from . import ansi
90 from .util import smrt_input, printNicely
91
92 OPTIONS = {
93 'action': 'friends',
94 'refresh': False,
95 'refresh_rate': 600,
96 'format': 'default',
97 'prompt': '[cyan]twitter[R]> ',
98 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
99 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
100 'length': 20,
101 'timestamp': False,
102 'datestamp': False,
103 'extra_args': [],
104 'secure': True,
105 }
106
107 def parse_args(args, options):
108 long_opts = ['help', 'format=', 'refresh', 'oauth=',
109 'refresh-rate=', 'config=', 'length=', 'timestamp',
110 'datestamp', 'no-ssl']
111 short_opts = "e:p:f:h?rR:c:l:td"
112 opts, extra_args = getopt(args, short_opts, long_opts)
113
114 for opt, arg in opts:
115 if opt in ('-f', '--format'):
116 options['format'] = arg
117 elif opt in ('-r', '--refresh'):
118 options['refresh'] = True
119 elif opt in ('-R', '--refresh-rate'):
120 options['refresh_rate'] = int(arg)
121 elif opt in ('-l', '--length'):
122 options["length"] = int(arg)
123 elif opt in ('-t', '--timestamp'):
124 options["timestamp"] = True
125 elif opt in ('-d', '--datestamp'):
126 options["datestamp"] = True
127 elif opt in ('-?', '-h', '--help'):
128 options['action'] = 'help'
129 elif opt in ('-c', '--config'):
130 options['config_filename'] = arg
131 elif opt == '--no-ssl':
132 options['secure'] = False
133 elif opt == '--oauth':
134 options['oauth_filename'] = arg
135
136 if extra_args and not ('action' in options and options['action'] == 'help'):
137 options['action'] = extra_args[0]
138 options['extra_args'] = extra_args[1:]
139
140 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
141 timestamp = options["timestamp"]
142 datestamp = options["datestamp"]
143 t = time.strptime(status['created_at'], format)
144 i_hate_timezones = time.timezone
145 if (time.daylight):
146 i_hate_timezones = time.altzone
147 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
148 seconds=i_hate_timezones)
149 t = dt.timetuple()
150 if timestamp and datestamp:
151 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
152 elif timestamp:
153 return time.strftime("%H:%M:%S ", t)
154 elif datestamp:
155 return time.strftime("%Y-%m-%d ", t)
156 return ""
157
158 class StatusFormatter(object):
159 def __call__(self, status, options):
160 return ("%s%s %s" %(
161 get_time_string(status, options),
162 status['user']['screen_name'], status['text']))
163
164 class AnsiStatusFormatter(object):
165 def __init__(self):
166 self._colourMap = ansi.ColourMap()
167
168 def __call__(self, status, options):
169 colour = self._colourMap.colourFor(status['user']['screen_name'])
170 return ("%s%s%s%s %s" %(
171 get_time_string(status, options),
172 ansi.cmdColour(colour), status['user']['screen_name'],
173 ansi.cmdReset(), status['text']))
174
175 class VerboseStatusFormatter(object):
176 def __call__(self, status, options):
177 return ("-- %s (%s) on %s\n%s\n" %(
178 status['user']['screen_name'],
179 status['user']['location'],
180 status['created_at'],
181 status['text']))
182
183 class URLStatusFormatter(object):
184 urlmatch = re.compile(r'https?://\S+')
185 def __call__(self, status, options):
186 urls = self.urlmatch.findall(status['text'])
187 return '\n'.join(urls) if urls else ""
188
189
190 class ListsFormatter(object):
191 def __call__(self, list):
192 if list['description']:
193 list_str = "%-30s (%s)" % (list['name'], list['description'])
194 else:
195 list_str = "%-30s" % (list['name'])
196 return "%s\n" % list_str
197
198 class ListsVerboseFormatter(object):
199 def __call__(self, list):
200 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
201 return list_str
202
203 class AnsiListsFormatter(object):
204 def __init__(self):
205 self._colourMap = ansi.ColourMap()
206
207 def __call__(self, list):
208 colour = self._colourMap.colourFor(list['name'])
209 return ("%s%-15s%s %s" %(
210 ansi.cmdColour(colour), list['name'],
211 ansi.cmdReset(), list['description']))
212
213
214 class AdminFormatter(object):
215 def __call__(self, action, user):
216 user_str = "%s (%s)" %(user['screen_name'], user['name'])
217 if action == "follow":
218 return "You are now following %s.\n" %(user_str)
219 else:
220 return "You are no longer following %s.\n" %(user_str)
221
222 class VerboseAdminFormatter(object):
223 def __call__(self, action, user):
224 return("-- %s: %s (%s): %s" % (
225 "Following" if action == "follow" else "Leaving",
226 user['screen_name'],
227 user['name'],
228 user['url']))
229
230 class SearchFormatter(object):
231 def __call__(self, result, options):
232 return("%s%s %s" %(
233 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
234 result['from_user'], result['text']))
235
236 class VerboseSearchFormatter(SearchFormatter):
237 pass #Default to the regular one
238
239 class URLSearchFormatter(object):
240 urlmatch = re.compile(r'https?://\S+')
241 def __call__(self, result, options):
242 urls = self.urlmatch.findall(result['text'])
243 return '\n'.join(urls) if urls else ""
244
245 class AnsiSearchFormatter(object):
246 def __init__(self):
247 self._colourMap = ansi.ColourMap()
248
249 def __call__(self, result, options):
250 colour = self._colourMap.colourFor(result['from_user'])
251 return ("%s%s%s%s %s" %(
252 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
253 ansi.cmdColour(colour), result['from_user'],
254 ansi.cmdReset(), result['text']))
255
256 _term_encoding = None
257 def get_term_encoding():
258 global _term_encoding
259 if not _term_encoding:
260 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
261 if lang[1:]:
262 _term_encoding = lang[1]
263 else:
264 _term_encoding = 'UTF-8'
265 return _term_encoding
266
267 formatters = {}
268 status_formatters = {
269 'default': StatusFormatter,
270 'verbose': VerboseStatusFormatter,
271 'urls': URLStatusFormatter,
272 'ansi': AnsiStatusFormatter
273 }
274 formatters['status'] = status_formatters
275
276 admin_formatters = {
277 'default': AdminFormatter,
278 'verbose': VerboseAdminFormatter,
279 'urls': AdminFormatter,
280 'ansi': AdminFormatter
281 }
282 formatters['admin'] = admin_formatters
283
284 search_formatters = {
285 'default': SearchFormatter,
286 'verbose': VerboseSearchFormatter,
287 'urls': URLSearchFormatter,
288 'ansi': AnsiSearchFormatter
289 }
290 formatters['search'] = search_formatters
291
292 lists_formatters = {
293 'default': ListsFormatter,
294 'verbose': ListsVerboseFormatter,
295 'urls': None,
296 'ansi': AnsiListsFormatter
297 }
298 formatters['lists'] = lists_formatters
299
300 def get_formatter(action_type, options):
301 formatters_dict = formatters.get(action_type)
302 if (not formatters_dict):
303 raise TwitterError(
304 "There was an error finding a class of formatters for your type (%s)"
305 %(action_type))
306 f = formatters_dict.get(options['format'])
307 if (not f):
308 raise TwitterError(
309 "Unknown formatter '%s' for status actions" %(options['format']))
310 return f()
311
312 class Action(object):
313
314 def ask(self, subject='perform this action', careful=False):
315 '''
316 Requests fromt he user using `raw_input` if `subject` should be
317 performed. When `careful`, the default answer is NO, otherwise YES.
318 Returns the user answer in the form `True` or `False`.
319 '''
320 sample = '(y/N)'
321 if not careful:
322 sample = '(Y/n)'
323
324 prompt = 'You really want to %s %s? ' %(subject, sample)
325 try:
326 answer = input(prompt).lower()
327 if careful:
328 return answer in ('yes', 'y')
329 else:
330 return answer not in ('no', 'n')
331 except EOFError:
332 print(file=sys.stderr) # Put Newline since Enter was never pressed
333 # TODO:
334 # Figure out why on OS X the raw_input keeps raising
335 # EOFError and is never able to reset and get more input
336 # Hint: Look at how IPython implements their console
337 default = True
338 if careful:
339 default = False
340 return default
341
342 def __call__(self, twitter, options):
343 action = actions.get(options['action'], NoSuchAction)()
344 try:
345 doAction = lambda : action(twitter, options)
346 if (options['refresh'] and isinstance(action, StatusAction)):
347 while True:
348 doAction()
349 time.sleep(options['refresh_rate'])
350 else:
351 doAction()
352 except KeyboardInterrupt:
353 print('\n[Keyboard Interrupt]', file=sys.stderr)
354 pass
355
356 class NoSuchActionError(Exception):
357 pass
358
359 class NoSuchAction(Action):
360 def __call__(self, twitter, options):
361 raise NoSuchActionError("No such action: %s" %(options['action']))
362
363 class StatusAction(Action):
364 def __call__(self, twitter, options):
365 statuses = self.getStatuses(twitter, options)
366 sf = get_formatter('status', options)
367 for status in statuses:
368 statusStr = sf(status, options)
369 if statusStr.strip():
370 printNicely(statusStr)
371
372 class SearchAction(Action):
373 def __call__(self, twitter, options):
374 # We need to be pointing at search.twitter.com to work, and it is less
375 # tangly to do it here than in the main()
376 twitter.domain="search.twitter.com"
377 twitter.uriparts=()
378 # We need to bypass the TwitterCall parameter encoding, so we
379 # don't encode the plus sign, so we have to encode it ourselves
380 query_string = "+".join(
381 [quote(term)
382 for term in options['extra_args']])
383
384 results = twitter.search(q=query_string)['results']
385 f = get_formatter('search', options)
386 for result in results:
387 resultStr = f(result, options)
388 if resultStr.strip():
389 printNicely(resultStr)
390
391 class AdminAction(Action):
392 def __call__(self, twitter, options):
393 if not (options['extra_args'] and options['extra_args'][0]):
394 raise TwitterError("You need to specify a user (screen name)")
395 af = get_formatter('admin', options)
396 try:
397 user = self.getUser(twitter, options['extra_args'][0])
398 except TwitterError as e:
399 print("There was a problem following or leaving the specified user.")
400 print("You may be trying to follow a user you are already following;")
401 print("Leaving a user you are not currently following;")
402 print("Or the user may not exist.")
403 print("Sorry.")
404 print()
405 print(e)
406 else:
407 printNicely(af(options['action'], user))
408
409 class ListsAction(StatusAction):
410 def getStatuses(self, twitter, options):
411 if not options['extra_args']:
412 raise TwitterError("Please provide a user to query for lists")
413
414 screen_name = options['extra_args'][0]
415
416 if not options['extra_args'][1:]:
417 lists = twitter.user.lists(user=screen_name)['lists']
418 if not lists:
419 printNicely("This user has no lists.")
420 for list in lists:
421 lf = get_formatter('lists', options)
422 printNicely(lf(list))
423 return []
424 else:
425 return reversed(twitter.user.lists.list.statuses(
426 user=screen_name, list=options['extra_args'][1]))
427
428
429 class MyListsAction(ListsAction):
430 def getStatuses(self, twitter, options):
431 screen_name = twitter.account.verify_credentials()['screen_name']
432 options['extra_args'].insert(0, screen_name)
433 return ListsAction.getStatuses(self, twitter, options)
434
435
436 class FriendsAction(StatusAction):
437 def getStatuses(self, twitter, options):
438 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
439
440 class PublicAction(StatusAction):
441 def getStatuses(self, twitter, options):
442 return reversed(twitter.statuses.public_timeline(count=options["length"]))
443
444 class RepliesAction(StatusAction):
445 def getStatuses(self, twitter, options):
446 return reversed(twitter.statuses.replies(count=options["length"]))
447
448 class FollowAction(AdminAction):
449 def getUser(self, twitter, user):
450 return twitter.friendships.create(id=user)
451
452 class LeaveAction(AdminAction):
453 def getUser(self, twitter, user):
454 return twitter.friendships.destroy(id=user)
455
456 class SetStatusAction(Action):
457 def __call__(self, twitter, options):
458 statusTxt = (" ".join(options['extra_args'])
459 if options['extra_args']
460 else str(input("message: ")))
461 twitter.statuses.update(status=statusTxt)
462
463 class TwitterShell(Action):
464
465 def render_prompt(self, prompt):
466 '''Parses the `prompt` string and returns the rendered version'''
467 prompt = prompt.strip("'").replace("\\'","'")
468 for colour in ansi.COLOURS_NAMED:
469 if '[%s]' %(colour) in prompt:
470 prompt = prompt.replace(
471 '[%s]' %(colour), ansi.cmdColourNamed(colour))
472 prompt = prompt.replace('[R]', ansi.cmdReset())
473 return prompt
474
475 def __call__(self, twitter, options):
476 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
477 while True:
478 options['action'] = ""
479 try:
480 args = input(prompt).split()
481 parse_args(args, options)
482 if not options['action']:
483 continue
484 elif options['action'] == 'exit':
485 raise SystemExit(0)
486 elif options['action'] == 'shell':
487 print('Sorry Xzibit does not work here!', file=sys.stderr)
488 continue
489 elif options['action'] == 'help':
490 print('''\ntwitter> `action`\n
491 The Shell Accepts all the command line actions along with:
492
493 exit Leave the twitter shell (^D may also be used)
494
495 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
496 Action()(twitter, options)
497 options['action'] = ''
498 except NoSuchActionError as e:
499 print(e, file=sys.stderr)
500 except KeyboardInterrupt:
501 print('\n[Keyboard Interrupt]', file=sys.stderr)
502 except EOFError:
503 print(file=sys.stderr)
504 leaving = self.ask(subject='Leave')
505 if not leaving:
506 print('Excellent!', file=sys.stderr)
507 else:
508 raise SystemExit(0)
509
510 class PythonPromptAction(Action):
511 def __call__(self, twitter, options):
512 try:
513 while True:
514 smrt_input(globals(), locals())
515 except EOFError:
516 pass
517
518 class HelpAction(Action):
519 def __call__(self, twitter, options):
520 print(__doc__)
521
522 class DoNothingAction(Action):
523 def __call__(self, twitter, options):
524 pass
525
526 actions = {
527 'authorize' : DoNothingAction,
528 'follow' : FollowAction,
529 'friends' : FriendsAction,
530 'list' : ListsAction,
531 'mylist' : MyListsAction,
532 'help' : HelpAction,
533 'leave' : LeaveAction,
534 'public' : PublicAction,
535 'pyprompt' : PythonPromptAction,
536 'replies' : RepliesAction,
537 'search' : SearchAction,
538 'set' : SetStatusAction,
539 'shell' : TwitterShell,
540 }
541
542 def loadConfig(filename):
543 options = dict(OPTIONS)
544 if os.path.exists(filename):
545 cp = SafeConfigParser()
546 cp.read([filename])
547 for option in ('format', 'prompt'):
548 if cp.has_option('twitter', option):
549 options[option] = cp.get('twitter', option)
550 return options
551
552 def main(args=sys.argv[1:]):
553 arg_options = {}
554 try:
555 parse_args(args, arg_options)
556 except GetoptError as e:
557 print("I can't do that, %s." %(e), file=sys.stderr)
558 print(file=sys.stderr)
559 raise SystemExit(1)
560
561 config_path = os.path.expanduser(
562 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
563 config_options = loadConfig(config_path)
564
565 # Apply the various options in order, the most important applied last.
566 # Defaults first, then what's read from config file, then command-line
567 # arguments.
568 options = dict(OPTIONS)
569 for d in config_options, arg_options:
570 for k,v in list(d.items()):
571 if v: options[k] = v
572
573 if options['refresh'] and options['action'] not in (
574 'friends', 'public', 'replies'):
575 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
576 print("Use 'twitter -h' for help.", file=sys.stderr)
577 return 1
578
579 oauth_filename = os.path.expanduser(options['oauth_filename'])
580
581 if (options['action'] == 'authorize'
582 or not os.path.exists(oauth_filename)):
583 oauth_dance(
584 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
585 options['oauth_filename'])
586
587 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
588
589 twitter = Twitter(
590 auth=OAuth(
591 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
592 secure=options['secure'],
593 api_version='1',
594 domain='api.twitter.com')
595
596 try:
597 Action()(twitter, options)
598 except NoSuchActionError as e:
599 print(e, file=sys.stderr)
600 raise SystemExit(1)
601 except TwitterError as e:
602 print(str(e), file=sys.stderr)
603 print("Use 'twitter -h' for help.", file=sys.stderr)
604 raise SystemExit(1)