-Add statistics gathering to the peer downloading.
-
-Statistics are needed of how much has been uploaded, downloaded from
-peers, and downloaded from mirrors.
-
-
Add all cache files to the database.
All files in the cache should be added to the database, so that they can
if destFile.exists():
log.msg('File already exists, removing: %s' % destFile.path)
destFile.remove()
- elif not destFile.parent().exists():
+ if not destFile.parent().exists():
destFile.parent().makedirs()
# Determine whether it needs to be decompressed and how
from urllib import quote_plus, unquote_plus
from binascii import b2a_hex
+import operator
from twisted.python import log
from twisted.internet import defer
Uploads use L{FileUploaderStream} or L{twisted.web2.stream.MemorySTream},
apt uses L{CacheManager.ProxyFileStream} or L{twisted.web.stream.FileStream}.
"""
+
+ stats = None
def __init__(self, factory, wrappedProtocol):
ThrottlingProtocol.__init__(self, factory, wrappedProtocol)
def write(self, data):
if self.throttle:
ThrottlingProtocol.write(self, data)
+ if stats:
+ stats.sentBytes(len(data))
else:
ProtocolWrapper.write(self, data)
+ def writeSequence(self, seq):
+ if self.throttle:
+ ThrottlingProtocol.writeSequence(self, seq)
+ if stats:
+ stats.sentBytes(reduce(operator.add, map(len, seq)))
+ else:
+ ProtocolWrapper.writeSequence(self, seq)
+
def registerProducer(self, producer, streaming):
ThrottlingProtocol.registerProducer(self, producer, streaming)
streamType = getattr(producer, 'stream', None)
'betweenRequestsTimeOut': 60})
self.factory = ThrottlingFactory(self.factory, writeLimit = self.uploadLimit)
self.factory.protocol = UploadThrottlingProtocol
+ self.factory.protocol.stats = self.manager.stats
return self.factory
def render(self, ctx):
@ivar position: the current file position to write the next data to
@type length: C{int}
@ivar length: the position in the file to not write beyond
+ @type inform: C{method}
+ @ivar inform: a function to call with the length of data received
@type doneDefer: L{twisted.internet.defer.Deferred}
@ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, start = 0, length = None):
+ def __init__(self, inputStream, outFile, start = 0, length = None, inform = None):
"""Initializes the file.
@type inputStream: L{twisted.web2.stream.IByteStream}
@type length: C{int}
@param length: the maximum amount of data to write to the file
(optional, defaults to not limiting the writing to the file
+ @type inform: C{method}
+ @param inform: a function to call with the length of data received
+ (optional, defaults to calling nothing)
"""
self.stream = inputStream
self.outFile = outFile
self.length = None
if length is not None:
self.length = start + length
+ self.inform = inform
self.doneDefer = None
def run(self):
self.outFile.write(data)
self.hash.update(data)
self.position += len(data)
+ self.inform(len(data))
def _done(self, result):
"""Return the result."""
else:
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
+ def statUpdate(bytes, stats = self.manager.stats, mirror = peer.mirror):
+ stats.receivedBytes(bytes, mirror)
if response.code == 206:
- df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+ df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+ PIECE_SIZE, statUpdate).run()
else:
- df = StreamToFile(response.stream, self.file).run()
- df.addCallbacks(self._gotPiece, self._gotError,
- callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+ df = StreamToFile(response.stream, self.file,
+ inform = statUpdate).run()
+ reactor.callLater(0, df.addCallbacks,
+ *(self._gotPiece, self._gotError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
self.outstanding -= 1
self.peerlist.append(peer)
@ivar cache_dir: the directory to use for storing all files
@type dht: L{interfaces.IDHT}
@ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type clients: C{dictionary}
@ivar clients: the available peers that have been previously contacted
"""
- def __init__(self, cache_dir, dht):
+ def __init__(self, cache_dir, dht, stats):
"""Initialize the instance."""
self.cache_dir = cache_dir
self.cache_dir.restat(False)
if not self.cache_dir.exists():
self.cache_dir.makedirs()
self.dht = dht
+ self.stats = stats
self.clients = {}
def get(self, hash, mirror, peers = [], method="GET", modtime=None):
from CacheManager import CacheManager
from Hash import HashObject
from db import DB
+from stats import StatsLogger
from util import findMyIPAddr, compact
DHT_PIECES = 4
@ivar db: the database to use for tracking files and hashes
@type dht: L{interfaces.IDHT}
@ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type http_server: L{HTTPServer.TopLevel}
@ivar http_server: the web server that will handle all requests from apt
and from other peers
self.dht = dhtClass()
self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
self.dht.join().addCallbacks(self.joinComplete, self.joinError)
+ self.stats = StatsLogger(self.db)
self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
self.getHTTPFactory = self.http_server.getHTTPFactory
- self.peers = PeerManager(self.cache_dir, self.dht)
+ self.peers = PeerManager(self.cache_dir, self.dht, self.stats)
self.mirrors = MirrorManager(self.cache_dir)
self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
self.my_contact = None
@return: the formatted HTML page containing the statistics
"""
out = '<html><body>\n\n'
+ out += self.stats.formatHTML(self.my_contact)
+ out += '\n\n'
if IDHTStats.implementedBy(self.dhtClass):
out += self.dht.getStats()
out += '\n</body></html>\n'
self.conn.text_factory = str
self.conn.row_factory = sqlite.Row
+ #{ DB Functions
def _loadDB(self):
"""Open a new connection to the existing database file"""
try:
c.execute("CREATE TABLE hashes (hashID INTEGER PRIMARY KEY AUTOINCREMENT, " +
"hash KHASH UNIQUE, pieces KHASH, " +
"piecehash KHASH, refreshed TIMESTAMP)")
+ c.execute("CREATE TABLE stats (param TEXT PRIMARY KEY UNIQUE, value NUMERIC)")
+ c.execute("CREATE INDEX hashes_hash ON hashes(hash)")
c.execute("CREATE INDEX hashes_refreshed ON hashes(refreshed)")
c.execute("CREATE INDEX hashes_piecehash ON hashes(piecehash)")
c.close()
self.conn.commit()
+ def close(self):
+ """Close the database connection."""
+ self.conn.close()
+
+ #{ Files and Hashes
def _removeChanged(self, file, row):
"""If the file has changed or is missing, remove it from the DB.
return removed
- def close(self):
- """Close the database connection."""
- self.conn.close()
+ #{ Statistics
+ def dbStats(self):
+ """Count the total number of files and hashes in the database.
+
+ @rtype: (C{int}, C{int})
+ @return: the number of distinct hashes and total files in the database
+ """
+ c = self.conn.cursor()
+ c.execute("SELECT COUNT(hash) as num_hashes FROM hashes")
+ hashes = 0
+ row = c.fetchone()
+ if row:
+ hashes = row[0]
+ c.execute("SELECT COUNT(path) as num_files FROM files")
+ files = 0
+ row = c.fetchone()
+ if row:
+ files = row[0]
+ return hashes, files
+ def getStats(self):
+ """Retrieve the saved statistics from the DB.
+
+ @return: dictionary of statistics
+ """
+ c = self.conn.cursor()
+ c.execute("SELECT param, value FROM stats")
+ row = c.fetchone()
+ stats = {}
+ while row:
+ stats[row['param']] = row['value']
+ row = c.fetchone()
+ c.close()
+ return stats
+
+ def saveStats(self, stats):
+ """Save the statistics to the DB."""
+ c = self.conn.cursor()
+ for param in stats:
+ c.execute("INSERT OR REPLACE INTO stats (param, value) VALUES (?, ?)",
+ (param, stats[param]))
+ self.conn.commit()
+ c.close()
+
class TestDB(unittest.TestCase):
"""Tests for the khashmir database."""
--- /dev/null
+
+"""Store statistics for the Apt-P2P downloader."""
+
+from datetime import datetime, timedelta
+from StringIO import StringIO
+
+from util import byte_format
+
+class StatsLogger:
+ """Store the statistics for the Khashmir DHT.
+
+ @ivar startTime: the time the program was started
+ @ivar reachable: whether we can be contacted by other nodes
+ @type table: L{ktable.KTable}
+ @ivar table: the routing table for the DHT
+ @ivar lastTableUpdate: the last time an update of the table stats was done
+ @ivar nodes: the number of nodes connected
+ @ivar users: the estimated number of total users in the DHT
+ @type store: L{db.DB}
+ @ivar store: the database for the DHT
+ @ivar lastDBUpdate: the last time an update of the database stats was done
+ @ivar keys: the number of distinct keys in the database
+ @ivar values: the number of values in the database
+ @ivar downPackets: the number of packets received
+ @ivar upPackets: the number of packets sent
+ @ivar downBytes: the number of bytes received
+ @ivar upBytes: the number of bytes sent
+ @ivar actions: a dictionary of the actions and their statistics, keys are
+ the action name, values are a list of 5 elements for the number of
+ times the action was sent, responded to, failed, received, and
+ generated an error
+ """
+
+ def __init__(self, db):
+ """Initialize the statistics.
+
+ @type store: L{db.DB}
+ @param store: the database for the Apt-P2P downloader
+ """
+ # Database
+ self.db = db
+ self.lastDBUpdate = datetime.now()
+ self.hashes, self.files = self.db.dbStats()
+
+ # Transport
+ self.mirrorDown = 0L
+ self.peerDown = 0L
+ self.peerUp = 0L
+
+ # Transport All-Time
+ stats = self.db.getStats()
+ self.mirrorAllDown = long(stats.get('mirror_down', 0L))
+ self.peerAllDown = long(stats.get('peer_down', 0L))
+ self.peerAllUp = long(stats.get('peer_up', 0L))
+
+ def save(self):
+ """Save the persistent statistics to the DB."""
+ stats = {'mirror_down': self.mirrorAllDown,
+ 'peer_down': self.peerAllDown,
+ 'peer_up': self.peerAllUp,
+ }
+ self.db.saveStats(stats)
+
+ def dbStats(self):
+ """Collect some statistics about the database.
+
+ @rtype: (C{int}, C{int})
+ @return: the number of keys and values in the database
+ """
+ if datetime.now() - self.lastDBUpdate > timedelta(minutes = 1):
+ self.lastDBUpdate = datetime.now()
+ self.hashes, self.files = self.db.keyStats()
+ return (self.hashes, self.files)
+
+ def formatHTML(self, contactAddress):
+ """Gather statistics for the DHT and format them for display in a browser.
+
+ @param contactAddress: the external IP address in use
+ @rtype: C{string}
+ @return: the stats, formatted for display in the body of an HTML page
+ """
+ self.dbStats()
+
+ out = StringIO()
+ out.write('<h2>Downloader Statistics</h2>\n')
+ out.write("<table border='0' cellspacing='20px'>\n<tr>\n")
+ out.write('<td>\n')
+
+ # General
+ out.write("<table border='1' cellpadding='4px'>\n")
+ out.write("<tr><th><h3>General</h3></th><th>Value</th></tr>\n")
+ out.write("<tr title='Contact address for this peer'><td>Contact</td><td>" + str(contactAdress) + '</td></tr>\n')
+ out.write("</table>\n")
+ out.write('</td><td>\n')
+
+ # Database
+ out.write("<table border='1' cellpadding='4px'>\n")
+ out.write("<tr><th><h3>Database</h3></th><th>Value</th></tr>\n")
+ out.write("<tr title='Number of distinct files in the database'><td>Distinct Files</td><td>" + str(self.hashes) + '</td></tr>\n')
+ out.write("<tr title='Total number of files being shared'><td>Total Files</td><td>" + str(self.files) + '</td></tr>\n')
+ out.write("</table>\n")
+ out.write("</td></tr><tr><td colspan='3'>\n")
+
+ # Transport
+ out.write("<table border='1' cellpadding='4px'>\n")
+ out.write("<tr><th><h3>Transport</h3></th><th>Mirror Downloads</th><th>Peer Downloads</th><th>Peer Uploads</th></tr>\n")
+ out.write("<tr><td title='Since the program was last restarted'>This Session</td>")
+ out.write("<td title='Amount downloaded from mirrors'>" + byte_format(self.mirrorDown) + '</td>')
+ out.write("<td title='Amount downloaded from peers'>" + byte_format(self.peerDown) + '</td>')
+ out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerUp) + '</td></tr>')
+ out.write("<tr><td title='Since the program was last restarted'>Session Ratio</td>")
+ out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
+ (float(self.mirrorDown) / float(self.mirrorDown + self.peerDown), ))
+ out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
+ (float(self.peerDown) / float(self.mirrorDown + self.peerDown), ))
+ out.write("<td title='Percent uploaded to peers compared with downloaded from peers'>%0.2f%%</td></tr>" %
+ (float(self.peerUp) / float(self.peerDown), ))
+ out.write("<tr><td title='Since the program was installed'>All-Time</td>")
+ out.write("<td title='Amount downloaded from mirrors'>" + byte_format(self.mirrorAllDown) + '</td>')
+ out.write("<td title='Amount downloaded from peers'>" + byte_format(self.peerAllDown) + '</td>')
+ out.write("<td title='Amount uploaded to peers'>" + byte_format(self.peerAllUp) + '</td></tr>')
+ out.write("<tr><td title='Since the program was installed'>All-Time Ratio</td>")
+ out.write("<td title='Percent of download from mirrors'>%0.2f%%</td>" %
+ (float(self.mirrorAllDown) / float(self.mirrorAllDown + self.peerAllDown), ))
+ out.write("<td title='Percent of download from peers'>%0.2f%%</td>" %
+ (float(self.peerAllDown) / float(self.mirrorAllDown + self.peerAllDown), ))
+ out.write("<td title='Percent uploaded to peers compared with downloaded from peers'>%0.2f%%</td></tr>" %
+ (float(self.peerAllUp) / float(self.peerAllDown), ))
+ out.write("</table>\n")
+ out.write("</td></tr>\n")
+ out.write("</table>\n")
+
+ return out.getvalue()
+
+ #{ Transport
+ def sentBytes(self, bytes):
+ """Record that some bytes were sent.
+
+ @param bytes: the number of bytes sent
+ """
+ self.peerUp += bytes
+ self.peerAllUp += bytes
+
+ def receivedBytes(self, bytes, mirror = False):
+ """Record that some bytes were received.
+
+ @param bytes: the number of bytes received
+ @param mirror: whether the bytes were sent to a mirror
+ """
+ if mirror:
+ self.mirrorDown += bytes
+ self.mirrorAllDown += bytes
+ else:
+ self.peerDown += bytes
+ self.peerAllDown += bytes
raise ValueError
return s
+def byte_format(s):
+ """Format a byte size for reading by the user.
+
+ @type s: C{long}
+ @param s: the number of bytes
+ @rtype: C{string}
+ @return: the formatted size with appropriate units
+
+ """
+
+ if (s < 1024):
+ r = str(s) + 'B'
+ elif (s < 10485):
+ r = str(int((s/1024.0)*100.0)/100.0) + 'KiB'
+ elif (s < 104857):
+ r = str(int((s/1024.0)*10.0)/10.0) + 'KiB'
+ elif (s < 1048576):
+ r = str(int(s/1024)) + 'KiB'
+ elif (s < 10737418L):
+ r = str(int((s/1048576.0)*100.0)/100.0) + 'MiB'
+ elif (s < 107374182L):
+ r = str(int((s/1048576.0)*10.0)/10.0) + 'MiB'
+ elif (s < 1073741824L):
+ r = str(int(s/1048576)) + 'MiB'
+ elif (s < 1099511627776L):
+ r = str(int((s/1073741824.0)*100.0)/100.0) + 'GiB'
+ else:
+ r = str(int((s/1099511627776.0)*100.0)/100.0) + 'TiB'
+ return(r)
+
class TestUtil(unittest.TestCase):
"""Tests for the utilities."""
from datetime import datetime, timedelta
from StringIO import StringIO
+from util import byte_format
+
class StatsLogger:
"""Store the statistics for the Khashmir DHT.
out.write('<h2>DHT Statistics</h2>\n')
out.write("<table border='0' cellspacing='20px'>\n<tr>\n")
out.write('<td>\n')
- out.write("<table border='1' cellpadding='4px'>\n")
# General
+ out.write("<table border='1' cellpadding='4px'>\n")
out.write("<tr><th><h3>General</h3></th><th>Value</th></tr>\n")
out.write("<tr title='Elapsed time since the DHT was started'><td>Up time</td><td>" + str(elapsed) + '</td></tr>\n')
out.write("<tr title='Whether this node is reachable by other nodes'><td>Reachable</td><td>" + str(self.reachable) + '</td></tr>\n')
out.write("</table>\n")
out.write('</td><td>\n')
- out.write("<table border='1' cellpadding='4px'>\n")
# Routing
+ out.write("<table border='1' cellpadding='4px'>\n")
out.write("<tr><th><h3>Routing Table</h3></th><th>Value</th></tr>\n")
out.write("<tr title='The number of connected nodes'><td>Number of nodes</td><td>" + str(self.nodes) + '</td></tr>\n')
out.write("<tr title='The estimated number of connected users in the entire DHT'><td>Total number of users</td><td>" + str(self.users) + '</td></tr>\n')
out.write("</table>\n")
out.write('</td><td>\n')
- out.write("<table border='1' cellpadding='4px'>\n")
# Database
+ out.write("<table border='1' cellpadding='4px'>\n")
out.write("<tr><th><h3>Database</h3></th><th>Value</th></tr>\n")
out.write("<tr title='Number of distinct keys in the database'><td>Keys</td><td>" + str(self.keys) + '</td></tr>\n')
out.write("<tr title='Total number of values stored locally'><td>Values</td><td>" + str(self.values) + '</td></tr>\n')
out.write("</table>\n")
out.write("</td></tr><tr><td colspan='3'>\n")
+
+ # Transport
out.write("<table border='1' cellpadding='4px'>\n")
- out.write("<tr><th><h3>Transport</h3></th><th>Packets</th><th>Bytes</th><th>Bytes/second</th></tr>\n")
+ out.write("<tr><th><h3>Transport</h3></th><th>Packets</th><th>Bytes</th><th>Speed</th></tr>\n")
out.write("<tr title='Stats for packets received from the DHT'><td>Downloaded</td>")
out.write('<td>' + str(self.downPackets) + '</td>')
- out.write('<td>' + str(self.downBytes) + '</td>')
- out.write('<td>%0.2f</td></tr>\n' % (self.downBytes / (elapsed.days*86400.0 + elapsed.seconds), ))
+ out.write('<td>' + byte_format(self.downBytes) + '</td>')
+ out.write('<td>' + byte_format(self.downBytes / (elapsed.days*86400.0 + elapsed.seconds)) + '/sec</td></tr>\n')
out.write("<tr title='Stats for packets sent to the DHT'><td>Uploaded</td>")
out.write('<td>' + str(self.upPackets) + '</td>')
- out.write('<td>' + str(self.upBytes) + '</td>')
- out.write('<td>%0.2f</td></tr>\n' % (self.upBytes / (elapsed.days*86400.0 + elapsed.seconds), ))
+ out.write('<td>' + byte_format(self.upBytes) + '</td>')
+ out.write('<td>' + byte_format(self.upBytes / (elapsed.days*86400.0 + elapsed.seconds)) + '/sec</td></tr>\n')
out.write("</table>\n")
out.write("</td></tr><tr><td colspan='3'>\n")
- out.write("<table border='1' cellpadding='4px'>\n")
# Actions
+ out.write("<table border='1' cellpadding='4px'>\n")
out.write("<tr><th><h3>Actions</h3></th><th>Started</th><th>Sent</th><th>OK</th><th>Failed</th><th>Received</th><th>Error</th></tr>\n")
actions = self.actions.keys()
actions.sort()
raise ValueError
return s
+def byte_format(s):
+ """Format a byte size for reading by the user.
+
+ @type s: C{long}
+ @param s: the number of bytes
+ @rtype: C{string}
+ @return: the formatted size with appropriate units
+
+ """
+
+ if (s < 1024):
+ r = str(s) + 'B'
+ elif (s < 10485):
+ r = str(int((s/1024.0)*100.0)/100.0) + 'KiB'
+ elif (s < 104857):
+ r = str(int((s/1024.0)*10.0)/10.0) + 'KiB'
+ elif (s < 1048576):
+ r = str(int(s/1024)) + 'KiB'
+ elif (s < 10737418L):
+ r = str(int((s/1048576.0)*100.0)/100.0) + 'MiB'
+ elif (s < 107374182L):
+ r = str(int((s/1048576.0)*10.0)/10.0) + 'MiB'
+ elif (s < 1073741824L):
+ r = str(int(s/1048576)) + 'MiB'
+ elif (s < 1099511627776L):
+ r = str(int((s/1073741824.0)*100.0)/100.0) + 'GiB'
+ else:
+ r = str(int((s/1099511627776.0)*100.0)/100.0) + 'TiB'
+ return(r)
+
class TestUtil(unittest.TestCase):
"""Tests for the utilities."""