]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
8353df3473eff11d79af3e76c624cb8bb76d04a8
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
43 FORMATS for the --format option
45 default one line per status
46 verbose multiple lines per status, more verbose status info
48 ansi ansi colour (rainbow mode)
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
65 from __future__
import print_function
67 CONSUMER_KEY
='uS6hO2sV6tDKIOeVjhnFnQ'
68 CONSUMER_SECRET
='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
72 from getopt
import gnu_getopt
as getopt
, GetoptError
73 from getpass
import getpass
80 from ConfigParser
import SafeConfigParser
82 from configparser
import ConfigParser
as SafeConfigParser
85 from urllib
.parse
import quote
87 from urllib2
import quote
90 from .api
import Twitter
, TwitterError
91 from .oauth
import OAuth
, write_token_file
, read_token_file
92 from .oauth_dance
import oauth_dance
94 from .util
import smrt_input
, printNicely
101 'prompt': '[cyan]twitter[R]> ',
102 'config_filename': os
.environ
.get('HOME', '') + os
.sep
+ '.twitter',
103 'oauth_filename': os
.environ
.get('HOME', '') + os
.sep
+ '.twitter_oauth',
111 def parse_args(args
, options
):
112 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
113 'refresh-rate=', 'config=', 'length=', 'timestamp',
114 'datestamp', 'no-ssl']
115 short_opts
= "e:p:f:h?rR:c:l:td"
116 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
117 extra_args
= [arg
.decode(locale
.getpreferredencoding())
118 for arg
in extra_args
]
120 for opt
, arg
in opts
:
121 if opt
in ('-f', '--format'):
122 options
['format'] = arg
123 elif opt
in ('-r', '--refresh'):
124 options
['refresh'] = True
125 elif opt
in ('-R', '--refresh-rate'):
126 options
['refresh_rate'] = int(arg
)
127 elif opt
in ('-l', '--length'):
128 options
["length"] = int(arg
)
129 elif opt
in ('-t', '--timestamp'):
130 options
["timestamp"] = True
131 elif opt
in ('-d', '--datestamp'):
132 options
["datestamp"] = True
133 elif opt
in ('-?', '-h', '--help'):
134 options
['action'] = 'help'
135 elif opt
in ('-c', '--config'):
136 options
['config_filename'] = arg
137 elif opt
== '--no-ssl':
138 options
['secure'] = False
139 elif opt
== '--oauth':
140 options
['oauth_filename'] = arg
142 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
143 options
['action'] = extra_args
[0]
144 options
['extra_args'] = extra_args
[1:]
146 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
147 timestamp
= options
["timestamp"]
148 datestamp
= options
["datestamp"]
149 t
= time
.strptime(status
['created_at'], format
)
150 i_hate_timezones
= time
.timezone
152 i_hate_timezones
= time
.altzone
153 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
154 seconds
=i_hate_timezones
)
156 if timestamp
and datestamp
:
157 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
159 return time
.strftime("%H:%M:%S ", t
)
161 return time
.strftime("%Y-%m-%d ", t
)
164 class StatusFormatter(object):
165 def __call__(self
, status
, options
):
167 get_time_string(status
, options
),
168 status
['user']['screen_name'], status
['text']))
170 class AnsiStatusFormatter(object):
172 self
._colourMap
= ansi
.ColourMap()
174 def __call__(self
, status
, options
):
175 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
176 return ("%s%s%s%s %s" %(
177 get_time_string(status
, options
),
178 ansi
.cmdColour(colour
), status
['user']['screen_name'],
179 ansi
.cmdReset(), status
['text']))
181 class VerboseStatusFormatter(object):
182 def __call__(self
, status
, options
):
183 return ("-- %s (%s) on %s\n%s\n" %(
184 status
['user']['screen_name'],
185 status
['user']['location'],
186 status
['created_at'],
189 class URLStatusFormatter(object):
190 urlmatch
= re
.compile(r
'https?://\S+')
191 def __call__(self
, status
, options
):
192 urls
= self
.urlmatch
.findall(status
['text'])
193 return '\n'.join(urls
) if urls
else ""
196 class ListsFormatter(object):
197 def __call__(self
, list):
198 if list['description']:
199 list_str
= "%-30s (%s)" % (list['name'], list['description'])
201 list_str
= "%-30s" % (list['name'])
202 return "%s\n" % list_str
204 class ListsVerboseFormatter(object):
205 def __call__(self
, list):
206 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
209 class AnsiListsFormatter(object):
211 self
._colourMap
= ansi
.ColourMap()
213 def __call__(self
, list):
214 colour
= self
._colourMap
.colourFor(list['name'])
215 return ("%s%-15s%s %s" %(
216 ansi
.cmdColour(colour
), list['name'],
217 ansi
.cmdReset(), list['description']))
220 class AdminFormatter(object):
221 def __call__(self
, action
, user
):
222 user_str
= "%s (%s)" %(user
['screen_name'], user
['name'])
223 if action
== "follow":
224 return "You are now following %s.\n" %(user_str)
226 return "You are no longer following %s.\n" %(user_str)
228 class VerboseAdminFormatter(object):
229 def __call__(self
, action
, user
):
230 return("-- %s: %s (%s): %s" % (
231 "Following" if action
== "follow" else "Leaving",
236 class SearchFormatter(object):
237 def __call__(self
, result
, options
):
239 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
240 result
['from_user'], result
['text']))
242 class VerboseSearchFormatter(SearchFormatter
):
243 pass #Default to the regular one
245 class URLSearchFormatter(object):
246 urlmatch
= re
.compile(r
'https?://\S+')
247 def __call__(self
, result
, options
):
248 urls
= self
.urlmatch
.findall(result
['text'])
249 return '\n'.join(urls
) if urls
else ""
251 class AnsiSearchFormatter(object):
253 self
._colourMap
= ansi
.ColourMap()
255 def __call__(self
, result
, options
):
256 colour
= self
._colourMap
.colourFor(result
['from_user'])
257 return ("%s%s%s%s %s" %(
258 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
259 ansi
.cmdColour(colour
), result
['from_user'],
260 ansi
.cmdReset(), result
['text']))
262 _term_encoding
= None
263 def get_term_encoding():
264 global _term_encoding
265 if not _term_encoding
:
266 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
268 _term_encoding
= lang
[1]
270 _term_encoding
= 'UTF-8'
271 return _term_encoding
274 status_formatters
= {
275 'default': StatusFormatter
,
276 'verbose': VerboseStatusFormatter
,
277 'urls': URLStatusFormatter
,
278 'ansi': AnsiStatusFormatter
280 formatters
['status'] = status_formatters
283 'default': AdminFormatter
,
284 'verbose': VerboseAdminFormatter
,
285 'urls': AdminFormatter
,
286 'ansi': AdminFormatter
288 formatters
['admin'] = admin_formatters
290 search_formatters
= {
291 'default': SearchFormatter
,
292 'verbose': VerboseSearchFormatter
,
293 'urls': URLSearchFormatter
,
294 'ansi': AnsiSearchFormatter
296 formatters
['search'] = search_formatters
299 'default': ListsFormatter
,
300 'verbose': ListsVerboseFormatter
,
302 'ansi': AnsiListsFormatter
304 formatters
['lists'] = lists_formatters
306 def get_formatter(action_type
, options
):
307 formatters_dict
= formatters
.get(action_type
)
308 if (not formatters_dict
):
310 "There was an error finding a class of formatters for your type (%s)"
312 f
= formatters_dict
.get(options
['format'])
315 "Unknown formatter '%s' for status actions" %(options
['format']))
318 class Action(object):
320 def ask(self
, subject
='perform this action', careful
=False):
322 Requests fromt he user using `raw_input` if `subject` should be
323 performed. When `careful`, the default answer is NO, otherwise YES.
324 Returns the user answer in the form `True` or `False`.
330 prompt
= 'You really want to %s %s? ' %(subject
, sample
)
332 answer
= input(prompt
).lower()
334 return answer
in ('yes', 'y')
336 return answer
not in ('no', 'n')
338 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
340 # Figure out why on OS X the raw_input keeps raising
341 # EOFError and is never able to reset and get more input
342 # Hint: Look at how IPython implements their console
348 def __call__(self
, twitter
, options
):
349 action
= actions
.get(options
['action'], NoSuchAction
)()
351 doAction
= lambda : action(twitter
, options
)
352 if (options
['refresh'] and isinstance(action
, StatusAction
)):
355 time
.sleep(options
['refresh_rate'])
358 except KeyboardInterrupt:
359 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
362 class NoSuchActionError(Exception):
365 class NoSuchAction(Action
):
366 def __call__(self
, twitter
, options
):
367 raise NoSuchActionError("No such action: %s" %(options
['action']))
369 class StatusAction(Action
):
370 def __call__(self
, twitter
, options
):
371 statuses
= self
.getStatuses(twitter
, options
)
372 sf
= get_formatter('status', options
)
373 for status
in statuses
:
374 statusStr
= sf(status
, options
)
375 if statusStr
.strip():
376 printNicely(statusStr
)
378 class SearchAction(Action
):
379 def __call__(self
, twitter
, options
):
380 # We need to be pointing at search.twitter.com to work, and it is less
381 # tangly to do it here than in the main()
382 twitter
.domain
="search.twitter.com"
384 # We need to bypass the TwitterCall parameter encoding, so we
385 # don't encode the plus sign, so we have to encode it ourselves
386 query_string
= "+".join(
388 for term
in options
['extra_args']])
390 results
= twitter
.search(q
=query_string
)['results']
391 f
= get_formatter('search', options
)
392 for result
in results
:
393 resultStr
= f(result
, options
)
394 if resultStr
.strip():
395 printNicely(resultStr
)
397 class AdminAction(Action
):
398 def __call__(self
, twitter
, options
):
399 if not (options
['extra_args'] and options
['extra_args'][0]):
400 raise TwitterError("You need to specify a user (screen name)")
401 af
= get_formatter('admin', options
)
403 user
= self
.getUser(twitter
, options
['extra_args'][0])
404 except TwitterError
as e
:
405 print("There was a problem following or leaving the specified user.")
406 print("You may be trying to follow a user you are already following;")
407 print("Leaving a user you are not currently following;")
408 print("Or the user may not exist.")
413 printNicely(af(options
['action'], user
))
415 class ListsAction(StatusAction
):
416 def getStatuses(self
, twitter
, options
):
417 if not options
['extra_args']:
418 raise TwitterError("Please provide a user to query for lists")
420 screen_name
= options
['extra_args'][0]
422 if not options
['extra_args'][1:]:
423 lists
= twitter
.user
.lists(user
=screen_name
)['lists']
425 printNicely("This user has no lists.")
427 lf
= get_formatter('lists', options
)
428 printNicely(lf(list))
431 return reversed(twitter
.user
.lists
.list.statuses(
432 user
=screen_name
, list=options
['extra_args'][1]))
435 class MyListsAction(ListsAction
):
436 def getStatuses(self
, twitter
, options
):
437 screen_name
= twitter
.account
.verify_credentials()['screen_name']
438 options
['extra_args'].insert(0, screen_name
)
439 return ListsAction
.getStatuses(self
, twitter
, options
)
442 class FriendsAction(StatusAction
):
443 def getStatuses(self
, twitter
, options
):
444 return reversed(twitter
.statuses
.friends_timeline(count
=options
["length"]))
446 class PublicAction(StatusAction
):
447 def getStatuses(self
, twitter
, options
):
448 return reversed(twitter
.statuses
.public_timeline(count
=options
["length"]))
450 class RepliesAction(StatusAction
):
451 def getStatuses(self
, twitter
, options
):
452 return reversed(twitter
.statuses
.replies(count
=options
["length"]))
454 class FollowAction(AdminAction
):
455 def getUser(self
, twitter
, user
):
456 return twitter
.friendships
.create(id=user
)
458 class LeaveAction(AdminAction
):
459 def getUser(self
, twitter
, user
):
460 return twitter
.friendships
.destroy(id=user
)
462 class SetStatusAction(Action
):
463 def __call__(self
, twitter
, options
):
464 statusTxt
= (" ".join(options
['extra_args'])
465 if options
['extra_args']
466 else str(input("message: ")))
468 ptr
= re
.compile("@[\w_]+")
470 s
= ptr
.match(statusTxt
)
471 if s
and s
.start() == 0:
472 replies
.append(statusTxt
[s
.start():s
.end()])
473 statusTxt
= statusTxt
[s
.end()+1:]
476 replies
= " ".join(replies
)
477 if len(replies
) >= 140:
484 limit
= 140 - len(replies
)
485 if len(statusTxt
) > limit
:
486 end
= string
.rfind(statusTxt
, ' ', 0, limit
)
489 splitted
.append(" ".join((replies
,statusTxt
[:end
])))
490 statusTxt
= statusTxt
[end
:]
492 for status
in splitted
:
493 twitter
.statuses
.update(status
=status
)
495 class TwitterShell(Action
):
497 def render_prompt(self
, prompt
):
498 '''Parses the `prompt` string and returns the rendered version'''
499 prompt
= prompt
.strip("'").replace("\\'","'")
500 for colour
in ansi
.COLOURS_NAMED
:
501 if '[%s]' %(colour) in prompt
:
502 prompt
= prompt
.replace(
503 '[%s]' %(colour), ansi
.cmdColourNamed(colour
))
504 prompt
= prompt
.replace('[R]', ansi
.cmdReset())
507 def __call__(self
, twitter
, options
):
508 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
510 options
['action'] = ""
512 args
= input(prompt
).split()
513 parse_args(args
, options
)
514 if not options
['action']:
516 elif options
['action'] == 'exit':
518 elif options
['action'] == 'shell':
519 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
521 elif options
['action'] == 'help':
522 print('''\ntwitter> `action`\n
523 The Shell Accepts all the command line actions along with:
525 exit Leave the twitter shell (^D may also be used)
527 Full CMD Line help is appended below for your convinience.''', file=sys
.stderr
)
528 Action()(twitter
, options
)
529 options
['action'] = ''
530 except NoSuchActionError
as e
:
531 print(e
, file=sys
.stderr
)
532 except KeyboardInterrupt:
533 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
535 print(file=sys
.stderr
)
536 leaving
= self
.ask(subject
='Leave')
538 print('Excellent!', file=sys
.stderr
)
542 class PythonPromptAction(Action
):
543 def __call__(self
, twitter
, options
):
546 smrt_input(globals(), locals())
550 class HelpAction(Action
):
551 def __call__(self
, twitter
, options
):
554 class DoNothingAction(Action
):
555 def __call__(self
, twitter
, options
):
558 class RateLimitStatus(Action
):
559 def __call__(self
, twitter
, options
):
560 rate
= twitter
.account
.rate_limit_status()
561 print("Remaining API requests: %s / %s (hourly limit)" % (rate
['remaining_hits'], rate
['hourly_limit']))
562 print("Next reset in %ss (%s)" % (int(rate
['reset_time_in_seconds']-time
.time()),
563 time
.asctime(time
.localtime(rate
['reset_time_in_seconds']))))
566 'authorize' : DoNothingAction
,
567 'follow' : FollowAction
,
568 'friends' : FriendsAction
,
569 'list' : ListsAction
,
570 'mylist' : MyListsAction
,
572 'leave' : LeaveAction
,
573 'public' : PublicAction
,
574 'pyprompt' : PythonPromptAction
,
575 'replies' : RepliesAction
,
576 'search' : SearchAction
,
577 'set' : SetStatusAction
,
578 'shell' : TwitterShell
,
579 'rate' : RateLimitStatus
,
582 def loadConfig(filename
):
583 options
= dict(OPTIONS
)
584 if os
.path
.exists(filename
):
585 cp
= SafeConfigParser()
587 for option
in ('format', 'prompt'):
588 if cp
.has_option('twitter', option
):
589 options
[option
] = cp
.get('twitter', option
)
592 def main(args
=sys
.argv
[1:]):
595 parse_args(args
, arg_options
)
596 except GetoptError
as e
:
597 print("I can't do that, %s." %(e), file=sys
.stderr
)
598 print(file=sys
.stderr
)
601 config_path
= os
.path
.expanduser(
602 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
603 config_options
= loadConfig(config_path
)
605 # Apply the various options in order, the most important applied last.
606 # Defaults first, then what's read from config file, then command-line
608 options
= dict(OPTIONS
)
609 for d
in config_options
, arg_options
:
610 for k
,v
in list(d
.items()):
613 if options
['refresh'] and options
['action'] not in (
614 'friends', 'public', 'replies'):
615 print("You can only refresh the friends, public, or replies actions.", file=sys
.stderr
)
616 print("Use 'twitter -h' for help.", file=sys
.stderr
)
619 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
621 if (options
['action'] == 'authorize'
622 or not os
.path
.exists(oauth_filename
)):
624 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
625 options
['oauth_filename'])
627 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
631 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
632 secure
=options
['secure'],
634 domain
='api.twitter.com')
637 Action()(twitter
, options
)
638 except NoSuchActionError
as e
:
639 print(e
, file=sys
.stderr
)
641 except TwitterError
as e
:
642 print(str(e
), file=sys
.stderr
)
643 print("Use 'twitter -h' for help.", file=sys
.stderr
)