providing a later piece. This doesn't work if the differing piece is the
first piece, in which case it is downloaded from a 3rd peer, with
consensus revealing the misbehaving peer.
-
-
-Store and share torrent-like strings for large files.
-
-In addition to storing the file download location (which would still be
-used for small files), a bencoded dictionary containing the peer's
-hashes of the individual pieces could be stored for the larger files
-(20% of all the files are larger than 512 KB). This dictionary would
-have the normal piece size, the hash length, and a string containing the
-piece hashes of length <hash length>*<#pieces>. These piece hashes could
-be compared ahead of time to determine which peers have the same piece
-hashes (they all should), and then used during the download to verify
-the downloaded pieces.
-
-For very large files (5 or more pieces), the torrent strings are too
-long to store in the DHT and retrieve (a single UDP packet should be
-less than 1472 bytes to avoid fragmentation). Instead, the peers should
-store the torrent-like string for large files separately, and only
-contain a reference to it in their stored value for the hash of the
-file. The reference would be a hash of the bencoded dictionary. If the
-torrent-like string is short enough to store in the DHT (i.e. less than
-1472 bytes, or about 70 pieces for the SHA1 hash), then a
-lookup of that hash in the DHT would give the torrent-like string.
-Otherwise, a request to the peer for the hash (just like files are
-downloaded), should return the bencoded torrent-like string.
-
-
-PeerManager needs to track peers' properties.
-
-The PeerManager needs to keep track of the observed properties of seen
-peers, to help determine a selection criteria for choosing peers to
-download from. Each property will give a value from 0 to 1. The relevant
-properties are:
-
- - hash errors in last day (1 = 0, 0 = 3+)
- - recent download speed (1 = fastest, 0 = 0)
- - lag time from request to download (1 = 0, 0 = 15s+)
- - number of pending requests (1 = 0, 0 = max (10))
- - whether a connection is open (1 = yes, 0.9 = no)
-
-These should be combined (multiplied) to provide a sort order for peers
-available to download from, which can then be used to assign new
-downloads to peers. Pieces should be downloaded from the best peers
-first (i.e. piece 0 from the absolute best peer).
+from math import exp
+from datetime import datetime, timedelta
+
from twisted.internet import reactor, defer, protocol
from twisted.internet.protocol import ClientFactory
from twisted import version as twisted_version
from apt_dht_conf import version
-class HTTPClientManager(ClientFactory):
- """A manager for all HTTP requests to a single site.
+class Peer(ClientFactory):
+ """A manager for all HTTP requests to a single peer.
- Controls all requests that got to a single site (host and port).
+ Controls all requests that go to a single peer (host and port).
This includes buffering requests until they can be sent and reconnecting
- in the even of the connection being closed.
+ in the event of the connection being closed.
"""
self.response_queue = []
self.proto = None
self.connector = None
+ self._errors = 0
+ self._completed = 0
+ self._downloadSpeeds = []
+ self._lastResponse = None
+ self._responseTimes = []
def connect(self):
assert self.closed and not self.connecting
if not self.closed:
self.proto.transport.loseConnection()
- def is_idle(self):
- return not self.busy and not self.request_queue and not self.response_queue
-
def submitRequest(self, request):
+ request.submissionTime = datetime.now()
request.deferRequest = defer.Deferred()
self.request_queue.append(request)
self.processQueue()
req.deferResponse.addCallbacks(self.requestComplete, self.requestError)
def requestComplete(self, resp):
+ self._processLastResponse()
req = self.response_queue.pop(0)
log.msg('%s of %s completed with code %d' % (req.method, req.uri, resp.code))
+ self._completed += 1
+ if resp.code >= 400:
+ self._errors += 1
+ now = datetime.now()
+ self._responseTimes.append((now, now - req.submissionTime))
+ self._lastResponse = (now, resp.stream.length)
req.deferRequest.callback(resp)
def requestError(self, error):
+ self._processLastResponse()
req = self.response_queue.pop(0)
log.msg('Download of %s generated error %r' % (req.uri, error))
+ self._completed += 1
+ self._errors += 1
req.deferRequest.errback(error)
+
+ def hashError(self, error):
+ """Log that a hash error occurred from the peer."""
+ log.msg('Hash error from peer (%s, %d): %r' % (self.host, self.port, error))
+ self._errors += 1
+ # The IHTTPClientManager interface functions
def clientBusy(self, proto):
self.busy = True
def clientIdle(self, proto):
+ self._processLastResponse()
self.busy = False
self.processQueue()
self.processQueue()
def clientGone(self, proto):
+ self._processLastResponse()
for req in self.response_queue:
req.deferRequest.errback(ProtocolError('lost connection'))
self.busy = False
if self.request_queue:
self.processQueue()
+ # The downloading request interface functions
def setCommonHeaders(self):
headers = http_headers.Headers()
headers.setHeader('Host', self.host)
headers.setHeader('Range', ('bytes', [(rangeStart, rangeEnd)]))
return self.submitRequest(ClientRequest(method, path, headers, None))
+ # Functions that return information about the peer
+ def isIdle(self):
+ return not self.busy and not self.request_queue and not self.response_queue
+
+ def _processLastResponse(self):
+ if self._lastResponse is not None:
+ now = datetime.now()
+ self._downloadSpeeds.append((now, now - self._lastResponse[0], self._lastResponse[1]))
+ self._lastResponse = None
+
+ def downloadSpeed(self):
+ """Gets the latest average download speed for the peer.
+
+ The average is over the last 10 responses that occurred in the last hour.
+ """
+ total_time = 0.0
+ total_download = 0
+ now = datetime.now()
+ while self._downloadSpeeds and (len(self._downloadSpeeds) > 10 or
+ now - self._downloadSpeeds[0][0] > timedelta(seconds=3600)):
+ self._downloadSpeeds.pop(0)
+
+ # If there are none, then you get 0
+ if not self._downloadSpeeds:
+ return 0.0
+
+ for download in self._downloadSpeeds:
+ total_time += download[1].days*86400.0 + download[1].seconds + download[1].microseconds/1000000.0
+ total_download += download[2]
+
+ return total_download / total_time
+
+ def responseTime(self):
+ """Gets the latest average response time for the peer.
+
+ Response time is the time from receiving the request, to the time
+ the download begins. The average is over the last 10 responses that
+ occurred in the last hour.
+ """
+ total_response = 0.0
+ now = datetime.now()
+ while self._responseTimes and (len(self._responseTimes) > 10 or
+ now - self._responseTimes[0][0] > timedelta(seconds=3600)):
+ self._responseTimes.pop(0)
+
+ # If there are none, give it the benefit of the doubt
+ if not self._responseTimes:
+ return 0.0
+
+ for response in self._responseTimes:
+ total_response += response[1].days*86400.0 + response[1].seconds + response[1].microseconds/1000000.0
+
+ return total_response / len(self._responseTimes)
+
+ def rank(self, fastest):
+ """Determine the ranking value for the peer.
+
+ The ranking value is composed of 5 numbers:
+ - 1 if a connection to the peer is open, 0.9 otherwise
+ - 1 if there are no pending requests, to 0 if there are a maximum
+ - 1 if the peer is the fastest of all peers, to 0 if the speed is 0
+ - 1 if all requests are good, 0 if all produced errors
+ - an exponentially decreasing number based on the response time
+ """
+ rank = 1.0
+ if self.closed:
+ rank *= 0.9
+ rank *= (max(0.0, 10.0 - len(self.request_queue) - len(self.response_queue))) / 10.0
+ if fastest > 0.0:
+ rank *= min(1.0, self.downloadSpeed() / fastest)
+ if self._completed:
+ rank *= max(0.0, 1.0 - float(self._errors) / self._completed)
+ rank *= exp(-self.responseTime() / 5.0)
+ return rank
+
class TestClientManager(unittest.TestCase):
- """Unit tests for the HTTPClientManager."""
+ """Unit tests for the Peer."""
client = None
pending_calls = []
def test_download(self):
host = 'www.ietf.org'
- self.client = HTTPClientManager(host, 80)
+ self.client = Peer(host, 80)
self.timeout = 10
d = self.client.get('/rfc/rfc0013.txt')
def test_head(self):
host = 'www.ietf.org'
- self.client = HTTPClientManager(host, 80)
+ self.client = Peer(host, 80)
self.timeout = 10
d = self.client.get('/rfc/rfc0013.txt', "HEAD")
def test_multiple_downloads(self):
host = 'www.ietf.org'
- self.client = HTTPClientManager(host, 80)
+ self.client = Peer(host, 80)
self.timeout = 120
lastDefer = defer.Deferred()
def test_multiple_quick_downloads(self):
host = 'www.ietf.org'
- self.client = HTTPClientManager(host, 80)
+ self.client = Peer(host, 80)
self.timeout = 30
lastDefer = defer.Deferred()
self.pending_calls.append(reactor.callLater(0, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
return lastDefer
+ def checkInfo(self):
+ log.msg('Rank is: %r' % self.client.rank(250.0*1024))
+ log.msg('Download speed is: %r' % self.client.downloadSpeed())
+ log.msg('Response Time is: %r' % self.client.responseTime())
+
+ def test_peer_info(self):
+ host = 'www.ietf.org'
+ self.client = Peer(host, 80)
+ self.timeout = 120
+ lastDefer = defer.Deferred()
+
+ def newRequest(path, num, expect, last=False):
+ d = self.client.get(path)
+ d.addCallback(self.gotResp, num, expect)
+ if last:
+ d.addBoth(lastDefer.callback)
+
+ newRequest("/rfc/rfc0006.txt", 1, 1776)
+ newRequest("/rfc/rfc2362.txt", 2, 159833)
+ newRequest("/rfc/rfc0801.txt", 3, 40824)
+ self.pending_calls.append(reactor.callLater(1, newRequest, '/rfc/rfc0013.txt', 4, 1070))
+ self.pending_calls.append(reactor.callLater(10, newRequest, '/rfc/rfc0022.txt', 5, 4606))
+ self.pending_calls.append(reactor.callLater(30, newRequest, '/rfc/rfc0048.txt', 6, 41696))
+ self.pending_calls.append(reactor.callLater(31, newRequest, '/rfc/rfc3261.txt', 7, 647976))
+ self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0014.txt', 8, 27))
+ self.pending_calls.append(reactor.callLater(32, newRequest, '/rfc/rfc0001.txt', 9, 21088))
+ self.pending_calls.append(reactor.callLater(62, newRequest, '/rfc/rfc2801.txt', 0, 598794, True))
+
+ for i in xrange(2, 122, 2):
+ self.pending_calls.append(reactor.callLater(i, self.checkInfo))
+
+ return lastDefer
+
def test_range(self):
host = 'www.ietf.org'
- self.client = HTTPClientManager(host, 80)
+ self.client = Peer(host, 80)
self.timeout = 10
d = self.client.getRange('/rfc/rfc0013.txt', 100, 199)