]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
reversed multiple twitts so they appear rightside up on the timeline
[z_archive/twitter.git] / twitter / cmdline.py
1 # encoding: utf-8
2 """
3 USAGE:
4
5 twitter [action] [options]
6
7
8 ACTIONS:
9 authorize authorize the command-line tool to interact with Twitter
10 follow follow a user
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
15 tweets from that list
16 mylist get list of your lists; give a list name to get tweets
17 from that list
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
20 object directly
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
26
27
28 OPTIONS:
29
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
42
43 FORMATS for the --format option
44
45 default one line per status
46 verbose multiple lines per status, more verbose status info
47 urls nothing but URLs
48 ansi ansi colour (rainbow mode)
49
50
51 CONFIG FILES
52
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
55 set, like so:
56
57 [twitter]
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
62 home directory.
63 """
64
65 from __future__ import print_function
66
67 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
68 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
69
70 import sys
71 import time
72 from getopt import gnu_getopt as getopt, GetoptError
73 from getpass import getpass
74 import re
75 import os.path
76 import locale
77 import string
78
79 try:
80 from ConfigParser import SafeConfigParser
81 except ImportError:
82 from configparser import ConfigParser as SafeConfigParser
83 import datetime
84 try:
85 from urllib.parse import quote
86 except ImportError:
87 from urllib2 import quote
88 import webbrowser
89
90 from .api import Twitter, TwitterError
91 from .oauth import OAuth, write_token_file, read_token_file
92 from .oauth_dance import oauth_dance
93 from . import ansi
94 from .util import smrt_input, printNicely
95
96 OPTIONS = {
97 'action': 'friends',
98 'refresh': False,
99 'refresh_rate': 600,
100 'format': 'default',
101 'prompt': '[cyan]twitter[R]> ',
102 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
103 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
104 'length': 20,
105 'timestamp': False,
106 'datestamp': False,
107 'extra_args': [],
108 'secure': True,
109 }
110
111 def parse_args(args, options):
112 long_opts = ['help', 'format=', 'refresh', 'oauth=',
113 'refresh-rate=', 'config=', 'length=', 'timestamp',
114 'datestamp', 'no-ssl']
115 short_opts = "e:p:f:h?rR:c:l:td"
116 opts, extra_args = getopt(args, short_opts, long_opts)
117 extra_args = [arg.decode(locale.getpreferredencoding())
118 for arg in extra_args]
119
120 for opt, arg in opts:
121 if opt in ('-f', '--format'):
122 options['format'] = arg
123 elif opt in ('-r', '--refresh'):
124 options['refresh'] = True
125 elif opt in ('-R', '--refresh-rate'):
126 options['refresh_rate'] = int(arg)
127 elif opt in ('-l', '--length'):
128 options["length"] = int(arg)
129 elif opt in ('-t', '--timestamp'):
130 options["timestamp"] = True
131 elif opt in ('-d', '--datestamp'):
132 options["datestamp"] = True
133 elif opt in ('-?', '-h', '--help'):
134 options['action'] = 'help'
135 elif opt in ('-c', '--config'):
136 options['config_filename'] = arg
137 elif opt == '--no-ssl':
138 options['secure'] = False
139 elif opt == '--oauth':
140 options['oauth_filename'] = arg
141
142 if extra_args and not ('action' in options and options['action'] == 'help'):
143 options['action'] = extra_args[0]
144 options['extra_args'] = extra_args[1:]
145
146 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
147 timestamp = options["timestamp"]
148 datestamp = options["datestamp"]
149 t = time.strptime(status['created_at'], format)
150 i_hate_timezones = time.timezone
151 if (time.daylight):
152 i_hate_timezones = time.altzone
153 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
154 seconds=i_hate_timezones)
155 t = dt.timetuple()
156 if timestamp and datestamp:
157 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
158 elif timestamp:
159 return time.strftime("%H:%M:%S ", t)
160 elif datestamp:
161 return time.strftime("%Y-%m-%d ", t)
162 return ""
163
164 class StatusFormatter(object):
165 def __call__(self, status, options):
166 return ("%s%s %s" %(
167 get_time_string(status, options),
168 status['user']['screen_name'], status['text']))
169
170 class AnsiStatusFormatter(object):
171 def __init__(self):
172 self._colourMap = ansi.ColourMap()
173
174 def __call__(self, status, options):
175 colour = self._colourMap.colourFor(status['user']['screen_name'])
176 return ("%s%s%s%s %s" %(
177 get_time_string(status, options),
178 ansi.cmdColour(colour), status['user']['screen_name'],
179 ansi.cmdReset(), status['text']))
180
181 class VerboseStatusFormatter(object):
182 def __call__(self, status, options):
183 return ("-- %s (%s) on %s\n%s\n" %(
184 status['user']['screen_name'],
185 status['user']['location'],
186 status['created_at'],
187 status['text']))
188
189 class URLStatusFormatter(object):
190 urlmatch = re.compile(r'https?://\S+')
191 def __call__(self, status, options):
192 urls = self.urlmatch.findall(status['text'])
193 return '\n'.join(urls) if urls else ""
194
195
196 class ListsFormatter(object):
197 def __call__(self, list):
198 if list['description']:
199 list_str = "%-30s (%s)" % (list['name'], list['description'])
200 else:
201 list_str = "%-30s" % (list['name'])
202 return "%s\n" % list_str
203
204 class ListsVerboseFormatter(object):
205 def __call__(self, list):
206 list_str = "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
207 return list_str
208
209 class AnsiListsFormatter(object):
210 def __init__(self):
211 self._colourMap = ansi.ColourMap()
212
213 def __call__(self, list):
214 colour = self._colourMap.colourFor(list['name'])
215 return ("%s%-15s%s %s" %(
216 ansi.cmdColour(colour), list['name'],
217 ansi.cmdReset(), list['description']))
218
219
220 class AdminFormatter(object):
221 def __call__(self, action, user):
222 user_str = "%s (%s)" %(user['screen_name'], user['name'])
223 if action == "follow":
224 return "You are now following %s.\n" %(user_str)
225 else:
226 return "You are no longer following %s.\n" %(user_str)
227
228 class VerboseAdminFormatter(object):
229 def __call__(self, action, user):
230 return("-- %s: %s (%s): %s" % (
231 "Following" if action == "follow" else "Leaving",
232 user['screen_name'],
233 user['name'],
234 user['url']))
235
236 class SearchFormatter(object):
237 def __call__(self, result, options):
238 return("%s%s %s" %(
239 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
240 result['from_user'], result['text']))
241
242 class VerboseSearchFormatter(SearchFormatter):
243 pass #Default to the regular one
244
245 class URLSearchFormatter(object):
246 urlmatch = re.compile(r'https?://\S+')
247 def __call__(self, result, options):
248 urls = self.urlmatch.findall(result['text'])
249 return '\n'.join(urls) if urls else ""
250
251 class AnsiSearchFormatter(object):
252 def __init__(self):
253 self._colourMap = ansi.ColourMap()
254
255 def __call__(self, result, options):
256 colour = self._colourMap.colourFor(result['from_user'])
257 return ("%s%s%s%s %s" %(
258 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
259 ansi.cmdColour(colour), result['from_user'],
260 ansi.cmdReset(), result['text']))
261
262 _term_encoding = None
263 def get_term_encoding():
264 global _term_encoding
265 if not _term_encoding:
266 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
267 if lang[1:]:
268 _term_encoding = lang[1]
269 else:
270 _term_encoding = 'UTF-8'
271 return _term_encoding
272
273 formatters = {}
274 status_formatters = {
275 'default': StatusFormatter,
276 'verbose': VerboseStatusFormatter,
277 'urls': URLStatusFormatter,
278 'ansi': AnsiStatusFormatter
279 }
280 formatters['status'] = status_formatters
281
282 admin_formatters = {
283 'default': AdminFormatter,
284 'verbose': VerboseAdminFormatter,
285 'urls': AdminFormatter,
286 'ansi': AdminFormatter
287 }
288 formatters['admin'] = admin_formatters
289
290 search_formatters = {
291 'default': SearchFormatter,
292 'verbose': VerboseSearchFormatter,
293 'urls': URLSearchFormatter,
294 'ansi': AnsiSearchFormatter
295 }
296 formatters['search'] = search_formatters
297
298 lists_formatters = {
299 'default': ListsFormatter,
300 'verbose': ListsVerboseFormatter,
301 'urls': None,
302 'ansi': AnsiListsFormatter
303 }
304 formatters['lists'] = lists_formatters
305
306 def get_formatter(action_type, options):
307 formatters_dict = formatters.get(action_type)
308 if (not formatters_dict):
309 raise TwitterError(
310 "There was an error finding a class of formatters for your type (%s)"
311 %(action_type))
312 f = formatters_dict.get(options['format'])
313 if (not f):
314 raise TwitterError(
315 "Unknown formatter '%s' for status actions" %(options['format']))
316 return f()
317
318 class Action(object):
319
320 def ask(self, subject='perform this action', careful=False):
321 '''
322 Requests fromt he user using `raw_input` if `subject` should be
323 performed. When `careful`, the default answer is NO, otherwise YES.
324 Returns the user answer in the form `True` or `False`.
325 '''
326 sample = '(y/N)'
327 if not careful:
328 sample = '(Y/n)'
329
330 prompt = 'You really want to %s %s? ' %(subject, sample)
331 try:
332 answer = input(prompt).lower()
333 if careful:
334 return answer in ('yes', 'y')
335 else:
336 return answer not in ('no', 'n')
337 except EOFError:
338 print(file=sys.stderr) # Put Newline since Enter was never pressed
339 # TODO:
340 # Figure out why on OS X the raw_input keeps raising
341 # EOFError and is never able to reset and get more input
342 # Hint: Look at how IPython implements their console
343 default = True
344 if careful:
345 default = False
346 return default
347
348 def __call__(self, twitter, options):
349 action = actions.get(options['action'], NoSuchAction)()
350 try:
351 doAction = lambda : action(twitter, options)
352 if (options['refresh'] and isinstance(action, StatusAction)):
353 while True:
354 doAction()
355 time.sleep(options['refresh_rate'])
356 else:
357 doAction()
358 except KeyboardInterrupt:
359 print('\n[Keyboard Interrupt]', file=sys.stderr)
360 pass
361
362 class NoSuchActionError(Exception):
363 pass
364
365 class NoSuchAction(Action):
366 def __call__(self, twitter, options):
367 raise NoSuchActionError("No such action: %s" %(options['action']))
368
369 class StatusAction(Action):
370 def __call__(self, twitter, options):
371 statuses = self.getStatuses(twitter, options)
372 sf = get_formatter('status', options)
373 for status in statuses:
374 statusStr = sf(status, options)
375 if statusStr.strip():
376 printNicely(statusStr)
377
378 class SearchAction(Action):
379 def __call__(self, twitter, options):
380 # We need to be pointing at search.twitter.com to work, and it is less
381 # tangly to do it here than in the main()
382 twitter.domain="search.twitter.com"
383 twitter.uriparts=()
384 # We need to bypass the TwitterCall parameter encoding, so we
385 # don't encode the plus sign, so we have to encode it ourselves
386 query_string = "+".join(
387 [quote(term)
388 for term in options['extra_args']])
389
390 results = twitter.search(q=query_string)['results']
391 f = get_formatter('search', options)
392 for result in results:
393 resultStr = f(result, options)
394 if resultStr.strip():
395 printNicely(resultStr)
396
397 class AdminAction(Action):
398 def __call__(self, twitter, options):
399 if not (options['extra_args'] and options['extra_args'][0]):
400 raise TwitterError("You need to specify a user (screen name)")
401 af = get_formatter('admin', options)
402 try:
403 user = self.getUser(twitter, options['extra_args'][0])
404 except TwitterError as e:
405 print("There was a problem following or leaving the specified user.")
406 print("You may be trying to follow a user you are already following;")
407 print("Leaving a user you are not currently following;")
408 print("Or the user may not exist.")
409 print("Sorry.")
410 print()
411 print(e)
412 else:
413 printNicely(af(options['action'], user))
414
415 class ListsAction(StatusAction):
416 def getStatuses(self, twitter, options):
417 if not options['extra_args']:
418 raise TwitterError("Please provide a user to query for lists")
419
420 screen_name = options['extra_args'][0]
421
422 if not options['extra_args'][1:]:
423 lists = twitter.user.lists(user=screen_name)['lists']
424 if not lists:
425 printNicely("This user has no lists.")
426 for list in lists:
427 lf = get_formatter('lists', options)
428 printNicely(lf(list))
429 return []
430 else:
431 return reversed(twitter.user.lists.list.statuses(
432 user=screen_name, list=options['extra_args'][1]))
433
434
435 class MyListsAction(ListsAction):
436 def getStatuses(self, twitter, options):
437 screen_name = twitter.account.verify_credentials()['screen_name']
438 options['extra_args'].insert(0, screen_name)
439 return ListsAction.getStatuses(self, twitter, options)
440
441
442 class FriendsAction(StatusAction):
443 def getStatuses(self, twitter, options):
444 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
445
446 class PublicAction(StatusAction):
447 def getStatuses(self, twitter, options):
448 return reversed(twitter.statuses.public_timeline(count=options["length"]))
449
450 class RepliesAction(StatusAction):
451 def getStatuses(self, twitter, options):
452 return reversed(twitter.statuses.replies(count=options["length"]))
453
454 class FollowAction(AdminAction):
455 def getUser(self, twitter, user):
456 return twitter.friendships.create(id=user)
457
458 class LeaveAction(AdminAction):
459 def getUser(self, twitter, user):
460 return twitter.friendships.destroy(id=user)
461
462 class SetStatusAction(Action):
463 def __call__(self, twitter, options):
464 statusTxt = (" ".join(options['extra_args'])
465 if options['extra_args']
466 else str(input("message: ")))
467 replies = []
468 ptr = re.compile("@[\w_]+")
469 while statusTxt:
470 s = ptr.match(statusTxt)
471 if s and s.start() == 0:
472 replies.append(statusTxt[s.start():s.end()])
473 statusTxt = statusTxt[s.end()+1:]
474 else:
475 break
476 replies = " ".join(replies)
477 if len(replies) >= 140:
478 # just go back
479 statusTxt = replies
480 replies = ""
481
482 splitted = []
483 while statusTxt:
484 limit = 140 - len(replies)
485 if len(statusTxt) > limit:
486 end = string.rfind(statusTxt, ' ', 0, limit)
487 else:
488 end = limit
489 splitted.append(" ".join((replies,statusTxt[:end])))
490 statusTxt = statusTxt[end:]
491
492 splitted.reverse()
493 for status in splitted:
494 twitter.statuses.update(status=status)
495
496 class TwitterShell(Action):
497
498 def render_prompt(self, prompt):
499 '''Parses the `prompt` string and returns the rendered version'''
500 prompt = prompt.strip("'").replace("\\'","'")
501 for colour in ansi.COLOURS_NAMED:
502 if '[%s]' %(colour) in prompt:
503 prompt = prompt.replace(
504 '[%s]' %(colour), ansi.cmdColourNamed(colour))
505 prompt = prompt.replace('[R]', ansi.cmdReset())
506 return prompt
507
508 def __call__(self, twitter, options):
509 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
510 while True:
511 options['action'] = ""
512 try:
513 args = input(prompt).split()
514 parse_args(args, options)
515 if not options['action']:
516 continue
517 elif options['action'] == 'exit':
518 raise SystemExit(0)
519 elif options['action'] == 'shell':
520 print('Sorry Xzibit does not work here!', file=sys.stderr)
521 continue
522 elif options['action'] == 'help':
523 print('''\ntwitter> `action`\n
524 The Shell Accepts all the command line actions along with:
525
526 exit Leave the twitter shell (^D may also be used)
527
528 Full CMD Line help is appended below for your convinience.''', file=sys.stderr)
529 Action()(twitter, options)
530 options['action'] = ''
531 except NoSuchActionError as e:
532 print(e, file=sys.stderr)
533 except KeyboardInterrupt:
534 print('\n[Keyboard Interrupt]', file=sys.stderr)
535 except EOFError:
536 print(file=sys.stderr)
537 leaving = self.ask(subject='Leave')
538 if not leaving:
539 print('Excellent!', file=sys.stderr)
540 else:
541 raise SystemExit(0)
542
543 class PythonPromptAction(Action):
544 def __call__(self, twitter, options):
545 try:
546 while True:
547 smrt_input(globals(), locals())
548 except EOFError:
549 pass
550
551 class HelpAction(Action):
552 def __call__(self, twitter, options):
553 print(__doc__)
554
555 class DoNothingAction(Action):
556 def __call__(self, twitter, options):
557 pass
558
559 class RateLimitStatus(Action):
560 def __call__(self, twitter, options):
561 rate = twitter.account.rate_limit_status()
562 print("Remaining API requests: %s / %s (hourly limit)" % (rate['remaining_hits'], rate['hourly_limit']))
563 print("Next reset in %ss (%s)" % (int(rate['reset_time_in_seconds']-time.time()),
564 time.asctime(time.localtime(rate['reset_time_in_seconds']))))
565
566 actions = {
567 'authorize' : DoNothingAction,
568 'follow' : FollowAction,
569 'friends' : FriendsAction,
570 'list' : ListsAction,
571 'mylist' : MyListsAction,
572 'help' : HelpAction,
573 'leave' : LeaveAction,
574 'public' : PublicAction,
575 'pyprompt' : PythonPromptAction,
576 'replies' : RepliesAction,
577 'search' : SearchAction,
578 'set' : SetStatusAction,
579 'shell' : TwitterShell,
580 'rate' : RateLimitStatus,
581 }
582
583 def loadConfig(filename):
584 options = dict(OPTIONS)
585 if os.path.exists(filename):
586 cp = SafeConfigParser()
587 cp.read([filename])
588 for option in ('format', 'prompt'):
589 if cp.has_option('twitter', option):
590 options[option] = cp.get('twitter', option)
591 return options
592
593 def main(args=sys.argv[1:]):
594 arg_options = {}
595 try:
596 parse_args(args, arg_options)
597 except GetoptError as e:
598 print("I can't do that, %s." %(e), file=sys.stderr)
599 print(file=sys.stderr)
600 raise SystemExit(1)
601
602 config_path = os.path.expanduser(
603 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
604 config_options = loadConfig(config_path)
605
606 # Apply the various options in order, the most important applied last.
607 # Defaults first, then what's read from config file, then command-line
608 # arguments.
609 options = dict(OPTIONS)
610 for d in config_options, arg_options:
611 for k,v in list(d.items()):
612 if v: options[k] = v
613
614 if options['refresh'] and options['action'] not in (
615 'friends', 'public', 'replies'):
616 print("You can only refresh the friends, public, or replies actions.", file=sys.stderr)
617 print("Use 'twitter -h' for help.", file=sys.stderr)
618 return 1
619
620 oauth_filename = os.path.expanduser(options['oauth_filename'])
621
622 if (options['action'] == 'authorize'
623 or not os.path.exists(oauth_filename)):
624 oauth_dance(
625 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
626 options['oauth_filename'])
627
628 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
629
630 twitter = Twitter(
631 auth=OAuth(
632 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
633 secure=options['secure'],
634 api_version='1',
635 domain='api.twitter.com')
636
637 try:
638 Action()(twitter, options)
639 except NoSuchActionError as e:
640 print(e, file=sys.stderr)
641 raise SystemExit(1)
642 except TwitterError as e:
643 print(str(e), file=sys.stderr)
644 print("Use 'twitter -h' for help.", file=sys.stderr)
645 raise SystemExit(1)