X-Git-Url: https://jfr.im/git/z_archive/twitter.git/blobdiff_plain/95152c20dcaac81e455ce85926246d9b7c984880..b5c07360ac6005787436c649de5020a19c00e914:/twitter/util.py diff --git a/twitter/util.py b/twitter/util.py index 27142af..7831939 100644 --- a/twitter/util.py +++ b/twitter/util.py @@ -7,15 +7,22 @@ Internal utility functions. from __future__ import print_function +import contextlib import re import sys +import textwrap import time +import socket try: from html.entities import name2codepoint unichr = chr + import urllib.request as urllib2 + import urllib.parse as urlparse except ImportError: from htmlentitydefs import name2codepoint + import urllib2 + import urlparse def htmlentitydecode(s): return re.sub( @@ -75,3 +82,73 @@ class Fail(object): self.count() if delay > 0: time.sleep(delay) + + +def find_links(line): + """Find all links in the given line. The function returns a sprintf style + format string (with %s placeholders for the links) and a list of urls.""" + l = line.replace("%", "%%") + regex = "(https?://[^ )]+)" + return ( + re.sub(regex, "%s", l), + [m.group(1) for m in re.finditer(regex, l)]) + +def follow_redirects(link, sites= None): + """Follow directs for the link as long as the redirects are on the given + sites and return the resolved link.""" + def follow(url): + return sites == None or urlparse.urlparse(url).hostname in sites + + class RedirectHandler(urllib2.HTTPRedirectHandler): + def __init__(self): + self.last_url = None + def redirect_request(self, req, fp, code, msg, hdrs, newurl): + self.last_url = newurl + if not follow(newurl): + return None + r = urllib2.HTTPRedirectHandler.redirect_request( + self, req, fp, code, msg, hdrs, newurl) + r.get_method = lambda : 'HEAD' + return r + + if not follow(link): + return link + redirect_handler = RedirectHandler() + opener = urllib2.build_opener(redirect_handler) + req = urllib2.Request(link) + req.get_method = lambda : 'HEAD' + try: + with contextlib.closing(opener.open(req,timeout=1)) as site: + return site.url + except: + return redirect_handler.last_url if redirect_handler.last_url else link + +def expand_line(line, sites): + """Expand the links in the line for the given sites.""" + try: + l = line.strip() + msg_format, links = find_links(l) + args = tuple(follow_redirects(l, sites) for l in links) + line = msg_format % args + except Exception as e: + try: + err("expanding line %s failed due to %s" % (line, unicode(e))) + except: + pass + return line + +def parse_host_list(list_of_hosts): + """Parse the comma separated list of hosts.""" + p = set( + m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts)) + return p + + +def align_text(text, left_margin=17, max_width=160): + lines = [] + for line in text.split('\n'): + temp_lines = textwrap.wrap(line, max_width - left_margin) + temp_lines = [(' ' * left_margin + line) for line in temp_lines] + lines.append('\n'.join(temp_lines)) + ret = '\n'.join(lines) + return ret.lstrip()