]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Make search work again
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6
7 ACTIONS:
8 authorize authorize the command-line tool to interact with Twitter
9 follow add the specified user to your follow list
10 friends get latest tweets from your friends (default action)
11 help print this help text that you are currently reading
12 leave remove the specified user from your following list
13 public get latest public tweets
14 replies get latest replies
15 search search twitter (Beware: octothorpe, escape it)
16 set set your twitter status
17 shell login the twitter shell
18
19
20 OPTIONS:
21
22 -r --refresh run this command forever, polling every once
23 in a while (default: every 5 minutes)
24 -R --refresh-rate <rate> set the refresh rate (in seconds)
25 -f --format <format> specify the output format for status updates
26 -c --config <filename> read username and password from given config
27 file (default ~/.twitter)
28 -l --length <count> specify number of status updates shown
29 (default: 20, max: 200)
30 -t --timestamp show time before status lines
31 -d --datestamp shoe date before status lines
32 --no-ssl use HTTP instead of more secure HTTPS
33 --oauth <filename> filename to read/store oauth credentials to
34
35 FORMATS for the --format option
36
37 default one line per status
38 verbose multiple lines per status, more verbose status info
39 urls nothing but URLs
40 ansi ansi colour (rainbow mode)
41
42
43 CONFIG FILES
44
45 The config file should be placed in your home directory and be named .twitter.
46 It must contain a [twitter] header, and all the desired options you wish to
47 set, like so:
48
49 [twitter]
50 format: <desired_default_format_for_output>
51 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
52
53 OAuth authentication tokens are stored in the file .twitter_oauth in your
54 home directory.
55 """
56
57 CONSUMER_KEY='uS6hO2sV6tDKIOeVjhnFnQ'
58 CONSUMER_SECRET='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
59
60 import sys
61 import time
62 from getopt import gnu_getopt as getopt, GetoptError
63 from getpass import getpass
64 import re
65 import os.path
66 from ConfigParser import SafeConfigParser
67 import datetime
68 from urllib import quote
69 import webbrowser
70
71 from api import Twitter, TwitterError
72 from oauth import OAuth, write_token_file, read_token_file
73 from oauth_dance import oauth_dance
74 import ansi
75
76 OPTIONS = {
77 'action': 'friends',
78 'refresh': False,
79 'refresh_rate': 600,
80 'format': 'default',
81 'prompt': '[cyan]twitter[R]> ',
82 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
83 'oauth_filename': os.environ.get('HOME', '') + os.sep + '.twitter_oauth',
84 'length': 20,
85 'timestamp': False,
86 'datestamp': False,
87 'extra_args': [],
88 'secure': True,
89 }
90
91 def parse_args(args, options):
92 long_opts = ['help', 'format=', 'refresh', 'oauth=',
93 'refresh-rate=', 'config=', 'length=', 'timestamp',
94 'datestamp', 'no-ssl']
95 short_opts = "e:p:f:h?rR:c:l:td"
96 opts, extra_args = getopt(args, short_opts, long_opts)
97
98 for opt, arg in opts:
99 if opt in ('-f', '--format'):
100 options['format'] = arg
101 elif opt in ('-r', '--refresh'):
102 options['refresh'] = True
103 elif opt in ('-R', '--refresh-rate'):
104 options['refresh_rate'] = int(arg)
105 elif opt in ('-l', '--length'):
106 options["length"] = int(arg)
107 elif opt in ('-t', '--timestamp'):
108 options["timestamp"] = True
109 elif opt in ('-d', '--datestamp'):
110 options["datestamp"] = True
111 elif opt in ('-?', '-h', '--help'):
112 options['action'] = 'help'
113 elif opt in ('-c', '--config'):
114 options['config_filename'] = arg
115 elif opt == '--no-ssl':
116 options['secure'] = False
117 elif opt == '--oauth':
118 options['oauth_filename'] = arg
119
120 if extra_args and not ('action' in options and options['action'] == 'help'):
121 options['action'] = extra_args[0]
122 options['extra_args'] = extra_args[1:]
123
124 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
125 timestamp = options["timestamp"]
126 datestamp = options["datestamp"]
127 t = time.strptime(status['created_at'], format)
128 i_hate_timezones = time.timezone
129 if (time.daylight):
130 i_hate_timezones = time.altzone
131 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
132 seconds=i_hate_timezones)
133 t = dt.timetuple()
134 if timestamp and datestamp:
135 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
136 elif timestamp:
137 return time.strftime("%H:%M:%S ", t)
138 elif datestamp:
139 return time.strftime("%Y-%m-%d ", t)
140 return ""
141
142 class StatusFormatter(object):
143 def __call__(self, status, options):
144 return (u"%s%s %s" %(
145 get_time_string(status, options),
146 status['user']['screen_name'], status['text']))
147
148 class AnsiStatusFormatter(object):
149 def __init__(self):
150 self._colourMap = ansi.ColourMap()
151
152 def __call__(self, status, options):
153 colour = self._colourMap.colourFor(status['user']['screen_name'])
154 return (u"%s%s%s%s %s" %(
155 get_time_string(status, options),
156 ansi.cmdColour(colour), status['user']['screen_name'],
157 ansi.cmdReset(), status['text']))
158
159 class VerboseStatusFormatter(object):
160 def __call__(self, status, options):
161 return (u"-- %s (%s) on %s\n%s\n" %(
162 status['user']['screen_name'],
163 status['user']['location'],
164 status['created_at'],
165 status['text']))
166
167 class URLStatusFormatter(object):
168 urlmatch = re.compile(r'https?://\S+')
169 def __call__(self, status, options):
170 urls = self.urlmatch.findall(status['text'])
171 return u'\n'.join(urls) if urls else ""
172
173 class AdminFormatter(object):
174 def __call__(self, action, user):
175 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
176 if action == "follow":
177 return u"You are now following %s.\n" %(user_str)
178 else:
179 return u"You are no longer following %s.\n" %(user_str)
180
181 class VerboseAdminFormatter(object):
182 def __call__(self, action, user):
183 return(u"-- %s: %s (%s): %s" % (
184 "Following" if action == "follow" else "Leaving",
185 user['screen_name'],
186 user['name'],
187 user['url']))
188
189 class SearchFormatter(object):
190 def __call__(self, result, options):
191 return(u"%s%s %s" %(
192 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
193 result['from_user'], result['text']))
194
195 class VerboseSearchFormatter(SearchFormatter):
196 pass #Default to the regular one
197
198 class URLSearchFormatter(object):
199 urlmatch = re.compile(r'https?://\S+')
200 def __call__(self, result, options):
201 urls = self.urlmatch.findall(result['text'])
202 return u'\n'.join(urls) if urls else ""
203
204 class AnsiSearchFormatter(object):
205 def __init__(self):
206 self._colourMap = ansi.ColourMap()
207
208 def __call__(self, result, options):
209 colour = self._colourMap.colourFor(result['from_user'])
210 return (u"%s%s%s%s %s" %(
211 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
212 ansi.cmdColour(colour), result['from_user'],
213 ansi.cmdReset(), result['text']))
214
215 _term_encoding = None
216 def get_term_encoding():
217 global _term_encoding
218 if not _term_encoding:
219 lang = os.getenv('LANG', 'unknown.UTF-8').split('.')
220 if lang[1:]:
221 _term_encoding = lang[1]
222 else:
223 _term_encoding = 'UTF-8'
224 return _term_encoding
225
226 formatters = {}
227 status_formatters = {
228 'default': StatusFormatter,
229 'verbose': VerboseStatusFormatter,
230 'urls': URLStatusFormatter,
231 'ansi': AnsiStatusFormatter
232 }
233 formatters['status'] = status_formatters
234
235 admin_formatters = {
236 'default': AdminFormatter,
237 'verbose': VerboseAdminFormatter,
238 'urls': AdminFormatter,
239 'ansi': AdminFormatter
240 }
241 formatters['admin'] = admin_formatters
242
243 search_formatters = {
244 'default': SearchFormatter,
245 'verbose': VerboseSearchFormatter,
246 'urls': URLSearchFormatter,
247 'ansi': AnsiSearchFormatter
248 }
249 formatters['search'] = search_formatters
250
251 def get_formatter(action_type, options):
252 formatters_dict = formatters.get(action_type)
253 if (not formatters_dict):
254 raise TwitterError(
255 "There was an error finding a class of formatters for your type (%s)"
256 %(action_type))
257 f = formatters_dict.get(options['format'])
258 if (not f):
259 raise TwitterError(
260 "Unknown formatter '%s' for status actions" %(options['format']))
261 return f()
262
263 class Action(object):
264
265 def ask(self, subject='perform this action', careful=False):
266 '''
267 Requests fromt he user using `raw_input` if `subject` should be
268 performed. When `careful`, the default answer is NO, otherwise YES.
269 Returns the user answer in the form `True` or `False`.
270 '''
271 sample = '(y/N)'
272 if not careful:
273 sample = '(Y/n)'
274
275 prompt = 'You really want to %s %s? ' %(subject, sample)
276 try:
277 answer = raw_input(prompt).lower()
278 if careful:
279 return answer in ('yes', 'y')
280 else:
281 return answer not in ('no', 'n')
282 except EOFError:
283 print >>sys.stderr # Put Newline since Enter was never pressed
284 # TODO:
285 # Figure out why on OS X the raw_input keeps raising
286 # EOFError and is never able to reset and get more input
287 # Hint: Look at how IPython implements their console
288 default = True
289 if careful:
290 default = False
291 return default
292
293 def __call__(self, twitter, options):
294 action = actions.get(options['action'], NoSuchAction)()
295 try:
296 doAction = lambda : action(twitter, options)
297 if (options['refresh'] and isinstance(action, StatusAction)):
298 while True:
299 doAction()
300 time.sleep(options['refresh_rate'])
301 else:
302 doAction()
303 except KeyboardInterrupt:
304 print >>sys.stderr, '\n[Keyboard Interrupt]'
305 pass
306
307 class NoSuchActionError(Exception):
308 pass
309
310 class NoSuchAction(Action):
311 def __call__(self, twitter, options):
312 raise NoSuchActionError("No such action: %s" %(options['action']))
313
314 def printNicely(string):
315 if sys.stdout.encoding:
316 print string.encode(sys.stdout.encoding, 'replace')
317 else:
318 print string.encode('utf-8')
319
320 class StatusAction(Action):
321 def __call__(self, twitter, options):
322 statuses = self.getStatuses(twitter, options)
323 sf = get_formatter('status', options)
324 for status in statuses:
325 statusStr = sf(status, options)
326 if statusStr.strip():
327 printNicely(statusStr)
328
329 class SearchAction(Action):
330 def __call__(self, twitter, options):
331 # We need to be pointing at search.twitter.com to work, and it is less
332 # tangly to do it here than in the main()
333 twitter.domain="search.twitter.com"
334 twitter.uriparts=()
335 # We need to bypass the TwitterCall parameter encoding, so we
336 # don't encode the plus sign, so we have to encode it ourselves
337 query_string = "+".join(
338 [quote(term.decode(get_term_encoding()))
339 for term in options['extra_args']])
340
341 results = twitter.search(q=query_string)['results']
342 f = get_formatter('search', options)
343 for result in results:
344 resultStr = f(result, options)
345 if resultStr.strip():
346 printNicely(resultStr)
347
348 class AdminAction(Action):
349 def __call__(self, twitter, options):
350 if not (options['extra_args'] and options['extra_args'][0]):
351 raise TwitterError("You need to specify a user (screen name)")
352 af = get_formatter('admin', options)
353 try:
354 user = self.getUser(twitter, options['extra_args'][0])
355 except TwitterError, e:
356 print "There was a problem following or leaving the specified user."
357 print "You may be trying to follow a user you are already following;"
358 print "Leaving a user you are not currently following;"
359 print "Or the user may not exist."
360 print "Sorry."
361 print
362 print e
363 else:
364 printNicely(af(options['action'], user))
365
366 class FriendsAction(StatusAction):
367 def getStatuses(self, twitter, options):
368 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
369
370 class PublicAction(StatusAction):
371 def getStatuses(self, twitter, options):
372 return reversed(twitter.statuses.public_timeline(count=options["length"]))
373
374 class RepliesAction(StatusAction):
375 def getStatuses(self, twitter, options):
376 return reversed(twitter.statuses.replies(count=options["length"]))
377
378 class FollowAction(AdminAction):
379 def getUser(self, twitter, user):
380 return twitter.friendships.create(id=user)
381
382 class LeaveAction(AdminAction):
383 def getUser(self, twitter, user):
384 return twitter.friendships.destroy(id=user)
385
386 class SetStatusAction(Action):
387 def __call__(self, twitter, options):
388 statusTxt = (" ".join(options['extra_args']).decode(get_term_encoding())
389 if options['extra_args']
390 else unicode(raw_input("message: ")))
391 status = (statusTxt.encode('utf8', 'replace'))
392 twitter.statuses.update(status=status)
393
394 class TwitterShell(Action):
395
396 def render_prompt(self, prompt):
397 '''Parses the `prompt` string and returns the rendered version'''
398 prompt = prompt.strip("'").replace("\\'","'")
399 for colour in ansi.COLOURS_NAMED:
400 if '[%s]' %(colour) in prompt:
401 prompt = prompt.replace(
402 '[%s]' %(colour), ansi.cmdColourNamed(colour))
403 prompt = prompt.replace('[R]', ansi.cmdReset())
404 return prompt
405
406 def __call__(self, twitter, options):
407 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
408 while True:
409 options['action'] = ""
410 try:
411 args = raw_input(prompt).split()
412 parse_args(args, options)
413 if not options['action']:
414 continue
415 elif options['action'] == 'exit':
416 raise SystemExit(0)
417 elif options['action'] == 'shell':
418 print >>sys.stderr, 'Sorry Xzibit does not work here!'
419 continue
420 elif options['action'] == 'help':
421 print >>sys.stderr, '''\ntwitter> `action`\n
422 The Shell Accepts all the command line actions along with:
423
424 exit Leave the twitter shell (^D may also be used)
425
426 Full CMD Line help is appended below for your convinience.'''
427 Action()(twitter, options)
428 options['action'] = ''
429 except NoSuchActionError, e:
430 print >>sys.stderr, e
431 except KeyboardInterrupt:
432 print >>sys.stderr, '\n[Keyboard Interrupt]'
433 except EOFError:
434 print >>sys.stderr
435 leaving = self.ask(subject='Leave')
436 if not leaving:
437 print >>sys.stderr, 'Excellent!'
438 else:
439 raise SystemExit(0)
440
441 class HelpAction(Action):
442 def __call__(self, twitter, options):
443 print __doc__
444
445 class DoNothingAction(Action):
446 def __call__(self, twitter, options):
447 pass
448
449 actions = {
450 'authorize' : DoNothingAction,
451 'follow' : FollowAction,
452 'friends' : FriendsAction,
453 'help' : HelpAction,
454 'leave' : LeaveAction,
455 'public' : PublicAction,
456 'replies' : RepliesAction,
457 'search' : SearchAction,
458 'set' : SetStatusAction,
459 'shell' : TwitterShell,
460 }
461
462 def loadConfig(filename):
463 options = dict(OPTIONS)
464 if os.path.exists(filename):
465 cp = SafeConfigParser()
466 cp.read([filename])
467 for option in ('format', 'prompt'):
468 if cp.has_option('twitter', option):
469 options[option] = cp.get('twitter', option)
470 return options
471
472 def main(args=sys.argv[1:]):
473 arg_options = {}
474 try:
475 parse_args(args, arg_options)
476 except GetoptError, e:
477 print >> sys.stderr, "I can't do that, %s." %(e)
478 print >> sys.stderr
479 raise SystemExit(1)
480
481 config_path = os.path.expanduser(
482 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
483 config_options = loadConfig(config_path)
484
485 # Apply the various options in order, the most important applied last.
486 # Defaults first, then what's read from config file, then command-line
487 # arguments.
488 options = dict(OPTIONS)
489 for d in config_options, arg_options:
490 for k,v in d.items():
491 if v: options[k] = v
492
493 if options['refresh'] and options['action'] not in (
494 'friends', 'public', 'replies'):
495 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
496 print >> sys.stderr, "Use 'twitter -h' for help."
497 return 1
498
499 oauth_filename = os.path.expanduser(options['oauth_filename'])
500
501 if (options['action'] == 'authorize'
502 or not os.path.exists(oauth_filename)):
503 oauth_dance(
504 "the Command-Line Tool", CONSUMER_KEY, CONSUMER_SECRET,
505 options['oauth_filename'])
506
507 oauth_token, oauth_token_secret = read_token_file(oauth_filename)
508
509 twitter = Twitter(
510 auth=OAuth(
511 oauth_token, oauth_token_secret, CONSUMER_KEY, CONSUMER_SECRET),
512 secure=options['secure'],
513 api_version='1',
514 domain='api.twitter.com')
515
516 try:
517 Action()(twitter, options)
518 except NoSuchActionError, e:
519 print >>sys.stderr, e
520 raise SystemExit(1)
521 except TwitterError, e:
522 print >> sys.stderr, str(e)
523 print >> sys.stderr, "Use 'twitter -h' for help."
524 raise SystemExit(1)