]> jfr.im git - z_archive/twitter.git/blame_incremental - twitter/util.py
Merge pull request #216 from edi-bice/master
[z_archive/twitter.git] / twitter / util.py
... / ...
CommitLineData
1"""
2Internal utility functions.
3
4`htmlentitydecode` came from here:
5 http://wiki.python.org/moin/EscapingHtml
6"""
7
8from __future__ import print_function
9
10import contextlib
11import re
12import sys
13import time
14import socket
15
16try:
17 from html.entities import name2codepoint
18 unichr = chr
19 import urllib.request as urllib2
20 import urllib.parse as urlparse
21except ImportError:
22 from htmlentitydefs import name2codepoint
23 import urllib2
24 import urlparse
25
26def htmlentitydecode(s):
27 return re.sub(
28 '&(%s);' % '|'.join(name2codepoint),
29 lambda m: unichr(name2codepoint[m.group(1)]), s)
30
31def smrt_input(globals_, locals_, ps1=">>> ", ps2="... "):
32 inputs = []
33 while True:
34 if inputs:
35 prompt = ps2
36 else:
37 prompt = ps1
38 inputs.append(input(prompt))
39 try:
40 ret = eval('\n'.join(inputs), globals_, locals_)
41 if ret:
42 print(str(ret))
43 return
44 except SyntaxError:
45 pass
46
47def printNicely(string):
48 if hasattr(sys.stdout, 'buffer'):
49 sys.stdout.buffer.write(string.encode('utf8'))
50 print()
51 else:
52 print(string.encode('utf8'))
53
54__all__ = ["htmlentitydecode", "smrt_input"]
55
56def err(msg=""):
57 print(msg, file=sys.stderr)
58
59class Fail(object):
60 """A class to count fails during a repetitive task.
61
62 Args:
63 maximum: An integer for the maximum of fails to allow.
64 exit: An integer for the exit code when maximum of fail is reached.
65
66 Methods:
67 count: Count a fail, exit when maximum of fails is reached.
68 wait: Same as count but also sleep for a given time in seconds.
69 """
70 def __init__(self, maximum=10, exit=1):
71 self.i = maximum
72 self.exit = exit
73
74 def count(self):
75 self.i -= 1
76 if self.i == 0:
77 err("Too many consecutive fails, exiting.")
78 raise SystemExit(self.exit)
79
80 def wait(self, delay=0):
81 self.count()
82 if delay > 0:
83 time.sleep(delay)
84
85
86def find_links(line):
87 """Find all links in the given line. The function returns a sprintf style
88 format string (with %s placeholders for the links) and a list of urls."""
89 l = line.replace("%", "%%")
90 regex = "(https?://[^ )]+)"
91 return (
92 re.sub(regex, "%s", l),
93 [m.group(1) for m in re.finditer(regex, l)])
94
95def follow_redirects(link, sites= None):
96 """Follow directs for the link as long as the redirects are on the given
97 sites and return the resolved link."""
98 def follow(url):
99 return sites == None or urlparse.urlparse(url).hostname in sites
100
101 class RedirectHandler(urllib2.HTTPRedirectHandler):
102 def __init__(self):
103 self.last_url = None
104 def redirect_request(self, req, fp, code, msg, hdrs, newurl):
105 self.last_url = newurl
106 if not follow(newurl):
107 return None
108 r = urllib2.HTTPRedirectHandler.redirect_request(
109 self, req, fp, code, msg, hdrs, newurl)
110 r.get_method = lambda : 'HEAD'
111 return r
112
113 if not follow(link):
114 return link
115 redirect_handler = RedirectHandler()
116 opener = urllib2.build_opener(redirect_handler)
117 req = urllib2.Request(link)
118 req.get_method = lambda : 'HEAD'
119 try:
120 with contextlib.closing(opener.open(req,timeout=1)) as site:
121 return site.url
122 except (urllib2.HTTPError, urllib2.URLError, socket.timeout):
123 return redirect_handler.last_url if redirect_handler.last_url else link
124
125def expand_line(line, sites):
126 """Expand the links in the line for the given sites."""
127 l = line.strip()
128 msg_format, links = find_links(l)
129 args = tuple(follow_redirects(l, sites) for l in links)
130 return msg_format % args
131
132def parse_host_list(list_of_hosts):
133 """Parse the comma separated list of hosts."""
134 p = set(
135 m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts))
136 return p
137