X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=khashmir.py;h=0196fd228a27342f2abf501a71d0eb74f1348c95;hb=cc191e1f1a6b78e15cbf13def1d933cc817a13e3;hp=e9e53a76ebd60a2f574f38c743e5dd95ced378f0;hpb=9048402f56c24474c79920ab849748223ed339cf;p=quix0rs-apt-p2p.git diff --git a/khashmir.py b/khashmir.py index e9e53a7..0196fd2 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,37 +1,27 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -from const import reactor -import const - -import time - -from sha import sha +from time import time +from random import randrange +import sqlite ## find this at http://pysqlite.sourceforge.net/ -from ktable import KTable, K -from knode import KNode as Node +from twisted.internet.defer import Deferred +from twisted.internet import protocol +from twisted.internet import reactor +import const +from ktable import KTable +from knode import KNodeBase, KNodeRead, KNodeWrite from khash import newID, newIDInRange - from actions import FindNode, GetValue, KeyExpirer, StoreValue import krpc -from twisted.internet.defer import Deferred -from twisted.internet import protocol -from twisted.python import threadable -from twisted.application import service, internet -from twisted.web import server -threadable.init() -import sys - -import sqlite ## find this at http://pysqlite.sourceforge.net/ - class KhashmirDBExcept(Exception): pass -# this is the main class! -class Khashmir(protocol.Factory): - __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol') +# this is the base class, has base functionality and find node, no key-value mappings +class KhashmirBase(protocol.Factory): + _Node = KNodeBase def __init__(self, host, port, db='khashmir.db'): self.setup(host, port, db) @@ -40,16 +30,21 @@ class Khashmir(protocol.Factory): self.port = port self.node = self._loadSelfNode(host, port) self.table = KTable(self.node) - self.app = service.Application("krpc") + #self.app = service.Application("krpc") self.udp = krpc.hostbroker(self) self.udp.protocol = krpc.KRPC self.listenport = reactor.listenUDP(port, self.udp) - self.last = time.time() + self.last = time() self._loadRoutingTable() KeyExpirer(store=self.store) - #self.refreshTable(force=1) + self.refreshTable(force=1) reactor.callLater(60, self.checkpoint, (1,)) - + + def Node(self): + n = self._Node() + n.table = self.table + return n + def __del__(self): self.listenport.stopListening() @@ -60,21 +55,20 @@ class Khashmir(protocol.Factory): id = c.fetchone()[0] else: id = newID() - return Node().init(id, host, port) + return self._Node().init(id, host, port) def _saveSelfNode(self): - self.store.autocommit = 0 c = self.store.cursor() c.execute('delete from self where num = 0;') c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) self.store.commit() - self.store.autocommit = 1 def checkpoint(self, auto=0): self._saveSelfNode() self._dumpRoutingTable() + self.refreshTable() if auto: - reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,)) def _findDB(self, db): import os @@ -88,14 +82,13 @@ class Khashmir(protocol.Factory): def _loadDB(self, db): try: self.store = sqlite.connect(db=db) - self.store.autocommit = 1 + #self.store.autocommit = 0 except: import traceback - raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc() def _createNewDB(self, db): self.store = sqlite.connect(db=db) - self.store.autocommit = 1 s = """ create table kv (key binary, value binary, time timestamp, primary key (key, value)); create index kv_key on kv(key); @@ -107,20 +100,18 @@ class Khashmir(protocol.Factory): """ c = self.store.cursor() c.execute(s) + self.store.commit() def _dumpRoutingTable(self): """ save routing table nodes to the database """ - self.store.autocommit = 0; c = self.store.cursor() c.execute("delete from nodes where id not NULL;") for bucket in self.table.buckets: for node in bucket.l: - d = node.senderDict() - c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port'])) + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) self.store.commit() - self.store.autocommit = 1; def _loadRoutingTable(self): """ @@ -130,7 +121,7 @@ class Khashmir(protocol.Factory): c = self.store.cursor() c.execute("select * from nodes;") for rec in c.fetchall(): - n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.table.insertNode(n, contacted=0) @@ -141,7 +132,7 @@ class Khashmir(protocol.Factory): """ ping this node and add the contact info to the table on pong! """ - n =Node().init(const.NULL_ID, host, port) + n =self.Node().init(const.NULL_ID, host, port) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.sendPing(n, callback=callback) @@ -160,47 +151,7 @@ class Khashmir(protocol.Factory): else: # create our search state state = FindNode(self, id, d.callback) - reactor.callFromThread(state.goWithNodes, nodes) - - - ## also async - def valueForKey(self, key, callback, searchlocal = 1): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg - """ - nodes = self.table.findNodes(key) - - # get locals - if searchlocal: - l = self.retrieveValues(key) - if len(l) > 0: - reactor.callLater(0, callback, (l)) - else: - l = [] - - # create our search state - state = GetValue(self, key, callback) - reactor.callFromThread(state.goWithNodes, nodes, l) - - ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) - def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values - """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): - if not response: - # default callback - def _storedValueHandler(sender): - pass - response=_storedValueHandler - action = StoreValue(self.table, key, value, response) - reactor.callFromThread(action.goWithNodes, nodes) - - # this call is asynch - self.findNode(key, _storeValueForKey) - + reactor.callLater(0, state.goWithNodes, nodes) def insertNode(self, n, contacted=1): """ @@ -212,7 +163,7 @@ class Khashmir(protocol.Factory): method needs to be a properly formed Node object with a valid ID. """ old = self.table.insertNode(n, contacted=contacted) - if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: # the bucket is full, check to see if old node is still around and if so, replace it ## these are the callbacks used when we ping the oldest node in a bucket @@ -222,36 +173,30 @@ class Khashmir(protocol.Factory): def _notStaleNodeHandler(dict, old=old): """ called when we get a pong from the old node """ - _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = dict['sender'] - if sender['id'] == old.id: + if dict['id'] == old.id: self.table.justSeenNode(old.id) - df = old.ping(self.node.senderDict()) + df = old.ping(self.node.id) df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) def sendPing(self, node, callback=None): """ ping a node """ - df = node.ping(self.node.senderDict()) + df = node.ping(self.node.id) ## these are the callbacks we use when we issue a PING def _pongHandler(dict, node=node, table=self.table, callback=callback): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = dict['sender'] - if node.id != const.NULL_ID and node.id != sender['id']: - # whoah, got response from different peer than we were expecting - self.table.invalidateNode(node) - else: - sender['host'] = node.host - sender['port'] = node.port - n = Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - table.insertNode(n) - if callback: - callback() + sender = {'id' : dict['id']} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + table.insertNode(n) + if callback: + callback() def _defaultPong(err, node=node, table=self.table, callback=callback): table.nodeFailed(node) if callback: @@ -276,11 +221,45 @@ class Khashmir(protocol.Factory): pass for bucket in self.table.buckets: - if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS): + if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS): id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) + def stats(self): + """ + Returns (num_contacts, num_nodes) + num_contacts: number contacts in our routing table + num_nodes: number of nodes estimated in the entire dht + """ + num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) + num_nodes = const.K * (2**(len(self.table.buckets) - 1)) + return (num_contacts, num_nodes) + def krpc_ping(self, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + + def krpc_find_node(self, target, id, _krpc_sender): + nodes = self.table.findNodes(target) + nodes = map(lambda node: node.senderDict(), nodes) + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"nodes" : nodes, "id" : self.node.id} + + +## This class provides read-only access to the DHT, valueForKey +## you probably want to use this mixin and provide your own write methods +class KhashmirRead(KhashmirBase): + _Node = KNodeRead def retrieveValues(self, key): c = self.store.cursor() c.execute("select value from kv where key = %s;", sqlite.encode(key)) @@ -290,59 +269,81 @@ class Khashmir(protocol.Factory): l.append(t['value']) t = c.fetchone() return l - - ##### - ##### INCOMING MESSAGE HANDLERS - - def krpc_ping(self, sender, _krpc_sender): - """ - takes sender dict = {'id', , 'port', port} optional keys = 'ip' - returns sender dict + ## also async + def valueForKey(self, key, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg """ - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"sender" : self.node.senderDict()} + nodes = self.table.findNodes(key) - def krpc_find_node(self, target, sender, _krpc_sender): - nodes = self.table.findNodes(target) - nodes = map(lambda node: node.senderDict(), nodes) + # get locals + if searchlocal: + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callLater(0, callback, (l)) + else: + l = [] + + # create our search state + state = GetValue(self, key, callback) + reactor.callLater(0, state.goWithNodes, nodes, l) + + def krpc_find_value(self, key, id, _krpc_sender): + sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) + n = self.Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) - return {"nodes" : nodes, "sender" : self.node.senderDict()} + + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l, "id": self.node.id} + else: + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes, "id": self.node.id} + +### provides a generic write method, you probably don't want to deploy something that allows +### arbitrary value storage +class KhashmirWrite(KhashmirRead): + _Node = KNodeWrite + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = StoreValue(self.table, key, value, response) + reactor.callLater(0, action.goWithNodes, nodes) - def krpc_store_value(self, key, value, sender, _krpc_sender): - t = "%0.6f" % time.time() + # this call is asynch + self.findNode(key, _storeValueForKey) + + def krpc_store_value(self, key, value, id, _krpc_sender): + t = "%0.6f" % time() c = self.store.cursor() try: c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) except sqlite.IntegrityError, reason: # update last insert time c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) + self.store.commit() + sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"sender" : self.node.senderDict()} - - def krpc_find_value(self, key, sender, _krpc_sender): - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) + n = self.Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) - - l = self.retrieveValues(key) - if len(l) > 0: - return {'values' : l, "sender": self.node.senderDict()} - else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes, "sender": self.node.senderDict()} + return {"id" : self.node.id} +# the whole shebang, for testing +class Khashmir(KhashmirWrite): + _Node = KNodeWrite