Download from peers using the hash instead of a directory location.
[quix0rs-apt-p2p.git] / apt_dht / PeerManager.py
1
2 from random import choice
3 from urlparse import urlparse, urlunparse
4
5 from twisted.internet import reactor, defer
6 from twisted.python import log
7 from twisted.trial import unittest
8 from twisted.web2 import stream as stream_mod
9 from twisted.web2.http import splitHostPort
10
11 from HTTPDownloader import HTTPClientManager
12
13 class PeerManager:
14     def __init__(self):
15         self.clients = {}
16         
17     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
18         """Download from a list of peers or fallback to a mirror.
19         
20         @type peers: C{list} of C{string}
21         @param peers: a list of the peers where the file can be found
22         """
23         if peers:
24             peer = choice(peers)
25             log.msg('Downloading from peer %s' % peer)
26             host, port = splitHostPort('http', peer)
27             path = '/~/' + hash
28         else:
29             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
30             parsed = urlparse(mirror)
31             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
32             host, port = splitHostPort(parsed[0], parsed[1])
33             path = urlunparse(('', '') + parsed[2:])
34
35         return self.getPeer(host, port, path, method, modtime)
36         
37     def getPeer(self, host, port, path, method="GET", modtime=None):
38         if not port:
39             port = 80
40         site = host + ":" + str(port)
41         if site not in self.clients:
42             self.clients[site] = HTTPClientManager(host, port)
43         return self.clients[site].get(path, method, modtime)
44     
45     def close(self):
46         for site in self.clients:
47             self.clients[site].close()
48         self.clients = {}
49
50 class TestPeerManager(unittest.TestCase):
51     """Unit tests for the PeerManager."""
52     
53     manager = None
54     pending_calls = []
55     
56     def gotResp(self, resp, num, expect):
57         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
58         if expect is not None:
59             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
60         def print_(n):
61             pass
62         def printdone(n):
63             pass
64         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
65     
66     def test_download(self):
67         self.manager = PeerManager()
68         self.timeout = 10
69         
70         host = 'www.ietf.org'
71         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
72         d.addCallback(self.gotResp, 1, 1070)
73         return d
74         
75     def test_head(self):
76         self.manager = PeerManager()
77         self.timeout = 10
78         
79         host = 'www.ietf.org'
80         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
81         d.addCallback(self.gotResp, 1, 0)
82         return d
83         
84     def test_multiple_downloads(self):
85         self.manager = PeerManager()
86         self.timeout = 120
87         lastDefer = defer.Deferred()
88         
89         def newRequest(host, path, num, expect, last=False):
90             d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
91             d.addCallback(self.gotResp, num, expect)
92             if last:
93                 d.addBoth(lastDefer.callback)
94                 
95         newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
96         newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
97         newRequest('www.google.ca', "/", 3, None)
98         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
99         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
100         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
101         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
102         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
103         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
104         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
105         return lastDefer
106         
107     def tearDown(self):
108         for p in self.pending_calls:
109             if p.active():
110                 p.cancel()
111         self.pending_calls = []
112         if self.manager:
113             self.manager.close()
114             self.manager = None