]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
e7cef99f3212b6e7b2dce12d01d5073cb64e52ec
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 pyprompt start a Python prompt for interacting with the twitter
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
42 FORMATS for the --format option
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
48 ansi ansi colour (rainbow mode)
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
65 from __future__
import print_function
68 input = __builtins__
.raw_input
69 except (AttributeError, KeyError):
73 CONSUMER_KEY
= 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET
= 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
76 from getopt
import gnu_getopt
as getopt
, GetoptError
85 from ConfigParser
import SafeConfigParser
87 from configparser
import ConfigParser
as SafeConfigParser
90 from urllib
.parse
import quote
92 from urllib2
import quote
96 import html
.parser
as HTMLParser
98 from .api
import Twitter
, TwitterError
99 from .oauth
import OAuth
, read_token_file
100 from .oauth_dance
import oauth_dance
102 from .util
import smrt_input
, printNicely
, align_text
109 'prompt': '[cyan]twitter[R]> ',
110 'config_filename': os
.environ
.get('HOME',
111 os
.environ
.get('USERPROFILE', ''))
112 + os
.sep
+ '.twitter',
113 'oauth_filename': os
.environ
.get('HOME',
114 os
.environ
.get('USERPROFILE', ''))
115 + os
.sep
+ '.twitter_oauth',
121 'invert_split': False,
125 gHtmlParser
= HTMLParser
.HTMLParser()
126 hashtagRe
= re
.compile(r
'(?P<hashtag>#\S+)')
127 profileRe
= re
.compile(r
'(?P<profile>\@\S+)')
128 ansiFormatter
= ansi
.AnsiCmd(False)
131 def parse_args(args
, options
):
132 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
133 'refresh-rate=', 'config=', 'length=', 'timestamp',
134 'datestamp', 'no-ssl', 'force-ansi']
135 short_opts
= "e:p:f:h?rR:c:l:td"
136 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
137 if extra_args
and hasattr(extra_args
[0], 'decode'):
138 extra_args
= [arg
.decode(locale
.getpreferredencoding())
139 for arg
in extra_args
]
141 for opt
, arg
in opts
:
142 if opt
in ('-f', '--format'):
143 options
['format'] = arg
144 elif opt
in ('-r', '--refresh'):
145 options
['refresh'] = True
146 elif opt
in ('-R', '--refresh-rate'):
147 options
['refresh_rate'] = int(arg
)
148 elif opt
in ('-l', '--length'):
149 options
["length"] = int(arg
)
150 elif opt
in ('-t', '--timestamp'):
151 options
["timestamp"] = True
152 elif opt
in ('-d', '--datestamp'):
153 options
["datestamp"] = True
154 elif opt
in ('-?', '-h', '--help'):
155 options
['action'] = 'help'
156 elif opt
in ('-c', '--config'):
157 options
['config_filename'] = arg
158 elif opt
== '--no-ssl':
159 options
['secure'] = False
160 elif opt
== '--oauth':
161 options
['oauth_filename'] = arg
162 elif opt
== '--force-ansi':
163 options
['force-ansi'] = True
165 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
166 options
['action'] = extra_args
[0]
167 options
['extra_args'] = extra_args
[1:]
170 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
171 timestamp
= options
["timestamp"]
172 datestamp
= options
["datestamp"]
173 t
= time
.strptime(status
['created_at'], format
)
174 i_hate_timezones
= time
.timezone
176 i_hate_timezones
= time
.altzone
177 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
178 seconds
=i_hate_timezones
)
180 if timestamp
and datestamp
:
181 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
183 return time
.strftime("%H:%M:%S ", t
)
185 return time
.strftime("%Y-%m-%d ", t
)
191 'clear': ansiFormatter
.cmdReset(),
192 'hashtag': ansiFormatter
.cmdBold(),
193 'profile': ansiFormatter
.cmdUnderline(),
200 s
= '%s%s%s' % (ansiTypes
[mkey
], m
.group(mkey
), ansiTypes
['clear'])
206 def replaceInStatus(status
):
207 txt
= gHtmlParser
.unescape(status
)
208 txt
= re
.sub(hashtagRe
, reRepl
, txt
)
209 txt
= re
.sub(profileRe
, reRepl
, txt
)
213 def correctRTStatus(status
):
214 if 'retweeted_status' in status
:
215 return ("RT @" + status
['retweeted_status']['user']['screen_name']
216 + " " + status
['retweeted_status']['text'])
218 return status
['text']
221 class StatusFormatter(object):
222 def __call__(self
, status
, options
):
223 return ("%s@%s %s" % (
224 get_time_string(status
, options
),
225 status
['user']['screen_name'],
226 gHtmlParser
.unescape(correctRTStatus(status
))))
229 class AnsiStatusFormatter(object):
231 self
._colourMap
= ansi
.ColourMap()
233 def __call__(self
, status
, options
):
234 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
235 return ("%s%s% 16s%s %s " % (
236 get_time_string(status
, options
),
237 ansiFormatter
.cmdColour(colour
), status
['user']['screen_name'],
238 ansiFormatter
.cmdReset(),
239 align_text(replaceInStatus(correctRTStatus(status
)))))
242 class VerboseStatusFormatter(object):
243 def __call__(self
, status
, options
):
244 return ("-- %s (%s) on %s\n%s\n" % (
245 status
['user']['screen_name'],
246 status
['user']['location'],
247 status
['created_at'],
248 gHtmlParser
.unescape(correctRTStatus(status
))))
251 class JSONStatusFormatter(object):
252 def __call__(self
, status
, options
):
253 status
['text'] = gHtmlParser
.unescape(status
['text'])
254 return json
.dumps(status
)
257 class URLStatusFormatter(object):
258 urlmatch
= re
.compile(r
'https?://\S+')
260 def __call__(self
, status
, options
):
261 urls
= self
.urlmatch
.findall(correctRTStatus(status
))
262 return '\n'.join(urls
) if urls
else ""
265 class ListsFormatter(object):
266 def __call__(self
, list):
267 if list['description']:
268 list_str
= "%-30s (%s)" % (list['name'], list['description'])
270 list_str
= "%-30s" % (list['name'])
271 return "%s\n" % list_str
274 class ListsVerboseFormatter(object):
275 def __call__(self
, list):
276 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (
277 list['name'], list['description'],
278 list['member_count'], list['mode'])
282 class AnsiListsFormatter(object):
284 self
._colourMap
= ansi
.ColourMap()
286 def __call__(self
, list):
287 colour
= self
._colourMap
.colourFor(list['name'])
288 return ("%s%-15s%s %s" % (
289 ansiFormatter
.cmdColour(colour
), list['name'],
290 ansiFormatter
.cmdReset(), list['description']))
293 class AdminFormatter(object):
294 def __call__(self
, action
, user
):
295 user_str
= "%s (%s)" % (user
['screen_name'], user
['name'])
296 if action
== "follow":
297 return "You are now following %s.\n" % (user_str
)
299 return "You are no longer following %s.\n" % (user_str
)
302 class VerboseAdminFormatter(object):
303 def __call__(self
, action
, user
):
304 return("-- %s: %s (%s): %s" % (
305 "Following" if action
== "follow" else "Leaving",
311 class SearchFormatter(object):
312 def __call__(self
, result
, options
):
314 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
315 result
['from_user'], result
['text']))
318 class VerboseSearchFormatter(SearchFormatter
):
319 pass # Default to the regular one
322 class URLSearchFormatter(object):
323 urlmatch
= re
.compile(r
'https?://\S+')
325 def __call__(self
, result
, options
):
326 urls
= self
.urlmatch
.findall(result
['text'])
327 return '\n'.join(urls
) if urls
else ""
330 class AnsiSearchFormatter(object):
332 self
._colourMap
= ansi
.ColourMap()
334 def __call__(self
, result
, options
):
335 colour
= self
._colourMap
.colourFor(result
['from_user'])
336 return ("%s%s%s%s %s" % (
337 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
338 ansiFormatter
.cmdColour(colour
), result
['from_user'],
339 ansiFormatter
.cmdReset(), result
['text']))
341 _term_encoding
= None
344 def get_term_encoding():
345 global _term_encoding
346 if not _term_encoding
:
347 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
349 _term_encoding
= lang
[1]
351 _term_encoding
= 'UTF-8'
352 return _term_encoding
355 status_formatters
= {
356 'default': StatusFormatter
,
357 'verbose': VerboseStatusFormatter
,
358 'json': JSONStatusFormatter
,
359 'urls': URLStatusFormatter
,
360 'ansi': AnsiStatusFormatter
362 formatters
['status'] = status_formatters
365 'default': AdminFormatter
,
366 'verbose': VerboseAdminFormatter
,
367 'urls': AdminFormatter
,
368 'ansi': AdminFormatter
370 formatters
['admin'] = admin_formatters
372 search_formatters
= {
373 'default': SearchFormatter
,
374 'verbose': VerboseSearchFormatter
,
375 'urls': URLSearchFormatter
,
376 'ansi': AnsiSearchFormatter
378 formatters
['search'] = search_formatters
381 'default': ListsFormatter
,
382 'verbose': ListsVerboseFormatter
,
384 'ansi': AnsiListsFormatter
386 formatters
['lists'] = lists_formatters
389 def get_formatter(action_type
, options
):
390 formatters_dict
= formatters
.get(action_type
)
391 if not formatters_dict
:
393 "There was an error finding a class of formatters for your type (%s)"
395 f
= formatters_dict
.get(options
['format'])
398 "Unknown formatter '%s' for status actions" % (options
['format']))
402 class Action(object):
404 def ask(self
, subject
='perform this action', careful
=False):
406 Requests from the user using `raw_input` if `subject` should be
407 performed. When `careful`, the default answer is NO, otherwise YES.
408 Returns the user answer in the form `True` or `False`.
414 prompt
= 'You really want to %s %s? ' % (subject
, sample
)
416 answer
= input(prompt
).lower()
418 return answer
in ('yes', 'y')
420 return answer
not in ('no', 'n')
422 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
424 # Figure out why on OS X the raw_input keeps raising
425 # EOFError and is never able to reset and get more input
426 # Hint: Look at how IPython implements their console
432 def __call__(self
, twitter
, options
):
433 action
= actions
.get(options
['action'], NoSuchAction
)()
435 doAction
= lambda: action(twitter
, options
)
436 if options
['refresh'] and isinstance(action
, StatusAction
):
440 time
.sleep(options
['refresh_rate'])
443 except KeyboardInterrupt:
444 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
448 class NoSuchActionError(Exception):
452 class NoSuchAction(Action
):
453 def __call__(self
, twitter
, options
):
454 raise NoSuchActionError("No such action: %s" % (options
['action']))
457 class StatusAction(Action
):
458 def __call__(self
, twitter
, options
):
459 statuses
= self
.getStatuses(twitter
, options
)
460 sf
= get_formatter('status', options
)
461 for status
in statuses
:
462 statusStr
= sf(status
, options
)
463 if statusStr
.strip():
464 printNicely(statusStr
)
467 class SearchAction(Action
):
468 def __call__(self
, twitter
, options
):
469 # We need to be pointing at search.twitter.com to work, and it is less
470 # tangly to do it here than in the main()
471 twitter
.domain
= "search.twitter.com"
472 twitter
.uriparts
= ()
473 # We need to bypass the TwitterCall parameter encoding, so we
474 # don't encode the plus sign, so we have to encode it ourselves
475 query_string
= "+".join(
477 for term
in options
['extra_args']])
479 results
= twitter
.search(q
=query_string
)['results']
480 f
= get_formatter('search', options
)
481 for result
in results
:
482 resultStr
= f(result
, options
)
483 if resultStr
.strip():
484 printNicely(resultStr
)
487 class AdminAction(Action
):
488 def __call__(self
, twitter
, options
):
489 if not (options
['extra_args'] and options
['extra_args'][0]):
490 raise TwitterError("You need to specify a user (screen name)")
491 af
= get_formatter('admin', options
)
493 user
= self
.getUser(twitter
, options
['extra_args'][0])
494 except TwitterError
as e
:
495 print("There was a problem following or leaving the specified user.")
496 print("You may be trying to follow a user you are already following;")
497 print("Leaving a user you are not currently following;")
498 print("Or the user may not exist.")
503 printNicely(af(options
['action'], user
))
506 class ListsAction(StatusAction
):
507 def getStatuses(self
, twitter
, options
):
508 if not options
['extra_args']:
509 raise TwitterError("Please provide a user to query for lists")
511 screen_name
= options
['extra_args'][0]
513 if not options
['extra_args'][1:]:
514 lists
= twitter
.lists
.list(screen_name
=screen_name
)
516 printNicely("This user has no lists.")
518 lf
= get_formatter('lists', options
)
519 printNicely(lf(list))
522 return list(reversed(twitter
.lists
.statuses(
523 count
=options
['length'],
524 owner_screen_name
=screen_name
,
525 slug
=options
['extra_args'][1])))
528 class MyListsAction(ListsAction
):
529 def getStatuses(self
, twitter
, options
):
530 screen_name
= twitter
.account
.verify_credentials()['screen_name']
531 options
['extra_args'].insert(0, screen_name
)
532 return ListsAction
.getStatuses(self
, twitter
, options
)
535 class FriendsAction(StatusAction
):
536 def getStatuses(self
, twitter
, options
):
537 return list(reversed(
538 twitter
.statuses
.home_timeline(count
=options
["length"])))
541 class RepliesAction(StatusAction
):
542 def getStatuses(self
, twitter
, options
):
543 return list(reversed(
544 twitter
.statuses
.mentions_timeline(count
=options
["length"])))
547 class FollowAction(AdminAction
):
548 def getUser(self
, twitter
, user
):
549 return twitter
.friendships
.create(screen_name
=user
)
552 class LeaveAction(AdminAction
):
553 def getUser(self
, twitter
, user
):
554 return twitter
.friendships
.destroy(screen_name
=user
)
557 class SetStatusAction(Action
):
558 def __call__(self
, twitter
, options
):
559 statusTxt
= (" ".join(options
['extra_args'])
560 if options
['extra_args']
561 else str(input("message: ")))
563 ptr
= re
.compile("@[\w_]+")
565 s
= ptr
.match(statusTxt
)
566 if s
and s
.start() == 0:
567 replies
.append(statusTxt
[s
.start():s
.end()])
568 statusTxt
= statusTxt
[s
.end() + 1:]
571 replies
= " ".join(replies
)
572 if len(replies
) >= 140:
579 limit
= 140 - len(replies
)
580 if len(statusTxt
) > limit
:
581 end
= str.rfind(statusTxt
, ' ', 0, limit
)
584 splitted
.append(" ".join((replies
, statusTxt
[:end
])))
585 statusTxt
= statusTxt
[end
:]
587 if options
['invert_split']:
589 for status
in splitted
:
590 twitter
.statuses
.update(status
=status
)
593 class TwitterShell(Action
):
595 def render_prompt(self
, prompt
):
596 '''Parses the `prompt` string and returns the rendered version'''
597 prompt
= prompt
.strip("'").replace("\\'", "'")
598 for colour
in ansi
.COLOURS_NAMED
:
599 if '[%s]' % (colour
) in prompt
:
600 prompt
= prompt
.replace(
601 '[%s]' % (colour
), ansiFormatter
.cmdColourNamed(colour
))
602 prompt
= prompt
.replace('[R]', ansiFormatter
.cmdReset())
605 def __call__(self
, twitter
, options
):
606 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
608 options
['action'] = ""
610 args
= input(prompt
).split()
611 parse_args(args
, options
)
612 if not options
['action']:
614 elif options
['action'] == 'exit':
616 elif options
['action'] == 'shell':
617 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
619 elif options
['action'] == 'help':
620 print('''\ntwitter> `action`\n
621 The Shell accepts all the command line actions along with:
623 exit Leave the twitter shell (^D may also be used)
625 Full CMD Line help is appended below for your convenience.''',
627 Action()(twitter
, options
)
628 options
['action'] = ''
629 except NoSuchActionError
as e
:
630 print(e
, file=sys
.stderr
)
631 except KeyboardInterrupt:
632 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
634 print(file=sys
.stderr
)
635 leaving
= self
.ask(subject
='Leave')
637 print('Excellent!', file=sys
.stderr
)
642 class PythonPromptAction(Action
):
643 def __call__(self
, twitter
, options
):
646 smrt_input(globals(), locals())
651 class HelpAction(Action
):
652 def __call__(self
, twitter
, options
):
656 class DoNothingAction(Action
):
657 def __call__(self
, twitter
, options
):
661 class RateLimitStatus(Action
):
662 def __call__(self
, twitter
, options
):
663 rate
= twitter
.application
.rate_limit_status()
664 resources
= rate
['resources']
665 for resource
in resources
:
666 for method
in resources
[resource
]:
667 limit
= resources
[resource
][method
]['limit']
668 remaining
= resources
[resource
][method
]['remaining']
669 reset
= resources
[resource
][method
]['reset']
671 print("Remaining API requests for %s: %s / %s" %
672 (method
, remaining
, limit
))
673 print("Next reset in %ss (%s)\n" % (int(reset
- time
.time()),
674 time
.asctime(time
.localtime(reset
))))
677 'authorize' : DoNothingAction
,
678 'follow' : FollowAction
,
679 'friends' : FriendsAction
,
680 'list' : ListsAction
,
681 'mylist' : MyListsAction
,
683 'leave' : LeaveAction
,
684 'pyprompt' : PythonPromptAction
,
685 'replies' : RepliesAction
,
686 'search' : SearchAction
,
687 'set' : SetStatusAction
,
688 'shell' : TwitterShell
,
689 'rate' : RateLimitStatus
,
693 def loadConfig(filename
):
694 options
= dict(OPTIONS
)
695 if os
.path
.exists(filename
):
696 cp
= SafeConfigParser()
698 for option
in ('format', 'prompt'):
699 if cp
.has_option('twitter', option
):
700 options
[option
] = cp
.get('twitter', option
)
702 for option
in ('invert_split',):
703 if cp
.has_option('twitter', option
):
704 options
[option
] = cp
.getboolean('twitter', option
)
708 def main(args
=sys
.argv
[1:]):
711 parse_args(args
, arg_options
)
712 except GetoptError
as e
:
713 print("I can't do that, %s." % (e
), file=sys
.stderr
)
714 print(file=sys
.stderr
)
717 config_path
= os
.path
.expanduser(
718 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
719 config_options
= loadConfig(config_path
)
721 # Apply the various options in order, the most important applied last.
722 # Defaults first, then what's read from config file, then command-line
724 options
= dict(OPTIONS
)
725 for d
in config_options
, arg_options
:
726 for k
, v
in list(d
.items()):
730 if options
['refresh'] and options
['action'] not in ('friends', 'replies'):
731 print("You can only refresh the friends or replies actions.",
733 print("Use 'twitter -h' for help.", file=sys
.stderr
)
736 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
738 if options
['action'] == 'authorize' or not os
.path
.exists(oauth_filename
):
740 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
741 options
['oauth_filename'])
744 ansiFormatter
= ansi
.AnsiCmd(options
["force-ansi"])
746 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
750 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
751 secure
=options
['secure'],
753 domain
='api.twitter.com')
756 Action()(twitter
, options
)
757 except NoSuchActionError
as e
:
758 print(e
, file=sys
.stderr
)
760 except TwitterError
as e
:
761 print(str(e
), file=sys
.stderr
)
762 print("Use 'twitter -h' for help.", file=sys
.stderr
)