]> jfr.im git - z_archive/twitter.git/blob - twitter/util.py
follow redirects of tweeted urls before archiving
[z_archive/twitter.git] / twitter / util.py
1 """
2 Internal utility functions.
3
4 `htmlentitydecode` came from here:
5 http://wiki.python.org/moin/EscapingHtml
6 """
7
8 from __future__ import print_function
9
10 import contextlib
11 import re
12 import sys
13 import time
14 import urllib2
15 import urlparse
16
17 try:
18 from html.entities import name2codepoint
19 unichr = chr
20 except ImportError:
21 from htmlentitydefs import name2codepoint
22
23 def htmlentitydecode(s):
24 return re.sub(
25 '&(%s);' % '|'.join(name2codepoint),
26 lambda m: unichr(name2codepoint[m.group(1)]), s)
27
28 def smrt_input(globals_, locals_, ps1=">>> ", ps2="... "):
29 inputs = []
30 while True:
31 if inputs:
32 prompt = ps2
33 else:
34 prompt = ps1
35 inputs.append(input(prompt))
36 try:
37 ret = eval('\n'.join(inputs), globals_, locals_)
38 if ret:
39 print(str(ret))
40 return
41 except SyntaxError:
42 pass
43
44 def printNicely(string):
45 if hasattr(sys.stdout, 'buffer'):
46 sys.stdout.buffer.write(string.encode('utf8'))
47 print()
48 else:
49 print(string.encode('utf8'))
50
51 __all__ = ["htmlentitydecode", "smrt_input"]
52
53 def err(msg=""):
54 print(msg, file=sys.stderr)
55
56 class Fail(object):
57 """A class to count fails during a repetitive task.
58
59 Args:
60 maximum: An integer for the maximum of fails to allow.
61 exit: An integer for the exit code when maximum of fail is reached.
62
63 Methods:
64 count: Count a fail, exit when maximum of fails is reached.
65 wait: Same as count but also sleep for a given time in seconds.
66 """
67 def __init__(self, maximum=10, exit=1):
68 self.i = maximum
69 self.exit = exit
70
71 def count(self):
72 self.i -= 1
73 if self.i == 0:
74 err("Too many consecutive fails, exiting.")
75 raise SystemExit(self.exit)
76
77 def wait(self, delay=0):
78 self.count()
79 if delay > 0:
80 time.sleep(delay)
81
82
83 def find_links(line):
84 """Find all links in the given line. The function returns a sprintf style
85 format string (with %s placeholders for the links) and a list of urls."""
86 l = line.replace(u"%", u"%%")
87 regex = "(https?://[^ )]+)"
88 return (
89 re.sub(regex, "%s", l),
90 [m.group(1) for m in re.finditer(regex, l)])
91
92 def follow_redirects(link, sites= None):
93 """Follow directs for the link as long as the redirects are on the given
94 sites and return the resolved link."""
95 def follow(url):
96 return sites == None or urlparse.urlparse(url).hostname in sites
97
98 class RedirectHandler(urllib2.HTTPRedirectHandler):
99 def __init__(self):
100 self.last_url = None
101 def redirect_request(self, req, fp, code, msg, hdrs, newurl):
102 self.last_url = newurl
103 if not follow(newurl):
104 return None
105 r = urllib2.HTTPRedirectHandler.redirect_request(
106 self, req, fp, code, msg, hdrs, newurl)
107 r.get_method = lambda : 'HEAD'
108 return r
109
110 if not follow(link):
111 return link
112 redirect_handler = RedirectHandler()
113 opener = urllib2.build_opener(redirect_handler)
114 req = urllib2.Request(link)
115 req.get_method = lambda : 'HEAD'
116 try:
117 with contextlib.closing(opener.open(req)) as site:
118 return site.url
119 except (urllib2.HTTPError, urllib2.URLError):
120 return redirect_handler.last_url if redirect_handler.last_url else link
121
122 def expand_line(line, sites):
123 """Expand the links in the line for the given sites."""
124 l = line.strip()
125 msg_format, links = find_links(l)
126 args = tuple(follow_redirects(l, sites) for l in links)
127 return msg_format % args
128
129 def parse_host_list(list_of_hosts):
130 """Parse the comma separated list of hosts."""
131 p = set(
132 m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts))
133 return p
134