]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 pyprompt start a Python prompt for interacting with the twitter
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
42 FORMATS for the --format option
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
48 ansi ansi colour (rainbow mode)
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
65 from __future__
import print_function
68 input = __builtins__
.raw_input
69 except (AttributeError, KeyError):
73 CONSUMER_KEY
= 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET
= 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
76 from getopt
import gnu_getopt
as getopt
, GetoptError
77 from getpass
import getpass
87 from ConfigParser
import SafeConfigParser
89 from configparser
import ConfigParser
as SafeConfigParser
92 from urllib
.parse
import quote
94 from urllib2
import quote
98 import html
.parser
as HTMLParser
102 from .api
import Twitter
, TwitterError
103 from .oauth
import OAuth
, write_token_file
, read_token_file
104 from .oauth_dance
import oauth_dance
106 from .util
import smrt_input
, printNicely
, align_text
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os
.environ
.get('HOME',
115 os
.environ
.get('USERPROFILE', ''))
116 + os
.sep
+ '.twitter',
117 'oauth_filename': os
.environ
.get('HOME',
118 os
.environ
.get('USERPROFILE', ''))
119 + os
.sep
+ '.twitter_oauth',
125 'invert_split': False,
129 gHtmlParser
= HTMLParser
.HTMLParser()
130 hashtagRe
= re
.compile(r
'(?P<hashtag>#\S+)')
131 profileRe
= re
.compile(r
'(?P<profile>\@\S+)')
132 ansiFormatter
= ansi
.AnsiCmd(False)
134 def parse_args(args
, options
):
135 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
136 'refresh-rate=', 'config=', 'length=', 'timestamp',
137 'datestamp', 'no-ssl', 'force-ansi']
138 short_opts
= "e:p:f:h?rR:c:l:td"
139 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
140 if extra_args
and hasattr(extra_args
[0], 'decode'):
141 extra_args
= [arg
.decode(locale
.getpreferredencoding())
142 for arg
in extra_args
]
144 for opt
, arg
in opts
:
145 if opt
in ('-f', '--format'):
146 options
['format'] = arg
147 elif opt
in ('-r', '--refresh'):
148 options
['refresh'] = True
149 elif opt
in ('-R', '--refresh-rate'):
150 options
['refresh_rate'] = int(arg
)
151 elif opt
in ('-l', '--length'):
152 options
["length"] = int(arg
)
153 elif opt
in ('-t', '--timestamp'):
154 options
["timestamp"] = True
155 elif opt
in ('-d', '--datestamp'):
156 options
["datestamp"] = True
157 elif opt
in ('-?', '-h', '--help'):
158 options
['action'] = 'help'
159 elif opt
in ('-c', '--config'):
160 options
['config_filename'] = arg
161 elif opt
== '--no-ssl':
162 options
['secure'] = False
163 elif opt
== '--oauth':
164 options
['oauth_filename'] = arg
165 elif opt
== '--force-ansi':
166 options
['force-ansi'] = True
168 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
169 options
['action'] = extra_args
[0]
170 options
['extra_args'] = extra_args
[1:]
172 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
173 timestamp
= options
["timestamp"]
174 datestamp
= options
["datestamp"]
175 t
= time
.strptime(status
['created_at'], format
)
176 i_hate_timezones
= time
.timezone
178 i_hate_timezones
= time
.altzone
179 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
180 seconds
=i_hate_timezones
)
182 if timestamp
and datestamp
:
183 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
185 return time
.strftime("%H:%M:%S ", t
)
187 return time
.strftime("%Y-%m-%d ", t
)
192 'clear': ansiFormatter
.cmdReset(),
193 'hashtag': ansiFormatter
.cmdBold(),
194 'profile': ansiFormatter
.cmdUnderline(),
201 s
= '%s%s%s' % (ansiTypes
[mkey
], m
.group(mkey
), ansiTypes
['clear'])
206 def replaceInStatus(status
):
207 txt
= gHtmlParser
.unescape(status
)
208 txt
= re
.sub(hashtagRe
, reRepl
, txt
)
209 txt
= re
.sub(profileRe
, reRepl
, txt
)
211 def correctRTStatus(status
):
212 if 'retweeted_status' in status
:
213 return ("RT @" + status
['retweeted_status']['user']['screen_name']
214 + " " + status
['retweeted_status']['text'])
216 return status
['text']
218 class StatusFormatter(object):
219 def __call__(self
, status
, options
):
220 return ("%s@%s %s" % (
221 get_time_string(status
, options
),
222 status
['user']['screen_name'],
223 gHtmlParser
.unescape(correctRTStatus(status
))))
225 class AnsiStatusFormatter(object):
227 self
._colourMap
= ansi
.ColourMap()
229 def __call__(self
, status
, options
):
230 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
231 return ("%s%s% 16s%s %s " % (
232 get_time_string(status
, options
),
233 ansiFormatter
.cmdColour(colour
), status
['user']['screen_name'],
234 ansiFormatter
.cmdReset(),
235 align_text(replaceInStatus(correctRTStatus(status
)))))
237 class VerboseStatusFormatter(object):
238 def __call__(self
, status
, options
):
239 return ("-- %s (%s) on %s\n%s\n" % (
240 status
['user']['screen_name'],
241 status
['user']['location'],
242 status
['created_at'],
243 gHtmlParser
.unescape(correctRTStatus(status
))))
245 class JSONStatusFormatter(object):
246 def __call__(self
, status
, options
):
247 status
['text'] = gHtmlParser
.unescape(status
['text'])
248 return json
.dumps(status
)
250 class URLStatusFormatter(object):
251 urlmatch
= re
.compile(r
'https?://\S+')
252 def __call__(self
, status
, options
):
253 urls
= self
.urlmatch
.findall(correctRTStatus(status
))
254 return '\n'.join(urls
) if urls
else ""
257 class ListsFormatter(object):
258 def __call__(self
, list):
259 if list['description']:
260 list_str
= "%-30s (%s)" % (list['name'], list['description'])
262 list_str
= "%-30s" % (list['name'])
263 return "%s\n" % list_str
265 class ListsVerboseFormatter(object):
266 def __call__(self
, list):
267 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (
268 list['name'], list['description'],
269 list['member_count'], list['mode'])
272 class AnsiListsFormatter(object):
274 self
._colourMap
= ansi
.ColourMap()
276 def __call__(self
, list):
277 colour
= self
._colourMap
.colourFor(list['name'])
278 return ("%s%-15s%s %s" % (
279 ansiFormatter
.cmdColour(colour
), list['name'],
280 ansiFormatter
.cmdReset(), list['description']))
283 class AdminFormatter(object):
284 def __call__(self
, action
, user
):
285 user_str
= "%s (%s)" % (user
['screen_name'], user
['name'])
286 if action
== "follow":
287 return "You are now following %s.\n" % (user_str
)
289 return "You are no longer following %s.\n" % (user_str
)
291 class VerboseAdminFormatter(object):
292 def __call__(self
, action
, user
):
293 return("-- %s: %s (%s): %s" % (
294 "Following" if action
== "follow" else "Leaving",
299 class SearchFormatter(object):
300 def __call__(self
, result
, options
):
302 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
303 result
['from_user'], result
['text']))
305 class VerboseSearchFormatter(SearchFormatter
):
306 pass # Default to the regular one
308 class URLSearchFormatter(object):
309 urlmatch
= re
.compile(r
'https?://\S+')
310 def __call__(self
, result
, options
):
311 urls
= self
.urlmatch
.findall(result
['text'])
312 return '\n'.join(urls
) if urls
else ""
314 class AnsiSearchFormatter(object):
316 self
._colourMap
= ansi
.ColourMap()
318 def __call__(self
, result
, options
):
319 colour
= self
._colourMap
.colourFor(result
['from_user'])
320 return ("%s%s%s%s %s" % (
321 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
322 ansiFormatter
.cmdColour(colour
), result
['from_user'],
323 ansiFormatter
.cmdReset(), result
['text']))
325 _term_encoding
= None
326 def get_term_encoding():
327 global _term_encoding
328 if not _term_encoding
:
329 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
331 _term_encoding
= lang
[1]
333 _term_encoding
= 'UTF-8'
334 return _term_encoding
337 status_formatters
= {
338 'default': StatusFormatter
,
339 'verbose': VerboseStatusFormatter
,
340 'json': JSONStatusFormatter
,
341 'urls': URLStatusFormatter
,
342 'ansi': AnsiStatusFormatter
344 formatters
['status'] = status_formatters
347 'default': AdminFormatter
,
348 'verbose': VerboseAdminFormatter
,
349 'urls': AdminFormatter
,
350 'ansi': AdminFormatter
352 formatters
['admin'] = admin_formatters
354 search_formatters
= {
355 'default': SearchFormatter
,
356 'verbose': VerboseSearchFormatter
,
357 'urls': URLSearchFormatter
,
358 'ansi': AnsiSearchFormatter
360 formatters
['search'] = search_formatters
363 'default': ListsFormatter
,
364 'verbose': ListsVerboseFormatter
,
366 'ansi': AnsiListsFormatter
368 formatters
['lists'] = lists_formatters
370 def get_formatter(action_type
, options
):
371 formatters_dict
= formatters
.get(action_type
)
372 if not formatters_dict
:
374 "There was an error finding a class of formatters for your type (%s)"
376 f
= formatters_dict
.get(options
['format'])
379 "Unknown formatter '%s' for status actions" % (options
['format']))
382 class Action(object):
384 def ask(self
, subject
='perform this action', careful
=False):
386 Requests from the user using `raw_input` if `subject` should be
387 performed. When `careful`, the default answer is NO, otherwise YES.
388 Returns the user answer in the form `True` or `False`.
394 prompt
= 'You really want to %s %s? ' % (subject
, sample
)
396 answer
= input(prompt
).lower()
398 return answer
in ('yes', 'y')
400 return answer
not in ('no', 'n')
402 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
404 # Figure out why on OS X the raw_input keeps raising
405 # EOFError and is never able to reset and get more input
406 # Hint: Look at how IPython implements their console
412 def __call__(self
, twitter
, options
):
413 action
= actions
.get(options
['action'], NoSuchAction
)()
415 doAction
= lambda: action(twitter
, options
)
416 if options
['refresh'] and isinstance(action
, StatusAction
):
420 time
.sleep(options
['refresh_rate'])
423 except KeyboardInterrupt:
424 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
427 class NoSuchActionError(Exception):
430 class NoSuchAction(Action
):
431 def __call__(self
, twitter
, options
):
432 raise NoSuchActionError("No such action: %s" % (options
['action']))
434 class StatusAction(Action
):
435 def __call__(self
, twitter
, options
):
436 statuses
= self
.getStatuses(twitter
, options
)
437 sf
= get_formatter('status', options
)
438 for status
in statuses
:
439 statusStr
= sf(status
, options
)
440 if statusStr
.strip():
441 printNicely(statusStr
)
443 class SearchAction(Action
):
444 def __call__(self
, twitter
, options
):
445 # We need to be pointing at search.twitter.com to work, and it is less
446 # tangly to do it here than in the main()
447 twitter
.domain
= "search.twitter.com"
448 twitter
.uriparts
= ()
449 # We need to bypass the TwitterCall parameter encoding, so we
450 # don't encode the plus sign, so we have to encode it ourselves
451 query_string
= "+".join(
453 for term
in options
['extra_args']])
455 results
= twitter
.search(q
=query_string
)['results']
456 f
= get_formatter('search', options
)
457 for result
in results
:
458 resultStr
= f(result
, options
)
459 if resultStr
.strip():
460 printNicely(resultStr
)
462 class AdminAction(Action
):
463 def __call__(self
, twitter
, options
):
464 if not (options
['extra_args'] and options
['extra_args'][0]):
465 raise TwitterError("You need to specify a user (screen name)")
466 af
= get_formatter('admin', options
)
468 user
= self
.getUser(twitter
, options
['extra_args'][0])
469 except TwitterError
as e
:
470 print("There was a problem following or leaving the specified user.")
471 print("You may be trying to follow a user you are already following;")
472 print("Leaving a user you are not currently following;")
473 print("Or the user may not exist.")
478 printNicely(af(options
['action'], user
))
480 class ListsAction(StatusAction
):
481 def getStatuses(self
, twitter
, options
):
482 if not options
['extra_args']:
483 raise TwitterError("Please provide a user to query for lists")
485 screen_name
= options
['extra_args'][0]
487 if not options
['extra_args'][1:]:
488 lists
= twitter
.lists
.list(screen_name
=screen_name
)
490 printNicely("This user has no lists.")
492 lf
= get_formatter('lists', options
)
493 printNicely(lf(list))
496 return list(reversed(twitter
.lists
.statuses(
497 count
=options
['length'],
498 owner_screen_name
=screen_name
,
499 slug
=options
['extra_args'][1])))
502 class MyListsAction(ListsAction
):
503 def getStatuses(self
, twitter
, options
):
504 screen_name
= twitter
.account
.verify_credentials()['screen_name']
505 options
['extra_args'].insert(0, screen_name
)
506 return ListsAction
.getStatuses(self
, twitter
, options
)
509 class FriendsAction(StatusAction
):
510 def getStatuses(self
, twitter
, options
):
511 return list(reversed(
512 twitter
.statuses
.home_timeline(count
=options
["length"])))
514 class RepliesAction(StatusAction
):
515 def getStatuses(self
, twitter
, options
):
516 return list(reversed(
517 twitter
.statuses
.mentions_timeline(count
=options
["length"])))
519 class FollowAction(AdminAction
):
520 def getUser(self
, twitter
, user
):
521 return twitter
.friendships
.create(screen_name
=user
)
523 class LeaveAction(AdminAction
):
524 def getUser(self
, twitter
, user
):
525 return twitter
.friendships
.destroy(screen_name
=user
)
527 class SetStatusAction(Action
):
528 def __call__(self
, twitter
, options
):
529 statusTxt
= (" ".join(options
['extra_args'])
530 if options
['extra_args']
531 else str(input("message: ")))
533 ptr
= re
.compile("@[\w_]+")
535 s
= ptr
.match(statusTxt
)
536 if s
and s
.start() == 0:
537 replies
.append(statusTxt
[s
.start():s
.end()])
538 statusTxt
= statusTxt
[s
.end() + 1:]
541 replies
= " ".join(replies
)
542 if len(replies
) >= 140:
549 limit
= 140 - len(replies
)
550 if len(statusTxt
) > limit
:
551 end
= str.rfind(statusTxt
, ' ', 0, limit
)
554 splitted
.append(" ".join((replies
, statusTxt
[:end
])))
555 statusTxt
= statusTxt
[end
:]
557 if options
['invert_split']:
559 for status
in splitted
:
560 twitter
.statuses
.update(status
=status
)
562 class TwitterShell(Action
):
564 def render_prompt(self
, prompt
):
565 '''Parses the `prompt` string and returns the rendered version'''
566 prompt
= prompt
.strip("'").replace("\\'", "'")
567 for colour
in ansi
.COLOURS_NAMED
:
568 if '[%s]' % (colour
) in prompt
:
569 prompt
= prompt
.replace(
570 '[%s]' % (colour
), ansiFormatter
.cmdColourNamed(colour
))
571 prompt
= prompt
.replace('[R]', ansiFormatter
.cmdReset())
574 def __call__(self
, twitter
, options
):
575 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
577 options
['action'] = ""
579 args
= input(prompt
).split()
580 parse_args(args
, options
)
581 if not options
['action']:
583 elif options
['action'] == 'exit':
585 elif options
['action'] == 'shell':
586 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
588 elif options
['action'] == 'help':
589 print('''\ntwitter> `action`\n
590 The Shell Accepts all the command line actions along with:
592 exit Leave the twitter shell (^D may also be used)
594 Full CMD Line help is appended below for your convinience.''', file=sys
.stderr
)
595 Action()(twitter
, options
)
596 options
['action'] = ''
597 except NoSuchActionError
as e
:
598 print(e
, file=sys
.stderr
)
599 except KeyboardInterrupt:
600 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
602 print(file=sys
.stderr
)
603 leaving
= self
.ask(subject
='Leave')
605 print('Excellent!', file=sys
.stderr
)
609 class PythonPromptAction(Action
):
610 def __call__(self
, twitter
, options
):
613 smrt_input(globals(), locals())
617 class HelpAction(Action
):
618 def __call__(self
, twitter
, options
):
621 class DoNothingAction(Action
):
622 def __call__(self
, twitter
, options
):
625 class RateLimitStatus(Action
):
626 def __call__(self
, twitter
, options
):
627 rate
= twitter
.application
.rate_limit_status()
628 print("Remaining API requests: %s / %s (hourly limit)" % (
629 rate
['remaining_hits'], rate
['hourly_limit']))
630 print("Next reset in %ss (%s)" % (
631 int(rate
['reset_time_in_seconds'] - time
.time()),
632 time
.asctime(time
.localtime(rate
['reset_time_in_seconds']))))
635 'authorize' : DoNothingAction
,
636 'follow' : FollowAction
,
637 'friends' : FriendsAction
,
638 'list' : ListsAction
,
639 'mylist' : MyListsAction
,
641 'leave' : LeaveAction
,
642 'pyprompt' : PythonPromptAction
,
643 'replies' : RepliesAction
,
644 'search' : SearchAction
,
645 'set' : SetStatusAction
,
646 'shell' : TwitterShell
,
647 'rate' : RateLimitStatus
,
650 def loadConfig(filename
):
651 options
= dict(OPTIONS
)
652 if os
.path
.exists(filename
):
653 cp
= SafeConfigParser()
655 for option
in ('format', 'prompt'):
656 if cp
.has_option('twitter', option
):
657 options
[option
] = cp
.get('twitter', option
)
659 for option
in ('invert_split',):
660 if cp
.has_option('twitter', option
):
661 options
[option
] = cp
.getboolean('twitter', option
)
664 def main(args
=sys
.argv
[1:]):
667 parse_args(args
, arg_options
)
668 except GetoptError
as e
:
669 print("I can't do that, %s." % (e
), file=sys
.stderr
)
670 print(file=sys
.stderr
)
673 config_path
= os
.path
.expanduser(
674 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
675 config_options
= loadConfig(config_path
)
677 # Apply the various options in order, the most important applied last.
678 # Defaults first, then what's read from config file, then command-line
680 options
= dict(OPTIONS
)
681 for d
in config_options
, arg_options
:
682 for k
, v
in list(d
.items()):
685 if options
['refresh'] and options
['action'] not in ('friends', 'replies'):
686 print("You can only refresh the friends or replies actions.",
688 print("Use 'twitter -h' for help.", file=sys
.stderr
)
691 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
693 if options
['action'] == 'authorize' or not os
.path
.exists(oauth_filename
):
695 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
696 options
['oauth_filename'])
699 ansiFormatter
= ansi
.AnsiCmd(options
["force-ansi"])
701 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
705 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
706 secure
=options
['secure'],
708 domain
='api.twitter.com')
711 Action()(twitter
, options
)
712 except NoSuchActionError
as e
:
713 print(e
, file=sys
.stderr
)
715 except TwitterError
as e
:
716 print(str(e
), file=sys
.stderr
)
717 print("Use 'twitter -h' for help.", file=sys
.stderr
)