]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
775df731b977f22eaaa7cfa4a5beb1ff3e044443
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 public get latest public tweets
19 pyprompt start a Python prompt for interacting with the twitter
21 replies get latest replies to you
22 search search twitter (Beware: octothorpe, escape it)
23 set set your twitter status
24 shell login to the twitter shell
25 rate get your current rate limit status (remaining API reqs)
30 -r --refresh run this command forever, polling every once
31 in a while (default: every 5 minutes)
32 -R --refresh-rate <rate> set the refresh rate (in seconds)
33 -f --format <format> specify the output format for status updates
34 -c --config <filename> read username and password from given config
35 file (default ~/.twitter)
36 -l --length <count> specify number of status updates shown
37 (default: 20, max: 200)
38 -t --timestamp show time before status lines
39 -d --datestamp show date before status lines
40 --no-ssl use less-secure HTTP instead of HTTPS
41 --oauth <filename> filename to read/store oauth credentials to
43 FORMATS for the --format option
45 default one line per status
46 verbose multiple lines per status, more verbose status info
48 ansi ansi colour (rainbow mode)
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
65 from __future__
import print_function
67 CONSUMER_KEY
='uS6hO2sV6tDKIOeVjhnFnQ'
68 CONSUMER_SECRET
='MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
72 from getopt
import gnu_getopt
as getopt
, GetoptError
73 from getpass
import getpass
80 from ConfigParser
import SafeConfigParser
82 from configparser
import ConfigParser
as SafeConfigParser
85 from urllib
.parse
import quote
87 from urllib2
import quote
90 from .api
import Twitter
, TwitterError
91 from .oauth
import OAuth
, write_token_file
, read_token_file
92 from .oauth_dance
import oauth_dance
94 from .util
import smrt_input
, printNicely
101 'prompt': '[cyan]twitter[R]> ',
102 'config_filename': os
.environ
.get('HOME', '') + os
.sep
+ '.twitter',
103 'oauth_filename': os
.environ
.get('HOME', '') + os
.sep
+ '.twitter_oauth',
111 def parse_args(args
, options
):
112 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
113 'refresh-rate=', 'config=', 'length=', 'timestamp',
114 'datestamp', 'no-ssl']
115 short_opts
= "e:p:f:h?rR:c:l:td"
116 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
117 extra_args
= [arg
.decode(locale
.getpreferredencoding())
118 for arg
in extra_args
]
120 for opt
, arg
in opts
:
121 if opt
in ('-f', '--format'):
122 options
['format'] = arg
123 elif opt
in ('-r', '--refresh'):
124 options
['refresh'] = True
125 elif opt
in ('-R', '--refresh-rate'):
126 options
['refresh_rate'] = int(arg
)
127 elif opt
in ('-l', '--length'):
128 options
["length"] = int(arg
)
129 elif opt
in ('-t', '--timestamp'):
130 options
["timestamp"] = True
131 elif opt
in ('-d', '--datestamp'):
132 options
["datestamp"] = True
133 elif opt
in ('-?', '-h', '--help'):
134 options
['action'] = 'help'
135 elif opt
in ('-c', '--config'):
136 options
['config_filename'] = arg
137 elif opt
== '--no-ssl':
138 options
['secure'] = False
139 elif opt
== '--oauth':
140 options
['oauth_filename'] = arg
142 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
143 options
['action'] = extra_args
[0]
144 options
['extra_args'] = extra_args
[1:]
146 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
147 timestamp
= options
["timestamp"]
148 datestamp
= options
["datestamp"]
149 t
= time
.strptime(status
['created_at'], format
)
150 i_hate_timezones
= time
.timezone
152 i_hate_timezones
= time
.altzone
153 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
154 seconds
=i_hate_timezones
)
156 if timestamp
and datestamp
:
157 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
159 return time
.strftime("%H:%M:%S ", t
)
161 return time
.strftime("%Y-%m-%d ", t
)
164 class StatusFormatter(object):
165 def __call__(self
, status
, options
):
167 get_time_string(status
, options
),
168 status
['user']['screen_name'], status
['text']))
170 class AnsiStatusFormatter(object):
172 self
._colourMap
= ansi
.ColourMap()
174 def __call__(self
, status
, options
):
175 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
176 return ("%s%s%s%s %s" %(
177 get_time_string(status
, options
),
178 ansi
.cmdColour(colour
), status
['user']['screen_name'],
179 ansi
.cmdReset(), status
['text']))
181 class VerboseStatusFormatter(object):
182 def __call__(self
, status
, options
):
183 return ("-- %s (%s) on %s\n%s\n" %(
184 status
['user']['screen_name'],
185 status
['user']['location'],
186 status
['created_at'],
189 class URLStatusFormatter(object):
190 urlmatch
= re
.compile(r
'https?://\S+')
191 def __call__(self
, status
, options
):
192 urls
= self
.urlmatch
.findall(status
['text'])
193 return '\n'.join(urls
) if urls
else ""
196 class ListsFormatter(object):
197 def __call__(self
, list):
198 if list['description']:
199 list_str
= "%-30s (%s)" % (list['name'], list['description'])
201 list_str
= "%-30s" % (list['name'])
202 return "%s\n" % list_str
204 class ListsVerboseFormatter(object):
205 def __call__(self
, list):
206 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
209 class AnsiListsFormatter(object):
211 self
._colourMap
= ansi
.ColourMap()
213 def __call__(self
, list):
214 colour
= self
._colourMap
.colourFor(list['name'])
215 return ("%s%-15s%s %s" %(
216 ansi
.cmdColour(colour
), list['name'],
217 ansi
.cmdReset(), list['description']))
220 class AdminFormatter(object):
221 def __call__(self
, action
, user
):
222 user_str
= "%s (%s)" %(user
['screen_name'], user
['name'])
223 if action
== "follow":
224 return "You are now following %s.\n" %(user_str)
226 return "You are no longer following %s.\n" %(user_str)
228 class VerboseAdminFormatter(object):
229 def __call__(self
, action
, user
):
230 return("-- %s: %s (%s): %s" % (
231 "Following" if action
== "follow" else "Leaving",
236 class SearchFormatter(object):
237 def __call__(self
, result
, options
):
239 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
240 result
['from_user'], result
['text']))
242 class VerboseSearchFormatter(SearchFormatter
):
243 pass #Default to the regular one
245 class URLSearchFormatter(object):
246 urlmatch
= re
.compile(r
'https?://\S+')
247 def __call__(self
, result
, options
):
248 urls
= self
.urlmatch
.findall(result
['text'])
249 return '\n'.join(urls
) if urls
else ""
251 class AnsiSearchFormatter(object):
253 self
._colourMap
= ansi
.ColourMap()
255 def __call__(self
, result
, options
):
256 colour
= self
._colourMap
.colourFor(result
['from_user'])
257 return ("%s%s%s%s %s" %(
258 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
259 ansi
.cmdColour(colour
), result
['from_user'],
260 ansi
.cmdReset(), result
['text']))
262 _term_encoding
= None
263 def get_term_encoding():
264 global _term_encoding
265 if not _term_encoding
:
266 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
268 _term_encoding
= lang
[1]
270 _term_encoding
= 'UTF-8'
271 return _term_encoding
274 status_formatters
= {
275 'default': StatusFormatter
,
276 'verbose': VerboseStatusFormatter
,
277 'urls': URLStatusFormatter
,
278 'ansi': AnsiStatusFormatter
280 formatters
['status'] = status_formatters
283 'default': AdminFormatter
,
284 'verbose': VerboseAdminFormatter
,
285 'urls': AdminFormatter
,
286 'ansi': AdminFormatter
288 formatters
['admin'] = admin_formatters
290 search_formatters
= {
291 'default': SearchFormatter
,
292 'verbose': VerboseSearchFormatter
,
293 'urls': URLSearchFormatter
,
294 'ansi': AnsiSearchFormatter
296 formatters
['search'] = search_formatters
299 'default': ListsFormatter
,
300 'verbose': ListsVerboseFormatter
,
302 'ansi': AnsiListsFormatter
304 formatters
['lists'] = lists_formatters
306 def get_formatter(action_type
, options
):
307 formatters_dict
= formatters
.get(action_type
)
308 if (not formatters_dict
):
310 "There was an error finding a class of formatters for your type (%s)"
312 f
= formatters_dict
.get(options
['format'])
315 "Unknown formatter '%s' for status actions" %(options
['format']))
318 class Action(object):
320 def ask(self
, subject
='perform this action', careful
=False):
322 Requests fromt he user using `raw_input` if `subject` should be
323 performed. When `careful`, the default answer is NO, otherwise YES.
324 Returns the user answer in the form `True` or `False`.
330 prompt
= 'You really want to %s %s? ' %(subject
, sample
)
332 answer
= input(prompt
).lower()
334 return answer
in ('yes', 'y')
336 return answer
not in ('no', 'n')
338 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
340 # Figure out why on OS X the raw_input keeps raising
341 # EOFError and is never able to reset and get more input
342 # Hint: Look at how IPython implements their console
348 def __call__(self
, twitter
, options
):
349 action
= actions
.get(options
['action'], NoSuchAction
)()
351 doAction
= lambda : action(twitter
, options
)
352 if (options
['refresh'] and isinstance(action
, StatusAction
)):
356 time
.sleep(options
['refresh_rate'])
359 except KeyboardInterrupt:
360 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
363 class NoSuchActionError(Exception):
366 class NoSuchAction(Action
):
367 def __call__(self
, twitter
, options
):
368 raise NoSuchActionError("No such action: %s" %(options
['action']))
370 class StatusAction(Action
):
371 def __call__(self
, twitter
, options
):
372 statuses
= self
.getStatuses(twitter
, options
)
373 sf
= get_formatter('status', options
)
374 for status
in statuses
:
375 statusStr
= sf(status
, options
)
376 if statusStr
.strip():
377 printNicely(statusStr
)
379 class SearchAction(Action
):
380 def __call__(self
, twitter
, options
):
381 # We need to be pointing at search.twitter.com to work, and it is less
382 # tangly to do it here than in the main()
383 twitter
.domain
="search.twitter.com"
385 # We need to bypass the TwitterCall parameter encoding, so we
386 # don't encode the plus sign, so we have to encode it ourselves
387 query_string
= "+".join(
389 for term
in options
['extra_args']])
391 results
= twitter
.search(q
=query_string
)['results']
392 f
= get_formatter('search', options
)
393 for result
in results
:
394 resultStr
= f(result
, options
)
395 if resultStr
.strip():
396 printNicely(resultStr
)
398 class AdminAction(Action
):
399 def __call__(self
, twitter
, options
):
400 if not (options
['extra_args'] and options
['extra_args'][0]):
401 raise TwitterError("You need to specify a user (screen name)")
402 af
= get_formatter('admin', options
)
404 user
= self
.getUser(twitter
, options
['extra_args'][0])
405 except TwitterError
as e
:
406 print("There was a problem following or leaving the specified user.")
407 print("You may be trying to follow a user you are already following;")
408 print("Leaving a user you are not currently following;")
409 print("Or the user may not exist.")
414 printNicely(af(options
['action'], user
))
416 class ListsAction(StatusAction
):
417 def getStatuses(self
, twitter
, options
):
418 if not options
['extra_args']:
419 raise TwitterError("Please provide a user to query for lists")
421 screen_name
= options
['extra_args'][0]
423 if not options
['extra_args'][1:]:
424 lists
= twitter
.user
.lists(user
=screen_name
)['lists']
426 printNicely("This user has no lists.")
428 lf
= get_formatter('lists', options
)
429 printNicely(lf(list))
432 return reversed(twitter
.user
.lists
.list.statuses(
433 user
=screen_name
, list=options
['extra_args'][1]))
436 class MyListsAction(ListsAction
):
437 def getStatuses(self
, twitter
, options
):
438 screen_name
= twitter
.account
.verify_credentials()['screen_name']
439 options
['extra_args'].insert(0, screen_name
)
440 return ListsAction
.getStatuses(self
, twitter
, options
)
443 class FriendsAction(StatusAction
):
444 def getStatuses(self
, twitter
, options
):
445 return reversed(twitter
.statuses
.friends_timeline(count
=options
["length"]))
447 class PublicAction(StatusAction
):
448 def getStatuses(self
, twitter
, options
):
449 return reversed(twitter
.statuses
.public_timeline(count
=options
["length"]))
451 class RepliesAction(StatusAction
):
452 def getStatuses(self
, twitter
, options
):
453 return reversed(twitter
.statuses
.replies(count
=options
["length"]))
455 class FollowAction(AdminAction
):
456 def getUser(self
, twitter
, user
):
457 return twitter
.friendships
.create(id=user
)
459 class LeaveAction(AdminAction
):
460 def getUser(self
, twitter
, user
):
461 return twitter
.friendships
.destroy(id=user
)
463 class SetStatusAction(Action
):
464 def __call__(self
, twitter
, options
):
465 statusTxt
= (" ".join(options
['extra_args'])
466 if options
['extra_args']
467 else str(input("message: ")))
469 ptr
= re
.compile("@[\w_]+")
471 s
= ptr
.match(statusTxt
)
472 if s
and s
.start() == 0:
473 replies
.append(statusTxt
[s
.start():s
.end()])
474 statusTxt
= statusTxt
[s
.end()+1:]
477 replies
= " ".join(replies
)
478 if len(replies
) >= 140:
485 limit
= 140 - len(replies
)
486 if len(statusTxt
) > limit
:
487 end
= string
.rfind(statusTxt
, ' ', 0, limit
)
490 splitted
.append(" ".join((replies
,statusTxt
[:end
])))
491 statusTxt
= statusTxt
[end
:]
493 for status
in splitted
:
494 twitter
.statuses
.update(status
=status
)
496 class TwitterShell(Action
):
498 def render_prompt(self
, prompt
):
499 '''Parses the `prompt` string and returns the rendered version'''
500 prompt
= prompt
.strip("'").replace("\\'","'")
501 for colour
in ansi
.COLOURS_NAMED
:
502 if '[%s]' %(colour) in prompt
:
503 prompt
= prompt
.replace(
504 '[%s]' %(colour), ansi
.cmdColourNamed(colour
))
505 prompt
= prompt
.replace('[R]', ansi
.cmdReset())
508 def __call__(self
, twitter
, options
):
509 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
511 options
['action'] = ""
513 args
= input(prompt
).split()
514 parse_args(args
, options
)
515 if not options
['action']:
517 elif options
['action'] == 'exit':
519 elif options
['action'] == 'shell':
520 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
522 elif options
['action'] == 'help':
523 print('''\ntwitter> `action`\n
524 The Shell Accepts all the command line actions along with:
526 exit Leave the twitter shell (^D may also be used)
528 Full CMD Line help is appended below for your convinience.''', file=sys
.stderr
)
529 Action()(twitter
, options
)
530 options
['action'] = ''
531 except NoSuchActionError
as e
:
532 print(e
, file=sys
.stderr
)
533 except KeyboardInterrupt:
534 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
536 print(file=sys
.stderr
)
537 leaving
= self
.ask(subject
='Leave')
539 print('Excellent!', file=sys
.stderr
)
543 class PythonPromptAction(Action
):
544 def __call__(self
, twitter
, options
):
547 smrt_input(globals(), locals())
551 class HelpAction(Action
):
552 def __call__(self
, twitter
, options
):
555 class DoNothingAction(Action
):
556 def __call__(self
, twitter
, options
):
559 class RateLimitStatus(Action
):
560 def __call__(self
, twitter
, options
):
561 rate
= twitter
.account
.rate_limit_status()
562 print("Remaining API requests: %s / %s (hourly limit)" % (rate
['remaining_hits'], rate
['hourly_limit']))
563 print("Next reset in %ss (%s)" % (int(rate
['reset_time_in_seconds']-time
.time()),
564 time
.asctime(time
.localtime(rate
['reset_time_in_seconds']))))
567 'authorize' : DoNothingAction
,
568 'follow' : FollowAction
,
569 'friends' : FriendsAction
,
570 'list' : ListsAction
,
571 'mylist' : MyListsAction
,
573 'leave' : LeaveAction
,
574 'public' : PublicAction
,
575 'pyprompt' : PythonPromptAction
,
576 'replies' : RepliesAction
,
577 'search' : SearchAction
,
578 'set' : SetStatusAction
,
579 'shell' : TwitterShell
,
580 'rate' : RateLimitStatus
,
583 def loadConfig(filename
):
584 options
= dict(OPTIONS
)
585 if os
.path
.exists(filename
):
586 cp
= SafeConfigParser()
588 for option
in ('format', 'prompt'):
589 if cp
.has_option('twitter', option
):
590 options
[option
] = cp
.get('twitter', option
)
593 def main(args
=sys
.argv
[1:]):
596 parse_args(args
, arg_options
)
597 except GetoptError
as e
:
598 print("I can't do that, %s." %(e), file=sys
.stderr
)
599 print(file=sys
.stderr
)
602 config_path
= os
.path
.expanduser(
603 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
604 config_options
= loadConfig(config_path
)
606 # Apply the various options in order, the most important applied last.
607 # Defaults first, then what's read from config file, then command-line
609 options
= dict(OPTIONS
)
610 for d
in config_options
, arg_options
:
611 for k
,v
in list(d
.items()):
614 if options
['refresh'] and options
['action'] not in (
615 'friends', 'public', 'replies'):
616 print("You can only refresh the friends, public, or replies actions.", file=sys
.stderr
)
617 print("Use 'twitter -h' for help.", file=sys
.stderr
)
620 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
622 if (options
['action'] == 'authorize'
623 or not os
.path
.exists(oauth_filename
)):
625 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
626 options
['oauth_filename'])
628 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
632 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
633 secure
=options
['secure'],
635 domain
='api.twitter.com')
638 Action()(twitter
, options
)
639 except NoSuchActionError
as e
:
640 print(e
, file=sys
.stderr
)
642 except TwitterError
as e
:
643 print(str(e
), file=sys
.stderr
)
644 print("Use 'twitter -h' for help.", file=sys
.stderr
)