From d563aab35fc0fd1fab59e0f6d594fbb05735cf21 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Mon, 14 Apr 2008 16:35:12 -0700 Subject: [PATCH] Add statistics reporting to the main program (untested). --- TODO | 6 -- apt_p2p/CacheManager.py | 2 +- apt_p2p/HTTPServer.py | 14 ++++ apt_p2p/PeerManager.py | 28 +++++-- apt_p2p/apt_p2p.py | 8 +- apt_p2p/db.py | 54 ++++++++++++- apt_p2p/stats.py | 155 ++++++++++++++++++++++++++++++++++++++ apt_p2p/util.py | 30 ++++++++ apt_p2p_Khashmir/stats.py | 22 +++--- apt_p2p_Khashmir/util.py | 30 ++++++++ 10 files changed, 323 insertions(+), 26 deletions(-) create mode 100644 apt_p2p/stats.py diff --git a/TODO b/TODO index 2477d2f..49a33d8 100644 --- a/TODO +++ b/TODO @@ -1,9 +1,3 @@ -Add statistics gathering to the peer downloading. - -Statistics are needed of how much has been uploaded, downloaded from -peers, and downloaded from mirrors. - - Add all cache files to the database. All files in the cache should be added to the database, so that they can diff --git a/apt_p2p/CacheManager.py b/apt_p2p/CacheManager.py index 24c821e..77e42c6 100644 --- a/apt_p2p/CacheManager.py +++ b/apt_p2p/CacheManager.py @@ -347,7 +347,7 @@ class CacheManager: if destFile.exists(): log.msg('File already exists, removing: %s' % destFile.path) destFile.remove() - elif not destFile.parent().exists(): + if not destFile.parent().exists(): destFile.parent().makedirs() # Determine whether it needs to be decompressed and how diff --git a/apt_p2p/HTTPServer.py b/apt_p2p/HTTPServer.py index ee314ac..3d43fa0 100644 --- a/apt_p2p/HTTPServer.py +++ b/apt_p2p/HTTPServer.py @@ -3,6 +3,7 @@ from urllib import quote_plus, unquote_plus from binascii import b2a_hex +import operator from twisted.python import log from twisted.internet import defer @@ -148,6 +149,8 @@ class UploadThrottlingProtocol(ThrottlingProtocol): Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream}, apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}. """ + + stats = None def __init__(self, factory, wrappedProtocol): ThrottlingProtocol.__init__(self, factory, wrappedProtocol) @@ -156,9 +159,19 @@ class UploadThrottlingProtocol(ThrottlingProtocol): def write(self, data): if self.throttle: ThrottlingProtocol.write(self, data) + if stats: + stats.sentBytes(len(data)) else: ProtocolWrapper.write(self, data) + def writeSequence(self, seq): + if self.throttle: + ThrottlingProtocol.writeSequence(self, seq) + if stats: + stats.sentBytes(reduce(operator.add, map(len, seq))) + else: + ProtocolWrapper.writeSequence(self, seq) + def registerProducer(self, producer, streaming): ThrottlingProtocol.registerProducer(self, producer, streaming) streamType = getattr(producer, 'stream', None) @@ -207,6 +220,7 @@ class TopLevel(resource.Resource): 'betweenRequestsTimeOut': 60}) self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit) self.factory.protocol = UploadThrottlingProtocol + self.factory.protocol.stats = self.manager.stats return self.factory def render(self, ctx): diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 5df37fe..9108fde 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -141,11 +141,13 @@ class StreamToFile: @ivar position: the current file position to write the next data to @type length: C{int} @ivar length: the position in the file to not write beyond + @type inform: C{method} + @ivar inform: a function to call with the length of data received @type doneDefer: L{twisted.internet.defer.Deferred} @ivar doneDefer: the deferred that will fire when done writing """ - def __init__(self, inputStream, outFile, start = 0, length = None): + def __init__(self, inputStream, outFile, start = 0, length = None, inform = None): """Initializes the file. @type inputStream: L{twisted.web2.stream.IByteStream} @@ -158,6 +160,9 @@ class StreamToFile: @type length: C{int} @param length: the maximum amount of data to write to the file (optional, defaults to not limiting the writing to the file + @type inform: C{method} + @param inform: a function to call with the length of data received + (optional, defaults to calling nothing) """ self.stream = inputStream self.outFile = outFile @@ -166,6 +171,7 @@ class StreamToFile: self.length = None if length is not None: self.length = start + length + self.inform = inform self.doneDefer = None def run(self): @@ -195,6 +201,7 @@ class StreamToFile: self.outFile.write(data) self.hash.update(data) self.position += len(data) + self.inform(len(data)) def _done(self, result): """Return the result.""" @@ -564,12 +571,18 @@ class FileDownload: else: # Read the response stream to the file log.msg('Streaming piece %d from peer %r' % (piece, peer)) + def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror): + stats.receivedBytes(bytes, mirror) if response.code == 206: - df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run() + df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, + PIECE_SIZE, statUpdate).run() else: - df = StreamToFile(response.stream, self.file).run() - df.addCallbacks(self._gotPiece, self._gotError, - callbackArgs=(piece, peer), errbackArgs=(piece, peer)) + df = StreamToFile(response.stream, self.file, + inform = statUpdate).run() + reactor.callLater(0, df.addCallbacks, + *(self._gotPiece, self._gotError), + **{'callbackArgs': (piece, peer), + 'errbackArgs': (piece, peer)}) self.outstanding -= 1 self.peerlist.append(peer) @@ -617,17 +630,20 @@ class PeerManager: @ivar cache_dir: the directory to use for storing all files @type dht: L{interfaces.IDHT} @ivar dht: the DHT instance + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to record sent data to @type clients: C{dictionary} @ivar clients: the available peers that have been previously contacted """ - def __init__(self, cache_dir, dht): + def __init__(self, cache_dir, dht, stats): """Initialize the instance.""" self.cache_dir = cache_dir self.cache_dir.restat(False) if not self.cache_dir.exists(): self.cache_dir.makedirs() self.dht = dht + self.stats = stats self.clients = {} def get(self, hash, mirror, peers = [], method="GET", modtime=None): diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index 489da5e..889ebd9 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -27,6 +27,7 @@ from MirrorManager import MirrorManager from CacheManager import CacheManager from Hash import HashObject from db import DB +from stats import StatsLogger from util import findMyIPAddr, compact DHT_PIECES = 4 @@ -48,6 +49,8 @@ class AptP2P: @ivar db: the database to use for tracking files and hashes @type dht: L{interfaces.IDHT} @ivar dht: the DHT instance + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to record sent data to @type http_server: L{HTTPServer.TopLevel} @ivar http_server: the web server that will handle all requests from apt and from other peers @@ -78,9 +81,10 @@ class AptP2P: self.dht = dhtClass() self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) self.dht.join().addCallbacks(self.joinComplete, self.joinError) + self.stats = StatsLogger(self.db) self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) self.getHTTPFactory = self.http_server.getHTTPFactory - self.peers = PeerManager(self.cache_dir, self.dht) + self.peers = PeerManager(self.cache_dir, self.dht, self.stats) self.mirrors = MirrorManager(self.cache_dir) self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) self.my_contact = None @@ -136,6 +140,8 @@ class AptP2P: @return: the formatted HTML page containing the statistics """ out = '\n\n' + out += self.stats.formatHTML(self.my_contact) + out += '\n\n' if IDHTStats.implementedBy(self.dhtClass): out += self.dht.getStats() out += '\n\n' diff --git a/apt_p2p/db.py b/apt_p2p/db.py index 569de9a..44e692b 100644 --- a/apt_p2p/db.py +++ b/apt_p2p/db.py @@ -49,6 +49,7 @@ class DB: self.conn.text_factory = str self.conn.row_factory = sqlite.Row + #{ DB Functions def _loadDB(self): """Open a new connection to the existing database file""" try: @@ -68,11 +69,18 @@ class DB: c.execute("CREATE TABLE hashes (hashID INTEGER PRIMARY KEY AUTOINCREMENT, " + "hash KHASH UNIQUE, pieces KHASH, " + "piecehash KHASH, refreshed TIMESTAMP)") + c.execute("CREATE TABLE stats (param TEXT PRIMARY KEY UNIQUE, value NUMERIC)") + c.execute("CREATE INDEX hashes_hash ON hashes(hash)") c.execute("CREATE INDEX hashes_refreshed ON hashes(refreshed)") c.execute("CREATE INDEX hashes_piecehash ON hashes(piecehash)") c.close() self.conn.commit() + def close(self): + """Close the database connection.""" + self.conn.close() + + #{ Files and Hashes def _removeChanged(self, file, row): """If the file has changed or is missing, remove it from the DB. @@ -299,10 +307,50 @@ class DB: return removed - def close(self): - """Close the database connection.""" - self.conn.close() + #{ Statistics + def dbStats(self): + """Count the total number of files and hashes in the database. + + @rtype: (C{int}, C{int}) + @return: the number of distinct hashes and total files in the database + """ + c = self.conn.cursor() + c.execute("SELECT COUNT(hash) as num_hashes FROM hashes") + hashes = 0 + row = c.fetchone() + if row: + hashes = row[0] + c.execute("SELECT COUNT(path) as num_files FROM files") + files = 0 + row = c.fetchone() + if row: + files = row[0] + return hashes, files + def getStats(self): + """Retrieve the saved statistics from the DB. + + @return: dictionary of statistics + """ + c = self.conn.cursor() + c.execute("SELECT param, value FROM stats") + row = c.fetchone() + stats = {} + while row: + stats[row['param']] = row['value'] + row = c.fetchone() + c.close() + return stats + + def saveStats(self, stats): + """Save the statistics to the DB.""" + c = self.conn.cursor() + for param in stats: + c.execute("INSERT OR REPLACE INTO stats (param, value) VALUES (?, ?)", + (param, stats[param])) + self.conn.commit() + c.close() + class TestDB(unittest.TestCase): """Tests for the khashmir database.""" diff --git a/apt_p2p/stats.py b/apt_p2p/stats.py new file mode 100644 index 0000000..640c57f --- /dev/null +++ b/apt_p2p/stats.py @@ -0,0 +1,155 @@ + +"""Store statistics for the Apt-P2P downloader.""" + +from datetime import datetime, timedelta +from StringIO import StringIO + +from util import byte_format + +class StatsLogger: + """Store the statistics for the Khashmir DHT. + + @ivar startTime: the time the program was started + @ivar reachable: whether we can be contacted by other nodes + @type table: L{ktable.KTable} + @ivar table: the routing table for the DHT + @ivar lastTableUpdate: the last time an update of the table stats was done + @ivar nodes: the number of nodes connected + @ivar users: the estimated number of total users in the DHT + @type store: L{db.DB} + @ivar store: the database for the DHT + @ivar lastDBUpdate: the last time an update of the database stats was done + @ivar keys: the number of distinct keys in the database + @ivar values: the number of values in the database + @ivar downPackets: the number of packets received + @ivar upPackets: the number of packets sent + @ivar downBytes: the number of bytes received + @ivar upBytes: the number of bytes sent + @ivar actions: a dictionary of the actions and their statistics, keys are + the action name, values are a list of 5 elements for the number of + times the action was sent, responded to, failed, received, and + generated an error + """ + + def __init__(self, db): + """Initialize the statistics. + + @type store: L{db.DB} + @param store: the database for the Apt-P2P downloader + """ + # Database + self.db = db + self.lastDBUpdate = datetime.now() + self.hashes, self.files = self.db.dbStats() + + # Transport + self.mirrorDown = 0L + self.peerDown = 0L + self.peerUp = 0L + + # Transport All-Time + stats = self.db.getStats() + self.mirrorAllDown = long(stats.get('mirror_down', 0L)) + self.peerAllDown = long(stats.get('peer_down', 0L)) + self.peerAllUp = long(stats.get('peer_up', 0L)) + + def save(self): + """Save the persistent statistics to the DB.""" + stats = {'mirror_down': self.mirrorAllDown, + 'peer_down': self.peerAllDown, + 'peer_up': self.peerAllUp, + } + self.db.saveStats(stats) + + def dbStats(self): + """Collect some statistics about the database. + + @rtype: (C{int}, C{int}) + @return: the number of keys and values in the database + """ + if datetime.now() - self.lastDBUpdate > timedelta(minutes = 1): + self.lastDBUpdate = datetime.now() + self.hashes, self.files = self.db.keyStats() + return (self.hashes, self.files) + + def formatHTML(self, contactAddress): + """Gather statistics for the DHT and format them for display in a browser. + + @param contactAddress: the external IP address in use + @rtype: C{string} + @return: the stats, formatted for display in the body of an HTML page + """ + self.dbStats() + + out = StringIO() + out.write('

Downloader Statistics

\n') + out.write("\n\n") + out.write('\n") + out.write("
\n') + + # General + out.write("\n") + out.write("\n") + out.write("\n') + out.write("

General

Value
Contact" + str(contactAdress) + '
\n") + out.write('
\n') + + # Database + out.write("\n") + out.write("\n") + out.write("\n') + out.write("\n') + out.write("

Database

Value
Distinct Files" + str(self.hashes) + '
Total Files" + str(self.files) + '
\n") + out.write("
\n") + + # Transport + out.write("\n") + out.write("\n") + out.write("") + out.write("') + out.write("') + out.write("') + out.write("") + out.write("" % + (float(self.mirrorDown) / float(self.mirrorDown + self.peerDown), )) + out.write("" % + (float(self.peerDown) / float(self.mirrorDown + self.peerDown), )) + out.write("" % + (float(self.peerUp) / float(self.peerDown), )) + out.write("") + out.write("') + out.write("') + out.write("') + out.write("") + out.write("" % + (float(self.mirrorAllDown) / float(self.mirrorAllDown + self.peerAllDown), )) + out.write("" % + (float(self.peerAllDown) / float(self.mirrorAllDown + self.peerAllDown), )) + out.write("" % + (float(self.peerAllUp) / float(self.peerAllDown), )) + out.write("

Transport

Mirror DownloadsPeer DownloadsPeer Uploads
This Session" + byte_format(self.mirrorDown) + '" + byte_format(self.peerDown) + '" + byte_format(self.peerUp) + '
Session Ratio%0.2f%%%0.2f%%%0.2f%%
All-Time" + byte_format(self.mirrorAllDown) + '" + byte_format(self.peerAllDown) + '" + byte_format(self.peerAllUp) + '
All-Time Ratio%0.2f%%%0.2f%%%0.2f%%
\n") + out.write("
\n") + + return out.getvalue() + + #{ Transport + def sentBytes(self, bytes): + """Record that some bytes were sent. + + @param bytes: the number of bytes sent + """ + self.peerUp += bytes + self.peerAllUp += bytes + + def receivedBytes(self, bytes, mirror = False): + """Record that some bytes were received. + + @param bytes: the number of bytes received + @param mirror: whether the bytes were sent to a mirror + """ + if mirror: + self.mirrorDown += bytes + self.mirrorAllDown += bytes + else: + self.peerDown += bytes + self.peerAllDown += bytes diff --git a/apt_p2p/util.py b/apt_p2p/util.py index c334d1d..3d1a50e 100644 --- a/apt_p2p/util.py +++ b/apt_p2p/util.py @@ -153,6 +153,36 @@ def compact(ip, port): raise ValueError return s +def byte_format(s): + """Format a byte size for reading by the user. + + @type s: C{long} + @param s: the number of bytes + @rtype: C{string} + @return: the formatted size with appropriate units + + """ + + if (s < 1024): + r = str(s) + 'B' + elif (s < 10485): + r = str(int((s/1024.0)*100.0)/100.0) + 'KiB' + elif (s < 104857): + r = str(int((s/1024.0)*10.0)/10.0) + 'KiB' + elif (s < 1048576): + r = str(int(s/1024)) + 'KiB' + elif (s < 10737418L): + r = str(int((s/1048576.0)*100.0)/100.0) + 'MiB' + elif (s < 107374182L): + r = str(int((s/1048576.0)*10.0)/10.0) + 'MiB' + elif (s < 1073741824L): + r = str(int(s/1048576)) + 'MiB' + elif (s < 1099511627776L): + r = str(int((s/1073741824.0)*100.0)/100.0) + 'GiB' + else: + r = str(int((s/1099511627776.0)*100.0)/100.0) + 'TiB' + return(r) + class TestUtil(unittest.TestCase): """Tests for the utilities.""" diff --git a/apt_p2p_Khashmir/stats.py b/apt_p2p_Khashmir/stats.py index 5afc17a..aee9bbb 100644 --- a/apt_p2p_Khashmir/stats.py +++ b/apt_p2p_Khashmir/stats.py @@ -4,6 +4,8 @@ from datetime import datetime, timedelta from StringIO import StringIO +from util import byte_format + class StatsLogger: """Store the statistics for the Khashmir DHT. @@ -102,45 +104,47 @@ class StatsLogger: out.write('

DHT Statistics

\n') out.write("\n\n") out.write('
\n') - out.write("\n") # General + out.write("
\n") out.write("\n") out.write("\n') out.write("\n') out.write("

General

Value
Up time" + str(elapsed) + '
Reachable" + str(self.reachable) + '
\n") out.write('
\n') - out.write("\n") # Routing + out.write("
\n") out.write("\n") out.write("\n') out.write("\n') out.write("

Routing Table

Value
Number of nodes" + str(self.nodes) + '
Total number of users" + str(self.users) + '
\n") out.write('
\n') - out.write("\n") # Database + out.write("
\n") out.write("\n") out.write("\n') out.write("\n') out.write("

Database

Value
Keys" + str(self.keys) + '
Values" + str(self.values) + '
\n") out.write("
\n") + + # Transport out.write("\n") - out.write("\n") + out.write("\n") out.write("") out.write('') - out.write('') - out.write('\n' % (self.downBytes / (elapsed.days*86400.0 + elapsed.seconds), )) + out.write('') + out.write('\n') out.write("") out.write('') - out.write('') - out.write('\n' % (self.upBytes / (elapsed.days*86400.0 + elapsed.seconds), )) + out.write('') + out.write('\n') out.write("

Transport

PacketsBytesBytes/second

Transport

PacketsBytesSpeed
Downloaded' + str(self.downPackets) + '' + str(self.downBytes) + '%0.2f
' + byte_format(self.downBytes) + '' + byte_format(self.downBytes / (elapsed.days*86400.0 + elapsed.seconds)) + '/sec
Uploaded' + str(self.upPackets) + '' + str(self.upBytes) + '%0.2f
' + byte_format(self.upBytes) + '' + byte_format(self.upBytes / (elapsed.days*86400.0 + elapsed.seconds)) + '/sec
\n") out.write("
\n") - out.write("\n") # Actions + out.write("
\n") out.write("\n") actions = self.actions.keys() actions.sort() diff --git a/apt_p2p_Khashmir/util.py b/apt_p2p_Khashmir/util.py index 52b6e97..39b4ce0 100644 --- a/apt_p2p_Khashmir/util.py +++ b/apt_p2p_Khashmir/util.py @@ -62,6 +62,36 @@ def compact(id, host, port): raise ValueError return s +def byte_format(s): + """Format a byte size for reading by the user. + + @type s: C{long} + @param s: the number of bytes + @rtype: C{string} + @return: the formatted size with appropriate units + + """ + + if (s < 1024): + r = str(s) + 'B' + elif (s < 10485): + r = str(int((s/1024.0)*100.0)/100.0) + 'KiB' + elif (s < 104857): + r = str(int((s/1024.0)*10.0)/10.0) + 'KiB' + elif (s < 1048576): + r = str(int(s/1024)) + 'KiB' + elif (s < 10737418L): + r = str(int((s/1048576.0)*100.0)/100.0) + 'MiB' + elif (s < 107374182L): + r = str(int((s/1048576.0)*10.0)/10.0) + 'MiB' + elif (s < 1073741824L): + r = str(int(s/1048576)) + 'MiB' + elif (s < 1099511627776L): + r = str(int((s/1073741824.0)*100.0)/100.0) + 'GiB' + else: + r = str(int((s/1099511627776.0)*100.0)/100.0) + 'TiB' + return(r) + class TestUtil(unittest.TestCase): """Tests for the utilities.""" -- 2.30.2

Actions

StartedSentOKFailedReceivedError