X-Git-Url: https://jfr.im/git/z_archive/twitter.git/blobdiff_plain/098660ce6ca34597e97f074b3af48f64dcf3ea03..b5c07360ac6005787436c649de5020a19c00e914:/twitter/util.py diff --git a/twitter/util.py b/twitter/util.py index dab8c84..7831939 100644 --- a/twitter/util.py +++ b/twitter/util.py @@ -5,18 +5,29 @@ Internal utility functions. http://wiki.python.org/moin/EscapingHtml """ +from __future__ import print_function +import contextlib import re import sys +import textwrap +import time +import socket + try: from html.entities import name2codepoint + unichr = chr + import urllib.request as urllib2 + import urllib.parse as urlparse except ImportError: from htmlentitydefs import name2codepoint + import urllib2 + import urlparse def htmlentitydecode(s): return re.sub( '&(%s);' % '|'.join(name2codepoint), - lambda m: chr(name2codepoint[m.group(1)]), s) + lambda m: unichr(name2codepoint[m.group(1)]), s) def smrt_input(globals_, locals_, ps1=">>> ", ps2="... "): inputs = [] @@ -29,7 +40,7 @@ def smrt_input(globals_, locals_, ps1=">>> ", ps2="... "): try: ret = eval('\n'.join(inputs), globals_, locals_) if ret: - print(ret) + print(str(ret)) return except SyntaxError: pass @@ -42,3 +53,102 @@ def printNicely(string): print(string.encode('utf8')) __all__ = ["htmlentitydecode", "smrt_input"] + +def err(msg=""): + print(msg, file=sys.stderr) + +class Fail(object): + """A class to count fails during a repetitive task. + + Args: + maximum: An integer for the maximum of fails to allow. + exit: An integer for the exit code when maximum of fail is reached. + + Methods: + count: Count a fail, exit when maximum of fails is reached. + wait: Same as count but also sleep for a given time in seconds. + """ + def __init__(self, maximum=10, exit=1): + self.i = maximum + self.exit = exit + + def count(self): + self.i -= 1 + if self.i == 0: + err("Too many consecutive fails, exiting.") + raise SystemExit(self.exit) + + def wait(self, delay=0): + self.count() + if delay > 0: + time.sleep(delay) + + +def find_links(line): + """Find all links in the given line. The function returns a sprintf style + format string (with %s placeholders for the links) and a list of urls.""" + l = line.replace("%", "%%") + regex = "(https?://[^ )]+)" + return ( + re.sub(regex, "%s", l), + [m.group(1) for m in re.finditer(regex, l)]) + +def follow_redirects(link, sites= None): + """Follow directs for the link as long as the redirects are on the given + sites and return the resolved link.""" + def follow(url): + return sites == None or urlparse.urlparse(url).hostname in sites + + class RedirectHandler(urllib2.HTTPRedirectHandler): + def __init__(self): + self.last_url = None + def redirect_request(self, req, fp, code, msg, hdrs, newurl): + self.last_url = newurl + if not follow(newurl): + return None + r = urllib2.HTTPRedirectHandler.redirect_request( + self, req, fp, code, msg, hdrs, newurl) + r.get_method = lambda : 'HEAD' + return r + + if not follow(link): + return link + redirect_handler = RedirectHandler() + opener = urllib2.build_opener(redirect_handler) + req = urllib2.Request(link) + req.get_method = lambda : 'HEAD' + try: + with contextlib.closing(opener.open(req,timeout=1)) as site: + return site.url + except: + return redirect_handler.last_url if redirect_handler.last_url else link + +def expand_line(line, sites): + """Expand the links in the line for the given sites.""" + try: + l = line.strip() + msg_format, links = find_links(l) + args = tuple(follow_redirects(l, sites) for l in links) + line = msg_format % args + except Exception as e: + try: + err("expanding line %s failed due to %s" % (line, unicode(e))) + except: + pass + return line + +def parse_host_list(list_of_hosts): + """Parse the comma separated list of hosts.""" + p = set( + m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts)) + return p + + +def align_text(text, left_margin=17, max_width=160): + lines = [] + for line in text.split('\n'): + temp_lines = textwrap.wrap(line, max_width - left_margin) + temp_lines = [(' ' * left_margin + line) for line in temp_lines] + lines.append('\n'.join(temp_lines)) + ret = '\n'.join(lines) + return ret.lstrip()