]>
jfr.im git - z_archive/twitter.git/blob - twitter/util.py
5a5f50710f4d8ed8d7d5a0c65f4bc97671f5f072
2 Internal utility functions.
4 `htmlentitydecode` came from here:
5 http://wiki.python.org/moin/EscapingHtml
8 from __future__
import print_function
17 PY_3_OR_HIGHER
= sys
.version_info
>= (3, 0)
20 from html
.entities
import name2codepoint
22 import urllib
.request
as urllib2
23 import urllib
.parse
as urlparse
25 from htmlentitydefs
import name2codepoint
29 def htmlentitydecode(s
):
31 '&(%s);' % '|'.join(name2codepoint
),
32 lambda m
: unichr(name2codepoint
[m
.group(1)]), s
)
34 def smrt_input(globals_
, locals_
, ps1
=">>> ", ps2
="... "):
41 inputs
.append(input(prompt
))
43 ret
= eval('\n'.join(inputs
), globals_
, locals_
)
50 def printNicely(string
):
51 if hasattr(sys
.stdout
, 'buffer'):
52 sys
.stdout
.buffer.write(string
.encode('utf8'))
54 sys
.stdout
.buffer.flush()
57 print(string
.encode('utf8'))
59 def actually_bytes(stringy
):
61 if type(stringy
) == bytes:
63 elif type(stringy
) != str:
64 stringy
= str(stringy
)
65 if type(stringy
) == str:
66 stringy
= stringy
.encode("utf-8")
68 if type(stringy
) == str:
70 elif type(stringy
) != unicode:
71 stringy
= str(stringy
)
72 if type(stringy
) == unicode:
73 stringy
= stringy
.encode("utf-8")
77 print(msg
, file=sys
.stderr
)
81 """A class to count fails during a repetitive task.
84 maximum: An integer for the maximum of fails to allow.
85 exit: An integer for the exit code when maximum of fail is reached.
88 count: Count a fail, exit when maximum of fails is reached.
89 wait: Same as count but also sleep for a given time in seconds.
91 def __init__(self
, maximum
=10, exit
=1):
98 err("Too many consecutive fails, exiting.")
99 raise SystemExit(self
.exit
)
101 def wait(self
, delay
=0):
107 def find_links(line
):
108 """Find all links in the given line. The function returns a sprintf style
109 format string (with %s placeholders for the links) and a list of urls."""
110 l
= line
.replace("%", "%%")
111 regex
= "(https?://[^ )]+)"
113 re
.sub(regex
, "%s", l
),
114 [m
.group(1) for m
in re
.finditer(regex
, l
)])
116 def follow_redirects(link
, sites
= None):
117 """Follow directs for the link as long as the redirects are on the given
118 sites and return the resolved link."""
120 return sites
== None or urlparse
.urlparse(url
).hostname
in sites
122 class RedirectHandler(urllib2
.HTTPRedirectHandler
):
125 def redirect_request(self
, req
, fp
, code
, msg
, hdrs
, newurl
):
126 self
.last_url
= newurl
127 if not follow(newurl
):
129 r
= urllib2
.HTTPRedirectHandler
.redirect_request(
130 self
, req
, fp
, code
, msg
, hdrs
, newurl
)
131 r
.get_method
= lambda : 'HEAD'
136 redirect_handler
= RedirectHandler()
137 opener
= urllib2
.build_opener(redirect_handler
)
138 req
= urllib2
.Request(link
)
139 req
.get_method
= lambda : 'HEAD'
141 with contextlib
.closing(opener
.open(req
,timeout
=1)) as site
:
144 return redirect_handler
.last_url
if redirect_handler
.last_url
else link
146 def expand_line(line
, sites
):
147 """Expand the links in the line for the given sites."""
150 msg_format
, links
= find_links(l
)
151 args
= tuple(follow_redirects(l
, sites
) for l
in links
)
152 line
= msg_format
% args
153 except Exception as e
:
155 err("expanding line %s failed due to %s" % (line
, unicode(e
)))
160 def parse_host_list(list_of_hosts
):
161 """Parse the comma separated list of hosts."""
163 m
.group(1) for m
in re
.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts
))
167 def align_text(text
, left_margin
=17, max_width
=160):
169 for line
in text
.split('\n'):
170 temp_lines
= textwrap
.wrap(line
, max_width
- left_margin
)
171 temp_lines
= [(' ' * left_margin
+ line
) for line
in temp_lines
]
172 lines
.append('\n'.join(temp_lines
))
173 ret
= '\n'.join(lines
)
177 __all__
= ["htmlentitydecode", "smrt_input"]