]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 pyprompt start a Python prompt for interacting with the twitter
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
42 FORMATS for the --format option
44 default one line per status
45 verbose multiple lines per status, more verbose status info
47 ansi ansi colour (rainbow mode)
52 The config file should be placed in your home directory and be named .twitter.
53 It must contain a [twitter] header, and all the desired options you wish to
57 format: <desired_default_format_for_output>
58 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
60 OAuth authentication tokens are stored in the file .twitter_oauth in your
64 from __future__
import print_function
67 input = __builtins__
['raw_input']
68 except (AttributeError, KeyError):
72 CONSUMER_KEY
= 'uS6hO2sV6tDKIOeVjhnFnQ'
73 CONSUMER_SECRET
= 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
77 from getopt
import gnu_getopt
as getopt
, GetoptError
78 from getpass
import getpass
85 from ConfigParser
import SafeConfigParser
87 from configparser
import ConfigParser
as SafeConfigParser
90 from urllib
.parse
import quote
92 from urllib2
import quote
96 import html
.parser
as HTMLParser
100 from .api
import Twitter
, TwitterError
101 from .oauth
import OAuth
, write_token_file
, read_token_file
102 from .oauth_dance
import oauth_dance
104 from .util
import smrt_input
, printNicely
111 'prompt': '[cyan]twitter[R]> ',
112 'config_filename': os
.environ
.get('HOME', os
.environ
.get('USERPROFILE', '')) + os
.sep
+ '.twitter',
113 'oauth_filename': os
.environ
.get('HOME', os
.environ
.get('USERPROFILE', '')) + os
.sep
+ '.twitter_oauth',
119 'invert_split': False,
123 gHtmlParser
= HTMLParser
.HTMLParser()
124 hashtagRe
= re
.compile(r
'(?P<hashtag>#\S+)')
125 profileRe
= re
.compile(r
'(?P<profile>\@\S+)')
126 ansiFormatter
= ansi
.AnsiCmd(False)
128 def parse_args(args
, options
):
129 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
130 'refresh-rate=', 'config=', 'length=', 'timestamp',
131 'datestamp', 'no-ssl', 'force-ansi']
132 short_opts
= "e:p:f:h?rR:c:l:td"
133 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
134 if extra_args
and hasattr(extra_args
[0], 'decode'):
135 extra_args
= [arg
.decode(locale
.getpreferredencoding())
136 for arg
in extra_args
]
138 for opt
, arg
in opts
:
139 if opt
in ('-f', '--format'):
140 options
['format'] = arg
141 elif opt
in ('-r', '--refresh'):
142 options
['refresh'] = True
143 elif opt
in ('-R', '--refresh-rate'):
144 options
['refresh_rate'] = int(arg
)
145 elif opt
in ('-l', '--length'):
146 options
["length"] = int(arg
)
147 elif opt
in ('-t', '--timestamp'):
148 options
["timestamp"] = True
149 elif opt
in ('-d', '--datestamp'):
150 options
["datestamp"] = True
151 elif opt
in ('-?', '-h', '--help'):
152 options
['action'] = 'help'
153 elif opt
in ('-c', '--config'):
154 options
['config_filename'] = arg
155 elif opt
== '--no-ssl':
156 options
['secure'] = False
157 elif opt
== '--oauth':
158 options
['oauth_filename'] = arg
159 elif opt
== '--force-ansi':
160 options
['force-ansi'] = True
162 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
163 options
['action'] = extra_args
[0]
164 options
['extra_args'] = extra_args
[1:]
166 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
167 timestamp
= options
["timestamp"]
168 datestamp
= options
["datestamp"]
169 t
= time
.strptime(status
['created_at'], format
)
170 i_hate_timezones
= time
.timezone
172 i_hate_timezones
= time
.altzone
173 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
174 seconds
=i_hate_timezones
)
176 if timestamp
and datestamp
:
177 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
179 return time
.strftime("%H:%M:%S ", t
)
181 return time
.strftime("%Y-%m-%d ", t
)
186 'clear': ansiFormatter
.cmdReset(),
187 'hashtag': ansiFormatter
.cmdBold(),
188 'profile': ansiFormatter
.cmdUnderline(),
195 s
= '%s%s%s' % (ansiTypes
[mkey
], m
.group(mkey
), ansiTypes
['clear'])
200 def replaceInStatus(status
):
201 txt
= gHtmlParser
.unescape(status
)
202 txt
= re
.sub(hashtagRe
, reRepl
, txt
)
203 txt
= re
.sub(profileRe
, reRepl
, txt
)
206 class StatusFormatter(object):
207 def __call__(self
, status
, options
):
208 return ("%s%s %s" % (
209 get_time_string(status
, options
),
210 status
['user']['screen_name'], gHtmlParser
.unescape(status
['text'])))
212 class AnsiStatusFormatter(object):
214 self
._colourMap
= ansi
.ColourMap()
216 def __call__(self
, status
, options
):
217 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
218 return ("%s%s%s%s %s" % (
219 get_time_string(status
, options
),
220 ansiFormatter
.cmdColour(colour
), status
['user']['screen_name'],
221 ansiFormatter
.cmdReset(), replaceInStatus(status
['text'])))
223 class VerboseStatusFormatter(object):
224 def __call__(self
, status
, options
):
225 return ("-- %s (%s) on %s\n%s\n" % (
226 status
['user']['screen_name'],
227 status
['user']['location'],
228 status
['created_at'],
229 gHtmlParser
.unescape(status
['text'])))
231 class URLStatusFormatter(object):
232 urlmatch
= re
.compile(r
'https?://\S+')
233 def __call__(self
, status
, options
):
234 urls
= self
.urlmatch
.findall(status
['text'])
235 return '\n'.join(urls
) if urls
else ""
238 class ListsFormatter(object):
239 def __call__(self
, list):
240 if list['description']:
241 list_str
= "%-30s (%s)" % (list['name'], list['description'])
243 list_str
= "%-30s" % (list['name'])
244 return "%s\n" % list_str
246 class ListsVerboseFormatter(object):
247 def __call__(self
, list):
248 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
251 class AnsiListsFormatter(object):
253 self
._colourMap
= ansi
.ColourMap()
255 def __call__(self
, list):
256 colour
= self
._colourMap
.colourFor(list['name'])
257 return ("%s%-15s%s %s" % (
258 ansiFormatter
.cmdColour(colour
), list['name'],
259 ansiFormatter
.cmdReset(), list['description']))
262 class AdminFormatter(object):
263 def __call__(self
, action
, user
):
264 user_str
= "%s (%s)" % (user
['screen_name'], user
['name'])
265 if action
== "follow":
266 return "You are now following %s.\n" % (user_str
)
268 return "You are no longer following %s.\n" % (user_str
)
270 class VerboseAdminFormatter(object):
271 def __call__(self
, action
, user
):
272 return("-- %s: %s (%s): %s" % (
273 "Following" if action
== "follow" else "Leaving",
278 class SearchFormatter(object):
279 def __call__(self
, result
, options
):
281 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
282 result
['from_user'], result
['text']))
284 class VerboseSearchFormatter(SearchFormatter
):
285 pass # Default to the regular one
287 class URLSearchFormatter(object):
288 urlmatch
= re
.compile(r
'https?://\S+')
289 def __call__(self
, result
, options
):
290 urls
= self
.urlmatch
.findall(result
['text'])
291 return '\n'.join(urls
) if urls
else ""
293 class AnsiSearchFormatter(object):
295 self
._colourMap
= ansi
.ColourMap()
297 def __call__(self
, result
, options
):
298 colour
= self
._colourMap
.colourFor(result
['from_user'])
299 return ("%s%s%s%s %s" % (
300 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
301 ansiFormatter
.cmdColour(colour
), result
['from_user'],
302 ansiFormatter
.cmdReset(), result
['text']))
304 _term_encoding
= None
305 def get_term_encoding():
306 global _term_encoding
307 if not _term_encoding
:
308 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
310 _term_encoding
= lang
[1]
312 _term_encoding
= 'UTF-8'
313 return _term_encoding
316 status_formatters
= {
317 'default': StatusFormatter
,
318 'verbose': VerboseStatusFormatter
,
319 'urls': URLStatusFormatter
,
320 'ansi': AnsiStatusFormatter
322 formatters
['status'] = status_formatters
325 'default': AdminFormatter
,
326 'verbose': VerboseAdminFormatter
,
327 'urls': AdminFormatter
,
328 'ansi': AdminFormatter
330 formatters
['admin'] = admin_formatters
332 search_formatters
= {
333 'default': SearchFormatter
,
334 'verbose': VerboseSearchFormatter
,
335 'urls': URLSearchFormatter
,
336 'ansi': AnsiSearchFormatter
338 formatters
['search'] = search_formatters
341 'default': ListsFormatter
,
342 'verbose': ListsVerboseFormatter
,
344 'ansi': AnsiListsFormatter
346 formatters
['lists'] = lists_formatters
348 def get_formatter(action_type
, options
):
349 formatters_dict
= formatters
.get(action_type
)
350 if (not formatters_dict
):
352 "There was an error finding a class of formatters for your type (%s)"
354 f
= formatters_dict
.get(options
['format'])
357 "Unknown formatter '%s' for status actions" % (options
['format']))
360 class Action(object):
362 def ask(self
, subject
='perform this action', careful
=False):
364 Requests fromt he user using `raw_input` if `subject` should be
365 performed. When `careful`, the default answer is NO, otherwise YES.
366 Returns the user answer in the form `True` or `False`.
372 prompt
= 'You really want to %s %s? ' % (subject
, sample
)
374 answer
= input(prompt
).lower()
376 return answer
in ('yes', 'y')
378 return answer
not in ('no', 'n')
380 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
382 # Figure out why on OS X the raw_input keeps raising
383 # EOFError and is never able to reset and get more input
384 # Hint: Look at how IPython implements their console
390 def __call__(self
, twitter
, options
):
391 action
= actions
.get(options
['action'], NoSuchAction
)()
393 doAction
= lambda : action(twitter
, options
)
394 if (options
['refresh'] and isinstance(action
, StatusAction
)):
398 time
.sleep(options
['refresh_rate'])
401 except KeyboardInterrupt:
402 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
405 class NoSuchActionError(Exception):
408 class NoSuchAction(Action
):
409 def __call__(self
, twitter
, options
):
410 raise NoSuchActionError("No such action: %s" % (options
['action']))
412 class StatusAction(Action
):
413 def __call__(self
, twitter
, options
):
414 statuses
= self
.getStatuses(twitter
, options
)
415 sf
= get_formatter('status', options
)
416 for status
in statuses
:
417 statusStr
= sf(status
, options
)
418 if statusStr
.strip():
419 printNicely(statusStr
)
421 class SearchAction(Action
):
422 def __call__(self
, twitter
, options
):
423 # We need to be pointing at search.twitter.com to work, and it is less
424 # tangly to do it here than in the main()
425 twitter
.domain
= "search.twitter.com"
426 twitter
.uriparts
= ()
427 # We need to bypass the TwitterCall parameter encoding, so we
428 # don't encode the plus sign, so we have to encode it ourselves
429 query_string
= "+".join(
431 for term
in options
['extra_args']])
433 results
= twitter
.search(q
=query_string
)['results']
434 f
= get_formatter('search', options
)
435 for result
in results
:
436 resultStr
= f(result
, options
)
437 if resultStr
.strip():
438 printNicely(resultStr
)
440 class AdminAction(Action
):
441 def __call__(self
, twitter
, options
):
442 if not (options
['extra_args'] and options
['extra_args'][0]):
443 raise TwitterError("You need to specify a user (screen name)")
444 af
= get_formatter('admin', options
)
446 user
= self
.getUser(twitter
, options
['extra_args'][0])
447 except TwitterError
as e
:
448 print("There was a problem following or leaving the specified user.")
449 print("You may be trying to follow a user you are already following;")
450 print("Leaving a user you are not currently following;")
451 print("Or the user may not exist.")
456 printNicely(af(options
['action'], user
))
458 class ListsAction(StatusAction
):
459 def getStatuses(self
, twitter
, options
):
460 if not options
['extra_args']:
461 raise TwitterError("Please provide a user to query for lists")
463 screen_name
= options
['extra_args'][0]
465 if not options
['extra_args'][1:]:
466 lists
= twitter
.lists
.list(screen_name
=screen_name
)
468 printNicely("This user has no lists.")
470 lf
= get_formatter('lists', options
)
471 printNicely(lf(list))
474 return reversed(twitter
.user
.lists
.list.statuses(
475 user
=screen_name
, list=options
['extra_args'][1]))
478 class MyListsAction(ListsAction
):
479 def getStatuses(self
, twitter
, options
):
480 screen_name
= twitter
.account
.verify_credentials()['screen_name']
481 options
['extra_args'].insert(0, screen_name
)
482 return ListsAction
.getStatuses(self
, twitter
, options
)
485 class FriendsAction(StatusAction
):
486 def getStatuses(self
, twitter
, options
):
487 return reversed(twitter
.statuses
.home_timeline(count
=options
["length"]))
489 class RepliesAction(StatusAction
):
490 def getStatuses(self
, twitter
, options
):
491 return reversed(twitter
.statuses
.mentions_timeline(count
=options
["length"]))
493 class FollowAction(AdminAction
):
494 def getUser(self
, twitter
, user
):
495 return twitter
.friendships
.create(id=user
)
497 class LeaveAction(AdminAction
):
498 def getUser(self
, twitter
, user
):
499 return twitter
.friendships
.destroy(id=user
)
501 class SetStatusAction(Action
):
502 def __call__(self
, twitter
, options
):
503 statusTxt
= (" ".join(options
['extra_args'])
504 if options
['extra_args']
505 else str(input("message: ")))
507 ptr
= re
.compile("@[\w_]+")
509 s
= ptr
.match(statusTxt
)
510 if s
and s
.start() == 0:
511 replies
.append(statusTxt
[s
.start():s
.end()])
512 statusTxt
= statusTxt
[s
.end() + 1:]
515 replies
= " ".join(replies
)
516 if len(replies
) >= 140:
523 limit
= 140 - len(replies
)
524 if len(statusTxt
) > limit
:
525 end
= string
.rfind(statusTxt
, ' ', 0, limit
)
528 splitted
.append(" ".join((replies
, statusTxt
[:end
])))
529 statusTxt
= statusTxt
[end
:]
531 if options
['invert_split']:
533 for status
in splitted
:
534 twitter
.statuses
.update(status
=status
)
536 class TwitterShell(Action
):
538 def render_prompt(self
, prompt
):
539 '''Parses the `prompt` string and returns the rendered version'''
540 prompt
= prompt
.strip("'").replace("\\'", "'")
541 for colour
in ansi
.COLOURS_NAMED
:
542 if '[%s]' % (colour
) in prompt
:
543 prompt
= prompt
.replace(
544 '[%s]' % (colour
), ansiFormatter
.cmdColourNamed(colour
))
545 prompt
= prompt
.replace('[R]', ansiFormatter
.cmdReset())
548 def __call__(self
, twitter
, options
):
549 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
551 options
['action'] = ""
553 args
= input(prompt
).split()
554 parse_args(args
, options
)
555 if not options
['action']:
557 elif options
['action'] == 'exit':
559 elif options
['action'] == 'shell':
560 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
562 elif options
['action'] == 'help':
563 print('''\ntwitter> `action`\n
564 The Shell Accepts all the command line actions along with:
566 exit Leave the twitter shell (^D may also be used)
568 Full CMD Line help is appended below for your convinience.''', file=sys
.stderr
)
569 Action()(twitter
, options
)
570 options
['action'] = ''
571 except NoSuchActionError
as e
:
572 print(e
, file=sys
.stderr
)
573 except KeyboardInterrupt:
574 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
576 print(file=sys
.stderr
)
577 leaving
= self
.ask(subject
='Leave')
579 print('Excellent!', file=sys
.stderr
)
583 class PythonPromptAction(Action
):
584 def __call__(self
, twitter
, options
):
587 smrt_input(globals(), locals())
591 class HelpAction(Action
):
592 def __call__(self
, twitter
, options
):
595 class DoNothingAction(Action
):
596 def __call__(self
, twitter
, options
):
599 class RateLimitStatus(Action
):
600 def __call__(self
, twitter
, options
):
601 rate
= twitter
.account
.rate_limit_status()
602 print("Remaining API requests: %s / %s (hourly limit)" % (rate
['remaining_hits'], rate
['hourly_limit']))
603 print("Next reset in %ss (%s)" % (int(rate
['reset_time_in_seconds'] - time
.time()),
604 time
.asctime(time
.localtime(rate
['reset_time_in_seconds']))))
607 'authorize' : DoNothingAction
,
608 'follow' : FollowAction
,
609 'friends' : FriendsAction
,
610 'list' : ListsAction
,
611 'mylist' : MyListsAction
,
613 'leave' : LeaveAction
,
614 'pyprompt' : PythonPromptAction
,
615 'replies' : RepliesAction
,
616 'search' : SearchAction
,
617 'set' : SetStatusAction
,
618 'shell' : TwitterShell
,
619 'rate' : RateLimitStatus
,
622 def loadConfig(filename
):
623 options
= dict(OPTIONS
)
624 if os
.path
.exists(filename
):
625 cp
= SafeConfigParser()
627 for option
in ('format', 'prompt'):
628 if cp
.has_option('twitter', option
):
629 options
[option
] = cp
.get('twitter', option
)
631 for option
in ('invert_split',):
632 if cp
.has_option('twitter', option
):
633 options
[option
] = cp
.getboolean('twitter', option
)
636 def main(args
=sys
.argv
[1:]):
639 parse_args(args
, arg_options
)
640 except GetoptError
as e
:
641 print("I can't do that, %s." % (e
), file=sys
.stderr
)
642 print(file=sys
.stderr
)
645 config_path
= os
.path
.expanduser(
646 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
647 config_options
= loadConfig(config_path
)
649 # Apply the various options in order, the most important applied last.
650 # Defaults first, then what's read from config file, then command-line
652 options
= dict(OPTIONS
)
653 for d
in config_options
, arg_options
:
654 for k
, v
in list(d
.items()):
657 if options
['refresh'] and options
['action'] not in (
658 'friends', 'replies'):
659 print("You can only refresh the friends or replies actions.", file=sys
.stderr
)
660 print("Use 'twitter -h' for help.", file=sys
.stderr
)
663 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
665 if (options
['action'] == 'authorize'
666 or not os
.path
.exists(oauth_filename
)):
668 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
669 options
['oauth_filename'])
672 ansiFormatter
= ansi
.AnsiCmd(options
["force-ansi"])
674 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
678 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
679 secure
=options
['secure'],
681 domain
='api.twitter.com')
684 Action()(twitter
, options
)
685 except NoSuchActionError
as e
:
686 print(e
, file=sys
.stderr
)
688 except TwitterError
as e
:
689 print(str(e
), file=sys
.stderr
)
690 print("Use 'twitter -h' for help.", file=sys
.stderr
)