Rename all apt-dht files to apt-p2p.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7
8 from twisted.internet import reactor, defer
9 from twisted.python import log
10 from twisted.trial import unittest
11 from twisted.web2 import stream as stream_mod
12 from twisted.web2.http import splitHostPort
13
14 from HTTPDownloader import Peer
15 from util import uncompact
16
17 class PeerManager:
18     """Manage a set of peers and the requests to them.
19     
20     @type clients: C{dictionary}
21     @ivar clients: the available peers that have been previously contacted
22     """
23
24     def __init__(self):
25         """Initialize the instance."""
26         self.clients = {}
27         
28     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
29         """Download from a list of peers or fallback to a mirror.
30         
31         @type hash: L{Hash.HashObject}
32         @param hash: the hash object containing the expected hash for the file
33         @param mirror: the URI of the file on the mirror
34         @type peers: C{list} of C{string}
35         @param peers: a list of the peer info where the file can be found
36             (optional, defaults to downloading from the mirror)
37         @type method: C{string}
38         @param method: the HTTP method to use, 'GET' or 'HEAD'
39             (optional, defaults to 'GET')
40         @type modtime: C{int}
41         @param modtime: the modification time to use for an 'If-Modified-Since'
42             header, as seconds since the epoch
43             (optional, defaults to not sending that header)
44         """
45         if peers:
46             # Choose one of the peers at random
47             compact_peer = choice(peers)
48             peer = uncompact(compact_peer['c'])
49             log.msg('Downloading from peer %r' % (peer, ))
50             site = peer
51             path = '/~/' + quote_plus(hash.expected())
52         else:
53             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
54             parsed = urlparse(mirror)
55             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
56             site = splitHostPort(parsed[0], parsed[1])
57             path = urlunparse(('', '') + parsed[2:])
58
59         return self.getPeer(site, path, method, modtime)
60         
61     def getPeer(self, site, path, method="GET", modtime=None):
62         """Create a new peer if necessary and forward the request to it.
63         
64         @type site: (C{string}, C{int})
65         @param site: the IP address and port of the peer
66         @type path: C{string}
67         @param path: the path to the file on the peer
68         @type method: C{string}
69         @param method: the HTTP method to use, 'GET' or 'HEAD'
70             (optional, defaults to 'GET')
71         @type modtime: C{int}
72         @param modtime: the modification time to use for an 'If-Modified-Since'
73             header, as seconds since the epoch
74             (optional, defaults to not sending that header)
75         """
76         if site not in self.clients:
77             self.clients[site] = Peer(site[0], site[1])
78         return self.clients[site].get(path, method, modtime)
79     
80     def close(self):
81         """Close all the connections to peers."""
82         for site in self.clients:
83             self.clients[site].close()
84         self.clients = {}
85
86 class TestPeerManager(unittest.TestCase):
87     """Unit tests for the PeerManager."""
88     
89     manager = None
90     pending_calls = []
91     
92     def gotResp(self, resp, num, expect):
93         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
94         if expect is not None:
95             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
96         def print_(n):
97             pass
98         def printdone(n):
99             pass
100         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
101     
102     def test_download(self):
103         """Tests a normal download."""
104         self.manager = PeerManager()
105         self.timeout = 10
106         
107         host = 'www.ietf.org'
108         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
109         d.addCallback(self.gotResp, 1, 1070)
110         return d
111         
112     def test_head(self):
113         """Tests a 'HEAD' request."""
114         self.manager = PeerManager()
115         self.timeout = 10
116         
117         host = 'www.ietf.org'
118         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
119         d.addCallback(self.gotResp, 1, 0)
120         return d
121         
122     def test_multiple_downloads(self):
123         """Tests multiple downloads with queueing and connection closing."""
124         self.manager = PeerManager()
125         self.timeout = 120
126         lastDefer = defer.Deferred()
127         
128         def newRequest(host, path, num, expect, last=False):
129             d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
130             d.addCallback(self.gotResp, num, expect)
131             if last:
132                 d.addBoth(lastDefer.callback)
133                 
134         newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
135         newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
136         newRequest('www.google.ca', "/", 3, None)
137         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
138         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
139         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
140         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
141         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
142         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
143         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
144         return lastDefer
145         
146     def tearDown(self):
147         for p in self.pending_calls:
148             if p.active():
149                 p.cancel()
150         self.pending_calls = []
151         if self.manager:
152             self.manager.close()
153             self.manager = None