]>
Commit | Line | Data |
---|---|---|
f0603331 MV |
1 | # encoding: utf-8 |
2 | from __future__ import unicode_literals | |
3 | ||
907402f6 | 4 | from collections import namedtuple |
5 | import contextlib | |
6 | import functools | |
7 | import socket | |
907402f6 | 8 | import threading |
9 | from twitter.util import find_links, follow_redirects, expand_line, parse_host_list | |
10 | ||
62ec1b07 | 11 | try: |
12 | import http.server as BaseHTTPServer | |
13 | import socketserver as SocketServer | |
14 | except ImportError: | |
15 | import BaseHTTPServer | |
16 | import SocketServer | |
17 | ||
907402f6 | 18 | |
19 | def test_find_links(): | |
20 | assert find_links("nix") == ("nix", []) | |
21 | assert find_links("http://abc") == ("%s", ["http://abc"]) | |
22 | assert find_links("t http://abc") == ("t %s", ["http://abc"]) | |
23 | assert find_links("http://abc t") == ("%s t", ["http://abc"]) | |
be5f32da | 24 | assert find_links("1 http://a 2 http://b 3") == ("1 %s 2 %s 3", |
907402f6 | 25 | ["http://a", "http://b"]) |
26 | assert find_links("%") == ("%%", []) | |
27 | assert find_links("(http://abc)") == ("(%s)", ["http://abc"]) | |
28 | ||
29 | ||
30 | Response = namedtuple('Response', 'path code headers') | |
31 | ||
32 | @contextlib.contextmanager | |
33 | def start_server(*resp): | |
34 | """HTTP server replying with the given responses to the expected | |
35 | requests.""" | |
be5f32da | 36 | def url(port, path): |
907402f6 | 37 | return 'http://%s:%s%s' % (socket.gethostname(), port, path) |
be5f32da | 38 | |
907402f6 | 39 | responses = list(reversed(resp)) |
be5f32da | 40 | |
907402f6 | 41 | class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler): |
42 | def do_HEAD(self): | |
43 | response = responses.pop() | |
44 | assert response.path == self.path | |
45 | self.send_response(response.code) | |
62ec1b07 | 46 | for header, value in list(response.headers.items()): |
907402f6 | 47 | self.send_header(header, value) |
48 | self.end_headers() | |
be5f32da | 49 | |
907402f6 | 50 | httpd = SocketServer.TCPServer(("", 0), MyHandler) |
51 | t = threading.Thread(target=httpd.serve_forever) | |
52 | t.setDaemon(True) | |
53 | t.start() | |
54 | port = httpd.server_address[1] | |
55 | yield functools.partial(url, port) | |
56 | httpd.shutdown() | |
be5f32da | 57 | |
907402f6 | 58 | def test_follow_redirects_direct_link(): |
59 | link = "/resource" | |
60 | with start_server(Response(link, 200, {})) as url: | |
61 | assert url(link) == follow_redirects(url(link)) | |
62 | ||
63 | def test_follow_redirects_redirected_link(): | |
64 | redirected = "/redirected" | |
65 | link = "/resource" | |
66 | with start_server( | |
be5f32da | 67 | Response(link, 301, {"Location": redirected}), |
907402f6 | 68 | Response(redirected, 200, {})) as url: |
69 | assert url(redirected) == follow_redirects(url(link)) | |
be5f32da | 70 | |
907402f6 | 71 | def test_follow_redirects_unavailable(): |
72 | link = "/resource" | |
73 | with start_server(Response(link, 404, {})) as url: | |
74 | assert url(link) == follow_redirects(url(link)) | |
75 | ||
76 | def test_follow_redirects_link_to_last_available(): | |
77 | unavailable = "/unavailable" | |
78 | link = "/resource" | |
79 | with start_server( | |
be5f32da | 80 | Response(link, 301, {"Location": unavailable}), |
907402f6 | 81 | Response(unavailable, 404, {})) as url: |
82 | assert url(unavailable) == follow_redirects(url(link)) | |
83 | ||
84 | ||
85 | def test_follow_redirects_no_where(): | |
86 | link = "http://links.nowhere/" | |
87 | assert link == follow_redirects(link) | |
be5f32da | 88 | |
907402f6 | 89 | def test_follow_redirects_link_to_nowhere(): |
90 | unavailable = "http://links.nowhere/" | |
91 | link = "/resource" | |
92 | with start_server( | |
93 | Response(link, 301, {"Location": unavailable})) as url: | |
94 | assert unavailable == follow_redirects(url(link)) | |
95 | ||
96 | def test_follow_redirects_filtered_by_site(): | |
97 | link = "/resource" | |
98 | with start_server() as url: | |
99 | assert url(link) == follow_redirects(url(link), ["other_host"]) | |
100 | ||
101 | ||
102 | def test_follow_redirects_filtered_by_site_after_redirect(): | |
103 | link = "/resource" | |
104 | redirected = "/redirected" | |
105 | filtered = "http://dont-follow/" | |
106 | with start_server( | |
be5f32da | 107 | Response(link, 301, {"Location": redirected}), |
907402f6 | 108 | Response(redirected, 301, {"Location": filtered})) as url: |
109 | hosts = [socket.gethostname()] | |
110 | assert filtered == follow_redirects(url(link), hosts) | |
111 | ||
112 | def test_follow_redirects_filtered_by_site_allowed(): | |
113 | redirected = "/redirected" | |
114 | link = "/resource" | |
115 | with start_server( | |
be5f32da | 116 | Response(link, 301, {"Location": redirected}), |
907402f6 | 117 | Response(redirected, 200, {})) as url: |
118 | hosts = [socket.gethostname()] | |
119 | assert url(redirected) == follow_redirects(url(link), hosts) | |
120 | ||
121 | def test_expand_line(): | |
122 | redirected = "/redirected" | |
123 | link = "/resource" | |
124 | with start_server( | |
be5f32da | 125 | Response(link, 301, {"Location": redirected}), |
907402f6 | 126 | Response(redirected, 200, {})) as url: |
127 | fmt = "before %s after" | |
128 | line = fmt % url(link) | |
129 | expected = fmt % url(redirected) | |
130 | assert expected == expand_line(line, None) | |
131 | ||
132 | def test_parse_host_config(): | |
133 | assert set() == parse_host_list("") | |
134 | assert set("h") == parse_host_list("h") | |
135 | assert set(["1", "2"]) == parse_host_list("1,2") | |
136 | assert set(["1", "2"]) == parse_host_list(" 1 , 2 ") | |
137 |