WIP on multiple peer downloading (completely broken).
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7
8 from twisted.internet import reactor, defer
9 from twisted.python import log
10 from twisted.trial import unittest
11 from twisted.web2 import stream as stream_mod
12 from twisted.web2.http import splitHostPort
13
14 from HTTPDownloader import Peer
15 from util import uncompact
16
17 class FileDownload(defer.Deferred):
18     """Manage a download from a list of peers or a mirror.
19     
20     
21     """
22     
23     def __init__(self, manager, hash, mirror, compact_peers):
24         """Initialize the instance.
25         
26         @type hash: L{Hash.HashObject}
27         @param hash: the hash object containing the expected hash for the file
28         @param mirror: the URI of the file on the mirror
29         @type compact_peers: C{list} of C{string}
30         @param compact_peers: a list of the peer info where the file can be found
31         """
32         defer.Deferred.__init__(self)
33         self.manager = manager
34         self.hash = hash
35         self.mirror = mirror
36
37         self.peers = {}
38         no_pieces = 0
39         pieces_string = {}
40         pieces_hash = {}
41         pieces_dl_hash = {}
42
43         for compact_peer in compact_peers:
44             # Build a list of all the peers for this download
45             site = uncompact(compact_peer['c'])
46             peer = manager.getPeer(site)
47             self.peers[site] = peer
48
49             # Extract any piece information from the peers list
50             if 't' in compact_peer:
51                 pieces_string.setdefault(compact_peer['t']['t'], 0)
52                 pieces_string[compact_peer['t']['t']] += 1
53             elif 'h' in compact_peer:
54                 pieces_hash.setdefault(compact_peer['h'], 0)
55                 pieces_hash[compact_peer['h']] += 1
56             elif 'l' in compact_peer:
57                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
58                 pieces_dl_hash[compact_peer['l']] += 1
59             else:
60                 no_pieces += 1
61         
62         max_found = max(no_pieces, max(pieces_string.values()),
63                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
64
65         if max_found == no_pieces:
66             self.sort()
67             pieces = []
68             if max_found < len(self.peers):
69                 pass
70         elif max_found == max(pieces_string.values()):
71             pass
72         
73     def sort(self):
74         def sort(a, b):
75             """Sort peers by their rank."""
76             if a.rank > b.rank:
77                 return 1
78             elif a.rank < b.rank:
79                 return -1
80             return 0
81         self.peers.sort(sort)
82
83 class PeerManager:
84     """Manage a set of peers and the requests to them.
85     
86     @type clients: C{dictionary}
87     @ivar clients: the available peers that have been previously contacted
88     """
89
90     def __init__(self):
91         """Initialize the instance."""
92         self.clients = {}
93         
94     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
95         """Download from a list of peers or fallback to a mirror.
96         
97         @type hash: L{Hash.HashObject}
98         @param hash: the hash object containing the expected hash for the file
99         @param mirror: the URI of the file on the mirror
100         @type peers: C{list} of C{string}
101         @param peers: a list of the peer info where the file can be found
102             (optional, defaults to downloading from the mirror)
103         @type method: C{string}
104         @param method: the HTTP method to use, 'GET' or 'HEAD'
105             (optional, defaults to 'GET')
106         @type modtime: C{int}
107         @param modtime: the modification time to use for an 'If-Modified-Since'
108             header, as seconds since the epoch
109             (optional, defaults to not sending that header)
110         """
111         if not peers or method != "GET" or modtime is not None:
112             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
113             parsed = urlparse(mirror)
114             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
115             site = splitHostPort(parsed[0], parsed[1])
116             path = urlunparse(('', '') + parsed[2:])
117             peer = self.getPeer(site)
118             return peer.get(path, method, modtime)
119         elif len(peers) == 1:
120             site = uncompact(peers[0]['c'])
121             log.msg('Downloading from peer %r' % (site, ))
122             path = '/~/' + quote_plus(hash.expected())
123             peer = self.getPeer(site)
124             return peer.get(path)
125         else:
126             FileDownload(self, hash, mirror, peers)
127             
128         
129     def getPeer(self, site):
130         """Create a new peer if necessary and return it.
131         
132         @type site: (C{string}, C{int})
133         @param site: the IP address and port of the peer
134         """
135         if site not in self.clients:
136             self.clients[site] = Peer(site[0], site[1])
137         return self.clients[site]
138     
139     def close(self):
140         """Close all the connections to peers."""
141         for site in self.clients:
142             self.clients[site].close()
143         self.clients = {}
144
145 class TestPeerManager(unittest.TestCase):
146     """Unit tests for the PeerManager."""
147     
148     manager = None
149     pending_calls = []
150     
151     def gotResp(self, resp, num, expect):
152         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
153         if expect is not None:
154             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
155         def print_(n):
156             pass
157         def printdone(n):
158             pass
159         stream_mod.readStream(resp.stream, print_).addCallback(printdone)
160     
161     def test_download(self):
162         """Tests a normal download."""
163         self.manager = PeerManager()
164         self.timeout = 10
165         
166         host = 'www.ietf.org'
167         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
168         d.addCallback(self.gotResp, 1, 1070)
169         return d
170         
171     def test_head(self):
172         """Tests a 'HEAD' request."""
173         self.manager = PeerManager()
174         self.timeout = 10
175         
176         host = 'www.ietf.org'
177         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
178         d.addCallback(self.gotResp, 1, 0)
179         return d
180         
181     def test_multiple_downloads(self):
182         """Tests multiple downloads with queueing and connection closing."""
183         self.manager = PeerManager()
184         self.timeout = 120
185         lastDefer = defer.Deferred()
186         
187         def newRequest(host, path, num, expect, last=False):
188             d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
189             d.addCallback(self.gotResp, num, expect)
190             if last:
191                 d.addBoth(lastDefer.callback)
192                 
193         newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
194         newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
195         newRequest('www.google.ca', "/", 3, None)
196         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
197         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
198         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
199         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
200         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
201         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
202         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
203         return lastDefer
204         
205     def tearDown(self):
206         for p in self.pending_calls:
207             if p.active():
208                 p.cancel()
209         self.pending_calls = []
210         if self.manager:
211             self.manager.close()
212             self.manager = None