X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=khashmir.py;h=0196fd228a27342f2abf501a71d0eb74f1348c95;hb=c599adb68f5e7afefb25375b063e5e7fe02f949c;hp=d624c9751663f87277ad05ccdff49b88035d4f73;hpb=57f0d64780ae8d29c20c5adb3068d430d0baa4d4;p=quix0rs-apt-p2p.git diff --git a/khashmir.py b/khashmir.py index d624c97..0196fd2 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,488 +1,349 @@ -## Copyright 2002 Andrew Loewenstern, All Rights Reserved - -from listener import Listener -from ktable import KTable, K -from node import Node -from dispatcher import Dispatcher -from hash import newID, intify -import messages -import transactions - -import time - -from bsddb3 import db ## find this at http://pybsddb.sf.net/ -from bsddb3._db import DBNotFoundError - -# don't ping unless it's been at least this many seconds since we've heard from a peer -MAX_PING_INTERVAL = 60 * 15 # fifteen minutes - -# concurrent FIND_NODE/VALUE requests! -N = 3 - - -# this is the main class! -class Khashmir: - __slots__ = ['listener', 'node', 'table', 'dispatcher', 'tf', 'store'] - def __init__(self, host, port): - self.listener = Listener(host, port) - self.node = Node(newID(), host, port) - self.table = KTable(self.node) - self.dispatcher = Dispatcher(self.listener, messages.BASE, self.node.id) - self.tf = transactions.TransactionFactory(self.node.id, self.dispatcher) - - self.store = db.DB() - self.store.open(None, None, db.DB_BTREE) - - #### register unsolicited incoming message handlers - self.dispatcher.registerHandler('ping', self._pingHandler, messages.PING) - - self.dispatcher.registerHandler('find node', self._findNodeHandler, messages.FIND_NODE) +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from time import time +from random import randrange +import sqlite ## find this at http://pysqlite.sourceforge.net/ + +from twisted.internet.defer import Deferred +from twisted.internet import protocol +from twisted.internet import reactor + +import const +from ktable import KTable +from knode import KNodeBase, KNodeRead, KNodeWrite +from khash import newID, newIDInRange +from actions import FindNode, GetValue, KeyExpirer, StoreValue +import krpc + +class KhashmirDBExcept(Exception): + pass + +# this is the base class, has base functionality and find node, no key-value mappings +class KhashmirBase(protocol.Factory): + _Node = KNodeBase + def __init__(self, host, port, db='khashmir.db'): + self.setup(host, port, db) + + def setup(self, host, port, db='khashmir.db'): + self._findDB(db) + self.port = port + self.node = self._loadSelfNode(host, port) + self.table = KTable(self.node) + #self.app = service.Application("krpc") + self.udp = krpc.hostbroker(self) + self.udp.protocol = krpc.KRPC + self.listenport = reactor.listenUDP(port, self.udp) + self.last = time() + self._loadRoutingTable() + KeyExpirer(store=self.store) + self.refreshTable(force=1) + reactor.callLater(60, self.checkpoint, (1,)) + + def Node(self): + n = self._Node() + n.table = self.table + return n + + def __del__(self): + self.listenport.stopListening() + + def _loadSelfNode(self, host, port): + c = self.store.cursor() + c.execute('select id from self where num = 0;') + if c.rowcount > 0: + id = c.fetchone()[0] + else: + id = newID() + return self._Node().init(id, host, port) + + def _saveSelfNode(self): + c = self.store.cursor() + c.execute('delete from self where num = 0;') + c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) + self.store.commit() + + def checkpoint(self, auto=0): + self._saveSelfNode() + self._dumpRoutingTable() + self.refreshTable() + if auto: + reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,)) + + def _findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self._createNewDB(db) + else: + self._loadDB(db) + + def _loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + #self.store.autocommit = 0 + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc() + + def _createNewDB(self, db): + self.store = sqlite.connect(db=db) + s = """ + create table kv (key binary, value binary, time timestamp, primary key (key, value)); + create index kv_key on kv(key); + create index kv_timestamp on kv(time); + + create table nodes (id binary primary key, host text, port number); + + create table self (num number primary key, id binary); + """ + c = self.store.cursor() + c.execute(s) + self.store.commit() + + def _dumpRoutingTable(self): + """ + save routing table nodes to the database + """ + c = self.store.cursor() + c.execute("delete from nodes where id not NULL;") + for bucket in self.table.buckets: + for node in bucket.l: + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) + self.store.commit() + + def _loadRoutingTable(self): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + c = self.store.cursor() + c.execute("select * from nodes;") + for rec in c.fetchall(): + n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.table.insertNode(n, contacted=0) + - self.dispatcher.registerHandler('get value', self._findValueHandler, messages.GET_VALUE) - - self.dispatcher.registerHandler('store value', self._storeValueHandler, messages.STORE_VALUE) - - ####### ####### LOCAL INTERFACE - use these methods! - def addContact(self, host, port): - """ - ping this node and add the contact info to the table on pong! - """ - n =Node(" "*20, host, port) # note, we - self.sendPing(n) - + def addContact(self, host, port, callback=None): + """ + ping this node and add the contact info to the table on pong! + """ + n =self.Node().init(const.NULL_ID, host, port) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.sendPing(n, callback=callback) ## this call is async! - def findNode(self, id, callback): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want - nodes = self.table.findNodes(id) - if len(nodes) == 1 and nodes[0].id == id : - # we got it in our table! - def tcall(t, callback=callback): - callback(t.extras) - self.dispatcher.postEvent(tcall, 0, extras=nodes) - else: - # create our search state - state = FindNode(self, self.dispatcher, id, callback) - # handle this in our own thread - self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes) + def findNode(self, id, callback, errback=None): + """ returns the contact info for node, or the k closest nodes, from the global table """ + # get K nodes out of local table/cache, or the node we want + nodes = self.table.findNodes(id) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + if len(nodes) == 1 and nodes[0].id == id : + d.callback(nodes) + else: + # create our search state + state = FindNode(self, id, d.callback) + reactor.callLater(0, state.goWithNodes, nodes) + def insertNode(self, n, contacted=1): + """ + insert a node in our local table, pinging oldest contact in bucket, if necessary + + If all you have is a host/port, then use addContact, which calls this method after + receiving the PONG from the remote node. The reason for the seperation is we can't insert + a node into the table without it's peer-ID. That means of course the node passed into this + method needs to be a properly formed Node object with a valid ID. + """ + old = self.table.insertNode(n, contacted=contacted) + if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + # the bucket is full, check to see if old node is still around and if so, replace it + + ## these are the callbacks used when we ping the oldest node in a bucket + def _staleNodeHandler(oldnode=old, newnode = n): + """ called if the pinged node never responds """ + self.table.replaceStaleNode(old, newnode) + + def _notStaleNodeHandler(dict, old=old): + """ called when we get a pong from the old node """ + dict = dict['rsp'] + if dict['id'] == old.id: + self.table.justSeenNode(old.id) + + df = old.ping(self.node.id) + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + + def sendPing(self, node, callback=None): + """ + ping a node + """ + df = node.ping(self.node.id) + ## these are the callbacks we use when we issue a PING + def _pongHandler(dict, node=node, table=self.table, callback=callback): + _krpc_sender = dict['_krpc_sender'] + dict = dict['rsp'] + sender = {'id' : dict['id']} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + table.insertNode(n) + if callback: + callback() + def _defaultPong(err, node=node, table=self.table, callback=callback): + table.nodeFailed(node) + if callback: + callback() + + df.addCallbacks(_pongHandler,_defaultPong) + + def findCloseNodes(self, callback=lambda a: None): + """ + This does a findNode on the ID one away from our own. + This will allow us to populate our table with nodes on our network closest to our own. + This is called as soon as we start up with an empty table + """ + id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) + self.findNode(id, callback) + + def refreshTable(self, force=0): + """ + force=1 will refresh table regardless of last bucket access time + """ + def callback(nodes): + pass + for bucket in self.table.buckets: + if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS): + id = newIDInRange(bucket.min, bucket.max) + self.findNode(id, callback) + + def stats(self): + """ + Returns (num_contacts, num_nodes) + num_contacts: number contacts in our routing table + num_nodes: number of nodes estimated in the entire dht + """ + num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) + num_nodes = const.K * (2**(len(self.table.buckets) - 1)) + return (num_contacts, num_nodes) + + def krpc_ping(self, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + + def krpc_find_node(self, target, id, _krpc_sender): + nodes = self.table.findNodes(target) + nodes = map(lambda node: node.senderDict(), nodes) + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"nodes" : nodes, "id" : self.node.id} + + +## This class provides read-only access to the DHT, valueForKey +## you probably want to use this mixin and provide your own write methods +class KhashmirRead(KhashmirBase): + _Node = KNodeRead + def retrieveValues(self, key): + c = self.store.cursor() + c.execute("select value from kv where key = %s;", sqlite.encode(key)) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l ## also async - def valueForKey(self, key, callback): - """ returns the values found for key in global table """ - nodes = self.table.findNodes(key) - # create our search state - state = GetValue(self, self.dispatcher, key, callback) - # handle this in our own thread - self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes) - - - ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now - def storeValueForKey(self, key, value): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - values are stored in peers on a first-come first-served basis - this will probably change so more than one value can be stored under a key - """ - def _storeValueForKey(nodes, tf=self.tf, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"): - for node in nodes: - if node.id != self.node.id: - t = tf.StoreValue(node, key, value, response, default) - t.dispatch() - # this call is asynch - self.findNode(key, _storeValueForKey) - - - def insertNode(self, n): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary - - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. - """ - old = self.table.insertNode(n) - if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id: - # the bucket is full, check to see if old node is still around and if so, replace it - t = self.tf.Ping(old, self._notStaleNodeHandler, self._staleNodeHandler) - t.newnode = n - t.dispatch() - - - def sendPing(self, node): - """ - ping a node - """ - t = self.tf.Ping(node, self._pongHandler, self._defaultPong) - t.dispatch() - - - def findCloseNodes(self): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table - """ - id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - def callback(nodes): - pass - self.findNode(id, callback) - - def refreshTable(self): - """ - - """ - def callback(nodes): - pass - - for bucket in self.table.buckets: - if time.time() - bucket.lastAccessed >= 60 * 60: - id = randRange(bucket.min, bucket.max) - self.findNode(id, callback) - - - ##### - ##### UNSOLICITED INCOMING MESSAGE HANDLERS - - def _pingHandler(self, t, msg): - #print "Got PING from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port) - self.insertNode(t.target) - # respond, no callbacks, we don't care if they get it or not - nt = self.tf.Pong(t) - nt.dispatch() - - def _findNodeHandler(self, t, msg): - #print "Got FIND_NODES from %s:%s at %s:%s" % (t.target.host, t.target.port, self.node.host, self.node.port) - nodes = self.table.findNodes(msg['target']) - # respond, no callbacks, we don't care if they get it or not - nt = self.tf.GotNodes(t, nodes) - nt.dispatch() - - def _storeValueHandler(self, t, msg): - if not self.store.has_key(msg['key']): - self.store.put(msg['key'], msg['value']) - nt = self.tf.StoredValue(t) - nt.dispatch() - - def _findValueHandler(self, t, msg): - if self.store.has_key(msg['key']): - t = self.tf.GotValues(t, [(msg['key'], self.store[msg['key']])]) - else: - nodes = self.table.findNodes(msg['key']) - t = self.tf.GotNodes(t, nodes) - t.dispatch() - - - ### - ### message response callbacks - # called when we get a response to store value - def _storedValueHandler(self, t, msg): - self.table.insertNode(t.target) - - - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(self, t): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(t.target, t.newnode) - - def _notStaleNodeHandler(self, t, msg): - """ called when we get a ping from the remote node """ - self.table.insertNode(t.target) - - - ## these are the callbacks we use when we issue a PING - def _pongHandler(self, t, msg): - #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) - n = Node(msg['id'], t.addr[0], t.addr[1]) - self.table.insertNode(n) - - def _defaultPong(self, t): - # this should probably increment a failed message counter and dump the node if it gets over a threshold - print "Never got PONG from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port) - - - -class ActionBase: - """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, dispatcher, target, callback): - self.table = table - self.dispatcher = dispatcher - self.target = target - self.int = intify(target) - self.found = {} - self.queried = {} - self.answered = {} - self.callback = callback - self.outstanding = 0 - self.finished = 0 - - def sort(a, b, int=self.int): - """ this function is for sorting nodes relative to the ID we are looking for """ - x, y = int ^ a.int, int ^ b.int - if x > y: - return 1 - elif x < y: - return -1 - return 0 - self.sort = sort - - def goWithNodes(self, t): - pass - -class FindNode(ActionBase): - """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, t, msg): - if self.finished or self.answered.has_key(t.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[t.id] = 1 - for node in msg['nodes']: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) - self.found[n.id] = n - self.table.insertNode(n) - self.schedule() - - def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:K]: - if node.id == self.target: - self.finished=1 - return self.callback([node]) - if not self.queried.has_key(node.id) and node.id != self.table.node.id: - t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - t.timeout = time.time() + 15 - t.dispatch() - if self.outstanding >= N: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done!! - self.finished=1 - self.callback(l[:K]) - - def defaultGotNodes(self, t): - if self.finished: - return - self.outstanding = self.outstanding - 1 - self.schedule() - - - def goWithNodes(self, t): - """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher - """ - nodes = t.extras - for node in nodes: - if node.id == self.table.node.id: - continue - self.found[node.id] = node - t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes) - t.timeout = time.time() + 15 - t.dispatch() - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - self.callback(nodes) - - - -class GetValue(FindNode): - """ get value task """ - def handleGotNodes(self, t, msg): - if self.finished or self.answered.has_key(t.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[t.id] = 1 - # go through nodes - # if we have any closer than what we already got, query them - if msg['type'] == 'got nodes': - for node in msg['nodes']: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) - self.found[n.id] = n - self.table.insertNode(n) - elif msg['type'] == 'got values': - ## done - self.finished = 1 - return self.callback(msg['values']) - self.schedule() - - ## get value - def schedule(self): - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:K]: - if not self.queried.has_key(node.id) and node.id != self.table.node.id: - t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - t.timeout = time.time() + 15 - t.dispatch() - if self.outstanding >= N: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done, didn't find it!! - self.finished=1 - self.callback([]) - - ## get value - def goWithNodes(self, t): - nodes = t.extras - for node in nodes: - if node.id == self.table.node.id: - continue - self.found[node.id] = node - t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes) - t.timeout = time.time() + 15 - t.dispatch() - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - self.callback([]) - - -#------ -def test_build_net(quiet=0): - from whrandom import randrange - import thread - port = 2001 - l = [] - peers = 100 - - if not quiet: - print "Building %s peer table." % peers - - for i in xrange(peers): - a = Khashmir('localhost', port + i) - l.append(a) - - def run(l=l): - while(1): - events = 0 - for peer in l: - events = events + peer.dispatcher.runOnce() - if events == 0: - time.sleep(.25) - - for i in range(10): - thread.start_new_thread(run, (l[i*10:(i+1)*10],)) - #thread.start_new_thread(l[i].dispatcher.run, ()) - - for peer in l[1:]: - n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(n.host, n.port) - - time.sleep(5) - - for peer in l: - peer.findCloseNodes() - time.sleep(5) - for peer in l: - peer.refreshTable() - return l + def valueForKey(self, key, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) -def test_find_nodes(l, quiet=0): - import threading, sys - from whrandom import randrange - flag = threading.Event() - - n = len(l) - - a = l[randrange(0,n)] - b = l[randrange(0,n)] - - def callback(nodes, l=l, flag=flag): - if (len(nodes) >0) and (nodes[0].id == b.node.id): - print "test_find_nodes PASSED" - else: - print "test_find_nodes FAILED" - flag.set() - a.findNode(b.node.id, callback) - flag.wait() - -def test_find_value(l, quiet=0): - from whrandom import randrange - from sha import sha - import time, threading, sys - - fa = threading.Event() - fb = threading.Event() - fc = threading.Event() - - n = len(l) - a = l[randrange(0,n)] - b = l[randrange(0,n)] - c = l[randrange(0,n)] - d = l[randrange(0,n)] - - key = sha(`randrange(0,100000)`).digest() - value = sha(`randrange(0,100000)`).digest() - if not quiet: - print "inserting value...", - sys.stdout.flush() - a.storeValueForKey(key, value) - time.sleep(3) - print "finding..." - - def mc(flag, value=value): - def callback(values, f=flag, val=value): - try: - if(len(values) == 0): - print "find FAILED" - else: - if values[0]['value'] != val: - print "find FAILED" - else: - print "find FOUND" - finally: - f.set() - return callback - b.valueForKey(key, mc(fa)) - c.valueForKey(key, mc(fb)) - d.valueForKey(key, mc(fc)) - - fa.wait() - fb.wait() - fc.wait() + # get locals + if searchlocal: + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callLater(0, callback, (l)) + else: + l = [] + + # create our search state + state = GetValue(self, key, callback) + reactor.callLater(0, state.goWithNodes, nodes, l) + + def krpc_find_value(self, key, id, _krpc_sender): + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) -if __name__ == "__main__": - l = test_build_net() - time.sleep(3) - print "finding nodes..." - test_find_nodes(l) - test_find_nodes(l) - test_find_nodes(l) - print "inserting and fetching values..." - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) - test_find_value(l) - for i in l: - i.dispatcher.stop() + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l, "id": self.node.id} + else: + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes, "id": self.node.id} + +### provides a generic write method, you probably don't want to deploy something that allows +### arbitrary value storage +class KhashmirWrite(KhashmirRead): + _Node = KNodeWrite + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = StoreValue(self.table, key, value, response) + reactor.callLater(0, action.goWithNodes, nodes) + + # this call is asynch + self.findNode(key, _storeValueForKey) + + def krpc_store_value(self, key, value, id, _krpc_sender): + t = "%0.6f" % time() + c = self.store.cursor() + try: + c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) + except sqlite.IntegrityError, reason: + # update last insert time + c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) + self.store.commit() + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + +# the whole shebang, for testing +class Khashmir(KhashmirWrite): + _Node = KNodeWrite