]> jfr.im git - z_archive/twitter.git/blob - twitter/util.py
7831939a2c1e2a24b2e40573cefe5a92c16d72de
[z_archive/twitter.git] / twitter / util.py
1 """
2 Internal utility functions.
3
4 `htmlentitydecode` came from here:
5 http://wiki.python.org/moin/EscapingHtml
6 """
7
8 from __future__ import print_function
9
10 import contextlib
11 import re
12 import sys
13 import textwrap
14 import time
15 import socket
16
17 try:
18 from html.entities import name2codepoint
19 unichr = chr
20 import urllib.request as urllib2
21 import urllib.parse as urlparse
22 except ImportError:
23 from htmlentitydefs import name2codepoint
24 import urllib2
25 import urlparse
26
27 def htmlentitydecode(s):
28 return re.sub(
29 '&(%s);' % '|'.join(name2codepoint),
30 lambda m: unichr(name2codepoint[m.group(1)]), s)
31
32 def smrt_input(globals_, locals_, ps1=">>> ", ps2="... "):
33 inputs = []
34 while True:
35 if inputs:
36 prompt = ps2
37 else:
38 prompt = ps1
39 inputs.append(input(prompt))
40 try:
41 ret = eval('\n'.join(inputs), globals_, locals_)
42 if ret:
43 print(str(ret))
44 return
45 except SyntaxError:
46 pass
47
48 def printNicely(string):
49 if hasattr(sys.stdout, 'buffer'):
50 sys.stdout.buffer.write(string.encode('utf8'))
51 print()
52 else:
53 print(string.encode('utf8'))
54
55 __all__ = ["htmlentitydecode", "smrt_input"]
56
57 def err(msg=""):
58 print(msg, file=sys.stderr)
59
60 class Fail(object):
61 """A class to count fails during a repetitive task.
62
63 Args:
64 maximum: An integer for the maximum of fails to allow.
65 exit: An integer for the exit code when maximum of fail is reached.
66
67 Methods:
68 count: Count a fail, exit when maximum of fails is reached.
69 wait: Same as count but also sleep for a given time in seconds.
70 """
71 def __init__(self, maximum=10, exit=1):
72 self.i = maximum
73 self.exit = exit
74
75 def count(self):
76 self.i -= 1
77 if self.i == 0:
78 err("Too many consecutive fails, exiting.")
79 raise SystemExit(self.exit)
80
81 def wait(self, delay=0):
82 self.count()
83 if delay > 0:
84 time.sleep(delay)
85
86
87 def find_links(line):
88 """Find all links in the given line. The function returns a sprintf style
89 format string (with %s placeholders for the links) and a list of urls."""
90 l = line.replace("%", "%%")
91 regex = "(https?://[^ )]+)"
92 return (
93 re.sub(regex, "%s", l),
94 [m.group(1) for m in re.finditer(regex, l)])
95
96 def follow_redirects(link, sites= None):
97 """Follow directs for the link as long as the redirects are on the given
98 sites and return the resolved link."""
99 def follow(url):
100 return sites == None or urlparse.urlparse(url).hostname in sites
101
102 class RedirectHandler(urllib2.HTTPRedirectHandler):
103 def __init__(self):
104 self.last_url = None
105 def redirect_request(self, req, fp, code, msg, hdrs, newurl):
106 self.last_url = newurl
107 if not follow(newurl):
108 return None
109 r = urllib2.HTTPRedirectHandler.redirect_request(
110 self, req, fp, code, msg, hdrs, newurl)
111 r.get_method = lambda : 'HEAD'
112 return r
113
114 if not follow(link):
115 return link
116 redirect_handler = RedirectHandler()
117 opener = urllib2.build_opener(redirect_handler)
118 req = urllib2.Request(link)
119 req.get_method = lambda : 'HEAD'
120 try:
121 with contextlib.closing(opener.open(req,timeout=1)) as site:
122 return site.url
123 except:
124 return redirect_handler.last_url if redirect_handler.last_url else link
125
126 def expand_line(line, sites):
127 """Expand the links in the line for the given sites."""
128 try:
129 l = line.strip()
130 msg_format, links = find_links(l)
131 args = tuple(follow_redirects(l, sites) for l in links)
132 line = msg_format % args
133 except Exception as e:
134 try:
135 err("expanding line %s failed due to %s" % (line, unicode(e)))
136 except:
137 pass
138 return line
139
140 def parse_host_list(list_of_hosts):
141 """Parse the comma separated list of hosts."""
142 p = set(
143 m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts))
144 return p
145
146
147 def align_text(text, left_margin=17, max_width=160):
148 lines = []
149 for line in text.split('\n'):
150 temp_lines = textwrap.wrap(line, max_width - left_margin)
151 temp_lines = [(' ' * left_margin + line) for line in temp_lines]
152 lines.append('\n'.join(temp_lines))
153 ret = '\n'.join(lines)
154 return ret.lstrip()