]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
43 FORMATS for the --format option
45 default one line per status
46 verbose multiple lines per status, more verbose status info
48 ansi ansi colour (rainbow mode)
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
65 from __future__
import print_function
67 CONSUMER_KEY
='uS6hO2sV6tDKIOeVjhnFnQ'
68 CONSUMER_SECRET
='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
72 from getopt
import gnu_getopt
as getopt
, GetoptError
73 from getpass
import getpass
80 from ConfigParser
import SafeConfigParser
82 from configparser
import ConfigParser
as SafeConfigParser
85 from urllib
.parse
import quote
87 from urllib2
import quote
90 from .api
import Twitter
, TwitterError
91 from .oauth
import OAuth
, write_token_file
, read_token_file
92 from .oauth_dance
import oauth_dance
94 from .util
import smrt_input
, printNicely
101 'prompt': '[cyan]twitter[R]> ',
102 'config_filename': os
.environ
.get('HOME', '') + os
.sep
+ '.twitter',
103 'oauth_filename': os
.environ
.get('HOME', '') + os
.sep
+ '.twitter_oauth',
109 'invert_split': False,
112 def parse_args(args
, options
):
113 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
114 'refresh-rate=', 'config=', 'length=', 'timestamp',
115 'datestamp', 'no-ssl']
116 short_opts
= "e:p:f:h?rR:c:l:td"
117 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
118 extra_args
= [arg
.decode(locale
.getpreferredencoding())
119 for arg
in extra_args
]
121 for opt
, arg
in opts
:
122 if opt
in ('-f', '--format'):
123 options
['format'] = arg
124 elif opt
in ('-r', '--refresh'):
125 options
['refresh'] = True
126 elif opt
in ('-R', '--refresh-rate'):
127 options
['refresh_rate'] = int(arg
)
128 elif opt
in ('-l', '--length'):
129 options
["length"] = int(arg
)
130 elif opt
in ('-t', '--timestamp'):
131 options
["timestamp"] = True
132 elif opt
in ('-d', '--datestamp'):
133 options
["datestamp"] = True
134 elif opt
in ('-?', '-h', '--help'):
135 options
['action'] = 'help'
136 elif opt
in ('-c', '--config'):
137 options
['config_filename'] = arg
138 elif opt
== '--no-ssl':
139 options
['secure'] = False
140 elif opt
== '--oauth':
141 options
['oauth_filename'] = arg
143 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
144 options
['action'] = extra_args
[0]
145 options
['extra_args'] = extra_args
[1:]
147 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
148 timestamp
= options
["timestamp"]
149 datestamp
= options
["datestamp"]
150 t
= time
.strptime(status
['created_at'], format
)
151 i_hate_timezones
= time
.timezone
153 i_hate_timezones
= time
.altzone
154 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
155 seconds
=i_hate_timezones
)
157 if timestamp
and datestamp
:
158 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
160 return time
.strftime("%H:%M:%S ", t
)
162 return time
.strftime("%Y-%m-%d ", t
)
165 class StatusFormatter(object):
166 def __call__(self
, status
, options
):
168 get_time_string(status
, options
),
169 status
['user']['screen_name'], status
['text']))
171 class AnsiStatusFormatter(object):
173 self
._colourMap
= ansi
.ColourMap()
175 def __call__(self
, status
, options
):
176 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
177 return ("%s%s%s%s %s" %(
178 get_time_string(status
, options
),
179 ansi
.cmdColour(colour
), status
['user']['screen_name'],
180 ansi
.cmdReset(), status
['text']))
182 class VerboseStatusFormatter(object):
183 def __call__(self
, status
, options
):
184 return ("-- %s (%s) on %s\n%s\n" %(
185 status
['user']['screen_name'],
186 status
['user']['location'],
187 status
['created_at'],
190 class URLStatusFormatter(object):
191 urlmatch
= re
.compile(r
'https?://\S+')
192 def __call__(self
, status
, options
):
193 urls
= self
.urlmatch
.findall(status
['text'])
194 return '\n'.join(urls
) if urls
else ""
197 class ListsFormatter(object):
198 def __call__(self
, list):
199 if list['description']:
200 list_str
= "%-30s (%s)" % (list['name'], list['description'])
202 list_str
= "%-30s" % (list['name'])
203 return "%s\n" % list_str
205 class ListsVerboseFormatter(object):
206 def __call__(self
, list):
207 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
210 class AnsiListsFormatter(object):
212 self
._colourMap
= ansi
.ColourMap()
214 def __call__(self
, list):
215 colour
= self
._colourMap
.colourFor(list['name'])
216 return ("%s%-15s%s %s" %(
217 ansi
.cmdColour(colour
), list['name'],
218 ansi
.cmdReset(), list['description']))
221 class AdminFormatter(object):
222 def __call__(self
, action
, user
):
223 user_str
= "%s (%s)" %(user
['screen_name'], user
['name'])
224 if action
== "follow":
225 return "You are now following %s.\n" %(user_str)
227 return "You are no longer following %s.\n" %(user_str)
229 class VerboseAdminFormatter(object):
230 def __call__(self
, action
, user
):
231 return("-- %s: %s (%s): %s" % (
232 "Following" if action
== "follow" else "Leaving",
237 class SearchFormatter(object):
238 def __call__(self
, result
, options
):
240 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
241 result
['from_user'], result
['text']))
243 class VerboseSearchFormatter(SearchFormatter
):
244 pass #Default to the regular one
246 class URLSearchFormatter(object):
247 urlmatch
= re
.compile(r
'https?://\S+')
248 def __call__(self
, result
, options
):
249 urls
= self
.urlmatch
.findall(result
['text'])
250 return '\n'.join(urls
) if urls
else ""
252 class AnsiSearchFormatter(object):
254 self
._colourMap
= ansi
.ColourMap()
256 def __call__(self
, result
, options
):
257 colour
= self
._colourMap
.colourFor(result
['from_user'])
258 return ("%s%s%s%s %s" %(
259 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
260 ansi
.cmdColour(colour
), result
['from_user'],
261 ansi
.cmdReset(), result
['text']))
263 _term_encoding
= None
264 def get_term_encoding():
265 global _term_encoding
266 if not _term_encoding
:
267 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
269 _term_encoding
= lang
[1]
271 _term_encoding
= 'UTF-8'
272 return _term_encoding
275 status_formatters
= {
276 'default': StatusFormatter
,
277 'verbose': VerboseStatusFormatter
,
278 'urls': URLStatusFormatter
,
279 'ansi': AnsiStatusFormatter
281 formatters
['status'] = status_formatters
284 'default': AdminFormatter
,
285 'verbose': VerboseAdminFormatter
,
286 'urls': AdminFormatter
,
287 'ansi': AdminFormatter
289 formatters
['admin'] = admin_formatters
291 search_formatters
= {
292 'default': SearchFormatter
,
293 'verbose': VerboseSearchFormatter
,
294 'urls': URLSearchFormatter
,
295 'ansi': AnsiSearchFormatter
297 formatters
['search'] = search_formatters
300 'default': ListsFormatter
,
301 'verbose': ListsVerboseFormatter
,
303 'ansi': AnsiListsFormatter
305 formatters
['lists'] = lists_formatters
307 def get_formatter(action_type
, options
):
308 formatters_dict
= formatters
.get(action_type
)
309 if (not formatters_dict
):
311 "There was an error finding a class of formatters for your type (%s)"
313 f
= formatters_dict
.get(options
['format'])
316 "Unknown formatter '%s' for status actions" %(options
['format']))
319 class Action(object):
321 def ask(self
, subject
='perform this action', careful
=False):
323 Requests fromt he user using `raw_input` if `subject` should be
324 performed. When `careful`, the default answer is NO, otherwise YES.
325 Returns the user answer in the form `True` or `False`.
331 prompt
= 'You really want to %s %s? ' %(subject
, sample
)
333 answer
= input(prompt
).lower()
335 return answer
in ('yes', 'y')
337 return answer
not in ('no', 'n')
339 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
341 # Figure out why on OS X the raw_input keeps raising
342 # EOFError and is never able to reset and get more input
343 # Hint: Look at how IPython implements their console
349 def __call__(self
, twitter
, options
):
350 action
= actions
.get(options
['action'], NoSuchAction
)()
352 doAction
= lambda : action(twitter
, options
)
353 if (options
['refresh'] and isinstance(action
, StatusAction
)):
357 time
.sleep(options
['refresh_rate'])
360 except KeyboardInterrupt:
361 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
364 class NoSuchActionError(Exception):
367 class NoSuchAction(Action
):
368 def __call__(self
, twitter
, options
):
369 raise NoSuchActionError("No such action: %s" %(options
['action']))
371 class StatusAction(Action
):
372 def __call__(self
, twitter
, options
):
373 statuses
= self
.getStatuses(twitter
, options
)
374 sf
= get_formatter('status', options
)
375 for status
in statuses
:
376 statusStr
= sf(status
, options
)
377 if statusStr
.strip():
378 printNicely(statusStr
)
380 class SearchAction(Action
):
381 def __call__(self
, twitter
, options
):
382 # We need to be pointing at search.twitter.com to work, and it is less
383 # tangly to do it here than in the main()
384 twitter
.domain
="search.twitter.com"
386 # We need to bypass the TwitterCall parameter encoding, so we
387 # don't encode the plus sign, so we have to encode it ourselves
388 query_string
= "+".join(
390 for term
in options
['extra_args']])
392 results
= twitter
.search(q
=query_string
)['results']
393 f
= get_formatter('search', options
)
394 for result
in results
:
395 resultStr
= f(result
, options
)
396 if resultStr
.strip():
397 printNicely(resultStr
)
399 class AdminAction(Action
):
400 def __call__(self
, twitter
, options
):
401 if not (options
['extra_args'] and options
['extra_args'][0]):
402 raise TwitterError("You need to specify a user (screen name)")
403 af
= get_formatter('admin', options
)
405 user
= self
.getUser(twitter
, options
['extra_args'][0])
406 except TwitterError
as e
:
407 print("There was a problem following or leaving the specified user.")
408 print("You may be trying to follow a user you are already following;")
409 print("Leaving a user you are not currently following;")
410 print("Or the user may not exist.")
415 printNicely(af(options
['action'], user
))
417 class ListsAction(StatusAction
):
418 def getStatuses(self
, twitter
, options
):
419 if not options
['extra_args']:
420 raise TwitterError("Please provide a user to query for lists")
422 screen_name
= options
['extra_args'][0]
424 if not options
['extra_args'][1:]:
425 lists
= twitter
.user
.lists(user
=screen_name
)['lists']
427 printNicely("This user has no lists.")
429 lf
= get_formatter('lists', options
)
430 printNicely(lf(list))
433 return reversed(twitter
.user
.lists
.list.statuses(
434 user
=screen_name
, list=options
['extra_args'][1]))
437 class MyListsAction(ListsAction
):
438 def getStatuses(self
, twitter
, options
):
439 screen_name
= twitter
.account
.verify_credentials()['screen_name']
440 options
['extra_args'].insert(0, screen_name
)
441 return ListsAction
.getStatuses(self
, twitter
, options
)
444 class FriendsAction(StatusAction
):
445 def getStatuses(self
, twitter
, options
):
446 return reversed(twitter
.statuses
.friends_timeline(count
=options
["length"]))
448 class PublicAction(StatusAction
):
449 def getStatuses(self
, twitter
, options
):
450 return reversed(twitter
.statuses
.public_timeline(count
=options
["length"]))
452 class RepliesAction(StatusAction
):
453 def getStatuses(self
, twitter
, options
):
454 return reversed(twitter
.statuses
.replies(count
=options
["length"]))
456 class FollowAction(AdminAction
):
457 def getUser(self
, twitter
, user
):
458 return twitter
.friendships
.create(id=user
)
460 class LeaveAction(AdminAction
):
461 def getUser(self
, twitter
, user
):
462 return twitter
.friendships
.destroy(id=user
)
464 class SetStatusAction(Action
):
465 def __call__(self
, twitter
, options
):
466 statusTxt
= (" ".join(options
['extra_args'])
467 if options
['extra_args']
468 else str(input("message: ")))
470 ptr
= re
.compile("@[\w_]+")
472 s
= ptr
.match(statusTxt
)
473 if s
and s
.start() == 0:
474 replies
.append(statusTxt
[s
.start():s
.end()])
475 statusTxt
= statusTxt
[s
.end()+1:]
478 replies
= " ".join(replies
)
479 if len(replies
) >= 140:
486 limit
= 140 - len(replies
)
487 if len(statusTxt
) > limit
:
488 end
= string
.rfind(statusTxt
, ' ', 0, limit
)
491 splitted
.append(" ".join((replies
,statusTxt
[:end
])))
492 statusTxt
= statusTxt
[end
:]
494 if options
['invert_split']:
496 for status
in splitted
:
497 twitter
.statuses
.update(status
=status
)
499 class TwitterShell(Action
):
501 def render_prompt(self
, prompt
):
502 '''Parses the `prompt` string and returns the rendered version'''
503 prompt
= prompt
.strip("'").replace("\\'","'")
504 for colour
in ansi
.COLOURS_NAMED
:
505 if '[%s]' %(colour) in prompt
:
506 prompt
= prompt
.replace(
507 '[%s]' %(colour), ansi
.cmdColourNamed(colour
))
508 prompt
= prompt
.replace('[R]', ansi
.cmdReset())
511 def __call__(self
, twitter
, options
):
512 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
514 options
['action'] = ""
516 args
= input(prompt
).split()
517 parse_args(args
, options
)
518 if not options
['action']:
520 elif options
['action'] == 'exit':
522 elif options
['action'] == 'shell':
523 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
525 elif options
['action'] == 'help':
526 print('''\ntwitter> `action`\n
527 The Shell Accepts all the command line actions along with:
529 exit Leave the twitter shell (^D may also be used)
531 Full CMD Line help is appended below for your convinience.''', file=sys
.stderr
)
532 Action()(twitter
, options
)
533 options
['action'] = ''
534 except NoSuchActionError
as e
:
535 print(e
, file=sys
.stderr
)
536 except KeyboardInterrupt:
537 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
539 print(file=sys
.stderr
)
540 leaving
= self
.ask(subject
='Leave')
542 print('Excellent!', file=sys
.stderr
)
546 class PythonPromptAction(Action
):
547 def __call__(self
, twitter
, options
):
550 smrt_input(globals(), locals())
554 class HelpAction(Action
):
555 def __call__(self
, twitter
, options
):
558 class DoNothingAction(Action
):
559 def __call__(self
, twitter
, options
):
562 class RateLimitStatus(Action
):
563 def __call__(self
, twitter
, options
):
564 rate
= twitter
.account
.rate_limit_status()
565 print("Remaining API requests: %s / %s (hourly limit)" % (rate
['remaining_hits'], rate
['hourly_limit']))
566 print("Next reset in %ss (%s)" % (int(rate
['reset_time_in_seconds']-time
.time()),
567 time
.asctime(time
.localtime(rate
['reset_time_in_seconds']))))
570 'authorize' : DoNothingAction
,
571 'follow' : FollowAction
,
572 'friends' : FriendsAction
,
573 'list' : ListsAction
,
574 'mylist' : MyListsAction
,
576 'leave' : LeaveAction
,
577 'public' : PublicAction
,
578 'pyprompt' : PythonPromptAction
,
579 'replies' : RepliesAction
,
580 'search' : SearchAction
,
581 'set' : SetStatusAction
,
582 'shell' : TwitterShell
,
583 'rate' : RateLimitStatus
,
586 def loadConfig(filename
):
587 options
= dict(OPTIONS
)
588 if os
.path
.exists(filename
):
589 cp
= SafeConfigParser()
591 for option
in ('format', 'prompt'):
592 if cp
.has_option('twitter', option
):
593 options
[option
] = cp
.get('twitter', option
)
595 for option
in ('invert_split',):
596 if cp
.has_option('twitter', option
):
597 options
[option
] = cp
.getboolean('twitter', option
)
600 def main(args
=sys
.argv
[1:]):
603 parse_args(args
, arg_options
)
604 except GetoptError
as e
:
605 print("I can't do that, %s." %(e), file=sys
.stderr
)
606 print(file=sys
.stderr
)
609 config_path
= os
.path
.expanduser(
610 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
611 config_options
= loadConfig(config_path
)
613 # Apply the various options in order, the most important applied last.
614 # Defaults first, then what's read from config file, then command-line
616 options
= dict(OPTIONS
)
617 for d
in config_options
, arg_options
:
618 for k
,v
in list(d
.items()):
621 if options
['refresh'] and options
['action'] not in (
622 'friends', 'public', 'replies'):
623 print("You can only refresh the friends, public, or replies actions.", file=sys
.stderr
)
624 print("Use 'twitter -h' for help.", file=sys
.stderr
)
627 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
629 if (options
['action'] == 'authorize'
630 or not os
.path
.exists(oauth_filename
)):
632 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
633 options
['oauth_filename'])
635 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
639 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
640 secure
=options
['secure'],
642 domain
='api.twitter.com')
645 Action()(twitter
, options
)
646 except NoSuchActionError
as e
:
647 print(e
, file=sys
.stderr
)
649 except TwitterError
as e
:
650 print(str(e
), file=sys
.stderr
)
651 print("Use 'twitter -h' for help.", file=sys
.stderr
)