]>
jfr.im git - z_archive/twitter.git/blob - twitter/cmdline.py
5 twitter [action] [options]
9 authorize authorize the command-line tool to interact with Twitter
11 friends get latest tweets from your friends (default action)
12 help print this help text that you are currently reading
13 leave stop following a user
14 list get list of a user's lists; give a list name to get
16 mylist get list of your lists; give a list name to get tweets
18 pyprompt start a Python prompt for interacting with the twitter
20 replies get latest replies to you
21 search search twitter (Beware: octothorpe, escape it)
22 set set your twitter status
23 shell login to the twitter shell
24 rate get your current rate limit status (remaining API reqs)
29 -r --refresh run this command forever, polling every once
30 in a while (default: every 5 minutes)
31 -R --refresh-rate <rate> set the refresh rate (in seconds)
32 -f --format <format> specify the output format for status updates
33 -c --config <filename> read username and password from given config
34 file (default ~/.twitter)
35 -l --length <count> specify number of status updates shown
36 (default: 20, max: 200)
37 -t --timestamp show time before status lines
38 -d --datestamp show date before status lines
39 --no-ssl use less-secure HTTP instead of HTTPS
40 --oauth <filename> filename to read/store oauth credentials to
42 FORMATS for the --format option
44 default one line per status
45 verbose multiple lines per status, more verbose status info
46 json raw json data from the api on each line
48 ansi ansi colour (rainbow mode)
53 The config file should be placed in your home directory and be named .twitter.
54 It must contain a [twitter] header, and all the desired options you wish to
58 format: <desired_default_format_for_output>
59 prompt: <twitter_shell_prompt e.g. '[cyan]twitter[R]> '>
61 OAuth authentication tokens are stored in the file .twitter_oauth in your
65 from __future__
import print_function
68 input = __builtins__
['raw_input']
69 except (AttributeError, KeyError):
73 CONSUMER_KEY
= 'uS6hO2sV6tDKIOeVjhnFnQ'
74 CONSUMER_SECRET
= 'MEYTOS97VvlHX7K1rwHPEqVpTSqZ71HtvoK4sVuYk'
76 from getopt
import gnu_getopt
as getopt
, GetoptError
77 from getpass
import getpass
87 from ConfigParser
import SafeConfigParser
89 from configparser
import ConfigParser
as SafeConfigParser
92 from urllib
.parse
import quote
94 from urllib2
import quote
98 import html
.parser
as HTMLParser
102 from .api
import Twitter
, TwitterError
103 from .oauth
import OAuth
, write_token_file
, read_token_file
104 from .oauth_dance
import oauth_dance
106 from .util
import smrt_input
, printNicely
113 'prompt': '[cyan]twitter[R]> ',
114 'config_filename': os
.environ
.get('HOME', os
.environ
.get('USERPROFILE', '')) + os
.sep
+ '.twitter',
115 'oauth_filename': os
.environ
.get('HOME', os
.environ
.get('USERPROFILE', '')) + os
.sep
+ '.twitter_oauth',
121 'invert_split': False,
125 gHtmlParser
= HTMLParser
.HTMLParser()
126 hashtagRe
= re
.compile(r
'(?P<hashtag>#\S+)')
127 profileRe
= re
.compile(r
'(?P<profile>\@\S+)')
128 ansiFormatter
= ansi
.AnsiCmd(False)
130 def parse_args(args
, options
):
131 long_opts
= ['help', 'format=', 'refresh', 'oauth=',
132 'refresh-rate=', 'config=', 'length=', 'timestamp',
133 'datestamp', 'no-ssl', 'force-ansi']
134 short_opts
= "e:p:f:h?rR:c:l:td"
135 opts
, extra_args
= getopt(args
, short_opts
, long_opts
)
136 if extra_args
and hasattr(extra_args
[0], 'decode'):
137 extra_args
= [arg
.decode(locale
.getpreferredencoding())
138 for arg
in extra_args
]
140 for opt
, arg
in opts
:
141 if opt
in ('-f', '--format'):
142 options
['format'] = arg
143 elif opt
in ('-r', '--refresh'):
144 options
['refresh'] = True
145 elif opt
in ('-R', '--refresh-rate'):
146 options
['refresh_rate'] = int(arg
)
147 elif opt
in ('-l', '--length'):
148 options
["length"] = int(arg
)
149 elif opt
in ('-t', '--timestamp'):
150 options
["timestamp"] = True
151 elif opt
in ('-d', '--datestamp'):
152 options
["datestamp"] = True
153 elif opt
in ('-?', '-h', '--help'):
154 options
['action'] = 'help'
155 elif opt
in ('-c', '--config'):
156 options
['config_filename'] = arg
157 elif opt
== '--no-ssl':
158 options
['secure'] = False
159 elif opt
== '--oauth':
160 options
['oauth_filename'] = arg
161 elif opt
== '--force-ansi':
162 options
['force-ansi'] = True
164 if extra_args
and not ('action' in options
and options
['action'] == 'help'):
165 options
['action'] = extra_args
[0]
166 options
['extra_args'] = extra_args
[1:]
168 def get_time_string(status
, options
, format
="%a %b %d %H:%M:%S +0000 %Y"):
169 timestamp
= options
["timestamp"]
170 datestamp
= options
["datestamp"]
171 t
= time
.strptime(status
['created_at'], format
)
172 i_hate_timezones
= time
.timezone
174 i_hate_timezones
= time
.altzone
175 dt
= datetime
.datetime(*t
[:-3]) - datetime
.timedelta(
176 seconds
=i_hate_timezones
)
178 if timestamp
and datestamp
:
179 return time
.strftime("%Y-%m-%d %H:%M:%S ", t
)
181 return time
.strftime("%H:%M:%S ", t
)
183 return time
.strftime("%Y-%m-%d ", t
)
188 'clear': ansiFormatter
.cmdReset(),
189 'hashtag': ansiFormatter
.cmdBold(),
190 'profile': ansiFormatter
.cmdUnderline(),
197 s
= '%s%s%s' % (ansiTypes
[mkey
], m
.group(mkey
), ansiTypes
['clear'])
202 def replaceInStatus(status
):
203 txt
= gHtmlParser
.unescape(status
)
204 txt
= re
.sub(hashtagRe
, reRepl
, txt
)
205 txt
= re
.sub(profileRe
, reRepl
, txt
)
208 class StatusFormatter(object):
209 def __call__(self
, status
, options
):
210 return ("%s%s %s" % (
211 get_time_string(status
, options
),
212 status
['user']['screen_name'], gHtmlParser
.unescape(status
['text'])))
214 class AnsiStatusFormatter(object):
216 self
._colourMap
= ansi
.ColourMap()
218 def __call__(self
, status
, options
):
219 colour
= self
._colourMap
.colourFor(status
['user']['screen_name'])
220 return ("%s%s%s%s %s" % (
221 get_time_string(status
, options
),
222 ansiFormatter
.cmdColour(colour
), status
['user']['screen_name'],
223 ansiFormatter
.cmdReset(), replaceInStatus(status
['text'])))
225 class VerboseStatusFormatter(object):
226 def __call__(self
, status
, options
):
227 return ("-- %s (%s) on %s\n%s\n" % (
228 status
['user']['screen_name'],
229 status
['user']['location'],
230 status
['created_at'],
231 gHtmlParser
.unescape(status
['text'])))
233 class JSONStatusFormatter(object):
234 def __call__(self
, status
, options
):
235 status
['text'] = gHtmlParser
.unescape(status
['text'])
236 return json
.dumps(status
)
238 class URLStatusFormatter(object):
239 urlmatch
= re
.compile(r
'https?://\S+')
240 def __call__(self
, status
, options
):
241 urls
= self
.urlmatch
.findall(status
['text'])
242 return '\n'.join(urls
) if urls
else ""
245 class ListsFormatter(object):
246 def __call__(self
, list):
247 if list['description']:
248 list_str
= "%-30s (%s)" % (list['name'], list['description'])
250 list_str
= "%-30s" % (list['name'])
251 return "%s\n" % list_str
253 class ListsVerboseFormatter(object):
254 def __call__(self
, list):
255 list_str
= "%-30s\n description: %s\n members: %s\n mode:%s\n" % (list['name'], list['description'], list['member_count'], list['mode'])
258 class AnsiListsFormatter(object):
260 self
._colourMap
= ansi
.ColourMap()
262 def __call__(self
, list):
263 colour
= self
._colourMap
.colourFor(list['name'])
264 return ("%s%-15s%s %s" % (
265 ansiFormatter
.cmdColour(colour
), list['name'],
266 ansiFormatter
.cmdReset(), list['description']))
269 class AdminFormatter(object):
270 def __call__(self
, action
, user
):
271 user_str
= "%s (%s)" % (user
['screen_name'], user
['name'])
272 if action
== "follow":
273 return "You are now following %s.\n" % (user_str
)
275 return "You are no longer following %s.\n" % (user_str
)
277 class VerboseAdminFormatter(object):
278 def __call__(self
, action
, user
):
279 return("-- %s: %s (%s): %s" % (
280 "Following" if action
== "follow" else "Leaving",
285 class SearchFormatter(object):
286 def __call__(self
, result
, options
):
288 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
289 result
['from_user'], result
['text']))
291 class VerboseSearchFormatter(SearchFormatter
):
292 pass # Default to the regular one
294 class URLSearchFormatter(object):
295 urlmatch
= re
.compile(r
'https?://\S+')
296 def __call__(self
, result
, options
):
297 urls
= self
.urlmatch
.findall(result
['text'])
298 return '\n'.join(urls
) if urls
else ""
300 class AnsiSearchFormatter(object):
302 self
._colourMap
= ansi
.ColourMap()
304 def __call__(self
, result
, options
):
305 colour
= self
._colourMap
.colourFor(result
['from_user'])
306 return ("%s%s%s%s %s" % (
307 get_time_string(result
, options
, "%a, %d %b %Y %H:%M:%S +0000"),
308 ansiFormatter
.cmdColour(colour
), result
['from_user'],
309 ansiFormatter
.cmdReset(), result
['text']))
311 _term_encoding
= None
312 def get_term_encoding():
313 global _term_encoding
314 if not _term_encoding
:
315 lang
= os
.getenv('LANG', 'unknown.UTF-8').split('.')
317 _term_encoding
= lang
[1]
319 _term_encoding
= 'UTF-8'
320 return _term_encoding
323 status_formatters
= {
324 'default': StatusFormatter
,
325 'verbose': VerboseStatusFormatter
,
326 'json': JSONStatusFormatter
,
327 'urls': URLStatusFormatter
,
328 'ansi': AnsiStatusFormatter
330 formatters
['status'] = status_formatters
333 'default': AdminFormatter
,
334 'verbose': VerboseAdminFormatter
,
335 'urls': AdminFormatter
,
336 'ansi': AdminFormatter
338 formatters
['admin'] = admin_formatters
340 search_formatters
= {
341 'default': SearchFormatter
,
342 'verbose': VerboseSearchFormatter
,
343 'urls': URLSearchFormatter
,
344 'ansi': AnsiSearchFormatter
346 formatters
['search'] = search_formatters
349 'default': ListsFormatter
,
350 'verbose': ListsVerboseFormatter
,
352 'ansi': AnsiListsFormatter
354 formatters
['lists'] = lists_formatters
356 def get_formatter(action_type
, options
):
357 formatters_dict
= formatters
.get(action_type
)
358 if (not formatters_dict
):
360 "There was an error finding a class of formatters for your type (%s)"
362 f
= formatters_dict
.get(options
['format'])
365 "Unknown formatter '%s' for status actions" % (options
['format']))
368 class Action(object):
370 def ask(self
, subject
='perform this action', careful
=False):
372 Requests from the user using `raw_input` if `subject` should be
373 performed. When `careful`, the default answer is NO, otherwise YES.
374 Returns the user answer in the form `True` or `False`.
380 prompt
= 'You really want to %s %s? ' % (subject
, sample
)
382 answer
= input(prompt
).lower()
384 return answer
in ('yes', 'y')
386 return answer
not in ('no', 'n')
388 print(file=sys
.stderr
) # Put Newline since Enter was never pressed
390 # Figure out why on OS X the raw_input keeps raising
391 # EOFError and is never able to reset and get more input
392 # Hint: Look at how IPython implements their console
398 def __call__(self
, twitter
, options
):
399 action
= actions
.get(options
['action'], NoSuchAction
)()
401 doAction
= lambda : action(twitter
, options
)
402 if (options
['refresh'] and isinstance(action
, StatusAction
)):
406 time
.sleep(options
['refresh_rate'])
409 except KeyboardInterrupt:
410 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
413 class NoSuchActionError(Exception):
416 class NoSuchAction(Action
):
417 def __call__(self
, twitter
, options
):
418 raise NoSuchActionError("No such action: %s" % (options
['action']))
420 class StatusAction(Action
):
421 def __call__(self
, twitter
, options
):
422 statuses
= self
.getStatuses(twitter
, options
)
423 sf
= get_formatter('status', options
)
424 for status
in statuses
:
425 statusStr
= sf(status
, options
)
426 if statusStr
.strip():
427 printNicely(statusStr
)
429 class SearchAction(Action
):
430 def __call__(self
, twitter
, options
):
431 # We need to be pointing at search.twitter.com to work, and it is less
432 # tangly to do it here than in the main()
433 twitter
.domain
= "search.twitter.com"
434 twitter
.uriparts
= ()
435 # We need to bypass the TwitterCall parameter encoding, so we
436 # don't encode the plus sign, so we have to encode it ourselves
437 query_string
= "+".join(
439 for term
in options
['extra_args']])
441 results
= twitter
.search(q
=query_string
)['results']
442 f
= get_formatter('search', options
)
443 for result
in results
:
444 resultStr
= f(result
, options
)
445 if resultStr
.strip():
446 printNicely(resultStr
)
448 class AdminAction(Action
):
449 def __call__(self
, twitter
, options
):
450 if not (options
['extra_args'] and options
['extra_args'][0]):
451 raise TwitterError("You need to specify a user (screen name)")
452 af
= get_formatter('admin', options
)
454 user
= self
.getUser(twitter
, options
['extra_args'][0])
455 except TwitterError
as e
:
456 print("There was a problem following or leaving the specified user.")
457 print("You may be trying to follow a user you are already following;")
458 print("Leaving a user you are not currently following;")
459 print("Or the user may not exist.")
464 printNicely(af(options
['action'], user
))
466 class ListsAction(StatusAction
):
467 def getStatuses(self
, twitter
, options
):
468 if not options
['extra_args']:
469 raise TwitterError("Please provide a user to query for lists")
471 screen_name
= options
['extra_args'][0]
473 if not options
['extra_args'][1:]:
474 lists
= twitter
.lists
.list(screen_name
=screen_name
)
476 printNicely("This user has no lists.")
478 lf
= get_formatter('lists', options
)
479 printNicely(lf(list))
482 return reversed(twitter
.user
.lists
.list.statuses(
483 user
=screen_name
, list=options
['extra_args'][1]))
486 class MyListsAction(ListsAction
):
487 def getStatuses(self
, twitter
, options
):
488 screen_name
= twitter
.account
.verify_credentials()['screen_name']
489 options
['extra_args'].insert(0, screen_name
)
490 return ListsAction
.getStatuses(self
, twitter
, options
)
493 class FriendsAction(StatusAction
):
494 def getStatuses(self
, twitter
, options
):
495 return reversed(twitter
.statuses
.home_timeline(count
=options
["length"]))
497 class RepliesAction(StatusAction
):
498 def getStatuses(self
, twitter
, options
):
499 return reversed(twitter
.statuses
.mentions_timeline(count
=options
["length"]))
501 class FollowAction(AdminAction
):
502 def getUser(self
, twitter
, user
):
503 return twitter
.friendships
.create(screen_name
=user
)
505 class LeaveAction(AdminAction
):
506 def getUser(self
, twitter
, user
):
507 return twitter
.friendships
.destroy(screen_name
=user
)
509 class SetStatusAction(Action
):
510 def __call__(self
, twitter
, options
):
511 statusTxt
= (" ".join(options
['extra_args'])
512 if options
['extra_args']
513 else str(input("message: ")))
515 ptr
= re
.compile("@[\w_]+")
517 s
= ptr
.match(statusTxt
)
518 if s
and s
.start() == 0:
519 replies
.append(statusTxt
[s
.start():s
.end()])
520 statusTxt
= statusTxt
[s
.end() + 1:]
523 replies
= " ".join(replies
)
524 if len(replies
) >= 140:
531 limit
= 140 - len(replies
)
532 if len(statusTxt
) > limit
:
533 end
= string
.rfind(statusTxt
, ' ', 0, limit
)
536 splitted
.append(" ".join((replies
, statusTxt
[:end
])))
537 statusTxt
= statusTxt
[end
:]
539 if options
['invert_split']:
541 for status
in splitted
:
542 twitter
.statuses
.update(status
=status
)
544 class TwitterShell(Action
):
546 def render_prompt(self
, prompt
):
547 '''Parses the `prompt` string and returns the rendered version'''
548 prompt
= prompt
.strip("'").replace("\\'", "'")
549 for colour
in ansi
.COLOURS_NAMED
:
550 if '[%s]' % (colour
) in prompt
:
551 prompt
= prompt
.replace(
552 '[%s]' % (colour
), ansiFormatter
.cmdColourNamed(colour
))
553 prompt
= prompt
.replace('[R]', ansiFormatter
.cmdReset())
556 def __call__(self
, twitter
, options
):
557 prompt
= self
.render_prompt(options
.get('prompt', 'twitter> '))
559 options
['action'] = ""
561 args
= input(prompt
).split()
562 parse_args(args
, options
)
563 if not options
['action']:
565 elif options
['action'] == 'exit':
567 elif options
['action'] == 'shell':
568 print('Sorry Xzibit does not work here!', file=sys
.stderr
)
570 elif options
['action'] == 'help':
571 print('''\ntwitter> `action`\n
572 The Shell Accepts all the command line actions along with:
574 exit Leave the twitter shell (^D may also be used)
576 Full CMD Line help is appended below for your convinience.''', file=sys
.stderr
)
577 Action()(twitter
, options
)
578 options
['action'] = ''
579 except NoSuchActionError
as e
:
580 print(e
, file=sys
.stderr
)
581 except KeyboardInterrupt:
582 print('\n[Keyboard Interrupt]', file=sys
.stderr
)
584 print(file=sys
.stderr
)
585 leaving
= self
.ask(subject
='Leave')
587 print('Excellent!', file=sys
.stderr
)
591 class PythonPromptAction(Action
):
592 def __call__(self
, twitter
, options
):
595 smrt_input(globals(), locals())
599 class HelpAction(Action
):
600 def __call__(self
, twitter
, options
):
603 class DoNothingAction(Action
):
604 def __call__(self
, twitter
, options
):
607 class RateLimitStatus(Action
):
608 def __call__(self
, twitter
, options
):
609 rate
= twitter
.application
.rate_limit_status()
610 print("Remaining API requests: %s / %s (hourly limit)" % (rate
['remaining_hits'], rate
['hourly_limit']))
611 print("Next reset in %ss (%s)" % (int(rate
['reset_time_in_seconds'] - time
.time()),
612 time
.asctime(time
.localtime(rate
['reset_time_in_seconds']))))
615 'authorize' : DoNothingAction
,
616 'follow' : FollowAction
,
617 'friends' : FriendsAction
,
618 'list' : ListsAction
,
619 'mylist' : MyListsAction
,
621 'leave' : LeaveAction
,
622 'pyprompt' : PythonPromptAction
,
623 'replies' : RepliesAction
,
624 'search' : SearchAction
,
625 'set' : SetStatusAction
,
626 'shell' : TwitterShell
,
627 'rate' : RateLimitStatus
,
630 def loadConfig(filename
):
631 options
= dict(OPTIONS
)
632 if os
.path
.exists(filename
):
633 cp
= SafeConfigParser()
635 for option
in ('format', 'prompt'):
636 if cp
.has_option('twitter', option
):
637 options
[option
] = cp
.get('twitter', option
)
639 for option
in ('invert_split',):
640 if cp
.has_option('twitter', option
):
641 options
[option
] = cp
.getboolean('twitter', option
)
644 def main(args
=sys
.argv
[1:]):
647 parse_args(args
, arg_options
)
648 except GetoptError
as e
:
649 print("I can't do that, %s." % (e
), file=sys
.stderr
)
650 print(file=sys
.stderr
)
653 config_path
= os
.path
.expanduser(
654 arg_options
.get('config_filename') or OPTIONS
.get('config_filename'))
655 config_options
= loadConfig(config_path
)
657 # Apply the various options in order, the most important applied last.
658 # Defaults first, then what's read from config file, then command-line
660 options
= dict(OPTIONS
)
661 for d
in config_options
, arg_options
:
662 for k
, v
in list(d
.items()):
665 if options
['refresh'] and options
['action'] not in (
666 'friends', 'replies'):
667 print("You can only refresh the friends or replies actions.", file=sys
.stderr
)
668 print("Use 'twitter -h' for help.", file=sys
.stderr
)
671 oauth_filename
= os
.path
.expanduser(options
['oauth_filename'])
673 if (options
['action'] == 'authorize'
674 or not os
.path
.exists(oauth_filename
)):
676 "the Command-Line Tool", CONSUMER_KEY
, CONSUMER_SECRET
,
677 options
['oauth_filename'])
680 ansiFormatter
= ansi
.AnsiCmd(options
["force-ansi"])
682 oauth_token
, oauth_token_secret
= read_token_file(oauth_filename
)
686 oauth_token
, oauth_token_secret
, CONSUMER_KEY
, CONSUMER_SECRET
),
687 secure
=options
['secure'],
689 domain
='api.twitter.com')
692 Action()(twitter
, options
)
693 except NoSuchActionError
as e
:
694 print(e
, file=sys
.stderr
)
696 except TwitterError
as e
:
697 print(str(e
), file=sys
.stderr
)
698 print("Use 'twitter -h' for help.", file=sys
.stderr
)