From e82e704e27d7bf1b6441d5b251bab96604420185 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Wed, 23 Apr 2008 15:30:39 -0700 Subject: [PATCH] Refresh expired DHT hashes concurrently instead of sequentially. --- TODO | 1 - apt_p2p/apt_p2p.py | 26 +++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/TODO b/TODO index d48ea93..08125f3 100644 --- a/TODO +++ b/TODO @@ -4,7 +4,6 @@ Some last few things to do before release. - remove files from the peer's download cache - update the modtime of files downloaded from peers - also set the Last-Modified header for the return to Apt -- refresh expired DHT hashes concurrently instead of sequentially Consider what happens when multiple requests for a file are received. diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index a771227..8bfe928 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -123,7 +123,7 @@ class AptP2P(protocol.Factory): raise RuntimeError, "IP address for this machine could not be found" self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT')) self.cache.scanDirectories() - reactor.callLater(60, self.refreshFiles) + self.nextRefresh = reactor.callLater(60, self.refreshFiles) def joinError(self, failure): """Joining the DHT has failed.""" @@ -131,27 +131,31 @@ class AptP2P(protocol.Factory): log.err(failure) raise RuntimeError, "IP address for this machine could not be found" - def refreshFiles(self): + def refreshFiles(self, result = None, hashes = {}): """Refresh any files in the DHT that are about to expire.""" - expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') - hashes = self.db.expiredHashes(expireAfter) - if len(hashes.keys()) > 0: - log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) - self._refreshFiles(None, hashes) - - def _refreshFiles(self, result, hashes): if result is not None: log.msg('Storage resulted in: %r' % result) + if not hashes: + expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') + hashes = self.db.expiredHashes(expireAfter) + if len(hashes.keys()) > 0: + log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) + + delay = 60 if hashes: + delay = 3 raw_hash = hashes.keys()[0] self.db.refreshHash(raw_hash) hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces']) del hashes[raw_hash] storeDefer = self.store(hash) - storeDefer.addBoth(self._refreshFiles, hashes) + storeDefer.addBoth(self.refreshFiles, hashes) + + if self.nextRefresh.active(): + self.nextRefresh.reset(delay) else: - reactor.callLater(60, self.refreshFiles) + self.nextRefresh = reactor.callLater(delay, self.plRefresh, None, hashes) def getStats(self): """Retrieve and format the statistics for the program. -- 2.30.2