]> jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
Added HTTPS support
[z_archive/twitter.git] / twitter / cmdline.py
1 """
2 USAGE:
3
4 twitter [action] [options]
5
6 ACTIONS:
7 follow add the specified user to your follow list
8 friends get latest tweets from your friends (default action)
9 help print this help text that you are currently reading
10 leave remove the specified user from your following list
11 public get latest public tweets
12 replies get latest replies
13 search search twitter (Beware: octothorpe, escape it)
14 set set your twitter status
15 shell login the twitter shell
16
17 OPTIONS:
18
19 -e --email <email> your email to login to twitter
20 -p --password <password> your twitter password
21 -r --refresh run this command forever, polling every once
22 in a while (default: every 5 minutes)
23 -R --refresh-rate <rate> set the refresh rate (in seconds)
24 -f --format <format> specify the output format for status updates
25 -c --config <filename> read username and password from given config
26 file (default ~/.twitter)
27 -l --length <count> specify number of status updates shown
28 (default: 20, max: 200)
29 -t --timestamp show time before status lines
30 -d --datestamp shoe date before status lines
31 --no-ssl use HTTP instead of more secure HTTPS
32
33 FORMATS for the --format option
34
35 default one line per status
36 verbose multiple lines per status, more verbose status info
37 urls nothing but URLs
38 ansi ansi colour (rainbow mode)
39
40 CONFIG FILES
41
42 The config file should contain a [twitter] header, and all the desired options
43 you wish to set, like so:
44
45 [twitter]
46 email: <username>
47 password: <password>
48 format: <desired_default_format_for_output>
49 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
50 """
51
52 import sys
53 import time
54 from getopt import gnu_getopt as getopt, GetoptError
55 from getpass import getpass
56 import re
57 import os.path
58 from ConfigParser import SafeConfigParser
59 import datetime
60 from urllib import quote
61
62 from api import Twitter, TwitterError
63 import ansi
64
65 # Please don't change this, it was provided by the fine folks at Twitter.
66 # If you change it, it will not work.
67 AGENT_STR = "twittercommandlinetoolpy"
68
69 OPTIONS = {
70 'email': None,
71 'password': None,
72 'action': 'friends',
73 'refresh': False,
74 'refresh_rate': 600,
75 'format': 'default',
76 'prompt': '[cyan]twitter[R]> ',
77 'config_filename': os.environ.get('HOME', '') + os.sep + '.twitter',
78 'length': 20,
79 'timestamp': False,
80 'datestamp': False,
81 'extra_args': [],
82 'secure': True
83 }
84
85 def parse_args(args, options):
86 long_opts = ['email', 'password', 'help', 'format', 'refresh',
87 'refresh-rate', 'config', 'length', 'timestamp',
88 'datestamp', 'no-ssl']
89 short_opts = "e:p:f:h?rR:c:l:td"
90 opts, extra_args = getopt(args, short_opts, long_opts)
91
92 for opt, arg in opts:
93 if opt in ('-e', '--email'):
94 options['email'] = arg
95 elif opt in ('-p', '--password'):
96 options['password'] = arg
97 elif opt in ('-f', '--format'):
98 options['format'] = arg
99 elif opt in ('-r', '--refresh'):
100 options['refresh'] = True
101 elif opt in ('-R', '--refresh-rate'):
102 options['refresh_rate'] = int(arg)
103 elif opt in ('-l', '--length'):
104 options["length"] = int(arg)
105 elif opt in ('-t', '--timestamp'):
106 options["timestamp"] = True
107 elif opt in ('-d', '--datestamp'):
108 options["datestamp"] = True
109 elif opt in ('-?', '-h', '--help'):
110 options['action'] = 'help'
111 elif opt in ('-c', '--config'):
112 options['config_filename'] = arg
113 elif opt == '--no-ssl':
114 options['secure'] = False
115
116 if extra_args and not ('action' in options and options['action'] == 'help'):
117 options['action'] = extra_args[0]
118 options['extra_args'] = extra_args[1:]
119
120 def get_time_string(status, options, format="%a %b %d %H:%M:%S +0000 %Y"):
121 timestamp = options["timestamp"]
122 datestamp = options["datestamp"]
123 t = time.strptime(status['created_at'], format)
124 i_hate_timezones = time.timezone
125 if (time.daylight):
126 i_hate_timezones = time.altzone
127 dt = datetime.datetime(*t[:-3]) - datetime.timedelta(
128 seconds=i_hate_timezones)
129 t = dt.timetuple()
130 if timestamp and datestamp:
131 return time.strftime("%Y-%m-%d %H:%M:%S ", t)
132 elif timestamp:
133 return time.strftime("%H:%M:%S ", t)
134 elif datestamp:
135 return time.strftime("%Y-%m-%d ", t)
136 return ""
137
138 class StatusFormatter(object):
139 def __call__(self, status, options):
140 return (u"%s%s %s" %(
141 get_time_string(status, options),
142 status['user']['screen_name'], status['text']))
143
144 class AnsiStatusFormatter(object):
145 def __init__(self):
146 self._colourMap = ansi.ColourMap()
147
148 def __call__(self, status, options):
149 colour = self._colourMap.colourFor(status['user']['screen_name'])
150 return (u"%s%s%s%s %s" %(
151 get_time_string(status, options),
152 ansi.cmdColour(colour), status['user']['screen_name'],
153 ansi.cmdReset(), status['text']))
154
155 class VerboseStatusFormatter(object):
156 def __call__(self, status, options):
157 return (u"-- %s (%s) on %s\n%s\n" %(
158 status['user']['screen_name'],
159 status['user']['location'],
160 status['created_at'],
161 status['text']))
162
163 class URLStatusFormatter(object):
164 urlmatch = re.compile(r'https?://\S+')
165 def __call__(self, status, options):
166 urls = self.urlmatch.findall(status['text'])
167 return u'\n'.join(urls) if urls else ""
168
169 class AdminFormatter(object):
170 def __call__(self, action, user):
171 user_str = u"%s (%s)" %(user['screen_name'], user['name'])
172 if action == "follow":
173 return u"You are now following %s.\n" %(user_str)
174 else:
175 return u"You are no longer following %s.\n" %(user_str)
176
177 class VerboseAdminFormatter(object):
178 def __call__(self, action, user):
179 return(u"-- %s: %s (%s): %s" % (
180 "Following" if action == "follow" else "Leaving",
181 user['screen_name'],
182 user['name'],
183 user['url']))
184
185 class SearchFormatter(object):
186 def __call__(self, result, options):
187 return(u"%s%s %s" %(
188 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
189 result['from_user'], result['text']))
190
191 class VerboseSearchFormatter(SearchFormatter):
192 pass #Default to the regular one
193
194 class URLSearchFormatter(object):
195 urlmatch = re.compile(r'https?://\S+')
196 def __call__(self, result, options):
197 urls = self.urlmatch.findall(result['text'])
198 return u'\n'.join(urls) if urls else ""
199
200 class AnsiSearchFormatter(object):
201 def __init__(self):
202 self._colourMap = ansi.ColourMap()
203
204 def __call__(self, result, options):
205 colour = self._colourMap.colourFor(result['from_user'])
206 return (u"%s%s%s%s %s" %(
207 get_time_string(result, options, "%a, %d %b %Y %H:%M:%S +0000"),
208 ansi.cmdColour(colour), result['from_user'],
209 ansi.cmdReset(), result['text']))
210
211 formatters = {}
212 status_formatters = {
213 'default': StatusFormatter,
214 'verbose': VerboseStatusFormatter,
215 'urls': URLStatusFormatter,
216 'ansi': AnsiStatusFormatter
217 }
218 formatters['status'] = status_formatters
219
220 admin_formatters = {
221 'default': AdminFormatter,
222 'verbose': VerboseAdminFormatter,
223 'urls': AdminFormatter,
224 'ansi': AdminFormatter
225 }
226 formatters['admin'] = admin_formatters
227
228 search_formatters = {
229 'default': SearchFormatter,
230 'verbose': VerboseSearchFormatter,
231 'urls': URLSearchFormatter,
232 'ansi': AnsiSearchFormatter
233 }
234 formatters['search'] = search_formatters
235
236 def get_formatter(action_type, options):
237 formatters_dict = formatters.get(action_type)
238 if (not formatters_dict):
239 raise TwitterError(
240 "There was an error finding a class of formatters for your type (%s)"
241 %(action_type))
242 f = formatters_dict.get(options['format'])
243 if (not f):
244 raise TwitterError(
245 "Unknown formatter '%s' for status actions" %(options['format']))
246 return f()
247
248 class Action(object):
249
250 def ask(self, subject='perform this action', careful=False):
251 '''
252 Requests fromt he user using `raw_input` if `subject` should be
253 performed. When `careful`, the default answer is NO, otherwise YES.
254 Returns the user answer in the form `True` or `False`.
255 '''
256 sample = '(y/N)'
257 if not careful:
258 sample = '(Y/n)'
259
260 prompt = 'You really want to %s %s? ' %(subject, sample)
261 try:
262 answer = raw_input(prompt).lower()
263 if careful:
264 return answer in ('yes', 'y')
265 else:
266 return answer not in ('no', 'n')
267 except EOFError:
268 print >>sys.stderr # Put Newline since Enter was never pressed
269 # TODO:
270 # Figure out why on OS X the raw_input keeps raising
271 # EOFError and is never able to reset and get more input
272 # Hint: Look at how IPython implements their console
273 default = True
274 if careful:
275 default = False
276 return default
277
278 def __call__(self, twitter, options):
279 action = actions.get(options['action'], NoSuchAction)()
280 try:
281 doAction = lambda : action(twitter, options)
282 if (options['refresh'] and isinstance(action, StatusAction)):
283 while True:
284 doAction()
285 time.sleep(options['refresh_rate'])
286 else:
287 doAction()
288 except KeyboardInterrupt:
289 print >>sys.stderr, '\n[Keyboard Interrupt]'
290 pass
291
292 class NoSuchActionError(Exception):
293 pass
294
295 class NoSuchAction(Action):
296 def __call__(self, twitter, options):
297 raise NoSuchActionError("No such action: %s" %(options['action']))
298
299 def printNicely(string):
300 if sys.stdout.encoding:
301 print string.encode(sys.stdout.encoding, 'replace')
302 else:
303 print string.encode('utf-8')
304
305 class StatusAction(Action):
306 def __call__(self, twitter, options):
307 statuses = self.getStatuses(twitter, options)
308 sf = get_formatter('status', options)
309 for status in statuses:
310 statusStr = sf(status, options)
311 if statusStr.strip():
312 printNicely(statusStr)
313
314 class SearchAction(Action):
315 def __call__(self, twitter, options):
316 # We need to be pointing at search.twitter.com to work, and it is less
317 # tangly to do it here than in the main()
318 twitter.domain="search.twitter.com"
319 # We need to bypass the TwitterCall parameter encoding, so we
320 # don't encode the plus sign, so we have to encode it ourselves
321 query_string = "+".join([quote(term) for term in options['extra_args']])
322 twitter.encoded_args = "q=%s" %(query_string)
323
324 results = twitter.search()['results']
325 f = get_formatter('search', options)
326 for result in results:
327 resultStr = f(result, options)
328 if resultStr.strip():
329 printNicely(resultStr)
330
331 class AdminAction(Action):
332 def __call__(self, twitter, options):
333 if not (options['extra_args'] and options['extra_args'][0]):
334 raise TwitterError("You need to specify a user (screen name)")
335 af = get_formatter('admin', options)
336 try:
337 user = self.getUser(twitter, options['extra_args'][0])
338 except TwitterError, e:
339 print "There was a problem following or leaving the specified user."
340 print "You may be trying to follow a user you are already following;"
341 print "Leaving a user you are not currently following;"
342 print "Or the user may not exist."
343 print "Sorry."
344 print
345 print e
346 else:
347 printNicely(af(options['action'], user))
348
349 class FriendsAction(StatusAction):
350 def getStatuses(self, twitter, options):
351 return reversed(twitter.statuses.friends_timeline(count=options["length"]))
352
353 class PublicAction(StatusAction):
354 def getStatuses(self, twitter, options):
355 return reversed(twitter.statuses.public_timeline(count=options["length"]))
356
357 class RepliesAction(StatusAction):
358 def getStatuses(self, twitter, options):
359 return reversed(twitter.statuses.replies(count=options["length"]))
360
361 class FollowAction(AdminAction):
362 def getUser(self, twitter, user):
363 return twitter.friendships.create(id=user)
364
365 class LeaveAction(AdminAction):
366 def getUser(self, twitter, user):
367 return twitter.friendships.destroy(id=user)
368
369 class SetStatusAction(Action):
370 def __call__(self, twitter, options):
371 statusTxt = (u" ".join(options['extra_args'])
372 if options['extra_args']
373 else unicode(raw_input("message: ")))
374 status = (statusTxt.encode('utf8', 'replace'))
375 twitter.statuses.update(status=status)
376
377 class TwitterShell(Action):
378
379 def render_prompt(self, prompt):
380 '''Parses the `prompt` string and returns the rendered version'''
381 prompt = prompt.strip("'").replace("\\'","'")
382 for colour in ansi.COLOURS_NAMED:
383 if '[%s]' %(colour) in prompt:
384 prompt = prompt.replace(
385 '[%s]' %(colour), ansi.cmdColourNamed(colour))
386 prompt = prompt.replace('[R]', ansi.cmdReset())
387 return prompt
388
389 def __call__(self, twitter, options):
390 prompt = self.render_prompt(options.get('prompt', 'twitter> '))
391 while True:
392 options['action'] = ""
393 try:
394 args = raw_input(prompt).split()
395 parse_args(args, options)
396 if not options['action']:
397 continue
398 elif options['action'] == 'exit':
399 raise SystemExit(0)
400 elif options['action'] == 'shell':
401 print >>sys.stderr, 'Sorry Xzibit does not work here!'
402 continue
403 elif options['action'] == 'help':
404 print >>sys.stderr, '''\ntwitter> `action`\n
405 The Shell Accepts all the command line actions along with:
406
407 exit Leave the twitter shell (^D may also be used)
408
409 Full CMD Line help is appended below for your convinience.'''
410 Action()(twitter, options)
411 options['action'] = ''
412 except NoSuchActionError, e:
413 print >>sys.stderr, e
414 except KeyboardInterrupt:
415 print >>sys.stderr, '\n[Keyboard Interrupt]'
416 except EOFError:
417 print >>sys.stderr
418 leaving = self.ask(subject='Leave')
419 if not leaving:
420 print >>sys.stderr, 'Excellent!'
421 else:
422 raise SystemExit(0)
423
424 class HelpAction(Action):
425 def __call__(self, twitter, options):
426 print __doc__
427
428 actions = {
429 'follow' : FollowAction,
430 'friends' : FriendsAction,
431 'help' : HelpAction,
432 'leave' : LeaveAction,
433 'public' : PublicAction,
434 'replies' : RepliesAction,
435 'search' : SearchAction,
436 'set' : SetStatusAction,
437 'shell' : TwitterShell,
438 }
439
440 def loadConfig(filename):
441 options = dict(OPTIONS)
442 if os.path.exists(filename):
443 cp = SafeConfigParser()
444 cp.read([filename])
445 for option in ('email', 'password', 'format', 'prompt'):
446 if cp.has_option('twitter', option):
447 options[option] = cp.get('twitter', option)
448 return options
449
450 def main(args=sys.argv[1:]):
451 arg_options = {}
452 try:
453 parse_args(args, arg_options)
454 except GetoptError, e:
455 print >> sys.stderr, "I can't do that, %s." %(e)
456 print >> sys.stderr
457 raise SystemExit(1)
458
459 config_options = loadConfig(
460 arg_options.get('config_filename') or OPTIONS.get('config_filename'))
461
462 # Apply the various options in order, the most important applied last.
463 # Defaults first, then what's read from config file, then command-line
464 # arguments.
465 options = dict(OPTIONS)
466 for d in config_options, arg_options:
467 for k,v in d.items():
468 if v: options[k] = v
469
470 if options['refresh'] and options['action'] not in (
471 'friends', 'public', 'replies'):
472 print >> sys.stderr, "You can only refresh the friends, public, or replies actions."
473 print >> sys.stderr, "Use 'twitter -h' for help."
474 raise SystemExit(1)
475
476 if options['email'] and not options['password']:
477 options['password'] = getpass("Twitter password: ")
478
479 twitter = Twitter(
480 options['email'], options['password'], agent=AGENT_STR,
481 secure=options['secure'])
482 try:
483 Action()(twitter, options)
484 except NoSuchActionError, e:
485 print >>sys.stderr, e
486 raise SystemExit(1)
487 except TwitterError, e:
488 print >> sys.stderr, e.args[0]
489 print >> sys.stderr, "Use 'twitter -h' for help."
490 raise SystemExit(1)