From: Cameron Dale Date: Wed, 16 Apr 2008 03:36:39 +0000 (-0700) Subject: Make the downloader statistics work. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=a9f0deccc4673d5332622ce40407ff009af6c8a3 Make the downloader statistics work. The previous (untested) commit was not working at all, this one now does, though the TODO mentions some future work. --- diff --git a/TODO b/TODO index dd9524a..7a586fe 100644 --- a/TODO +++ b/TODO @@ -15,6 +15,21 @@ distributions. They need to be dealt with properly by adding them to the tracking done by the AptPackages module. +Improve the downloaded and uploaded data measurements. + +There are 2 places that this data is measured: for statistics, and for +limiting the upload bandwidth. They both have deficiencies as they +sometimes miss the headers or the requests sent out. The upload +bandwidth calculation only considers the stream in the upload and not +the headers sent, and it also doesn't consider the upload bandwidth +from requesting downloads from peers (though that may be a good thing). +The statistics calculations for downloads include the headers of +downloaded files, but not the requests received from peers for upload +files. The statistics for uploaded data only includes the files sent +and not the headers, and also misses the requests for downloads sent to +other peers. + + Consider storing deltas of packages. Instead of downloading full package files when a previous version of diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index 13cb237..f1488e5 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -17,6 +17,24 @@ from zope.interface import implements from apt_p2p_conf import version +class LoggingHTTPClientProtocol(HTTPClientProtocol): + """A modified client protocol that logs the number of bytes received.""" + + def __init__(self, factory, stats = None, mirror = False): + HTTPClientProtocol.__init__(self, factory) + self.stats = stats + self.mirror = mirror + + def lineReceived(self, line): + if self.stats: + self.stats.receivedBytes(len(line) + 2, self.mirror) + HTTPClientProtocol.lineReceived(self, line) + + def rawDataReceived(self, data): + if self.stats: + self.stats.receivedBytes(len(data), self.mirror) + HTTPClientProtocol.rawDataReceived(self, data) + class Peer(ClientFactory): """A manager for all HTTP requests to a single peer. @@ -28,9 +46,10 @@ class Peer(ClientFactory): implements(IHTTPClientManager) - def __init__(self, host, port=80): + def __init__(self, host, port = 80, stats = None): self.host = host self.port = port + self.stats = stats self.mirror = False self.rank = 0.5 self.busy = False @@ -55,7 +74,8 @@ class Peer(ClientFactory): """Connect to the peer.""" assert self.closed and not self.connecting self.connecting = True - d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) + d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self, + stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port) d.addCallbacks(self.connected, self.connectionError) def connected(self, proto): diff --git a/apt_p2p/HTTPServer.py b/apt_p2p/HTTPServer.py index 3d43fa0..87e235f 100644 --- a/apt_p2p/HTTPServer.py +++ b/apt_p2p/HTTPServer.py @@ -159,16 +159,16 @@ class UploadThrottlingProtocol(ThrottlingProtocol): def write(self, data): if self.throttle: ThrottlingProtocol.write(self, data) - if stats: - stats.sentBytes(len(data)) + if self.stats: + self.stats.sentBytes(len(data)) else: ProtocolWrapper.write(self, data) def writeSequence(self, seq): if self.throttle: ThrottlingProtocol.writeSequence(self, seq) - if stats: - stats.sentBytes(reduce(operator.add, map(len, seq))) + if self.stats: + self.stats.sentBytes(reduce(operator.add, map(len, seq))) else: ProtocolWrapper.writeSequence(self, seq) @@ -220,7 +220,8 @@ class TopLevel(resource.Resource): 'betweenRequestsTimeOut': 60}) self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit) self.factory.protocol = UploadThrottlingProtocol - self.factory.protocol.stats = self.manager.stats + if self.manager: + self.factory.protocol.stats = self.manager.stats return self.factory def render(self, ctx): diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 9108fde..7376697 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -141,13 +141,11 @@ class StreamToFile: @ivar position: the current file position to write the next data to @type length: C{int} @ivar length: the position in the file to not write beyond - @type inform: C{method} - @ivar inform: a function to call with the length of data received @type doneDefer: L{twisted.internet.defer.Deferred} @ivar doneDefer: the deferred that will fire when done writing """ - def __init__(self, inputStream, outFile, start = 0, length = None, inform = None): + def __init__(self, inputStream, outFile, start = 0, length = None): """Initializes the file. @type inputStream: L{twisted.web2.stream.IByteStream} @@ -160,9 +158,6 @@ class StreamToFile: @type length: C{int} @param length: the maximum amount of data to write to the file (optional, defaults to not limiting the writing to the file - @type inform: C{method} - @param inform: a function to call with the length of data received - (optional, defaults to calling nothing) """ self.stream = inputStream self.outFile = outFile @@ -171,7 +166,6 @@ class StreamToFile: self.length = None if length is not None: self.length = start + length - self.inform = inform self.doneDefer = None def run(self): @@ -201,7 +195,6 @@ class StreamToFile: self.outFile.write(data) self.hash.update(data) self.position += len(data) - self.inform(len(data)) def _done(self, result): """Return the result.""" @@ -505,8 +498,7 @@ class FileDownload: if parsed[0] == "http": site = splitHostPort(parsed[0], parsed[1]) self.mirror_path = urlunparse(('', '') + parsed[2:]) - peer = self.manager.getPeer(site) - peer.mirror = True + peer = self.manager.getPeer(site, mirror = True) self.peerlist.append(peer) # Special case if there's only one good peer left @@ -571,14 +563,11 @@ class FileDownload: else: # Read the response stream to the file log.msg('Streaming piece %d from peer %r' % (piece, peer)) - def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror): - stats.receivedBytes(bytes, mirror) if response.code == 206: df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, - PIECE_SIZE, statUpdate).run() + PIECE_SIZE).run() else: - df = StreamToFile(response.stream, self.file, - inform = statUpdate).run() + df = StreamToFile(response.stream, self.file).run() reactor.callLater(0, df.addCallbacks, *(self._gotPiece, self._gotError), **{'callbackArgs': (piece, peer), @@ -669,8 +658,7 @@ class PeerManager: assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0] site = splitHostPort(parsed[0], parsed[1]) path = urlunparse(('', '') + parsed[2:]) - peer = self.getPeer(site) - peer.mirror = True + peer = self.getPeer(site, mirror = True) return peer.get(path, method, modtime) # elif len(peers) == 1: # site = uncompact(peers[0]['c']) @@ -682,14 +670,18 @@ class PeerManager: tmpfile = self.cache_dir.child(hash.hexexpected()) return FileDownload(self, hash, mirror, peers, tmpfile).run() - def getPeer(self, site): + def getPeer(self, site, mirror = False): """Create a new peer if necessary and return it. @type site: (C{string}, C{int}) @param site: the IP address and port of the peer + @param mirror: whether the peer is actually a mirror + (optional, defaults to False) """ if site not in self.clients: - self.clients[site] = Peer(site[0], site[1]) + self.clients[site] = Peer(site[0], site[1], self.stats) + if mirror: + self.clients[site].mirror = True return self.clients[site] def close(self): diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index 889ebd9..f88fd79 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -88,8 +88,13 @@ class AptP2P: self.mirrors = MirrorManager(self.cache_dir) self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) self.my_contact = None + reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown) - #{ DHT maintenance + #{ Maintenance + def shutdown(self): + self.stats.save() + self.db.close() + def joinComplete(self, result): """Complete the DHT join process and determine our download information. diff --git a/apt_p2p/stats.py b/apt_p2p/stats.py index 640c57f..06e291b 100644 --- a/apt_p2p/stats.py +++ b/apt_p2p/stats.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta from StringIO import StringIO -from util import byte_format +from util import uncompact, byte_format class StatsLogger: """Store the statistics for the Khashmir DHT. @@ -39,8 +39,7 @@ class StatsLogger: """ # Database self.db = db - self.lastDBUpdate = datetime.now() - self.hashes, self.files = self.db.dbStats() + self.hashes, self.files = 0, 0 # Transport self.mirrorDown = 0L @@ -61,17 +60,6 @@ class StatsLogger: } self.db.saveStats(stats) - def dbStats(self): - """Collect some statistics about the database. - - @rtype: (C{int}, C{int}) - @return: the number of keys and values in the database - """ - if datetime.now() - self.lastDBUpdate > timedelta(minutes = 1): - self.lastDBUpdate = datetime.now() - self.hashes, self.files = self.db.keyStats() - return (self.hashes, self.files) - def formatHTML(self, contactAddress): """Gather statistics for the DHT and format them for display in a browser. @@ -79,7 +67,7 @@ class StatsLogger: @rtype: C{string} @return: the stats, formatted for display in the body of an HTML page """ - self.dbStats() + self.hashes, self.files = self.db.dbStats() out = StringIO() out.write('

Downloader Statistics

\n') @@ -89,7 +77,7 @@ class StatsLogger: # General out.write("\n") out.write("\n") - out.write("\n') + out.write("\n') out.write("

General

Value
Contact" + str(contactAdress) + '
Contact" + str(uncompact(contactAddress)) + '
\n") out.write('\n') @@ -110,22 +98,22 @@ class StatsLogger: out.write("" + byte_format(self.peerUp) + '') out.write("Session Ratio") out.write("%0.2f%%" % - (float(self.mirrorDown) / float(self.mirrorDown + self.peerDown), )) + (100.0 * float(self.mirrorDown) / float(max(self.mirrorDown + self.peerDown, 1)), )) out.write("%0.2f%%" % - (float(self.peerDown) / float(self.mirrorDown + self.peerDown), )) - out.write("%0.2f%%" % - (float(self.peerUp) / float(self.peerDown), )) + (100.0 * float(self.peerDown) / float(max(self.mirrorDown + self.peerDown, 1)), )) + out.write("%0.2f%%" % + (100.0 * float(self.peerUp) / float(max(self.mirrorDown + self.peerDown, 1)), )) out.write("All-Time") out.write("" + byte_format(self.mirrorAllDown) + '') out.write("" + byte_format(self.peerAllDown) + '') out.write("" + byte_format(self.peerAllUp) + '') out.write("All-Time Ratio") out.write("%0.2f%%" % - (float(self.mirrorAllDown) / float(self.mirrorAllDown + self.peerAllDown), )) + (100.0 * float(self.mirrorAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), )) out.write("%0.2f%%" % - (float(self.peerAllDown) / float(self.mirrorAllDown + self.peerAllDown), )) - out.write("%0.2f%%" % - (float(self.peerAllUp) / float(self.peerAllDown), )) + (100.0 * float(self.peerAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), )) + out.write("%0.2f%%" % + (100.0 * float(self.peerAllUp) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), )) out.write("\n") out.write("\n") out.write("\n") diff --git a/apt_p2p/util.py b/apt_p2p/util.py index 3d1a50e..ec0287c 100644 --- a/apt_p2p/util.py +++ b/apt_p2p/util.py @@ -160,11 +160,15 @@ def byte_format(s): @param s: the number of bytes @rtype: C{string} @return: the formatted size with appropriate units - """ - - if (s < 1024): - r = str(s) + 'B' + if (s < 1): + r = str(int(s*1000.0)/1000.0) + 'B' + elif (s < 10): + r = str(int(s*100.0)/100.0) + 'B' + elif (s < 102): + r = str(int(s*10.0)/10.0) + 'B' + elif (s < 1024): + r = str(int(s)) + 'B' elif (s < 10485): r = str(int((s/1024.0)*100.0)/100.0) + 'KiB' elif (s < 104857): diff --git a/apt_p2p_Khashmir/util.py b/apt_p2p_Khashmir/util.py index 39b4ce0..55bd45a 100644 --- a/apt_p2p_Khashmir/util.py +++ b/apt_p2p_Khashmir/util.py @@ -69,11 +69,15 @@ def byte_format(s): @param s: the number of bytes @rtype: C{string} @return: the formatted size with appropriate units - """ - - if (s < 1024): - r = str(s) + 'B' + if (s < 1): + r = str(int(s*1000.0)/1000.0) + 'B' + elif (s < 10): + r = str(int(s*100.0)/100.0) + 'B' + elif (s < 102): + r = str(int(s*10.0)/10.0) + 'B' + elif (s < 1024): + r = str(int(s)) + 'B' elif (s < 10485): r = str(int((s/1024.0)*100.0)/100.0) + 'KiB' elif (s < 104857):