From: Cameron Dale Date: Thu, 24 Apr 2008 18:15:58 +0000 (-0700) Subject: Move the DHT stuff out of the main program and into the new DHTManager module. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=53b39e4234ad1ce483aafcd96d4d711d80fc67de;hp=8c18aac4790e10c1126108ab4b71b19e148045db Move the DHT stuff out of the main program and into the new DHTManager module. --- diff --git a/apt_p2p/CacheManager.py b/apt_p2p/CacheManager.py index e6a8408..b991093 100644 --- a/apt_p2p/CacheManager.py +++ b/apt_p2p/CacheManager.py @@ -245,7 +245,7 @@ class CacheManager: self.db.removeUntrackedFiles(self.all_dirs) #{ Scanning directories - def scanDirectories(self): + def scanDirectories(self, result = None): """Scan the cache directories, hashing new and rehashing changed files.""" assert not self.scanning, "a directory scan is already under way" self.scanning = self.all_dirs[:] @@ -281,14 +281,12 @@ class CacheManager: # If it's not a file ignore it if not file.isfile(): - log.msg('entering directory: %s' % file.path) reactor.callLater(0, self._scanDirectories, None, walker) return # If it's already properly in the DB, ignore it db_status = self.db.isUnchanged(file) if db_status: - log.msg('file is unchanged: %s' % file.path) reactor.callLater(0, self._scanDirectories, None, walker) return diff --git a/apt_p2p/DHTManager.py b/apt_p2p/DHTManager.py new file mode 100644 index 0000000..42a23e7 --- /dev/null +++ b/apt_p2p/DHTManager.py @@ -0,0 +1,167 @@ + +"""Manage all delaings with the DHT. + +@var DHT_PIECES: the maximum number of pieces to store with our contact info + in the DHT +@var TORRENT_PIECES: the maximum number of pieces to store as a separate entry + in the DHT +""" + +import sha + +from twisted.internet import reactor +from twisted.python import log + +from interfaces import IDHTStats +from apt_p2p_conf import config +from Hash import HashObject +from util import findMyIPAddr, compact + +DHT_PIECES = 4 +TORRENT_PIECES = 70 + +class DHT: + """Manages all the requests to a DHT. + + @type dhtClass: L{interfaces.IDHT} + @ivar dhtClass: the DHT class to use + @type db: L{db.DB} + @ivar db: the database to use for tracking files and hashes + @type dht: L{interfaces.IDHT} + @ivar dht: the DHT instance + @type my_contact: C{string} + @ivar my_contact: the 6-byte compact peer representation of this peer's + download information (IP address and port) + """ + + def __init__(self, dhtClass, db): + """Initialize the instance. + + @type dhtClass: L{interfaces.IDHT} + @param dhtClass: the DHT class to use + """ + self.dhtClass = dhtClass + self.db = db + self.my_contact = None + + def start(self): + self.dht = self.dhtClass() + self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) + df = self.dht.join() + df.addCallbacks(self.joinComplete, self.joinError) + return df + + def joinComplete(self, result): + """Complete the DHT join process and determine our download information. + + Called by the DHT when the join has been completed with information + on the external IP address and port of this peer. + """ + my_addr = findMyIPAddr(result, + config.getint(config.get('DEFAULT', 'DHT'), 'PORT'), + config.getboolean('DEFAULT', 'LOCAL_OK')) + if not my_addr: + raise RuntimeError, "IP address for this machine could not be found" + self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT')) + self.nextRefresh = reactor.callLater(60, self.refreshFiles) + return (my_addr, config.getint('DEFAULT', 'PORT')) + + def joinError(self, failure): + """Joining the DHT has failed.""" + log.msg("joining DHT failed miserably") + log.err(failure) + return failure + + def refreshFiles(self, result = None, hashes = {}): + """Refresh any files in the DHT that are about to expire.""" + if result is not None: + log.msg('Storage resulted in: %r' % result) + + if not hashes: + expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') + hashes = self.db.expiredHashes(expireAfter) + if len(hashes.keys()) > 0: + log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) + + delay = 60 + if hashes: + delay = 3 + raw_hash = hashes.keys()[0] + self.db.refreshHash(raw_hash) + hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces']) + del hashes[raw_hash] + storeDefer = self.store(hash) + storeDefer.addBoth(self.refreshFiles, hashes) + + if self.nextRefresh.active(): + self.nextRefresh.reset(delay) + else: + self.nextRefresh = reactor.callLater(delay, self.refreshFiles, None, hashes) + + def getStats(self): + """Retrieve the formatted statistics for the DHT. + + @rtype: C{string} + @return: the formatted HTML page containing the statistics + """ + if IDHTStats.implementedBy(self.dhtClass): + return self.dht.getStats() + return "

DHT doesn't support statistics\n" + + def get(self, key): + """Retrieve a hash's value from the DHT.""" + return self.dht.getValue(key) + + def store(self, hash): + """Add a hash for a file to the DHT. + + Sets the key and value from the hash information, and tries to add + it to the DHT. + """ + key = hash.digest() + value = {'c': self.my_contact} + pieces = hash.pieceDigests() + + # Determine how to store any piece data + if len(pieces) <= 1: + pass + elif len(pieces) <= DHT_PIECES: + # Short enough to be stored with our peer contact info + value['t'] = {'t': ''.join(pieces)} + elif len(pieces) <= TORRENT_PIECES: + # Short enough to be stored in a separate key in the DHT + value['h'] = sha.new(''.join(pieces)).digest() + else: + # Too long, must be served up by our peer HTTP server + value['l'] = sha.new(''.join(pieces)).digest() + + storeDefer = self.dht.storeValue(key, value) + storeDefer.addCallbacks(self._store_done, self._store_error, + callbackArgs = (hash, ), errbackArgs = (hash.digest(), )) + return storeDefer + + def _store_done(self, result, hash): + """Add a key/value pair for the pieces of the file to the DHT (if necessary).""" + log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result)) + pieces = hash.pieceDigests() + if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES: + # Add the piece data key and value to the DHT + key = sha.new(''.join(pieces)).digest() + value = {'t': ''.join(pieces)} + + storeDefer = self.dht.storeValue(key, value) + storeDefer.addCallbacks(self._store_torrent_done, self._store_error, + callbackArgs = (key, ), errbackArgs = (key, )) + return storeDefer + return result + + def _store_torrent_done(self, result, key): + """Adding the pieces to the DHT is complete.""" + log.msg('Added torrent string %r to the DHT: %r' % (key, result)) + return result + + def _store_error(self, err, key): + """Adding to the DHT failed.""" + log.msg('An error occurred adding %r to the DHT: %r' % (key, err)) + return err + \ No newline at end of file diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index deeb668..9e3f6da 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -370,7 +370,7 @@ class FileDownload: # del self.peers[site] # Start the DHT lookup - lookupDefer = self.manager.dht.getValue(key) + lookupDefer = self.manager.dht.get(key) lookupDefer.addBoth(self._getDHTPieces, key) def _getDHTPieces(self, results, key): @@ -636,7 +636,7 @@ class PeerManager: @type cache_dir: L{twisted.python.filepath.FilePath} @ivar cache_dir: the directory to use for storing all files - @type dht: L{interfaces.IDHT} + @type dht: L{DHTManager.DHT} @ivar dht: the DHT instance @type stats: L{stats.StatsLogger} @ivar stats: the statistics logger to record sent data to diff --git a/apt_p2p/__init__.py b/apt_p2p/__init__.py index 8ef0dae..e0ca498 100644 --- a/apt_p2p/__init__.py +++ b/apt_p2p/__init__.py @@ -10,15 +10,15 @@ where DHT is a class that implements interfaces.IDHT. Diagram of the interaction between the given modules:: - +---------------+ +-----------------------------------+ +------------- - | AptP2P | | DHT | | - | |--->|join DHT|----|--\ - | |--->|loadConfig | | | Another - | |--->|getValue | | | Node - | |--->|storeValue DHT|<---|--/ - | |--->|leave | | - | /-----|--->|getStats | | - | | | +-----------------------------------+ | Internet + +---------------+ +-------------+ +----------------+ +------------- + | AptP2P | | DHTManager | | IDHT | | + | |--->|start |--->|join DHT|----|--\ + | | | |--->|loadConfig | | | Another + | |--->|get |--->|getValue | | | Node + | |--->|store |--->|storeValue DHT|<---|--/ + | | | |--->|leave | | + | /-----|--->|getStats |--->|getStats | | + | | | +-------------+ +----------------+ | Internet | | | +-------------+ +----------------+ | | | | | PeerManager | | HTTPDownloader*| | | | |--->|get |--->|get HTTP|----|---> Mirror diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index 01ff480..86bf013 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -1,26 +1,19 @@ """The main program code. -@var DHT_PIECES: the maximum number of pieces to store with our contact info - in the DHT -@var TORRENT_PIECES: the maximum number of pieces to store as a separate entry - in the DHT @var download_dir: the name of the directory to use for downloaded files @var peer_dir: the name of the directory to use for peer downloads """ -from binascii import b2a_hex -from urlparse import urlunparse from urllib import unquote -import os, re, sha from twisted.internet import defer, reactor, protocol -from twisted.web2 import server, http, http_headers, static +from twisted.web2 import static from twisted.python import log, failure from twisted.python.filepath import FilePath -from interfaces import IDHT, IDHTStats from apt_p2p_conf import config +from DHTManager import DHT from PeerManager import PeerManager from HTTPServer import TopLevel from MirrorManager import MirrorManager @@ -28,10 +21,6 @@ from CacheManager import CacheManager from Hash import HashObject from db import DB from stats import StatsLogger -from util import findMyIPAddr, compact - -DHT_PIECES = 4 -TORRENT_PIECES = 70 download_dir = 'cache' peer_dir = 'peers' @@ -48,8 +37,8 @@ class AptP2P(protocol.Factory): @ivar cache_dir: the directory to use for storing all files @type db: L{db.DB} @ivar db: the database to use for tracking files and hashes - @type dht: L{interfaces.IDHT} - @ivar dht: the DHT instance + @type dht: L{DHTManager.DHT} + @ivar dht: the manager for DHT requests @type stats: L{stats.StatsLogger} @ivar stats: the statistics logger to record sent data to @type http_server: L{HTTPServer.TopLevel} @@ -62,9 +51,8 @@ class AptP2P(protocol.Factory): can be queried to get hashes from file names @type cache: L{CacheManager.CacheManager} @ivar cache: the manager of all downloaded files - @type my_contact: C{string} - @ivar my_contact: the 6-byte compact peer representation of this peer's - download information (IP address and port) + @type my_addr: C{string}, C{int} + @ivar my_addr: the IP address and port of this peer """ def __init__(self, dhtClass): @@ -75,6 +63,7 @@ class AptP2P(protocol.Factory): """ log.msg('Initializing the main apt_p2p application') self.dhtClass = dhtClass + self.my_addr = None #{ Factory interface def startFactory(self): @@ -88,16 +77,20 @@ class AptP2P(protocol.Factory): if not self.cache_dir.child(peer_dir).exists(): self.cache_dir.child(peer_dir).makedirs() self.db = DB(self.cache_dir.child('apt-p2p.db')) - self.dht = self.dhtClass() - self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) - self.dht.join().addCallbacks(self.joinComplete, self.joinError) + self.dht = DHT(self.dhtClass, self.db) + df = self.dht.start() + df.addCallback(self._dhtStarted) self.stats = StatsLogger(self.db) self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) self.http_server.getHTTPFactory().startFactory() self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats) self.mirrors = MirrorManager(self.cache_dir) self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) - self.my_contact = None + + def _dhtStarted(self, result): + """Save the returned address and start scanning the cache.""" + self.my_addr = result + self.cache.scanDirectories() def stopFactory(self): log.msg('Stoppping the main apt_p2p application') @@ -108,55 +101,8 @@ class AptP2P(protocol.Factory): def buildProtocol(self, addr): return self.http_server.getHTTPFactory().buildProtocol(addr) - - #{ DHT Maintenance - def joinComplete(self, result): - """Complete the DHT join process and determine our download information. - - Called by the DHT when the join has been completed with information - on the external IP address and port of this peer. - """ - my_addr = findMyIPAddr(result, - config.getint(config.get('DEFAULT', 'DHT'), 'PORT'), - config.getboolean('DEFAULT', 'LOCAL_OK')) - if not my_addr: - raise RuntimeError, "IP address for this machine could not be found" - self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT')) - self.cache.scanDirectories() - self.nextRefresh = reactor.callLater(60, self.refreshFiles) - - def joinError(self, failure): - """Joining the DHT has failed.""" - log.msg("joining DHT failed miserably") - log.err(failure) - raise RuntimeError, "IP address for this machine could not be found" - - def refreshFiles(self, result = None, hashes = {}): - """Refresh any files in the DHT that are about to expire.""" - if result is not None: - log.msg('Storage resulted in: %r' % result) - if not hashes: - expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') - hashes = self.db.expiredHashes(expireAfter) - if len(hashes.keys()) > 0: - log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) - - delay = 60 - if hashes: - delay = 3 - raw_hash = hashes.keys()[0] - self.db.refreshHash(raw_hash) - hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces']) - del hashes[raw_hash] - storeDefer = self.store(hash) - storeDefer.addBoth(self.refreshFiles, hashes) - - if self.nextRefresh.active(): - self.nextRefresh.reset(delay) - else: - self.nextRefresh = reactor.callLater(delay, self.refreshFiles, None, hashes) - + #{ Other functions def getStats(self): """Retrieve and format the statistics for the program. @@ -164,10 +110,9 @@ class AptP2P(protocol.Factory): @return: the formatted HTML page containing the statistics """ out = '\n\n' - out += self.stats.formatHTML(self.my_contact) + out += self.stats.formatHTML(self.my_addr) out += '\n\n' - if IDHTStats.implementedBy(self.dhtClass): - out += self.dht.getStats() + out += self.dht.getStats() out += '\n\n' return out @@ -320,7 +265,7 @@ class AptP2P(protocol.Factory): """Lookup the hash in the DHT.""" log.msg('Looking up hash in DHT for file: %s' % url) key = hash.expected() - lookupDefer = self.dht.getValue(key) + lookupDefer = self.dht.get(key) lookupDefer.addBoth(self.startDownload, req, hash, url, d) def startDownload(self, values, req, hash, url, d): @@ -390,60 +335,7 @@ class AptP2P(protocol.Factory): if url: self.mirrors.updatedFile(url, file_path) - if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT): - return self.store(hash) + if self.my_addr and hash and new_hash and (hash.expected() is not None or forceDHT): + return self.dht.store(hash) return None - - def store(self, hash): - """Add a key/value pair for the file to the DHT. - - Sets the key and value from the hash information, and tries to add - it to the DHT. - """ - key = hash.digest() - value = {'c': self.my_contact} - pieces = hash.pieceDigests() - - # Determine how to store any piece data - if len(pieces) <= 1: - pass - elif len(pieces) <= DHT_PIECES: - # Short enough to be stored with our peer contact info - value['t'] = {'t': ''.join(pieces)} - elif len(pieces) <= TORRENT_PIECES: - # Short enough to be stored in a separate key in the DHT - value['h'] = sha.new(''.join(pieces)).digest() - else: - # Too long, must be served up by our peer HTTP server - value['l'] = sha.new(''.join(pieces)).digest() - - storeDefer = self.dht.storeValue(key, value) - storeDefer.addCallbacks(self.store_done, self.store_error, - callbackArgs = (hash, ), errbackArgs = (hash.digest(), )) - return storeDefer - - def store_done(self, result, hash): - """Add a key/value pair for the pieces of the file to the DHT (if necessary).""" - log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result)) - pieces = hash.pieceDigests() - if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES: - # Add the piece data key and value to the DHT - key = sha.new(''.join(pieces)).digest() - value = {'t': ''.join(pieces)} - - storeDefer = self.dht.storeValue(key, value) - storeDefer.addCallbacks(self.store_torrent_done, self.store_error, - callbackArgs = (key, ), errbackArgs = (key, )) - return storeDefer - return result - - def store_torrent_done(self, result, key): - """Adding the file to the DHT is complete, and so is the workflow.""" - log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result)) - return result - - def store_error(self, err, key): - """Adding to the DHT failed.""" - log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err)) - return err \ No newline at end of file diff --git a/apt_p2p/stats.py b/apt_p2p/stats.py index b76e389..bd422d9 100644 --- a/apt_p2p/stats.py +++ b/apt_p2p/stats.py @@ -77,7 +77,7 @@ class StatsLogger: # General out.write("\n") out.write("\n") - out.write("\n') + out.write("\n') out.write("

General

Value
Contact" + str(uncompact(contactAddress)) + '
Contact" + str(contactAddress) + '
\n") out.write('\n')