adding them to the tracking done by the AptPackages module.
+Improve the downloaded and uploaded data measurements.
+
+There are 2 places that this data is measured: for statistics, and for
+limiting the upload bandwidth. They both have deficiencies as they
+sometimes miss the headers or the requests sent out. The upload
+bandwidth calculation only considers the stream in the upload and not
+the headers sent, and it also doesn't consider the upload bandwidth
+from requesting downloads from peers (though that may be a good thing).
+The statistics calculations for downloads include the headers of
+downloaded files, but not the requests received from peers for upload
+files. The statistics for uploaded data only includes the files sent
+and not the headers, and also misses the requests for downloads sent to
+other peers.
+
+
Consider storing deltas of packages.
Instead of downloading full package files when a previous version of
from apt_p2p_conf import version
+class LoggingHTTPClientProtocol(HTTPClientProtocol):
+ """A modified client protocol that logs the number of bytes received."""
+
+ def __init__(self, factory, stats = None, mirror = False):
+ HTTPClientProtocol.__init__(self, factory)
+ self.stats = stats
+ self.mirror = mirror
+
+ def lineReceived(self, line):
+ if self.stats:
+ self.stats.receivedBytes(len(line) + 2, self.mirror)
+ HTTPClientProtocol.lineReceived(self, line)
+
+ def rawDataReceived(self, data):
+ if self.stats:
+ self.stats.receivedBytes(len(data), self.mirror)
+ HTTPClientProtocol.rawDataReceived(self, data)
+
class Peer(ClientFactory):
"""A manager for all HTTP requests to a single peer.
implements(IHTTPClientManager)
- def __init__(self, host, port=80):
+ def __init__(self, host, port = 80, stats = None):
self.host = host
self.port = port
+ self.stats = stats
self.mirror = False
self.rank = 0.5
self.busy = False
"""Connect to the peer."""
assert self.closed and not self.connecting
self.connecting = True
- d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
+ d = protocol.ClientCreator(reactor, LoggingHTTPClientProtocol, self,
+ stats = self.stats, mirror = self.mirror).connectTCP(self.host, self.port)
d.addCallbacks(self.connected, self.connectionError)
def connected(self, proto):
def write(self, data):
if self.throttle:
ThrottlingProtocol.write(self, data)
- if stats:
- stats.sentBytes(len(data))
+ if self.stats:
+ self.stats.sentBytes(len(data))
else:
ProtocolWrapper.write(self, data)
def writeSequence(self, seq):
if self.throttle:
ThrottlingProtocol.writeSequence(self, seq)
- if stats:
- stats.sentBytes(reduce(operator.add, map(len, seq)))
+ if self.stats:
+ self.stats.sentBytes(reduce(operator.add, map(len, seq)))
else:
ProtocolWrapper.writeSequence(self, seq)
'betweenRequestsTimeOut': 60})
self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
self.factory.protocol = UploadThrottlingProtocol
- self.factory.protocol.stats = self.manager.stats
+ if self.manager:
+ self.factory.protocol.stats = self.manager.stats
return self.factory
def render(self, ctx):
@ivar position: the current file position to write the next data to
@type length: C{int}
@ivar length: the position in the file to not write beyond
- @type inform: C{method}
- @ivar inform: a function to call with the length of data received
@type doneDefer: L{twisted.internet.defer.Deferred}
@ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
+ def __init__(self, inputStream, outFile, start = 0, length = None):
"""Initializes the file.
@type inputStream: L{twisted.web2.stream.IByteStream}
@type length: C{int}
@param length: the maximum amount of data to write to the file
(optional, defaults to not limiting the writing to the file
- @type inform: C{method}
- @param inform: a function to call with the length of data received
- (optional, defaults to calling nothing)
"""
self.stream = inputStream
self.outFile = outFile
self.length = None
if length is not None:
self.length = start + length
- self.inform = inform
self.doneDefer = None
def run(self):
self.outFile.write(data)
self.hash.update(data)
self.position += len(data)
- self.inform(len(data))
def _done(self, result):
"""Return the result."""
if parsed[0] == "http":
site = splitHostPort(parsed[0], parsed[1])
self.mirror_path = urlunparse(('', '') + parsed[2:])
- peer = self.manager.getPeer(site)
- peer.mirror = True
+ peer = self.manager.getPeer(site, mirror = True)
self.peerlist.append(peer)
# Special case if there's only one good peer left
else:
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
- def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
- stats.receivedBytes(bytes, mirror)
if response.code == 206:
df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
- PIECE_SIZE, statUpdate).run()
+ PIECE_SIZE).run()
else:
- df = StreamToFile(response.stream, self.file,
- inform = statUpdate).run()
+ df = StreamToFile(response.stream, self.file).run()
reactor.callLater(0, df.addCallbacks,
*(self._gotPiece, self._gotError),
**{'callbackArgs': (piece, peer),
assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
site = splitHostPort(parsed[0], parsed[1])
path = urlunparse(('', '') + parsed[2:])
- peer = self.getPeer(site)
- peer.mirror = True
+ peer = self.getPeer(site, mirror = True)
return peer.get(path, method, modtime)
# elif len(peers) == 1:
# site = uncompact(peers[0]['c'])
tmpfile = self.cache_dir.child(hash.hexexpected())
return FileDownload(self, hash, mirror, peers, tmpfile).run()
- def getPeer(self, site):
+ def getPeer(self, site, mirror = False):
"""Create a new peer if necessary and return it.
@type site: (C{string}, C{int})
@param site: the IP address and port of the peer
+ @param mirror: whether the peer is actually a mirror
+ (optional, defaults to False)
"""
if site not in self.clients:
- self.clients[site] = Peer(site[0], site[1])
+ self.clients[site] = Peer(site[0], site[1], self.stats)
+ if mirror:
+ self.clients[site].mirror = True
return self.clients[site]
def close(self):
self.mirrors = MirrorManager(self.cache_dir)
self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
self.my_contact = None
+ reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
- #{ DHT maintenance
+ #{ Maintenance
+ def shutdown(self):
+ self.stats.save()
+ self.db.close()
+
def joinComplete(self, result):
"""Complete the DHT join process and determine our download information.
from datetime import datetime, timedelta
from StringIO import StringIO
-from util import byte_format
+from util import uncompact, byte_format
class StatsLogger:
"""Store the statistics for the Khashmir DHT.
"""
# Database
self.db = db
- self.lastDBUpdate = datetime.now()
- self.hashes, self.files = self.db.dbStats()
+ self.hashes, self.files = 0, 0
# Transport
self.mirrorDown = 0L
}
self.db.saveStats(stats)
- def dbStats(self):
- """Collect some statistics about the database.
-
- @rtype: (C{int}, C{int})
- @return: the number of keys and values in the database
- """
- if datetime.now() - self.lastDBUpdate > timedelta(minutes = 1):
- self.lastDBUpdate = datetime.now()
- self.hashes, self.files = self.db.keyStats()
- return (self.hashes, self.files)
-
def formatHTML(self, contactAddress):
"""Gather statistics for the DHT and format them for display in a browser.
@rtype: C{string}
@return: the stats, formatted for display in the body of an HTML page
"""
- self.dbStats()
+ self.hashes, self.files = self.db.dbStats()
out = StringIO()
out.write('<h2>Downloader Statistics</h2>\n')
# General
out.write("<table border='1' cellpadding='4px'>\n")
out.write("<tr><th><h3>General</h3></th><th>Value</th></tr>\n")
- out.write("<tr title='Contact address for this peer'><td>Contact</td><td>" + str(contactAdress) + '</td></tr>\n')
+ out.write("<tr title='Contact address for this peer'><td>Contact</td><td>" + str(uncompact(contactAddress)) + '</td></tr>\n')
out.write("</table>\n")
out.write('</td><td>\n')
out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerUp) + '</td></tr>')
out.write("<tr><td title='Since the program was last restarted'>Session Ratio</td>")
out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
- (float(self.mirrorDown) / float(self.mirrorDown + self.peerDown), ))
+ (100.0 * float(self.mirrorDown) / float(max(self.mirrorDown + self.peerDown, 1)), ))
out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
- (float(self.peerDown) / float(self.mirrorDown + self.peerDown), ))
- out.write("<td title='Percent uploaded to peers compared with downloaded from peers'>%0.2f%%</td></tr>" %
- (float(self.peerUp) / float(self.peerDown), ))
+ (100.0 * float(self.peerDown) / float(max(self.mirrorDown + self.peerDown, 1)), ))
+ out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr>" %
+ (100.0 * float(self.peerUp) / float(max(self.mirrorDown + self.peerDown, 1)), ))
out.write("<tr><td title='Since the program was installed'>All-Time</td>")
out.write("<td title='Amount downloaded from mirrors'>" + byte_format(self.mirrorAllDown) + '</td>')
out.write("<td title='Amount downloaded from peers'>" + byte_format(self.peerAllDown) + '</td>')
out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerAllUp) + '</td></tr>')
out.write("<tr><td title='Since the program was installed'>All-Time Ratio</td>")
out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
- (float(self.mirrorAllDown) / float(self.mirrorAllDown + self.peerAllDown), ))
+ (100.0 * float(self.mirrorAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
- (float(self.peerAllDown) / float(self.mirrorAllDown + self.peerAllDown), ))
- out.write("<td title='Percent uploaded to peers compared with downloaded from peers'>%0.2f%%</td></tr>" %
- (float(self.peerAllUp) / float(self.peerAllDown), ))
+ (100.0 * float(self.peerAllDown) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
+ out.write("<td title='Percent uploaded to peers compared with all downloaded'>%0.2f%%</td></tr>" %
+ (100.0 * float(self.peerAllUp) / float(max(self.mirrorAllDown + self.peerAllDown, 1)), ))
out.write("</table>\n")
out.write("</td></tr>\n")
out.write("</table>\n")
@param s: the number of bytes
@rtype: C{string}
@return: the formatted size with appropriate units
-
"""
-
- if (s < 1024):
- r = str(s) + 'B'
+ if (s < 1):
+ r = str(int(s*1000.0)/1000.0) + 'B'
+ elif (s < 10):
+ r = str(int(s*100.0)/100.0) + 'B'
+ elif (s < 102):
+ r = str(int(s*10.0)/10.0) + 'B'
+ elif (s < 1024):
+ r = str(int(s)) + 'B'
elif (s < 10485):
r = str(int((s/1024.0)*100.0)/100.0) + 'KiB'
elif (s < 104857):
@param s: the number of bytes
@rtype: C{string}
@return: the formatted size with appropriate units
-
"""
-
- if (s < 1024):
- r = str(s) + 'B'
+ if (s < 1):
+ r = str(int(s*1000.0)/1000.0) + 'B'
+ elif (s < 10):
+ r = str(int(s*100.0)/100.0) + 'B'
+ elif (s < 102):
+ r = str(int(s*10.0)/10.0) + 'B'
+ elif (s < 1024):
+ r = str(int(s)) + 'B'
elif (s < 10485):
r = str(int((s/1024.0)*100.0)/100.0) + 'KiB'
elif (s < 104857):