]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Remove references to email and password.
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6
7 ACTIONS:
8 authorize authorize the command-line tool to interact with Twitter
9 follow add the specified user to your follow list
10 friends get latest tweets from your friends (default action)
11 help print this help text that you are currently reading
12 leave remove the specified user from your following list
13 public get latest public tweets
14 replies get latest replies
15 search searchtwitter (Beware: octothorpe, escape it)
16 set set your twitter status
17 shell login the twitter shell
18
19
20 OPTIONS:
21
22 -r --refresh run this command forever, polling every once
23 in a while (default: every 5 minutes)
24 -R --refresh-rate <rate> set the refresh rate (in seconds)
25 -f --format <format> specify the output format for status updates
26 -c --config <filename> read username and password from given config
27 file (default ~/.twitter)
28 -l --length <count> specify number of status updates shown
29 (default: 20, max: 200)
30 -t --timestamp show time before status lines
31 -d --datestamp shoe date before status lines
32 --no-ssl use HTTP instead of more secure HTTPS
33
34
35 FORMATS for the --format option
36
37 default one line per status
38 verbose multiple lines per status, more verbose status info
39 urls nothing but URLs
40 ansi ansi colour (rainbow mode)
41
42
43 CONFIG FILES
44
45 The config file should be placed in your home directory and be named .twitter.
46 It must contain a [twitter] header, and all the desired options you wish to
47 set, like so:
48
49 [twitter]
50 format: <desired_default_format_for_output>
51 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
52
53 OAuth authentication tokens are stored in the file .twitter_oauth in your
54 home directory.
55
56 """
57
58 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
59 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
60
61 import sys
62 import time
63 from getopt import gnu_getopt as getopt, GetoptError
64 from getpass import getpass
65 import re
66 import os.path
67 from ConfigParser import SafeConfigParser
68 import datetime
69 from urllib import quote
70 import webbrowser
71
72 from api import Twitter, TwitterError
73 from oauth import OAuth
74 import ansi
75
76 # Please don't change this, it was provided by the fine folks at Twitter.
77 # If you change it, it will not work.
78 AGENT_STR = "twittercommandlinetoolpy"
79
80 OPTIONS = {
81 'action': 'friends',
82 'refresh': False,
83 'refresh_rate': 600,
84 'format': 'default',
85 'prompt': '[cyan]twitter[R]> ',
86 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
87 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
88 'length': 20,
89 'timestamp': False,
90 'datestamp': False,
91 'extra_args': [],
92 'secure': True,
93 }
94
95 def parse_args(args, options):
96 long_opts = ['help', 'format', 'refresh',
97 'refresh-rate', 'config', 'length', 'timestamp',
98 'datestamp', 'no-ssl']
99 short_opts = "e:p:f:h?rR:c:l:td"
100 opts, extra_args = getopt(args, short_opts, long_opts)
101
102 for opt, arg in opts:
103 if opt in ('-f', '--format'):
104 options['format'] = arg
105 elif opt in ('-r', '--refresh'):
106 options['refresh'] = True
107 elif opt in ('-R', '--refresh-rate'):
108 options['refresh_rate'] = int(arg)
109 elif opt in ('-l', '--length'):
110 options["length"] = int(arg)
111 elif opt in ('-t', '--timestamp'):
112 options["timestamp"] = True
113 elif opt in ('-d', '--datestamp'):
114 options["datestamp"] = True
115 elif opt in ('-?', '-h', '--help'):
116 options['action'] = 'help'
117 elif opt in ('-c', '--config'):
118 options['config_filename'] = arg
119 elif opt == '--no-ssl':
120 options['secure'] = False
121
122 if extra_args and not ('action' in options and options['action'] == 'help'):
123 options['action'] = extra_args[0]
124 options['extra_args'] = extra_args[1:]
125
126 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
127 timestamp = options["timestamp"]
128 datestamp = options["datestamp"]
129 t = time.strptime(status['created_at'], format)
130 i_hate_timezones = time.timezone
131 if (time.daylight):
132 i_hate_timezones = time.altzone
133 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
134 seconds=i_hate_timezones)
135 t = dt.timetuple()
136 if timestamp and datestamp:
137 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
138 elif timestamp:
139 return time.strftime("%H:%M:%S ", t)
140 elif datestamp:
141 return time.strftime("%Y-%m-%d ", t)
142 return ""
143
144 class StatusFormatter(object):
145 def __call__(self, status, options):
146 return (u"%s%s %s" %(
147 get_time_string(status, options),
148 status['user']['screen_name'], status['text']))
149
150 class AnsiStatusFormatter(object):
151 def __init__(self):
152 self._colourMap = ansi.ColourMap()
153
154 def __call__(self, status, options):
155 colour = self._colourMap.colourFor(status['user']['screen_name'])
156 return (u"%s%s%s%s %s" %(
157 get_time_string(status, options),
158 ansi.cmdColour(colour), status['user']['screen_name'],
159 ansi.cmdReset(), status['text']))
160
161 class VerboseStatusFormatter(object):
162 def __call__(self, status, options):
163 return (u"-- %s (%s) on %s\n%s\n" %(
164 status['user']['screen_name'],
165 status['user']['location'],
166 status['created_at'],
167 status['text']))
168
169 class URLStatusFormatter(object):
170 urlmatch = re.compile(r'https?://\S+')
171 def __call__(self, status, options):
172 urls = self.urlmatch.findall(status['text'])
173 return u'\n'.join(urls) if urls else ""
174
175 class AdminFormatter(object):
176 def __call__(self, action, user):
177 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
178 if action == "follow":
179 return u"You are now following %s.\n" %(user_str)
180 else:
181 return u"You are no longer following %s.\n" %(user_str)
182
183 class VerboseAdminFormatter(object):
184 def __call__(self, action, user):
185 return(u"-- %s: %s (%s): %s" % (
186 "Following" if action == "follow" else "Leaving",
187 user['screen_name'],
188 user['name'],
189 user['url']))
190
191 class SearchFormatter(object):
192 def __call__(self, result, options):
193 return(u"%s%s %s" %(
194 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
195 result['from_user'], result['text']))
196
197 class VerboseSearchFormatter(SearchFormatter):
198 pass #Default to the regular one
199
200 class URLSearchFormatter(object):
201 urlmatch = re.compile(r'https?://\S+')
202 def __call__(self, result, options):
203 urls = self.urlmatch.findall(result['text'])
204 return u'\n'.join(urls) if urls else ""
205
206 class AnsiSearchFormatter(object):
207 def __init__(self):
208 self._colourMap = ansi.ColourMap()
209
210 def __call__(self, result, options):
211 colour = self._colourMap.colourFor(result['from_user'])
212 return (u"%s%s%s%s %s" %(
213 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
214 ansi.cmdColour(colour), result['from_user'],
215 ansi.cmdReset(), result['text']))
216
217 formatters = {}
218 status_formatters = {
219 'default': StatusFormatter,
220 'verbose': VerboseStatusFormatter,
221 'urls': URLStatusFormatter,
222 'ansi': AnsiStatusFormatter
223 }
224 formatters['status'] = status_formatters
225
226 admin_formatters = {
227 'default': AdminFormatter,
228 'verbose': VerboseAdminFormatter,
229 'urls': AdminFormatter,
230 'ansi': AdminFormatter
231 }
232 formatters['admin'] = admin_formatters
233
234 search_formatters = {
235 'default': SearchFormatter,
236 'verbose': VerboseSearchFormatter,
237 'urls': URLSearchFormatter,
238 'ansi': AnsiSearchFormatter
239 }
240 formatters['search'] = search_formatters
241
242 def get_formatter(action_type, options):
243 formatters_dict = formatters.get(action_type)
244 if (not formatters_dict):
245 raise TwitterError(
246 "There was an error finding a class of formatters for your type (%s)"
247 %(action_type))
248 f = formatters_dict.get(options['format'])
249 if (not f):
250 raise TwitterError(
251 "Unknown formatter '%s' for status actions" %(options['format']))
252 return f()
253
254 class Action(object):
255
256 def ask(self, subject='perform this action', careful=False):
257 '''
258 Requests fromt he user using `raw_input` if `subject` should be
259 performed. When `careful`, the default answer is NO, otherwise YES.
260 Returns the user answer in the form `True` or `False`.
261 '''
262 sample = '(y/N)'
263 if not careful:
264 sample = '(Y/n)'
265
266 prompt = 'You really want to %s %s? ' %(subject, sample)
267 try:
268 answer = raw_input(prompt).lower()
269 if careful:
270 return answer in ('yes', 'y')
271 else:
272 return answer not in ('no', 'n')
273 except EOFError:
274 print >>sys.stderr # Put Newline since Enter was never pressed
275 # TODO:
276 # Figure out why on OS X the raw_input keeps raising
277 # EOFError and is never able to reset and get more input
278 # Hint: Look at how IPython implements their console
279 default = True
280 if careful:
281 default = False
282 return default
283
284 def __call__(self, twitter, options):
285 action = actions.get(options['action'], NoSuchAction)()
286 try:
287 doAction = lambda : action(twitter, options)
288 if (options['refresh'] and isinstance(action, StatusAction)):
289 while True:
290 doAction()
291 time.sleep(options['refresh_rate'])
292 else:
293 doAction()
294 except KeyboardInterrupt:
295 print >>sys.stderr, '\n[Keyboard Interrupt]'
296 pass
297
298 class NoSuchActionError(Exception):
299 pass
300
301 class NoSuchAction(Action):
302 def __call__(self, twitter, options):
303 raise NoSuchActionError("No such action: %s" %(options['action']))
304
305 def printNicely(string):
306 if sys.stdout.encoding:
307 print string.encode(sys.stdout.encoding, 'replace')
308 else:
309 print string.encode('utf-8')
310
311 class StatusAction(Action):
312 def __call__(self, twitter, options):
313 statuses = self.getStatuses(twitter, options)
314 sf = get_formatter('status', options)
315 for status in statuses:
316 statusStr = sf(status, options)
317 if statusStr.strip():
318 printNicely(statusStr)
319
320 class SearchAction(Action):
321 def __call__(self, twitter, options):
322 # We need to be pointing at search.twitter.com to work, and it is less
323 # tangly to do it here than in the main()
324 twitter.domain="search.twitter.com"
325 # We need to bypass the TwitterCall parameter encoding, so we
326 # don't encode the plus sign, so we have to encode it ourselves
327 query_string = "+".join([quote(term) for term in options['extra_args']])
328 twitter.encoded_args = "q=%s" %(query_string)
329
330 results = twitter.search()['results']
331 f = get_formatter('search', options)
332 for result in results:
333 resultStr = f(result, options)
334 if resultStr.strip():
335 printNicely(resultStr)
336
337 class AdminAction(Action):
338 def __call__(self, twitter, options):
339 if not (options['extra_args'] and options['extra_args'][0]):
340 raise TwitterError("You need to specify a user (screen name)")
341 af = get_formatter('admin', options)
342 try:
343 user = self.getUser(twitter, options['extra_args'][0])
344 except TwitterError, e:
345 print "There was a problem following or leaving the specified user."
346 print "You may be trying to follow a user you are already following;"
347 print "Leaving a user you are not currently following;"
348 print "Or the user may not exist."
349 print "Sorry."
350 print
351 print e
352 else:
353 printNicely(af(options['action'], user))
354
355 class FriendsAction(StatusAction):
356 def getStatuses(self, twitter, options):
357 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
358
359 class PublicAction(StatusAction):
360 def getStatuses(self, twitter, options):
361 return reversed(twitter.statuses.public_timeline(count=options["length"]))
362
363 class RepliesAction(StatusAction):
364 def getStatuses(self, twitter, options):
365 return reversed(twitter.statuses.replies(count=options["length"]))
366
367 class FollowAction(AdminAction):
368 def getUser(self, twitter, user):
369 return twitter.friendships.create(id=user)
370
371 class LeaveAction(AdminAction):
372 def getUser(self, twitter, user):
373 return twitter.friendships.destroy(id=user)
374
375 class SetStatusAction(Action):
376 def __call__(self, twitter, options):
377 statusTxt = (u" ".join(options['extra_args'])
378 if options['extra_args']
379 else unicode(raw_input("message: ")))
380 status = (statusTxt.encode('utf8', 'replace'))
381 twitter.statuses.update(status=status)
382
383 class TwitterShell(Action):
384
385 def render_prompt(self, prompt):
386 '''Parses the `prompt` string and returns the rendered version'''
387 prompt = prompt.strip("'").replace("\\'","'")
388 for colour in ansi.COLOURS_NAMED:
389 if '[%s]' %(colour) in prompt:
390 prompt = prompt.replace(
391 '[%s]' %(colour), ansi.cmdColourNamed(colour))
392 prompt = prompt.replace('[R]', ansi.cmdReset())
393 return prompt
394
395 def __call__(self, twitter, options):
396 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
397 while True:
398 options['action'] = ""
399 try:
400 args = raw_input(prompt).split()
401 parse_args(args, options)
402 if not options['action']:
403 continue
404 elif options['action'] == 'exit':
405 raise SystemExit(0)
406 elif options['action'] == 'shell':
407 print >>sys.stderr, 'Sorry Xzibit does not work here!'
408 continue
409 elif options['action'] == 'help':
410 print >>sys.stderr, '''\ntwitter> `action`\n
411 The Shell Accepts all the command line actions along with:
412
413 exit Leave the twitter shell (^D may also be used)
414
415 Full CMD Line help is appended below for your convinience.'''
416 Action()(twitter, options)
417 options['action'] = ''
418 except NoSuchActionError, e:
419 print >>sys.stderr, e
420 except KeyboardInterrupt:
421 print >>sys.stderr, '\n[Keyboard Interrupt]'
422 except EOFError:
423 print >>sys.stderr
424 leaving = self.ask(subject='Leave')
425 if not leaving:
426 print >>sys.stderr, 'Excellent!'
427 else:
428 raise SystemExit(0)
429
430 class HelpAction(Action):
431 def __call__(self, twitter, options):
432 print __doc__
433
434 class DoNothingAction(Action):
435 def __call__(self, twitter, options):
436 pass
437
438 def parse_oauth_tokens(result):
439 for r in result.split('&'):
440 k, v = r.split('=')
441 if k == 'oauth_token':
442 oauth_token = v
443 elif k == 'oauth_token_secret':
444 oauth_token_secret = v
445 return oauth_token, oauth_token_secret
446
447 def oauth_dance(options):
448 print ("Hi there! We're gonna get you all set up to use Twitter"
449 " on the command-line.")
450 twitter = Twitter(
451 auth=OAuth('', '', CONSUMER_KEY, CONSUMER_SECRET),
452 format='')
453 oauth_token, oauth_token_secret = parse_oauth_tokens(
454 twitter.oauth.request_token())
455 print """
456 In the web browser window that opens please choose to Allow access to the
457 command-line tool. Copy the PIN number that appears on the next page and
458 paste or type it here:
459 """
460 webbrowser.open(
461 'http://api.twitter.com/oauth/authorize?oauth_token=' +
462 oauth_token)
463 oauth_verifier = raw_input("Please type the PIN: ").strip()
464 twitter = Twitter(
465 auth=OAuth(
466 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
467 format='')
468 oauth_token, oauth_token_secret = parse_oauth_tokens(
469 twitter.oauth.access_token(oauth_verifier=oauth_verifier))
470 oauth_file = open(options['oauth_filename'], 'w')
471 print >> oauth_file, oauth_token
472 print >> oauth_file, oauth_token_secret
473 oauth_file.close()
474 print "That's it! Your authorization keys have been written to %s." % (
475 options['oauth_filename'])
476
477
478 actions = {
479 'authorize' : DoNothingAction,
480 'follow' : FollowAction,
481 'friends' : FriendsAction,
482 'help' : HelpAction,
483 'leave' : LeaveAction,
484 'public' : PublicAction,
485 'replies' : RepliesAction,
486 'search' : SearchAction,
487 'set' : SetStatusAction,
488 'shell' : TwitterShell,
489 }
490
491 def loadConfig(filename):
492 options = dict(OPTIONS)
493 if os.path.exists(filename):
494 cp = SafeConfigParser()
495 cp.read([filename])
496 for option in ('format', 'prompt'):
497 if cp.has_option('twitter', option):
498 options[option] = cp.get('twitter', option)
499 return options
500
501 def read_oauth_file(fn):
502 f = open(fn)
503 return f.readline().strip(), f.readline().strip()
504
505 def main(args=sys.argv[1:]):
506 arg_options = {}
507 try:
508 parse_args(args, arg_options)
509 except GetoptError, e:
510 print >> sys.stderr, "I can't do that, %s." %(e)
511 print >> sys.stderr
512 raise SystemExit(1)
513
514 config_options = loadConfig(
515 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
516
517 # Apply the various options in order, the most important applied last.
518 # Defaults first, then what's read from config file, then command-line
519 # arguments.
520 options = dict(OPTIONS)
521 for d in config_options, arg_options:
522 for k,v in d.items():
523 if v: options[k] = v
524
525 if options['refresh'] and options['action'] not in (
526 'friends', 'public', 'replies'):
527 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
528 print >> sys.stderr, "Use 'twitter -h' for help."
529 return 1
530
531 if (options['action'] == 'authorize'
532 or not os.path.exists(options['oauth_filename'])):
533 oauth_dance(options)
534
535 oauth_token, oauth_token_secret = read_oauth_file(options['oauth_filename'])
536
537 twitter = Twitter(
538 auth=OAuth(
539 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
540 secure=options['secure'])
541
542 try:
543 Action()(twitter, options)
544 except NoSuchActionError, e:
545 print >>sys.stderr, e
546 raise SystemExit(1)
547 except TwitterError, e:
548 print >> sys.stderr, e.args[0]
549 print >> sys.stderr, "Use 'twitter -h' for help."
550 raise SystemExit(1)