From d900237088b7832d2554c31b7436977bc5669348 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 28 Feb 2008 20:54:45 -0800 Subject: [PATCH] Add property tracking to downloads from peers. --- TODO | 44 ----------- apt_dht/HTTPDownloader.py | 159 ++++++++++++++++++++++++++++++++++---- apt_dht/HTTPServer.py | 2 +- apt_dht/PeerManager.py | 15 ++-- 4 files changed, 153 insertions(+), 67 deletions(-) diff --git a/TODO b/TODO index bd24953..b495193 100644 --- a/TODO +++ b/TODO @@ -47,47 +47,3 @@ originally provided the piece is probably at fault, since he is now providing a later piece. This doesn't work if the differing piece is the first piece, in which case it is downloaded from a 3rd peer, with consensus revealing the misbehaving peer. - - -Store and share torrent-like strings for large files. - -In addition to storing the file download location (which would still be -used for small files), a bencoded dictionary containing the peer's -hashes of the individual pieces could be stored for the larger files -(20% of all the files are larger than 512 KB). This dictionary would -have the normal piece size, the hash length, and a string containing the -piece hashes of length *<#pieces>. These piece hashes could -be compared ahead of time to determine which peers have the same piece -hashes (they all should), and then used during the download to verify -the downloaded pieces. - -For very large files (5 or more pieces), the torrent strings are too -long to store in the DHT and retrieve (a single UDP packet should be -less than 1472 bytes to avoid fragmentation). Instead, the peers should -store the torrent-like string for large files separately, and only -contain a reference to it in their stored value for the hash of the -file. The reference would be a hash of the bencoded dictionary. If the -torrent-like string is short enough to store in the DHT (i.e. less than -1472 bytes, or about 70 pieces for the SHA1 hash), then a -lookup of that hash in the DHT would give the torrent-like string. -Otherwise, a request to the peer for the hash (just like files are -downloaded), should return the bencoded torrent-like string. - - -PeerManager needs to track peers' properties. - -The PeerManager needs to keep track of the observed properties of seen -peers, to help determine a selection criteria for choosing peers to -download from. Each property will give a value from 0 to 1. The relevant -properties are: - - - hash errors in last day (1 = 0, 0 = 3+) - - recent download speed (1 = fastest, 0 = 0) - - lag time from request to download (1 = 0, 0 = 15s+) - - number of pending requests (1 = 0, 0 = max (10)) - - whether a connection is open (1 = yes, 0.9 = no) - -These should be combined (multiplied) to provide a sort order for peers -available to download from, which can then be used to assign new -downloads to peers. Pieces should be downloaded from the best peers -first (i.e. piece 0 from the absolute best peer). diff --git a/apt_dht/HTTPDownloader.py b/apt_dht/HTTPDownloader.py index e028a3b..2d91816 100644 --- a/apt_dht/HTTPDownloader.py +++ b/apt_dht/HTTPDownloader.py @@ -1,4 +1,7 @@ +from math import exp +from datetime import datetime, timedelta + from twisted.internet import reactor, defer, protocol from twisted.internet.protocol import ClientFactory from twisted import version as twisted_version @@ -12,12 +15,12 @@ from zope.interface import implements from apt_dht_conf import version -class HTTPClientManager(ClientFactory): - """A manager for all HTTP requests to a single site. +class Peer(ClientFactory): + """A manager for all HTTP requests to a single peer. - Controls all requests that got to a single site (host and port). + Controls all requests that go to a single peer (host and port). This includes buffering requests until they can be sent and reconnecting - in the even of the connection being closed. + in the event of the connection being closed. """ @@ -34,6 +37,11 @@ class HTTPClientManager(ClientFactory): self.response_queue = [] self.proto = None self.connector = None + self._errors = 0 + self._completed = 0 + self._downloadSpeeds = [] + self._lastResponse = None + self._responseTimes = [] def connect(self): assert self.closed and not self.connecting @@ -51,10 +59,8 @@ class HTTPClientManager(ClientFactory): if not self.closed: self.proto.transport.loseConnection() - def is_idle(self): - return not self.busy and not self.request_queue and not self.response_queue - def submitRequest(self, request): + request.submissionTime = datetime.now() request.deferRequest = defer.Deferred() self.request_queue.append(request) self.processQueue() @@ -79,19 +85,36 @@ class HTTPClientManager(ClientFactory): req.deferResponse.addCallbacks(self.requestComplete, self.requestError) def requestComplete(self, resp): + self._processLastResponse() req = self.response_queue.pop(0) log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code)) + self._completed += 1 + if resp.code >= 400: + self._errors += 1 + now = datetime.now() + self._responseTimes.append((now, now - req.submissionTime)) + self._lastResponse = (now, resp.stream.length) req.deferRequest.callback(resp) def requestError(self, error): + self._processLastResponse() req = self.response_queue.pop(0) log.msg('Download of %s generated error %r' % (req.uri, error)) + self._completed += 1 + self._errors += 1 req.deferRequest.errback(error) + + def hashError(self, error): + """Log that a hash error occurred from the peer.""" + log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error)) + self._errors += 1 + # The IHTTPClientManager interface functions def clientBusy(self, proto): self.busy = True def clientIdle(self, proto): + self._processLastResponse() self.busy = False self.processQueue() @@ -100,6 +123,7 @@ class HTTPClientManager(ClientFactory): self.processQueue() def clientGone(self, proto): + self._processLastResponse() for req in self.response_queue: req.deferRequest.errback(ProtocolError('lost connection')) self.busy = False @@ -111,6 +135,7 @@ class HTTPClientManager(ClientFactory): if self.request_queue: self.processQueue() + # The downloading request interface functions def setCommonHeaders(self): headers = http_headers.Headers() headers.setHeader('Host', self.host) @@ -129,8 +154,83 @@ class HTTPClientManager(ClientFactory): headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)])) return self.submitRequest(ClientRequest(method, path, headers, None)) + # Functions that return information about the peer + def isIdle(self): + return not self.busy and not self.request_queue and not self.response_queue + + def _processLastResponse(self): + if self._lastResponse is not None: + now = datetime.now() + self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1])) + self._lastResponse = None + + def downloadSpeed(self): + """Gets the latest average download speed for the peer. + + The average is over the last 10 responses that occurred in the last hour. + """ + total_time = 0.0 + total_download = 0 + now = datetime.now() + while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or + now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)): + self._downloadSpeeds.pop(0) + + # If there are none, then you get 0 + if not self._downloadSpeeds: + return 0.0 + + for download in self._downloadSpeeds: + total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0 + total_download += download[2] + + return total_download / total_time + + def responseTime(self): + """Gets the latest average response time for the peer. + + Response time is the time from receiving the request, to the time + the download begins. The average is over the last 10 responses that + occurred in the last hour. + """ + total_response = 0.0 + now = datetime.now() + while self._responseTimes and (len(self._responseTimes) > 10 or + now - self._responseTimes[0][0] > timedelta(seconds=3600)): + self._responseTimes.pop(0) + + # If there are none, give it the benefit of the doubt + if not self._responseTimes: + return 0.0 + + for response in self._responseTimes: + total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0 + + return total_response / len(self._responseTimes) + + def rank(self, fastest): + """Determine the ranking value for the peer. + + The ranking value is composed of 5 numbers: + - 1 if a connection to the peer is open, 0.9 otherwise + - 1 if there are no pending requests, to 0 if there are a maximum + - 1 if the peer is the fastest of all peers, to 0 if the speed is 0 + - 1 if all requests are good, 0 if all produced errors + - an exponentially decreasing number based on the response time + """ + rank = 1.0 + if self.closed: + rank *= 0.9 + rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0 + if fastest > 0.0: + rank *= min(1.0, self.downloadSpeed() / fastest) + if self._completed: + rank *= max(0.0, 1.0 - float(self._errors) / self._completed) + rank *= exp(-self.responseTime() / 5.0) + return rank + class TestClientManager(unittest.TestCase): - """Unit tests for the HTTPClientManager.""" + """Unit tests for the Peer.""" client = None pending_calls = [] @@ -147,7 +247,7 @@ class TestClientManager(unittest.TestCase): def test_download(self): host = 'www.ietf.org' - self.client = HTTPClientManager(host, 80) + self.client = Peer(host, 80) self.timeout = 10 d = self.client.get('/rfc/rfc0013.txt') @@ -156,7 +256,7 @@ class TestClientManager(unittest.TestCase): def test_head(self): host = 'www.ietf.org' - self.client = HTTPClientManager(host, 80) + self.client = Peer(host, 80) self.timeout = 10 d = self.client.get('/rfc/rfc0013.txt', "HEAD") @@ -165,7 +265,7 @@ class TestClientManager(unittest.TestCase): def test_multiple_downloads(self): host = 'www.ietf.org' - self.client = HTTPClientManager(host, 80) + self.client = Peer(host, 80) self.timeout = 120 lastDefer = defer.Deferred() @@ -189,7 +289,7 @@ class TestClientManager(unittest.TestCase): def test_multiple_quick_downloads(self): host = 'www.ietf.org' - self.client = HTTPClientManager(host, 80) + self.client = Peer(host, 80) self.timeout = 30 lastDefer = defer.Deferred() @@ -211,9 +311,42 @@ class TestClientManager(unittest.TestCase): self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True)) return lastDefer + def checkInfo(self): + log.msg('Rank is: %r' % self.client.rank(250.0*1024)) + log.msg('Download speed is: %r' % self.client.downloadSpeed()) + log.msg('Response Time is: %r' % self.client.responseTime()) + + def test_peer_info(self): + host = 'www.ietf.org' + self.client = Peer(host, 80) + self.timeout = 120 + lastDefer = defer.Deferred() + + def newRequest(path, num, expect, last=False): + d = self.client.get(path) + d.addCallback(self.gotResp, num, expect) + if last: + d.addBoth(lastDefer.callback) + + newRequest("/rfc/rfc0006.txt", 1, 1776) + newRequest("/rfc/rfc2362.txt", 2, 159833) + newRequest("/rfc/rfc0801.txt", 3, 40824) + self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070)) + self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606)) + self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696)) + self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976)) + self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27)) + self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088)) + self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True)) + + for i in xrange(2, 122, 2): + self.pending_calls.append(reactor.callLater(i, self.checkInfo)) + + return lastDefer + def test_range(self): host = 'www.ietf.org' - self.client = HTTPClientManager(host, 80) + self.client = Peer(host, 80) self.timeout = 10 d = self.client.getRange('/rfc/rfc0013.txt', 100, 199) diff --git a/apt_dht/HTTPServer.py b/apt_dht/HTTPServer.py index 5375547..b81282c 100644 --- a/apt_dht/HTTPServer.py +++ b/apt_dht/HTTPServer.py @@ -140,7 +140,7 @@ class TopLevel(resource.Resource): return FileUploader(files[0]['path'].path), () else: log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr)) - return static.Data(bencode({'t': files[0]['pieces']}), 'application/bencoded'), () + return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), () else: log.msg('Hash could not be found in database: %s' % hash) diff --git a/apt_dht/PeerManager.py b/apt_dht/PeerManager.py index b561b8e..7e3d127 100644 --- a/apt_dht/PeerManager.py +++ b/apt_dht/PeerManager.py @@ -9,7 +9,7 @@ from twisted.trial import unittest from twisted.web2 import stream as stream_mod from twisted.web2.http import splitHostPort -from HTTPDownloader import HTTPClientManager +from HTTPDownloader import Peer from util import uncompact class PeerManager: @@ -26,23 +26,20 @@ class PeerManager: compact_peer = choice(peers) peer = uncompact(compact_peer['c']) log.msg('Downloading from peer %r' % (peer, )) - host, port = peer + site = peer path = '/~/' + quote_plus(hash.expected()) else: log.msg('Downloading (%s) from mirror %s' % (method, mirror)) parsed = urlparse(mirror) assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0] - host, port = splitHostPort(parsed[0], parsed[1]) + site = splitHostPort(parsed[0], parsed[1]) path = urlunparse(('', '') + parsed[2:]) - return self.getPeer(host, port, path, method, modtime) + return self.getPeer(site, path, method, modtime) - def getPeer(self, host, port, path, method="GET", modtime=None): - if not port: - port = 80 - site = host + ":" + str(port) + def getPeer(self, site, path, method="GET", modtime=None): if site not in self.clients: - self.clients[site] = HTTPClientManager(host, port) + self.clients[site] = Peer(site[0], site[1]) return self.clients[site].get(path, method, modtime) def close(self): -- 2.39.5