2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
8 from twisted.internet import reactor, defer
9 from twisted.python import log
10 from twisted.trial import unittest
11 from twisted.web2 import stream as stream_mod
12 from twisted.web2.http import splitHostPort
14 from HTTPDownloader import Peer
15 from util import uncompact
17 class FileDownload(defer.Deferred):
18 """Manage a download from a list of peers or a mirror.
23 def __init__(self, manager, hash, mirror, compact_peers):
24 """Initialize the instance.
26 @type hash: L{Hash.HashObject}
27 @param hash: the hash object containing the expected hash for the file
28 @param mirror: the URI of the file on the mirror
29 @type compact_peers: C{list} of C{string}
30 @param compact_peers: a list of the peer info where the file can be found
32 defer.Deferred.__init__(self)
33 self.manager = manager
43 for compact_peer in compact_peers:
44 # Build a list of all the peers for this download
45 site = uncompact(compact_peer['c'])
46 peer = manager.getPeer(site)
47 self.peers[site] = peer
49 # Extract any piece information from the peers list
50 if 't' in compact_peer:
51 pieces_string.setdefault(compact_peer['t']['t'], 0)
52 pieces_string[compact_peer['t']['t']] += 1
53 elif 'h' in compact_peer:
54 pieces_hash.setdefault(compact_peer['h'], 0)
55 pieces_hash[compact_peer['h']] += 1
56 elif 'l' in compact_peer:
57 pieces_dl_hash.setdefault(compact_peer['l'], 0)
58 pieces_dl_hash[compact_peer['l']] += 1
62 max_found = max(no_pieces, max(pieces_string.values()),
63 max(pieces_hash.values()), max(pieces_dl_hash.values()))
65 if max_found == no_pieces:
68 if max_found < len(self.peers):
70 elif max_found == max(pieces_string.values()):
75 """Sort peers by their rank."""
84 """Manage a set of peers and the requests to them.
86 @type clients: C{dictionary}
87 @ivar clients: the available peers that have been previously contacted
91 """Initialize the instance."""
94 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
95 """Download from a list of peers or fallback to a mirror.
97 @type hash: L{Hash.HashObject}
98 @param hash: the hash object containing the expected hash for the file
99 @param mirror: the URI of the file on the mirror
100 @type peers: C{list} of C{string}
101 @param peers: a list of the peer info where the file can be found
102 (optional, defaults to downloading from the mirror)
103 @type method: C{string}
104 @param method: the HTTP method to use, 'GET' or 'HEAD'
105 (optional, defaults to 'GET')
106 @type modtime: C{int}
107 @param modtime: the modification time to use for an 'If-Modified-Since'
108 header, as seconds since the epoch
109 (optional, defaults to not sending that header)
111 if not peers or method != "GET" or modtime is not None:
112 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
113 parsed = urlparse(mirror)
114 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
115 site = splitHostPort(parsed[0], parsed[1])
116 path = urlunparse(('', '') + parsed[2:])
117 peer = self.getPeer(site)
118 return peer.get(path, method, modtime)
119 elif len(peers) == 1:
120 site = uncompact(peers[0]['c'])
121 log.msg('Downloading from peer %r' % (site, ))
122 path = '/~/' + quote_plus(hash.expected())
123 peer = self.getPeer(site)
124 return peer.get(path)
126 FileDownload(self, hash, mirror, peers)
129 def getPeer(self, site):
130 """Create a new peer if necessary and return it.
132 @type site: (C{string}, C{int})
133 @param site: the IP address and port of the peer
135 if site not in self.clients:
136 self.clients[site] = Peer(site[0], site[1])
137 return self.clients[site]
140 """Close all the connections to peers."""
141 for site in self.clients:
142 self.clients[site].close()
145 class TestPeerManager(unittest.TestCase):
146 """Unit tests for the PeerManager."""
151 def gotResp(self, resp, num, expect):
152 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
153 if expect is not None:
154 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
159 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
161 def test_download(self):
162 """Tests a normal download."""
163 self.manager = PeerManager()
166 host = 'www.ietf.org'
167 d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
168 d.addCallback(self.gotResp, 1, 1070)
172 """Tests a 'HEAD' request."""
173 self.manager = PeerManager()
176 host = 'www.ietf.org'
177 d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
178 d.addCallback(self.gotResp, 1, 0)
181 def test_multiple_downloads(self):
182 """Tests multiple downloads with queueing and connection closing."""
183 self.manager = PeerManager()
185 lastDefer = defer.Deferred()
187 def newRequest(host, path, num, expect, last=False):
188 d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
189 d.addCallback(self.gotResp, num, expect)
191 d.addBoth(lastDefer.callback)
193 newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
194 newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
195 newRequest('www.google.ca', "/", 3, None)
196 self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
197 self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
198 self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
199 self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
200 self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
201 self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
202 self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
206 for p in self.pending_calls:
209 self.pending_calls = []