]> jfr.im git - z_archive/twitter.git/commitdiff
follow redirects of tweeted urls before archiving
authorblob79 <redacted>
Tue, 24 Jul 2012 19:11:50 +0000 (21:11 +0200)
committerMike Verdone <redacted>
Fri, 3 Aug 2012 20:48:53 +0000 (22:48 +0200)
tests/test_util.py [new file with mode: 0644]
twitter/archiver.py
twitter/util.py

diff --git a/tests/test_util.py b/tests/test_util.py
new file mode 100644 (file)
index 0000000..322d11b
--- /dev/null
@@ -0,0 +1,129 @@
+import BaseHTTPServer
+from collections import namedtuple
+import contextlib
+import functools
+import socket
+import SocketServer
+import threading
+from twitter.util import find_links, follow_redirects, expand_line, parse_host_list
+
+
+def test_find_links():
+    assert find_links("nix") == ("nix", [])
+    assert find_links("http://abc") == ("%s", ["http://abc"])
+    assert find_links("t http://abc") == ("t %s", ["http://abc"])
+    assert find_links("http://abc t") == ("%s t", ["http://abc"])
+    assert find_links("1 http://a 2 http://b 3") == ("1 %s 2 %s 3", 
+        ["http://a", "http://b"])
+    assert find_links("%") == ("%%", [])
+    assert find_links("(http://abc)") == ("(%s)", ["http://abc"])
+
+
+Response = namedtuple('Response', 'path code headers')
+
+@contextlib.contextmanager
+def start_server(*resp):
+    """HTTP server replying with the given responses to the expected
+    requests."""
+    def url(port, path): 
+        return 'http://%s:%s%s' % (socket.gethostname(), port, path)
+    
+    responses = list(reversed(resp))
+    
+    class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+        def do_HEAD(self):
+            response = responses.pop()
+            assert response.path == self.path
+            self.send_response(response.code)
+            for header, value in response.headers.iteritems():
+                self.send_header(header, value)
+            self.end_headers()
+            
+    httpd = SocketServer.TCPServer(("", 0), MyHandler)
+    t = threading.Thread(target=httpd.serve_forever)
+    t.setDaemon(True)
+    t.start()
+    port = httpd.server_address[1]
+    yield functools.partial(url, port)
+    httpd.shutdown()
+    
+def test_follow_redirects_direct_link():
+    link = "/resource"
+    with start_server(Response(link, 200, {})) as url:
+        assert url(link) == follow_redirects(url(link))
+
+def test_follow_redirects_redirected_link():
+    redirected = "/redirected"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 200, {})) as url:
+        assert url(redirected) == follow_redirects(url(link))
+        
+def test_follow_redirects_unavailable():
+    link = "/resource"
+    with start_server(Response(link, 404, {})) as url:
+        assert url(link) == follow_redirects(url(link))
+
+def test_follow_redirects_link_to_last_available():
+    unavailable = "/unavailable"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": unavailable}), 
+        Response(unavailable, 404, {})) as url:
+        assert url(unavailable) == follow_redirects(url(link))
+
+
+def test_follow_redirects_no_where():
+    link = "http://links.nowhere/"
+    assert link == follow_redirects(link)
+    
+def test_follow_redirects_link_to_nowhere():
+    unavailable = "http://links.nowhere/"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": unavailable})) as url:
+        assert unavailable == follow_redirects(url(link))
+
+def test_follow_redirects_filtered_by_site():
+    link = "/resource"
+    with start_server() as url:
+        assert url(link) == follow_redirects(url(link), ["other_host"])
+
+
+def test_follow_redirects_filtered_by_site_after_redirect():
+    link = "/resource"
+    redirected = "/redirected"
+    filtered = "http://dont-follow/"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 301, {"Location": filtered})) as url:
+        hosts = [socket.gethostname()]
+        assert filtered == follow_redirects(url(link), hosts)
+
+def test_follow_redirects_filtered_by_site_allowed():
+    redirected = "/redirected"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 200, {})) as url:
+        hosts = [socket.gethostname()]
+        assert url(redirected) == follow_redirects(url(link), hosts)
+
+def test_expand_line():
+    redirected = "/redirected"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 200, {})) as url:
+        fmt = "before %s after"
+        line = fmt % url(link)
+        expected = fmt % url(redirected)
+        assert expected == expand_line(line, None)
+
+def test_parse_host_config():
+    assert set() == parse_host_list("")
+    assert set("h") == parse_host_list("h")
+    assert set(["1", "2"]) == parse_host_list("1,2")
+    assert set(["1", "2"]) == parse_host_list(" 1 , 2 ")
+
index 4768dc42ffc358f0f0a5c87ab32a08a3d7ec5610..5ceefd5619ae19ccfbb5850615b6d496dd0a4369 100644 (file)
@@ -14,6 +14,8 @@ OPTIONS
  -a --api-rate         see current API rate limit status
  -t --timeline <file>  archive own timeline into given file name (requires
                        OAuth, max 800 statuses).
+ -f --follow-redirects follow redirects of urls
+ -r --redirect-sites   follow redirects for this comma separated list of hosts
 
 AUTHENTICATION
     Authenticate to Twitter using OAuth to archive tweets of private profiles
@@ -23,7 +25,7 @@ AUTHENTICATION
 
 from __future__ import print_function
 
-import os, sys, time, calendar, urllib2, httplib
+import os, sys, time, calendar, urllib2, httplib, functools
 from getopt import gnu_getopt as getopt, GetoptError
 
 # T-Archiver (Twitter-Archiver) application registered by @stalkr_
@@ -34,13 +36,13 @@ from .api import Twitter, TwitterError
 from .oauth import OAuth, read_token_file
 from .oauth_dance import oauth_dance
 from .auth import NoAuth
-from .util import Fail, err
+from .util import Fail, err, expand_line, parse_host_list
 from .follow import lookup
 
 def parse_args(args, options):
     """Parse arguments from command-line to set options."""
-    long_opts = ['help', 'oauth', 'save-dir=', 'api-rate', 'timeline=']
-    short_opts = "hos:at:"
+    long_opts = ['help', 'oauth', 'save-dir=', 'api-rate', 'timeline=', 'follow-redirects',"redirect-sites="]
+    short_opts = "hos:at:fr:"
     opts, extra_args = getopt(args, short_opts, long_opts)
 
     for opt, arg in opts:
@@ -55,6 +57,10 @@ def parse_args(args, options):
             options['api-rate' ] = True
         elif opt in ('-t', '--timeline'):
             options['timeline'] = arg
+        elif opt in ('-f', '--follow-redirects'):
+            options['follow-redirects'] = True
+        elif opt in ('-r', '--redirect-sites'):
+            options['redirect-sites'] = arg
 
     options['extra_args'] = extra_args
 
@@ -108,7 +114,11 @@ def format_date(utc, to_localtime=True):
     else:
         return time.strftime("%Y-%m-%d %H:%M:%S UTC", u)
 
-def format_text(text):
+def expand_format_text(hosts, text):
+    """Following redirects in links."""
+    return direct_format_text(expand_line(text, hosts))
+
+def direct_format_text(text):
     """Transform special chars in text to have only one line."""
     return text.replace('\n','\\n').replace('\r','\\r')
 
@@ -161,7 +171,6 @@ def timeline_portion(twitter, screen_name, max_id=None):
         tweets[t['id']] = "%s <%s> %s" % (format_date(t['created_at']),
                                           t['user']['screen_name'],
                                           format_text(text))
-
     return tweets
 
 def timeline(twitter, screen_name, tweets):
@@ -232,7 +241,9 @@ def main(args=sys.argv[1:]):
         'oauth': False,
         'save-dir': ".",
         'api-rate': False,
-        'timeline': ""
+        'timeline': "",
+        'follow-redirects': False,
+        'redirect-sites': None,
     }
     try:
         parse_args(args, options)
@@ -266,6 +277,16 @@ def main(args=sys.argv[1:]):
         rate_limit_status(twitter)
         return
 
+    global format_text
+    if options['follow-redirects'] or options['redirect-sites'] :
+        if options['redirect-sites']:
+            hosts = parse_host_list(options['redirect-sites'])
+        else:
+            hosts = None
+        format_text = functools.partial(expand_format_text, hosts)
+    else:
+        format_text = direct_format_text
+    
     # save own timeline (the user used in OAuth)
     if options['timeline']:
         if isinstance(auth, NoAuth):
index 27142af5636f22d2593994f6e2450533148a78fe..4396b07dabc3408d4f6113edb26b7687c7b2d5db 100644 (file)
@@ -7,9 +7,12 @@ Internal utility functions.
 
 from __future__ import print_function
 
+import contextlib
 import re
 import sys
 import time
+import urllib2
+import urlparse
 
 try:
     from html.entities import name2codepoint
@@ -75,3 +78,57 @@ class Fail(object):
         self.count()
         if delay > 0:
             time.sleep(delay)
+
+
+def find_links(line):
+    """Find all links in the given line. The function returns a sprintf style
+    format string (with %s placeholders for the links) and a list of urls."""
+    l = line.replace(u"%", u"%%")
+    regex = "(https?://[^ )]+)"
+    return (
+        re.sub(regex, "%s", l), 
+        [m.group(1) for m in re.finditer(regex, l)])
+    
+def follow_redirects(link, sites= None):
+    """Follow directs for the link as long as the redirects are on the given
+    sites and return the resolved link."""
+    def follow(url):
+        return sites == None or urlparse.urlparse(url).hostname in sites
+                
+    class RedirectHandler(urllib2.HTTPRedirectHandler):
+        def __init__(self):
+            self.last_url = None
+        def redirect_request(self, req, fp, code, msg, hdrs, newurl):
+            self.last_url = newurl
+            if not follow(newurl):
+                return None
+            r = urllib2.HTTPRedirectHandler.redirect_request(
+                self, req, fp, code, msg, hdrs, newurl)
+            r.get_method = lambda : 'HEAD'
+            return r
+            
+    if not follow(link):
+        return link
+    redirect_handler = RedirectHandler()
+    opener = urllib2.build_opener(redirect_handler)
+    req = urllib2.Request(link)
+    req.get_method = lambda : 'HEAD'
+    try:
+        with contextlib.closing(opener.open(req)) as site:
+            return site.url
+    except (urllib2.HTTPError, urllib2.URLError):
+        return redirect_handler.last_url if redirect_handler.last_url else link
+
+def expand_line(line, sites):
+    """Expand the links in the line for the given sites."""
+    l = line.strip()
+    msg_format, links = find_links(l)
+    args = tuple(follow_redirects(l, sites) for l in links)
+    return msg_format % args
+
+def parse_host_list(list_of_hosts):
+    """Parse the comma separated list of hosts."""
+    p = set(
+        m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts))
+    return p
+