]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 pyprompt start a Python prompt for interacting with the twitter
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
42 FORMATS for the --format option
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
48 ansi ansi colour (rainbow mode)
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
65 from __future__
import print_function
68 input = __builtins__
.raw_input
69 except (AttributeError, KeyError):
73 CONSUMER_KEY
= 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET
= 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
76 from getopt
import gnu_getopt
as getopt
, GetoptError
77 from getpass
import getpass
87 from ConfigParser
import SafeConfigParser
89 from configparser
import ConfigParser
as SafeConfigParser
92 from urllib
.parse
import quote
94 from urllib2
import quote
98 import html
.parser
as HTMLParser
102 from .api
import Twitter
, TwitterError
103 from .oauth
import OAuth
, write_token_file
, read_token_file
104 from .oauth_dance
import oauth_dance
106 from .util
import smrt_input
, printNicely
, align_text
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os
.environ
.get('HOME', os
.environ
.get('USERPROFILE', '')) + os
.sep
+ '.twitter',
115 'oauth_filename': os
.environ
.get('HOME', os
.environ
.get('USERPROFILE', '')) + os
.sep
+ '.twitter_oauth',
121 'invert_split': False,
125 gHtmlParser
= HTMLParser
.HTMLParser()
126 hashtagRe
= re
.compile(r
'(?P<hashtag>#\S+)')
127 profileRe
= re
.compile(r
'(?P<profile>\@\S+)')
128 ansiFormatter
= ansi
.AnsiCmd(False)
130 def parse_args(args
, options
):
131 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
132 'refresh-rate=', 'config=', 'length=', 'timestamp',
133 'datestamp', 'no-ssl', 'force-ansi']
134 short_opts
= "e:p:f:h?rR:c:l:td"
135 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
136 if extra_args
and hasattr(extra_args
[0], 'decode'):
137 extra_args
= [arg
.decode(locale
.getpreferredencoding())
138 for arg
in extra_args
]
140 for opt
, arg
in opts
:
141 if opt
in ('-f', '--format'):
142 options
['format'] = arg
143 elif opt
in ('-r', '--refresh'):
144 options
['refresh'] = True
145 elif opt
in ('-R', '--refresh-rate'):
146 options
['refresh_rate'] = int(arg
)
147 elif opt
in ('-l', '--length'):
148 options
["length"] = int(arg
)
149 elif opt
in ('-t', '--timestamp'):
150 options
["timestamp"] = True
151 elif opt
in ('-d', '--datestamp'):
152 options
["datestamp"] = True
153 elif opt
in ('-?', '-h', '--help'):
154 options
['action'] = 'help'
155 elif opt
in ('-c', '--config'):
156 options
['config_filename'] = arg
157 elif opt
== '--no-ssl':
158 options
['secure'] = False
159 elif opt
== '--oauth':
160 options
['oauth_filename'] = arg
161 elif opt
== '--force-ansi':
162 options
['force-ansi'] = True
164 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
165 options
['action'] = extra_args
[0]
166 options
['extra_args'] = extra_args
[1:]
168 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
169 timestamp
= options
["timestamp"]
170 datestamp
= options
["datestamp"]
171 t
= time
.strptime(status
['created_at'], format
)
172 i_hate_timezones
= time
.timezone
174 i_hate_timezones
= time
.altzone
175 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
176 seconds
=i_hate_timezones
)
178 if timestamp
and datestamp
:
179 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
181 return time
.strftime("%H:%M:%S ", t
)
183 return time
.strftime("%Y-%m-%d ", t
)
188 'clear': ansiFormatter
.cmdReset(),
189 'hashtag': ansiFormatter
.cmdBold(),
190 'profile': ansiFormatter
.cmdUnderline(),
197 s
= '%s%s%s' % (ansiTypes
[mkey
], m
.group(mkey
), ansiTypes
['clear'])
202 def replaceInStatus(status
):
203 txt
= gHtmlParser
.unescape(status
)
204 txt
= re
.sub(hashtagRe
, reRepl
, txt
)
205 txt
= re
.sub(profileRe
, reRepl
, txt
)
207 def correctRTStatus(status
):
208 if('retweeted_status' in status
):
209 return "RT " + status
['retweeted_status']['user']['screen_name'] + " " + status
['retweeted_status']['text']
211 return status
['text']
213 class StatusFormatter(object):
214 def __call__(self
, status
, options
):
215 return ("%s@%s %s" % (
216 get_time_string(status
, options
),
217 status
['user']['screen_name'], gHtmlParser
.unescape(correctRTStatus(status
))))
219 class AnsiStatusFormatter(object):
221 self
._colourMap
= ansi
.ColourMap()
223 def __call__(self
, status
, options
):
224 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
225 return ("%s%s% 16s%s %s " % (
226 get_time_string(status
, options
),
227 ansiFormatter
.cmdColour(colour
), status
['user']['screen_name'],
228 ansiFormatter
.cmdReset(), align_text(replaceInStatus(correctRTStatus(status
)))))
230 class VerboseStatusFormatter(object):
231 def __call__(self
, status
, options
):
232 return ("-- %s (%s) on %s\n%s\n" % (
233 status
['user']['screen_name'],
234 status
['user']['location'],
235 status
['created_at'],
236 gHtmlParser
.unescape(correctRTStatus(status
))))
238 class JSONStatusFormatter(object):
239 def __call__(self
, status
, options
):
240 status
['text'] = gHtmlParser
.unescape(status
['text'])
241 return json
.dumps(status
)
243 class URLStatusFormatter(object):
244 urlmatch
= re
.compile(r
'https?://\S+')
245 def __call__(self
, status
, options
):
246 urls
= self
.urlmatch
.findall(correctRTStatus(status
))
247 return '\n'.join(urls
) if urls
else ""
250 class ListsFormatter(object):
251 def __call__(self
, list):
252 if list['description']:
253 list_str
= "%-30s (%s)" % (list['name'], list['description'])
255 list_str
= "%-30s" % (list['name'])
256 return "%s\n" % list_str
258 class ListsVerboseFormatter(object):
259 def __call__(self
, list):
260 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
263 class AnsiListsFormatter(object):
265 self
._colourMap
= ansi
.ColourMap()
267 def __call__(self
, list):
268 colour
= self
._colourMap
.colourFor(list['name'])
269 return ("%s%-15s%s %s" % (
270 ansiFormatter
.cmdColour(colour
), list['name'],
271 ansiFormatter
.cmdReset(), list['description']))
274 class AdminFormatter(object):
275 def __call__(self
, action
, user
):
276 user_str
= "%s (%s)" % (user
['screen_name'], user
['name'])
277 if action
== "follow":
278 return "You are now following %s.\n" % (user_str
)
280 return "You are no longer following %s.\n" % (user_str
)
282 class VerboseAdminFormatter(object):
283 def __call__(self
, action
, user
):
284 return("-- %s: %s (%s): %s" % (
285 "Following" if action
== "follow" else "Leaving",
290 class SearchFormatter(object):
291 def __call__(self
, result
, options
):
293 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
294 result
['from_user'], result
['text']))
296 class VerboseSearchFormatter(SearchFormatter
):
297 pass # Default to the regular one
299 class URLSearchFormatter(object):
300 urlmatch
= re
.compile(r
'https?://\S+')
301 def __call__(self
, result
, options
):
302 urls
= self
.urlmatch
.findall(result
['text'])
303 return '\n'.join(urls
) if urls
else ""
305 class AnsiSearchFormatter(object):
307 self
._colourMap
= ansi
.ColourMap()
309 def __call__(self
, result
, options
):
310 colour
= self
._colourMap
.colourFor(result
['from_user'])
311 return ("%s%s%s%s %s" % (
312 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
313 ansiFormatter
.cmdColour(colour
), result
['from_user'],
314 ansiFormatter
.cmdReset(), result
['text']))
316 _term_encoding
= None
317 def get_term_encoding():
318 global _term_encoding
319 if not _term_encoding
:
320 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
322 _term_encoding
= lang
[1]
324 _term_encoding
= 'UTF-8'
325 return _term_encoding
328 status_formatters
= {
329 'default': StatusFormatter
,
330 'verbose': VerboseStatusFormatter
,
331 'json': JSONStatusFormatter
,
332 'urls': URLStatusFormatter
,
333 'ansi': AnsiStatusFormatter
335 formatters
['status'] = status_formatters
338 'default': AdminFormatter
,
339 'verbose': VerboseAdminFormatter
,
340 'urls': AdminFormatter
,
341 'ansi': AdminFormatter
343 formatters
['admin'] = admin_formatters
345 search_formatters
= {
346 'default': SearchFormatter
,
347 'verbose': VerboseSearchFormatter
,
348 'urls': URLSearchFormatter
,
349 'ansi': AnsiSearchFormatter
351 formatters
['search'] = search_formatters
354 'default': ListsFormatter
,
355 'verbose': ListsVerboseFormatter
,
357 'ansi': AnsiListsFormatter
359 formatters
['lists'] = lists_formatters
361 def get_formatter(action_type
, options
):
362 formatters_dict
= formatters
.get(action_type
)
363 if (not formatters_dict
):
365 "There was an error finding a class of formatters for your type (%s)"
367 f
= formatters_dict
.get(options
['format'])
370 "Unknown formatter '%s' for status actions" % (options
['format']))
373 class Action(object):
375 def ask(self
, subject
='perform this action', careful
=False):
377 Requests from the user using `raw_input` if `subject` should be
378 performed. When `careful`, the default answer is NO, otherwise YES.
379 Returns the user answer in the form `True` or `False`.
385 prompt
= 'You really want to %s %s? ' % (subject
, sample
)
387 answer
= input(prompt
).lower()
389 return answer
in ('yes', 'y')
391 return answer
not in ('no', 'n')
393 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
395 # Figure out why on OS X the raw_input keeps raising
396 # EOFError and is never able to reset and get more input
397 # Hint: Look at how IPython implements their console
403 def __call__(self
, twitter
, options
):
404 action
= actions
.get(options
['action'], NoSuchAction
)()
406 doAction
= lambda : action(twitter
, options
)
407 if (options
['refresh'] and isinstance(action
, StatusAction
)):
411 time
.sleep(options
['refresh_rate'])
414 except KeyboardInterrupt:
415 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
418 class NoSuchActionError(Exception):
421 class NoSuchAction(Action
):
422 def __call__(self
, twitter
, options
):
423 raise NoSuchActionError("No such action: %s" % (options
['action']))
425 class StatusAction(Action
):
426 def __call__(self
, twitter
, options
):
427 statuses
= self
.getStatuses(twitter
, options
)
428 sf
= get_formatter('status', options
)
429 if(options
['format'] == "json"):
431 for status
in statuses
[:-1]:
432 statusStr
= sf(status
, options
)
433 if statusStr
.strip():
434 printNicely(statusStr
+",")
435 printNicely(sf(statuses
[-1], options
)+"]")
437 for status
in statuses
:
438 statusStr
= sf(status
, options
)
439 if statusStr
.strip():
440 printNicely(statusStr
)
442 class SearchAction(Action
):
443 def __call__(self
, twitter
, options
):
444 # We need to be pointing at search.twitter.com to work, and it is less
445 # tangly to do it here than in the main()
446 twitter
.domain
= "search.twitter.com"
447 twitter
.uriparts
= ()
448 # We need to bypass the TwitterCall parameter encoding, so we
449 # don't encode the plus sign, so we have to encode it ourselves
450 query_string
= "+".join(
452 for term
in options
['extra_args']])
454 results
= twitter
.search(q
=query_string
)['results']
455 f
= get_formatter('search', options
)
456 for result
in results
:
457 resultStr
= f(result
, options
)
458 if resultStr
.strip():
459 printNicely(resultStr
)
461 class AdminAction(Action
):
462 def __call__(self
, twitter
, options
):
463 if not (options
['extra_args'] and options
['extra_args'][0]):
464 raise TwitterError("You need to specify a user (screen name)")
465 af
= get_formatter('admin', options
)
467 user
= self
.getUser(twitter
, options
['extra_args'][0])
468 except TwitterError
as e
:
469 print("There was a problem following or leaving the specified user.")
470 print("You may be trying to follow a user you are already following;")
471 print("Leaving a user you are not currently following;")
472 print("Or the user may not exist.")
477 printNicely(af(options
['action'], user
))
479 class ListsAction(StatusAction
):
480 def getStatuses(self
, twitter
, options
):
481 if not options
['extra_args']:
482 raise TwitterError("Please provide a user to query for lists")
484 screen_name
= options
['extra_args'][0]
486 if not options
['extra_args'][1:]:
487 lists
= twitter
.lists
.list(screen_name
=screen_name
)
489 printNicely("This user has no lists.")
491 lf
= get_formatter('lists', options
)
492 printNicely(lf(list))
495 return list(reversed(twitter
.lists
.statuses(
496 owner_screen_name
=screen_name
, slug
=options
['extra_args'][1])))
499 class MyListsAction(ListsAction
):
500 def getStatuses(self
, twitter
, options
):
501 screen_name
= twitter
.account
.verify_credentials()['screen_name']
502 options
['extra_args'].insert(0, screen_name
)
503 return ListsAction
.getStatuses(self
, twitter
, options
)
506 class FriendsAction(StatusAction
):
507 def getStatuses(self
, twitter
, options
):
508 return list(reversed(twitter
.statuses
.home_timeline(count
=options
["length"])))
510 class RepliesAction(StatusAction
):
511 def getStatuses(self
, twitter
, options
):
512 return list(reversed(twitter
.statuses
.mentions_timeline(count
=options
["length"])))
514 class FollowAction(AdminAction
):
515 def getUser(self
, twitter
, user
):
516 return twitter
.friendships
.create(screen_name
=user
)
518 class LeaveAction(AdminAction
):
519 def getUser(self
, twitter
, user
):
520 return twitter
.friendships
.destroy(screen_name
=user
)
522 class SetStatusAction(Action
):
523 def __call__(self
, twitter
, options
):
524 statusTxt
= (" ".join(options
['extra_args'])
525 if options
['extra_args']
526 else str(input("message: ")))
528 ptr
= re
.compile("@[\w_]+")
530 s
= ptr
.match(statusTxt
)
531 if s
and s
.start() == 0:
532 replies
.append(statusTxt
[s
.start():s
.end()])
533 statusTxt
= statusTxt
[s
.end() + 1:]
536 replies
= " ".join(replies
)
537 if len(replies
) >= 140:
544 limit
= 140 - len(replies
)
545 if len(statusTxt
) > limit
:
546 end
= string
.rfind(statusTxt
, ' ', 0, limit
)
549 splitted
.append(" ".join((replies
, statusTxt
[:end
])))
550 statusTxt
= statusTxt
[end
:]
552 if options
['invert_split']:
554 for status
in splitted
:
555 twitter
.statuses
.update(status
=status
)
557 class TwitterShell(Action
):
559 def render_prompt(self
, prompt
):
560 '''Parses the `prompt` string and returns the rendered version'''
561 prompt
= prompt
.strip("'").replace("\\'", "'")
562 for colour
in ansi
.COLOURS_NAMED
:
563 if '[%s]' % (colour
) in prompt
:
564 prompt
= prompt
.replace(
565 '[%s]' % (colour
), ansiFormatter
.cmdColourNamed(colour
))
566 prompt
= prompt
.replace('[R]', ansiFormatter
.cmdReset())
569 def __call__(self
, twitter
, options
):
570 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
572 options
['action'] = ""
574 args
= input(prompt
).split()
575 parse_args(args
, options
)
576 if not options
['action']:
578 elif options
['action'] == 'exit':
580 elif options
['action'] == 'shell':
581 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
583 elif options
['action'] == 'help':
584 print('''\ntwitter> `action`\n
585 The Shell Accepts all the command line actions along with:
587 exit Leave the twitter shell (^D may also be used)
589 Full CMD Line help is appended below for your convinience.''', file=sys
.stderr
)
590 Action()(twitter
, options
)
591 options
['action'] = ''
592 except NoSuchActionError
as e
:
593 print(e
, file=sys
.stderr
)
594 except KeyboardInterrupt:
595 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
597 print(file=sys
.stderr
)
598 leaving
= self
.ask(subject
='Leave')
600 print('Excellent!', file=sys
.stderr
)
604 class PythonPromptAction(Action
):
605 def __call__(self
, twitter
, options
):
608 smrt_input(globals(), locals())
612 class HelpAction(Action
):
613 def __call__(self
, twitter
, options
):
616 class DoNothingAction(Action
):
617 def __call__(self
, twitter
, options
):
620 class RateLimitStatus(Action
):
621 def __call__(self
, twitter
, options
):
622 rate
= twitter
.application
.rate_limit_status()
623 print("Remaining API requests: %s / %s (hourly limit)" % (rate
['remaining_hits'], rate
['hourly_limit']))
624 print("Next reset in %ss (%s)" % (int(rate
['reset_time_in_seconds'] - time
.time()),
625 time
.asctime(time
.localtime(rate
['reset_time_in_seconds']))))
628 'authorize' : DoNothingAction
,
629 'follow' : FollowAction
,
630 'friends' : FriendsAction
,
631 'list' : ListsAction
,
632 'mylist' : MyListsAction
,
634 'leave' : LeaveAction
,
635 'pyprompt' : PythonPromptAction
,
636 'replies' : RepliesAction
,
637 'search' : SearchAction
,
638 'set' : SetStatusAction
,
639 'shell' : TwitterShell
,
640 'rate' : RateLimitStatus
,
643 def loadConfig(filename
):
644 options
= dict(OPTIONS
)
645 if os
.path
.exists(filename
):
646 cp
= SafeConfigParser()
648 for option
in ('format', 'prompt'):
649 if cp
.has_option('twitter', option
):
650 options
[option
] = cp
.get('twitter', option
)
652 for option
in ('invert_split',):
653 if cp
.has_option('twitter', option
):
654 options
[option
] = cp
.getboolean('twitter', option
)
657 def main(args
=sys
.argv
[1:]):
660 parse_args(args
, arg_options
)
661 except GetoptError
as e
:
662 print("I can't do that, %s." % (e
), file=sys
.stderr
)
663 print(file=sys
.stderr
)
666 config_path
= os
.path
.expanduser(
667 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
668 config_options
= loadConfig(config_path
)
670 # Apply the various options in order, the most important applied last.
671 # Defaults first, then what's read from config file, then command-line
673 options
= dict(OPTIONS
)
674 for d
in config_options
, arg_options
:
675 for k
, v
in list(d
.items()):
678 if options
['refresh'] and options
['action'] not in (
679 'friends', 'replies'):
680 print("You can only refresh the friends or replies actions.", file=sys
.stderr
)
681 print("Use 'twitter -h' for help.", file=sys
.stderr
)
684 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
686 if (options
['action'] == 'authorize'
687 or not os
.path
.exists(oauth_filename
)):
689 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
690 options
['oauth_filename'])
693 ansiFormatter
= ansi
.AnsiCmd(options
["force-ansi"])
695 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
699 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
700 secure
=options
['secure'],
702 domain
='api.twitter.com')
705 Action()(twitter
, options
)
706 except NoSuchActionError
as e
:
707 print(e
, file=sys
.stderr
)
709 except TwitterError
as e
:
710 print(str(e
), file=sys
.stderr
)
711 print("Use 'twitter -h' for help.", file=sys
.stderr
)