]> jfr.im git - z_archive/twitter.git/blob - twitter/util.py
fixes for python 3
[z_archive/twitter.git] / twitter / util.py
1 """
2 Internal utility functions.
3
4 `htmlentitydecode` came from here:
5 http://wiki.python.org/moin/EscapingHtml
6 """
7
8 from __future__ import print_function
9
10 import contextlib
11 import re
12 import sys
13 import time
14
15 try:
16 from html.entities import name2codepoint
17 unichr = chr
18 import urllib.request as urllib2
19 import urllib.parse as urlparse
20 except ImportError:
21 from htmlentitydefs import name2codepoint
22 import urllib2
23 import urlparse
24
25 def htmlentitydecode(s):
26 return re.sub(
27 '&(%s);' % '|'.join(name2codepoint),
28 lambda m: unichr(name2codepoint[m.group(1)]), s)
29
30 def smrt_input(globals_, locals_, ps1=">>> ", ps2="... "):
31 inputs = []
32 while True:
33 if inputs:
34 prompt = ps2
35 else:
36 prompt = ps1
37 inputs.append(input(prompt))
38 try:
39 ret = eval('\n'.join(inputs), globals_, locals_)
40 if ret:
41 print(str(ret))
42 return
43 except SyntaxError:
44 pass
45
46 def printNicely(string):
47 if hasattr(sys.stdout, 'buffer'):
48 sys.stdout.buffer.write(string.encode('utf8'))
49 print()
50 else:
51 print(string.encode('utf8'))
52
53 __all__ = ["htmlentitydecode", "smrt_input"]
54
55 def err(msg=""):
56 print(msg, file=sys.stderr)
57
58 class Fail(object):
59 """A class to count fails during a repetitive task.
60
61 Args:
62 maximum: An integer for the maximum of fails to allow.
63 exit: An integer for the exit code when maximum of fail is reached.
64
65 Methods:
66 count: Count a fail, exit when maximum of fails is reached.
67 wait: Same as count but also sleep for a given time in seconds.
68 """
69 def __init__(self, maximum=10, exit=1):
70 self.i = maximum
71 self.exit = exit
72
73 def count(self):
74 self.i -= 1
75 if self.i == 0:
76 err("Too many consecutive fails, exiting.")
77 raise SystemExit(self.exit)
78
79 def wait(self, delay=0):
80 self.count()
81 if delay > 0:
82 time.sleep(delay)
83
84
85 def find_links(line):
86 """Find all links in the given line. The function returns a sprintf style
87 format string (with %s placeholders for the links) and a list of urls."""
88 l = line.replace("%", "%%")
89 regex = "(https?://[^ )]+)"
90 return (
91 re.sub(regex, "%s", l),
92 [m.group(1) for m in re.finditer(regex, l)])
93
94 def follow_redirects(link, sites= None):
95 """Follow directs for the link as long as the redirects are on the given
96 sites and return the resolved link."""
97 def follow(url):
98 return sites == None or urlparse.urlparse(url).hostname in sites
99
100 class RedirectHandler(urllib2.HTTPRedirectHandler):
101 def __init__(self):
102 self.last_url = None
103 def redirect_request(self, req, fp, code, msg, hdrs, newurl):
104 self.last_url = newurl
105 if not follow(newurl):
106 return None
107 r = urllib2.HTTPRedirectHandler.redirect_request(
108 self, req, fp, code, msg, hdrs, newurl)
109 r.get_method = lambda : 'HEAD'
110 return r
111
112 if not follow(link):
113 return link
114 redirect_handler = RedirectHandler()
115 opener = urllib2.build_opener(redirect_handler)
116 req = urllib2.Request(link)
117 req.get_method = lambda : 'HEAD'
118 try:
119 with contextlib.closing(opener.open(req)) as site:
120 return site.url
121 except (urllib2.HTTPError, urllib2.URLError):
122 return redirect_handler.last_url if redirect_handler.last_url else link
123
124 def expand_line(line, sites):
125 """Expand the links in the line for the given sites."""
126 l = line.strip()
127 msg_format, links = find_links(l)
128 args = tuple(follow_redirects(l, sites) for l in links)
129 return msg_format % args
130
131 def parse_host_list(list_of_hosts):
132 """Parse the comma separated list of hosts."""
133 p = set(
134 m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts))
135 return p
136