2 """Manage a set of peers and the requests to them."""
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
8 from twisted.internet import reactor, defer
9 from twisted.python import log
10 from twisted.trial import unittest
11 from twisted.web2 import stream as stream_mod
12 from twisted.web2.http import splitHostPort
14 from HTTPDownloader import Peer
15 from util import uncompact
18 """Manage a set of peers and the requests to them.
20 @type clients: C{dictionary}
21 @ivar clients: the available peers that have been previously contacted
25 """Initialize the instance."""
28 def get(self, hash, mirror, peers = [], method="GET", modtime=None):
29 """Download from a list of peers or fallback to a mirror.
31 @type hash: L{Hash.HashObject}
32 @param hash: the hash object containing the expected hash for the file
33 @param mirror: the URI of the file on the mirror
34 @type peers: C{list} of C{string}
35 @param peers: a list of the peer info where the file can be found
36 (optional, defaults to downloading from the mirror)
37 @type method: C{string}
38 @param method: the HTTP method to use, 'GET' or 'HEAD'
39 (optional, defaults to 'GET')
41 @param modtime: the modification time to use for an 'If-Modified-Since'
42 header, as seconds since the epoch
43 (optional, defaults to not sending that header)
46 # Choose one of the peers at random
47 compact_peer = choice(peers)
48 peer = uncompact(compact_peer['c'])
49 log.msg('Downloading from peer %r' % (peer, ))
51 path = '/~/' + quote_plus(hash.expected())
53 log.msg('Downloading (%s) from mirror %s' % (method, mirror))
54 parsed = urlparse(mirror)
55 assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
56 site = splitHostPort(parsed[0], parsed[1])
57 path = urlunparse(('', '') + parsed[2:])
59 return self.getPeer(site, path, method, modtime)
61 def getPeer(self, site, path, method="GET", modtime=None):
62 """Create a new peer if necessary and forward the request to it.
64 @type site: (C{string}, C{int})
65 @param site: the IP address and port of the peer
67 @param path: the path to the file on the peer
68 @type method: C{string}
69 @param method: the HTTP method to use, 'GET' or 'HEAD'
70 (optional, defaults to 'GET')
72 @param modtime: the modification time to use for an 'If-Modified-Since'
73 header, as seconds since the epoch
74 (optional, defaults to not sending that header)
76 if site not in self.clients:
77 self.clients[site] = Peer(site[0], site[1])
78 return self.clients[site].get(path, method, modtime)
81 """Close all the connections to peers."""
82 for site in self.clients:
83 self.clients[site].close()
86 class TestPeerManager(unittest.TestCase):
87 """Unit tests for the PeerManager."""
92 def gotResp(self, resp, num, expect):
93 self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
94 if expect is not None:
95 self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
100 stream_mod.readStream(resp.stream, print_).addCallback(printdone)
102 def test_download(self):
103 """Tests a normal download."""
104 self.manager = PeerManager()
107 host = 'www.ietf.org'
108 d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
109 d.addCallback(self.gotResp, 1, 1070)
113 """Tests a 'HEAD' request."""
114 self.manager = PeerManager()
117 host = 'www.ietf.org'
118 d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
119 d.addCallback(self.gotResp, 1, 0)
122 def test_multiple_downloads(self):
123 """Tests multiple downloads with queueing and connection closing."""
124 self.manager = PeerManager()
126 lastDefer = defer.Deferred()
128 def newRequest(host, path, num, expect, last=False):
129 d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
130 d.addCallback(self.gotResp, num, expect)
132 d.addBoth(lastDefer.callback)
134 newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
135 newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
136 newRequest('www.google.ca', "/", 3, None)
137 self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
138 self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
139 self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
140 self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
141 self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
142 self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
143 self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
147 for p in self.pending_calls:
150 self.pending_calls = []