]> jfr.im git - z_archive/twitter.git/commitdiff
follow redirects of tweeted urls before archiving
authorblob79 <redacted>
Tue, 24 Jul 2012 19:11:50 +0000 (21:11 +0200)
committerMike Verdone <redacted>
Fri, 3 Aug 2012 20:48:53 +0000 (22:48 +0200)
tests/test_util.py [new file with mode: 0644]
twitter/archiver.py
twitter/util.py

diff --git a/tests/test_util.py b/tests/test_util.py
new file mode 100644 (file)
index 0000000..322d11b
--- /dev/null
@@ -0,0 +1,129 @@
+import BaseHTTPServer
+from collections import namedtuple
+import contextlib
+import functools
+import socket
+import SocketServer
+import threading
+from twitter.util import find_links, follow_redirects, expand_line, parse_host_list
+
+
+def test_find_links():
+    assert find_links("nix") == ("nix", [])
+    assert find_links("http://abc") == ("%s", ["http://abc"])
+    assert find_links("t http://abc") == ("t %s", ["http://abc"])
+    assert find_links("http://abc t") == ("%s t", ["http://abc"])
+    assert find_links("1 http://a 2 http://b 3") == ("1 %s 2 %s 3", 
+        ["http://a", "http://b"])
+    assert find_links("%") == ("%%", [])
+    assert find_links("(http://abc)") == ("(%s)", ["http://abc"])
+
+
+Response = namedtuple('Response', 'path code headers')
+
+@contextlib.contextmanager
+def start_server(*resp):
+    """HTTP server replying with the given responses to the expected
+    requests."""
+    def url(port, path): 
+        return 'http://%s:%s%s' % (socket.gethostname(), port, path)
+    
+    responses = list(reversed(resp))
+    
+    class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+        def do_HEAD(self):
+            response = responses.pop()
+            assert response.path == self.path
+            self.send_response(response.code)
+            for header, value in response.headers.iteritems():
+                self.send_header(header, value)
+            self.end_headers()
+            
+    httpd = SocketServer.TCPServer(("", 0), MyHandler)
+    t = threading.Thread(target=httpd.serve_forever)
+    t.setDaemon(True)
+    t.start()
+    port = httpd.server_address[1]
+    yield functools.partial(url, port)
+    httpd.shutdown()
+    
+def test_follow_redirects_direct_link():
+    link = "/resource"
+    with start_server(Response(link, 200, {})) as url:
+        assert url(link) == follow_redirects(url(link))
+
+def test_follow_redirects_redirected_link():
+    redirected = "/redirected"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 200, {})) as url:
+        assert url(redirected) == follow_redirects(url(link))
+        
+def test_follow_redirects_unavailable():
+    link = "/resource"
+    with start_server(Response(link, 404, {})) as url:
+        assert url(link) == follow_redirects(url(link))
+
+def test_follow_redirects_link_to_last_available():
+    unavailable = "/unavailable"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": unavailable}), 
+        Response(unavailable, 404, {})) as url:
+        assert url(unavailable) == follow_redirects(url(link))
+
+
+def test_follow_redirects_no_where():
+    link = "http://links.nowhere/"
+    assert link == follow_redirects(link)
+    
+def test_follow_redirects_link_to_nowhere():
+    unavailable = "http://links.nowhere/"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": unavailable})) as url:
+        assert unavailable == follow_redirects(url(link))
+
+def test_follow_redirects_filtered_by_site():
+    link = "/resource"
+    with start_server() as url:
+        assert url(link) == follow_redirects(url(link), ["other_host"])
+
+
+def test_follow_redirects_filtered_by_site_after_redirect():
+    link = "/resource"
+    redirected = "/redirected"
+    filtered = "http://dont-follow/"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 301, {"Location": filtered})) as url:
+        hosts = [socket.gethostname()]
+        assert filtered == follow_redirects(url(link), hosts)
+
+def test_follow_redirects_filtered_by_site_allowed():
+    redirected = "/redirected"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 200, {})) as url:
+        hosts = [socket.gethostname()]
+        assert url(redirected) == follow_redirects(url(link), hosts)
+
+def test_expand_line():
+    redirected = "/redirected"
+    link = "/resource"
+    with start_server(
+        Response(link, 301, {"Location": redirected}), 
+        Response(redirected, 200, {})) as url:
+        fmt = "before %s after"
+        line = fmt % url(link)
+        expected = fmt % url(redirected)
+        assert expected == expand_line(line, None)
+
+def test_parse_host_config():
+    assert set() == parse_host_list("")
+    assert set("h") == parse_host_list("h")
+    assert set(["1", "2"]) == parse_host_list("1,2")
+    assert set(["1", "2"]) == parse_host_list(" 1 , 2 ")
+
index 4768dc42ffc358f0f0a5c87ab32a08a3d7ec5610..5ceefd5619ae19ccfbb5850615b6d496dd0a4369 100644 (file)
@@ -14,6 +14,8 @@ OPTIONS
  -a --api-rate         see current API rate limit status
  -t --timeline <file>  archive own timeline into given file name (requires
                        OAuth, max 800 statuses).
  -a --api-rate         see current API rate limit status
  -t --timeline <file>  archive own timeline into given file name (requires
                        OAuth, max 800 statuses).
+ -f --follow-redirects follow redirects of urls
+ -r --redirect-sites   follow redirects for this comma separated list of hosts
 
 AUTHENTICATION
     Authenticate to Twitter using OAuth to archive tweets of private profiles
 
 AUTHENTICATION
     Authenticate to Twitter using OAuth to archive tweets of private profiles
@@ -23,7 +25,7 @@ AUTHENTICATION
 
 from __future__ import print_function
 
 
 from __future__ import print_function
 
-import os, sys, time, calendar, urllib2, httplib
+import os, sys, time, calendar, urllib2, httplib, functools
 from getopt import gnu_getopt as getopt, GetoptError
 
 # T-Archiver (Twitter-Archiver) application registered by @stalkr_
 from getopt import gnu_getopt as getopt, GetoptError
 
 # T-Archiver (Twitter-Archiver) application registered by @stalkr_
@@ -34,13 +36,13 @@ from .api import Twitter, TwitterError
 from .oauth import OAuth, read_token_file
 from .oauth_dance import oauth_dance
 from .auth import NoAuth
 from .oauth import OAuth, read_token_file
 from .oauth_dance import oauth_dance
 from .auth import NoAuth
-from .util import Fail, err
+from .util import Fail, err, expand_line, parse_host_list
 from .follow import lookup
 
 def parse_args(args, options):
     """Parse arguments from command-line to set options."""
 from .follow import lookup
 
 def parse_args(args, options):
     """Parse arguments from command-line to set options."""
-    long_opts = ['help', 'oauth', 'save-dir=', 'api-rate', 'timeline=']
-    short_opts = "hos:at:"
+    long_opts = ['help', 'oauth', 'save-dir=', 'api-rate', 'timeline=', 'follow-redirects',"redirect-sites="]
+    short_opts = "hos:at:fr:"
     opts, extra_args = getopt(args, short_opts, long_opts)
 
     for opt, arg in opts:
     opts, extra_args = getopt(args, short_opts, long_opts)
 
     for opt, arg in opts:
@@ -55,6 +57,10 @@ def parse_args(args, options):
             options['api-rate' ] = True
         elif opt in ('-t', '--timeline'):
             options['timeline'] = arg
             options['api-rate' ] = True
         elif opt in ('-t', '--timeline'):
             options['timeline'] = arg
+        elif opt in ('-f', '--follow-redirects'):
+            options['follow-redirects'] = True
+        elif opt in ('-r', '--redirect-sites'):
+            options['redirect-sites'] = arg
 
     options['extra_args'] = extra_args
 
 
     options['extra_args'] = extra_args
 
@@ -108,7 +114,11 @@ def format_date(utc, to_localtime=True):
     else:
         return time.strftime("%Y-%m-%d %H:%M:%S UTC", u)
 
     else:
         return time.strftime("%Y-%m-%d %H:%M:%S UTC", u)
 
-def format_text(text):
+def expand_format_text(hosts, text):
+    """Following redirects in links."""
+    return direct_format_text(expand_line(text, hosts))
+
+def direct_format_text(text):
     """Transform special chars in text to have only one line."""
     return text.replace('\n','\\n').replace('\r','\\r')
 
     """Transform special chars in text to have only one line."""
     return text.replace('\n','\\n').replace('\r','\\r')
 
@@ -161,7 +171,6 @@ def timeline_portion(twitter, screen_name, max_id=None):
         tweets[t['id']] = "%s <%s> %s" % (format_date(t['created_at']),
                                           t['user']['screen_name'],
                                           format_text(text))
         tweets[t['id']] = "%s <%s> %s" % (format_date(t['created_at']),
                                           t['user']['screen_name'],
                                           format_text(text))
-
     return tweets
 
 def timeline(twitter, screen_name, tweets):
     return tweets
 
 def timeline(twitter, screen_name, tweets):
@@ -232,7 +241,9 @@ def main(args=sys.argv[1:]):
         'oauth': False,
         'save-dir': ".",
         'api-rate': False,
         'oauth': False,
         'save-dir': ".",
         'api-rate': False,
-        'timeline': ""
+        'timeline': "",
+        'follow-redirects': False,
+        'redirect-sites': None,
     }
     try:
         parse_args(args, options)
     }
     try:
         parse_args(args, options)
@@ -266,6 +277,16 @@ def main(args=sys.argv[1:]):
         rate_limit_status(twitter)
         return
 
         rate_limit_status(twitter)
         return
 
+    global format_text
+    if options['follow-redirects'] or options['redirect-sites'] :
+        if options['redirect-sites']:
+            hosts = parse_host_list(options['redirect-sites'])
+        else:
+            hosts = None
+        format_text = functools.partial(expand_format_text, hosts)
+    else:
+        format_text = direct_format_text
+    
     # save own timeline (the user used in OAuth)
     if options['timeline']:
         if isinstance(auth, NoAuth):
     # save own timeline (the user used in OAuth)
     if options['timeline']:
         if isinstance(auth, NoAuth):
index 27142af5636f22d2593994f6e2450533148a78fe..4396b07dabc3408d4f6113edb26b7687c7b2d5db 100644 (file)
@@ -7,9 +7,12 @@ Internal utility functions.
 
 from __future__ import print_function
 
 
 from __future__ import print_function
 
+import contextlib
 import re
 import sys
 import time
 import re
 import sys
 import time
+import urllib2
+import urlparse
 
 try:
     from html.entities import name2codepoint
 
 try:
     from html.entities import name2codepoint
@@ -75,3 +78,57 @@ class Fail(object):
         self.count()
         if delay > 0:
             time.sleep(delay)
         self.count()
         if delay > 0:
             time.sleep(delay)
+
+
+def find_links(line):
+    """Find all links in the given line. The function returns a sprintf style
+    format string (with %s placeholders for the links) and a list of urls."""
+    l = line.replace(u"%", u"%%")
+    regex = "(https?://[^ )]+)"
+    return (
+        re.sub(regex, "%s", l), 
+        [m.group(1) for m in re.finditer(regex, l)])
+    
+def follow_redirects(link, sites= None):
+    """Follow directs for the link as long as the redirects are on the given
+    sites and return the resolved link."""
+    def follow(url):
+        return sites == None or urlparse.urlparse(url).hostname in sites
+                
+    class RedirectHandler(urllib2.HTTPRedirectHandler):
+        def __init__(self):
+            self.last_url = None
+        def redirect_request(self, req, fp, code, msg, hdrs, newurl):
+            self.last_url = newurl
+            if not follow(newurl):
+                return None
+            r = urllib2.HTTPRedirectHandler.redirect_request(
+                self, req, fp, code, msg, hdrs, newurl)
+            r.get_method = lambda : 'HEAD'
+            return r
+            
+    if not follow(link):
+        return link
+    redirect_handler = RedirectHandler()
+    opener = urllib2.build_opener(redirect_handler)
+    req = urllib2.Request(link)
+    req.get_method = lambda : 'HEAD'
+    try:
+        with contextlib.closing(opener.open(req)) as site:
+            return site.url
+    except (urllib2.HTTPError, urllib2.URLError):
+        return redirect_handler.last_url if redirect_handler.last_url else link
+
+def expand_line(line, sites):
+    """Expand the links in the line for the given sites."""
+    l = line.strip()
+    msg_format, links = find_links(l)
+    args = tuple(follow_redirects(l, sites) for l in links)
+    return msg_format % args
+
+def parse_host_list(list_of_hosts):
+    """Parse the comma separated list of hosts."""
+    p = set(
+        m.group(1) for m in re.finditer("\s*([^,\s]+)\s*,?\s*", list_of_hosts))
+    return p
+