]> jfr.im git - z_archive/twitter.git/blob - twitter/util.py
bandaid unicode/str.encode-related crash bug
[z_archive/twitter.git] / twitter / util.py
1 """
2 Internal utility functions.
3
4 `htmlentitydecode` came from here:
5 http://wiki.python.org/moin/EscapingHtml
6 """
7
8 from __future__ import print_function
9
10 import contextlib
11 import re
12 import sys
13 import textwrap
14 import time
15 import socket
16
17 PY_3_OR_HIGHER = sys.version_info >= (3, 0)
18
19 try:
20 from html.entities import name2codepoint
21 unichr = chr
22 import urllib.request as urllib2
23 import urllib.parse as urlparse
24 except ImportError:
25 from htmlentitydefs import name2codepoint
26 import urllib2
27 import urlparse
28
29 def htmlentitydecode(s):
30 return re.sub(
31 '&(%s);' % '|'.join(name2codepoint),
32 lambda m: unichr(name2codepoint[m.group(1)]), s)
33
34 def smrt_input(globals_, locals_, ps1=">>> ", ps2="... "):
35 inputs = []
36 while True:
37 if inputs:
38 prompt = ps2
39 else:
40 prompt = ps1
41 inputs.append(input(prompt))
42 try:
43 ret = eval('\n'.join(inputs), globals_, locals_)
44 if ret:
45 print(str(ret))
46 return
47 except SyntaxError:
48 pass
49
50 def printNicely(string):
51 if hasattr(sys.stdout, 'buffer'):
52 sys.stdout.buffer.write(string.encode('utf8'))
53 print()
54 sys.stdout.buffer.flush()
55 sys.stdout.flush()
56 else:
57 print(string.encode('utf8'))
58
59 def actually_bytes(stringy):
60 if PY_3_OR_HIGHER:
61 if type(stringy) == bytes:
62 pass
63 elif type(stringy) != str:
64 stringy = str(stringy)
65 if type(stringy) == str:
66 stringy = stringy.encode("utf-8")
67 else:
68 if type(stringy) == str:
69 pass
70 elif type(stringy) != unicode:
71 stringy = str(stringy)
72 if type(stringy) == unicode:
73 stringy = stringy.encode("utf-8")
74 return stringy
75
76 def err(msg=""):
77 print(msg, file=sys.stderr)
78
79
80 class Fail(object):
81 """A class to count fails during a repetitive task.
82
83 Args:
84 maximum: An integer for the maximum of fails to allow.
85 exit: An integer for the exit code when maximum of fail is reached.
86
87 Methods:
88 count: Count a fail, exit when maximum of fails is reached.
89 wait: Same as count but also sleep for a given time in seconds.
90 """
91 def __init__(self, maximum=10, exit=1):
92 self.i = maximum
93 self.exit = exit
94
95 def count(self):
96 self.i -= 1
97 if self.i == 0:
98 err("Too many consecutive fails, exiting.")
99 raise SystemExit(self.exit)
100
101 def wait(self, delay=0):
102 self.count()
103 if delay > 0:
104 time.sleep(delay)
105
106
107 def find_links(line):
108 """Find all links in the given line. The function returns a sprintf style
109 format string (with %s placeholders for the links) and a list of urls."""
110 l = line.replace("%", "%%")
111 regex = "(https?://[^ )]+)"
112 return (
113 re.sub(regex, "%s", l),
114 [m.group(1) for m in re.finditer(regex, l)])
115
116 def follow_redirects(link, sites= None):
117 """Follow directs for the link as long as the redirects are on the given
118 sites and return the resolved link."""
119 def follow(url):
120 return sites == None or urlparse.urlparse(url).hostname in sites
121
122 class RedirectHandler(urllib2.HTTPRedirectHandler):
123 def __init__(self):
124 self.last_url = None
125 def redirect_request(self, req, fp, code, msg, hdrs, newurl):
126 self.last_url = newurl
127 if not follow(newurl):
128 return None
129 r = urllib2.HTTPRedirectHandler.redirect_request(
130 self, req, fp, code, msg, hdrs, newurl)
131 r.get_method = lambda : 'HEAD'
132 return r
133
134 if not follow(link):
135 return link
136 redirect_handler = RedirectHandler()
137 opener = urllib2.build_opener(redirect_handler)
138 req = urllib2.Request(link)
139 req.get_method = lambda : 'HEAD'
140 try:
141 with contextlib.closing(opener.open(req,timeout=1)) as site:
142 return site.url
143 except:
144 return redirect_handler.last_url if redirect_handler.last_url else link
145
146 def expand_line(line, sites):
147 """Expand the links in the line for the given sites."""
148 try:
149 l = line.strip()
150 msg_format, links = find_links(l)
151 args = tuple(follow_redirects(l, sites) for l in links)
152 line = msg_format % args
153 except Exception as e:
154 try:
155 err("expanding line %s failed due to %s" % (line, unicode(e)))
156 except:
157 pass
158 return line
159
160 def parse_host_list(list_of_hosts):
161 """Parse the comma separated list of hosts."""
162 p = set(
163 m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts))
164 return p
165
166
167 def align_text(text, left_margin=17, max_width=160):
168 lines = []
169 for line in text.split('\n'):
170 temp_lines = textwrap.wrap(line, max_width - left_margin)
171 temp_lines = [(' ' * left_margin + line) for line in temp_lines]
172 lines.append('\n'.join(temp_lines))
173 ret = '\n'.join(lines)
174 return ret.lstrip()
175
176
177 __all__ = ["htmlentitydecode", "smrt_input"]