]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 pyprompt start a Python prompt for interacting with the twitter
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
25 repl begin a Read-Eval-Print-Loop with a configured twitter
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
43 FORMATS for the --format option
45 default one line per status
46 verbose multiple lines per status, more verbose status info
47 json raw json data from the api on each line
49 ansi ansi colour (rainbow mode)
54 The config file should be placed in your home directory and be named .twitter.
55 It must contain a [twitter] header, and all the desired options you wish to
59 format: <desired_default_format_for_output>
60 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
62 OAuth authentication tokens are stored in the file .twitter_oauth in your
66 from __future__
import print_function
69 input = __builtins__
.raw_input
70 except (AttributeError, KeyError):
74 CONSUMER_KEY
= 'uS6hO2sV6tDKIOeVjhnFnQ'
75 CONSUMER_SECRET
= 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
78 from getopt
import gnu_getopt
as getopt
, GetoptError
87 from ConfigParser
import SafeConfigParser
89 from configparser
import ConfigParser
as SafeConfigParser
92 from urllib
.parse
import quote
94 from urllib2
import quote
98 import html
.parser
as HTMLParser
100 from .api
import Twitter
, TwitterError
101 from .oauth
import OAuth
, read_token_file
102 from .oauth_dance
import oauth_dance
104 from .util
import smrt_input
, printNicely
, align_text
111 'prompt': '[cyan]twitter[R]> ',
112 'config_filename': os
.environ
.get('HOME',
113 os
.environ
.get('USERPROFILE', ''))
114 + os
.sep
+ '.twitter',
115 'oauth_filename': os
.environ
.get('HOME',
116 os
.environ
.get('USERPROFILE', ''))
117 + os
.sep
+ '.twitter_oauth',
123 'invert_split': False,
127 gHtmlParser
= HTMLParser
.HTMLParser()
128 hashtagRe
= re
.compile(r
'(?P<hashtag>#\S+)')
129 profileRe
= re
.compile(r
'(?P<profile>\@\S+)')
130 ansiFormatter
= ansi
.AnsiCmd(False)
133 def parse_args(args
, options
):
134 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
135 'refresh-rate=', 'config=', 'length=', 'timestamp',
136 'datestamp', 'no-ssl', 'force-ansi']
137 short_opts
= "e:p:f:h?rR:c:l:td"
138 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
139 if extra_args
and hasattr(extra_args
[0], 'decode'):
140 extra_args
= [arg
.decode(locale
.getpreferredencoding())
141 for arg
in extra_args
]
143 for opt
, arg
in opts
:
144 if opt
in ('-f', '--format'):
145 options
['format'] = arg
146 elif opt
in ('-r', '--refresh'):
147 options
['refresh'] = True
148 elif opt
in ('-R', '--refresh-rate'):
149 options
['refresh_rate'] = int(arg
)
150 elif opt
in ('-l', '--length'):
151 options
["length"] = int(arg
)
152 elif opt
in ('-t', '--timestamp'):
153 options
["timestamp"] = True
154 elif opt
in ('-d', '--datestamp'):
155 options
["datestamp"] = True
156 elif opt
in ('-?', '-h', '--help'):
157 options
['action'] = 'help'
158 elif opt
in ('-c', '--config'):
159 options
['config_filename'] = arg
160 elif opt
== '--no-ssl':
161 options
['secure'] = False
162 elif opt
== '--oauth':
163 options
['oauth_filename'] = arg
164 elif opt
== '--force-ansi':
165 options
['force-ansi'] = True
167 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
168 options
['action'] = extra_args
[0]
169 options
['extra_args'] = extra_args
[1:]
172 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
173 timestamp
= options
["timestamp"]
174 datestamp
= options
["datestamp"]
175 t
= time
.strptime(status
['created_at'], format
)
176 i_hate_timezones
= time
.timezone
178 i_hate_timezones
= time
.altzone
179 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
180 seconds
=i_hate_timezones
)
182 if timestamp
and datestamp
:
183 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
185 return time
.strftime("%H:%M:%S ", t
)
187 return time
.strftime("%Y-%m-%d ", t
)
193 'clear': ansiFormatter
.cmdReset(),
194 'hashtag': ansiFormatter
.cmdBold(),
195 'profile': ansiFormatter
.cmdUnderline(),
202 s
= '%s%s%s' % (ansiTypes
[mkey
], m
.group(mkey
), ansiTypes
['clear'])
208 def replaceInStatus(status
):
209 txt
= gHtmlParser
.unescape(status
)
210 txt
= re
.sub(hashtagRe
, reRepl
, txt
)
211 txt
= re
.sub(profileRe
, reRepl
, txt
)
215 def correctRTStatus(status
):
216 if 'retweeted_status' in status
:
217 return ("RT @" + status
['retweeted_status']['user']['screen_name']
218 + " " + status
['retweeted_status']['text'])
220 return status
['text']
223 class StatusFormatter(object):
224 def __call__(self
, status
, options
):
225 return ("%s@%s %s" % (
226 get_time_string(status
, options
),
227 status
['user']['screen_name'],
228 gHtmlParser
.unescape(correctRTStatus(status
))))
231 class AnsiStatusFormatter(object):
233 self
._colourMap
= ansi
.ColourMap()
235 def __call__(self
, status
, options
):
236 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
237 return ("%s%s% 16s%s %s " % (
238 get_time_string(status
, options
),
239 ansiFormatter
.cmdColour(colour
), status
['user']['screen_name'],
240 ansiFormatter
.cmdReset(),
241 align_text(replaceInStatus(correctRTStatus(status
)))))
244 class VerboseStatusFormatter(object):
245 def __call__(self
, status
, options
):
246 return ("-- %s (%s) on %s\n%s\n" % (
247 status
['user']['screen_name'],
248 status
['user']['location'],
249 status
['created_at'],
250 gHtmlParser
.unescape(correctRTStatus(status
))))
253 class JSONStatusFormatter(object):
254 def __call__(self
, status
, options
):
255 status
['text'] = gHtmlParser
.unescape(status
['text'])
256 return json
.dumps(status
)
259 class URLStatusFormatter(object):
260 urlmatch
= re
.compile(r
'https?://\S+')
262 def __call__(self
, status
, options
):
263 urls
= self
.urlmatch
.findall(correctRTStatus(status
))
264 return '\n'.join(urls
) if urls
else ""
267 class ListsFormatter(object):
268 def __call__(self
, list):
269 if list['description']:
270 list_str
= "%-30s (%s)" % (list['name'], list['description'])
272 list_str
= "%-30s" % (list['name'])
273 return "%s\n" % list_str
276 class ListsVerboseFormatter(object):
277 def __call__(self
, list):
278 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (
279 list['name'], list['description'],
280 list['member_count'], list['mode'])
284 class AnsiListsFormatter(object):
286 self
._colourMap
= ansi
.ColourMap()
288 def __call__(self
, list):
289 colour
= self
._colourMap
.colourFor(list['name'])
290 return ("%s%-15s%s %s" % (
291 ansiFormatter
.cmdColour(colour
), list['name'],
292 ansiFormatter
.cmdReset(), list['description']))
295 class AdminFormatter(object):
296 def __call__(self
, action
, user
):
297 user_str
= "%s (%s)" % (user
['screen_name'], user
['name'])
298 if action
== "follow":
299 return "You are now following %s.\n" % (user_str
)
301 return "You are no longer following %s.\n" % (user_str
)
304 class VerboseAdminFormatter(object):
305 def __call__(self
, action
, user
):
306 return("-- %s: %s (%s): %s" % (
307 "Following" if action
== "follow" else "Leaving",
313 class SearchFormatter(object):
314 def __call__(self
, result
, options
):
316 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
317 result
['from_user'], result
['text']))
320 class VerboseSearchFormatter(SearchFormatter
):
321 pass # Default to the regular one
324 class URLSearchFormatter(object):
325 urlmatch
= re
.compile(r
'https?://\S+')
327 def __call__(self
, result
, options
):
328 urls
= self
.urlmatch
.findall(result
['text'])
329 return '\n'.join(urls
) if urls
else ""
332 class AnsiSearchFormatter(object):
334 self
._colourMap
= ansi
.ColourMap()
336 def __call__(self
, result
, options
):
337 colour
= self
._colourMap
.colourFor(result
['from_user'])
338 return ("%s%s%s%s %s" % (
339 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
340 ansiFormatter
.cmdColour(colour
), result
['from_user'],
341 ansiFormatter
.cmdReset(), result
['text']))
343 _term_encoding
= None
346 def get_term_encoding():
347 global _term_encoding
348 if not _term_encoding
:
349 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
351 _term_encoding
= lang
[1]
353 _term_encoding
= 'UTF-8'
354 return _term_encoding
357 status_formatters
= {
358 'default': StatusFormatter
,
359 'verbose': VerboseStatusFormatter
,
360 'json': JSONStatusFormatter
,
361 'urls': URLStatusFormatter
,
362 'ansi': AnsiStatusFormatter
364 formatters
['status'] = status_formatters
367 'default': AdminFormatter
,
368 'verbose': VerboseAdminFormatter
,
369 'urls': AdminFormatter
,
370 'ansi': AdminFormatter
372 formatters
['admin'] = admin_formatters
374 search_formatters
= {
375 'default': SearchFormatter
,
376 'verbose': VerboseSearchFormatter
,
377 'urls': URLSearchFormatter
,
378 'ansi': AnsiSearchFormatter
380 formatters
['search'] = search_formatters
383 'default': ListsFormatter
,
384 'verbose': ListsVerboseFormatter
,
386 'ansi': AnsiListsFormatter
388 formatters
['lists'] = lists_formatters
391 def get_formatter(action_type
, options
):
392 formatters_dict
= formatters
.get(action_type
)
393 if not formatters_dict
:
395 "There was an error finding a class of formatters for your type (%s)"
397 f
= formatters_dict
.get(options
['format'])
400 "Unknown formatter '%s' for status actions" % (options
['format']))
404 class Action(object):
406 def ask(self
, subject
='perform this action', careful
=False):
408 Requests from the user using `raw_input` if `subject` should be
409 performed. When `careful`, the default answer is NO, otherwise YES.
410 Returns the user answer in the form `True` or `False`.
416 prompt
= 'You really want to %s %s? ' % (subject
, sample
)
418 answer
= input(prompt
).lower()
420 return answer
in ('yes', 'y')
422 return answer
not in ('no', 'n')
424 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
426 # Figure out why on OS X the raw_input keeps raising
427 # EOFError and is never able to reset and get more input
428 # Hint: Look at how IPython implements their console
434 def __call__(self
, twitter
, options
):
435 action
= actions
.get(options
['action'], NoSuchAction
)()
437 doAction
= lambda: action(twitter
, options
)
438 if options
['refresh'] and isinstance(action
, StatusAction
):
442 time
.sleep(options
['refresh_rate'])
445 except KeyboardInterrupt:
446 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
450 class NoSuchActionError(Exception):
454 class NoSuchAction(Action
):
455 def __call__(self
, twitter
, options
):
456 raise NoSuchActionError("No such action: %s" % (options
['action']))
459 class StatusAction(Action
):
460 def __call__(self
, twitter
, options
):
461 statuses
= self
.getStatuses(twitter
, options
)
462 sf
= get_formatter('status', options
)
463 for status
in statuses
:
464 statusStr
= sf(status
, options
)
465 if statusStr
.strip():
466 printNicely(statusStr
)
469 class SearchAction(Action
):
470 def __call__(self
, twitter
, options
):
471 # We need to be pointing at search.twitter.com to work, and it is less
472 # tangly to do it here than in the main()
473 twitter
.domain
= "search.twitter.com"
474 twitter
.uriparts
= ()
475 # We need to bypass the TwitterCall parameter encoding, so we
476 # don't encode the plus sign, so we have to encode it ourselves
477 query_string
= "+".join(
479 for term
in options
['extra_args']])
481 results
= twitter
.search(q
=query_string
)['results']
482 f
= get_formatter('search', options
)
483 for result
in results
:
484 resultStr
= f(result
, options
)
485 if resultStr
.strip():
486 printNicely(resultStr
)
489 class AdminAction(Action
):
490 def __call__(self
, twitter
, options
):
491 if not (options
['extra_args'] and options
['extra_args'][0]):
492 raise TwitterError("You need to specify a user (screen name)")
493 af
= get_formatter('admin', options
)
495 user
= self
.getUser(twitter
, options
['extra_args'][0])
496 except TwitterError
as e
:
497 print("There was a problem following or leaving the specified user.")
498 print("You may be trying to follow a user you are already following;")
499 print("Leaving a user you are not currently following;")
500 print("Or the user may not exist.")
505 printNicely(af(options
['action'], user
))
508 class ListsAction(StatusAction
):
509 def getStatuses(self
, twitter
, options
):
510 if not options
['extra_args']:
511 raise TwitterError("Please provide a user to query for lists")
513 screen_name
= options
['extra_args'][0]
515 if not options
['extra_args'][1:]:
516 lists
= twitter
.lists
.list(screen_name
=screen_name
)
518 printNicely("This user has no lists.")
520 lf
= get_formatter('lists', options
)
521 printNicely(lf(list))
524 return list(reversed(twitter
.lists
.statuses(
525 count
=options
['length'],
526 owner_screen_name
=screen_name
,
527 slug
=options
['extra_args'][1])))
530 class MyListsAction(ListsAction
):
531 def getStatuses(self
, twitter
, options
):
532 screen_name
= twitter
.account
.verify_credentials()['screen_name']
533 options
['extra_args'].insert(0, screen_name
)
534 return ListsAction
.getStatuses(self
, twitter
, options
)
537 class FriendsAction(StatusAction
):
538 def getStatuses(self
, twitter
, options
):
539 return list(reversed(
540 twitter
.statuses
.home_timeline(count
=options
["length"])))
543 class RepliesAction(StatusAction
):
544 def getStatuses(self
, twitter
, options
):
545 return list(reversed(
546 twitter
.statuses
.mentions_timeline(count
=options
["length"])))
549 class FollowAction(AdminAction
):
550 def getUser(self
, twitter
, user
):
551 return twitter
.friendships
.create(screen_name
=user
)
554 class LeaveAction(AdminAction
):
555 def getUser(self
, twitter
, user
):
556 return twitter
.friendships
.destroy(screen_name
=user
)
559 class SetStatusAction(Action
):
560 def __call__(self
, twitter
, options
):
561 statusTxt
= (" ".join(options
['extra_args'])
562 if options
['extra_args']
563 else str(input("message: ")))
565 ptr
= re
.compile("@[\w_]+")
567 s
= ptr
.match(statusTxt
)
568 if s
and s
.start() == 0:
569 replies
.append(statusTxt
[s
.start():s
.end()])
570 statusTxt
= statusTxt
[s
.end() + 1:]
573 replies
= " ".join(replies
)
574 if len(replies
) >= 140:
581 limit
= 140 - len(replies
)
582 if len(statusTxt
) > limit
:
583 end
= str.rfind(statusTxt
, ' ', 0, limit
)
586 splitted
.append(" ".join((replies
, statusTxt
[:end
])))
587 statusTxt
= statusTxt
[end
:]
589 if options
['invert_split']:
591 for status
in splitted
:
592 twitter
.statuses
.update(status
=status
)
595 class TwitterShell(Action
):
597 def render_prompt(self
, prompt
):
598 '''Parses the `prompt` string and returns the rendered version'''
599 prompt
= prompt
.strip("'").replace("\\'", "'")
600 for colour
in ansi
.COLOURS_NAMED
:
601 if '[%s]' % (colour
) in prompt
:
602 prompt
= prompt
.replace(
603 '[%s]' % (colour
), ansiFormatter
.cmdColourNamed(colour
))
604 prompt
= prompt
.replace('[R]', ansiFormatter
.cmdReset())
607 def __call__(self
, twitter
, options
):
608 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
610 options
['action'] = ""
612 args
= input(prompt
).split()
613 parse_args(args
, options
)
614 if not options
['action']:
616 elif options
['action'] == 'exit':
618 elif options
['action'] == 'shell':
619 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
621 elif options
['action'] == 'help':
622 print('''\ntwitter> `action`\n
623 The Shell accepts all the command line actions along with:
625 exit Leave the twitter shell (^D may also be used)
627 Full CMD Line help is appended below for your convenience.''',
629 Action()(twitter
, options
)
630 options
['action'] = ''
631 except NoSuchActionError
as e
:
632 print(e
, file=sys
.stderr
)
633 except KeyboardInterrupt:
634 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
636 print(file=sys
.stderr
)
637 leaving
= self
.ask(subject
='Leave')
639 print('Excellent!', file=sys
.stderr
)
644 class PythonPromptAction(Action
):
645 def __call__(self
, twitter
, options
):
648 smrt_input(globals(), locals())
653 class HelpAction(Action
):
654 def __call__(self
, twitter
, options
):
658 class DoNothingAction(Action
):
659 def __call__(self
, twitter
, options
):
663 class RateLimitStatus(Action
):
664 def __call__(self
, twitter
, options
):
665 rate
= twitter
.application
.rate_limit_status()
666 resources
= rate
['resources']
667 for resource
in resources
:
668 for method
in resources
[resource
]:
669 limit
= resources
[resource
][method
]['limit']
670 remaining
= resources
[resource
][method
]['remaining']
671 reset
= resources
[resource
][method
]['reset']
673 print("Remaining API requests for %s: %s / %s" %
674 (method
, remaining
, limit
))
675 print("Next reset in %ss (%s)\n" % (int(reset
- time
.time()),
676 time
.asctime(time
.localtime(reset
))))
679 class ReplAction(Action
):
680 def __call__(self
, twitter
, options
):
683 domain
="upload.twitter.com")
685 "\nUse the 'twitter' object to interact with"
686 " the Twitter REST API.\n"
687 "Use twitter_upload to interact with "
688 "upload.twitter.com\n\n")
689 code
.interact(local
={
692 "twitter_upload": upload
,
698 'authorize' : DoNothingAction
,
699 'follow' : FollowAction
,
700 'friends' : FriendsAction
,
701 'list' : ListsAction
,
702 'mylist' : MyListsAction
,
704 'leave' : LeaveAction
,
705 'pyprompt' : PythonPromptAction
,
706 'replies' : RepliesAction
,
707 'search' : SearchAction
,
708 'set' : SetStatusAction
,
709 'shell' : TwitterShell
,
710 'rate' : RateLimitStatus
,
715 def loadConfig(filename
):
716 options
= dict(OPTIONS
)
717 if os
.path
.exists(filename
):
718 cp
= SafeConfigParser()
720 for option
in ('format', 'prompt'):
721 if cp
.has_option('twitter', option
):
722 options
[option
] = cp
.get('twitter', option
)
724 for option
in ('invert_split',):
725 if cp
.has_option('twitter', option
):
726 options
[option
] = cp
.getboolean('twitter', option
)
730 def main(args
=sys
.argv
[1:]):
733 parse_args(args
, arg_options
)
734 except GetoptError
as e
:
735 print("I can't do that, %s." % (e
), file=sys
.stderr
)
736 print(file=sys
.stderr
)
739 config_path
= os
.path
.expanduser(
740 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
741 config_options
= loadConfig(config_path
)
743 # Apply the various options in order, the most important applied last.
744 # Defaults first, then what's read from config file, then command-line
746 options
= dict(OPTIONS
)
747 for d
in config_options
, arg_options
:
748 for k
, v
in list(d
.items()):
752 if options
['refresh'] and options
['action'] not in ('friends', 'replies'):
753 print("You can only refresh the friends or replies actions.",
755 print("Use 'twitter -h' for help.", file=sys
.stderr
)
758 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
760 if options
['action'] == 'authorize' or not os
.path
.exists(oauth_filename
):
762 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
763 options
['oauth_filename'])
766 ansiFormatter
= ansi
.AnsiCmd(options
["force-ansi"])
768 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
772 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
773 secure
=options
['secure'],
775 domain
='api.twitter.com')
778 Action()(twitter
, options
)
779 except NoSuchActionError
as e
:
780 print(e
, file=sys
.stderr
)
782 except TwitterError
as e
:
783 print(str(e
), file=sys
.stderr
)
784 print("Use 'twitter -h' for help.", file=sys
.stderr
)