]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
Added the application control files.
[quix0rs-apt-p2p.git] / khashmir.py
index d624c9751663f87277ad05ccdff49b88035d4f73..0196fd228a27342f2abf501a71d0eb74f1348c95 100644 (file)
-## Copyright 2002 Andrew Loewenstern, All Rights Reserved
-
-from listener import Listener
-from ktable import KTable, K
-from node import Node
-from dispatcher import Dispatcher
-from hash import newID, intify
-import messages
-import transactions
-
-import time
-
-from bsddb3 import db ## find this at http://pybsddb.sf.net/
-from bsddb3._db import DBNotFoundError
-
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
-
-# concurrent FIND_NODE/VALUE requests!
-N = 3
-
-
-# this is the main class!
-class Khashmir:
-    __slots__ = ['listener', 'node', 'table', 'dispatcher', 'tf', 'store']
-    def __init__(self, host, port):
-       self.listener = Listener(host, port)
-       self.node = Node(newID(), host, port)
-       self.table = KTable(self.node)
-       self.dispatcher = Dispatcher(self.listener, messages.BASE, self.node.id)
-       self.tf = transactions.TransactionFactory(self.node.id, self.dispatcher)
-       
-       self.store = db.DB()
-       self.store.open(None, None, db.DB_BTREE)
-
-       #### register unsolicited incoming message handlers
-       self.dispatcher.registerHandler('ping', self._pingHandler, messages.PING)
-               
-       self.dispatcher.registerHandler('find node', self._findNodeHandler, messages.FIND_NODE)
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
+from time import time
+from random import randrange
+import sqlite  ## find this at http://pysqlite.sourceforge.net/
+
+from twisted.internet.defer import Deferred
+from twisted.internet import protocol
+from twisted.internet import reactor
+
+import const
+from ktable import KTable
+from knode import KNodeBase, KNodeRead, KNodeWrite
+from khash import newID, newIDInRange
+from actions import FindNode, GetValue, KeyExpirer, StoreValue
+import krpc
+
+class KhashmirDBExcept(Exception):
+    pass
+
+# this is the base class, has base functionality and find node, no key-value mappings
+class KhashmirBase(protocol.Factory):
+    _Node = KNodeBase
+    def __init__(self, host, port, db='khashmir.db'):
+        self.setup(host, port, db)
+        
+    def setup(self, host, port, db='khashmir.db'):
+        self._findDB(db)
+        self.port = port
+        self.node = self._loadSelfNode(host, port)
+        self.table = KTable(self.node)
+        #self.app = service.Application("krpc")
+        self.udp = krpc.hostbroker(self)
+        self.udp.protocol = krpc.KRPC
+        self.listenport = reactor.listenUDP(port, self.udp)
+        self.last = time()
+        self._loadRoutingTable()
+        KeyExpirer(store=self.store)
+        self.refreshTable(force=1)
+        reactor.callLater(60, self.checkpoint, (1,))
+
+    def Node(self):
+        n = self._Node()
+        n.table = self.table
+        return n
+    
+    def __del__(self):
+        self.listenport.stopListening()
+        
+    def _loadSelfNode(self, host, port):
+        c = self.store.cursor()
+        c.execute('select id from self where num = 0;')
+        if c.rowcount > 0:
+            id = c.fetchone()[0]
+        else:
+            id = newID()
+        return self._Node().init(id, host, port)
+        
+    def _saveSelfNode(self):
+        c = self.store.cursor()
+        c.execute('delete from self where num = 0;')
+        c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
+        self.store.commit()
+        
+    def checkpoint(self, auto=0):
+        self._saveSelfNode()
+        self._dumpRoutingTable()
+        self.refreshTable()
+        if auto:
+            reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
+        
+    def _findDB(self, db):
+        import os
+        try:
+            os.stat(db)
+        except OSError:
+            self._createNewDB(db)
+        else:
+            self._loadDB(db)
+        
+    def _loadDB(self, db):
+        try:
+            self.store = sqlite.connect(db=db)
+            #self.store.autocommit = 0
+        except:
+            import traceback
+            raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
+        
+    def _createNewDB(self, db):
+        self.store = sqlite.connect(db=db)
+        s = """
+            create table kv (key binary, value binary, time timestamp, primary key (key, value));
+            create index kv_key on kv(key);
+            create index kv_timestamp on kv(time);
+            
+            create table nodes (id binary primary key, host text, port number);
+            
+            create table self (num number primary key, id binary);
+            """
+        c = self.store.cursor()
+        c.execute(s)
+        self.store.commit()
+
+    def _dumpRoutingTable(self):
+        """
+            save routing table nodes to the database
+        """
+        c = self.store.cursor()
+        c.execute("delete from nodes where id not NULL;")
+        for bucket in self.table.buckets:
+            for node in bucket.l:
+                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
+        self.store.commit()
+        
+    def _loadRoutingTable(self):
+        """
+            load routing table nodes from database
+            it's usually a good idea to call refreshTable(force=1) after loading the table
+        """
+        c = self.store.cursor()
+        c.execute("select * from nodes;")
+        for rec in c.fetchall():
+            n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+            n.conn = self.udp.connectionForAddr((n.host, n.port))
+            self.table.insertNode(n, contacted=0)
+            
 
-       self.dispatcher.registerHandler('get value', self._findValueHandler, messages.GET_VALUE)
-       
-       self.dispatcher.registerHandler('store value', self._storeValueHandler, messages.STORE_VALUE)
-       
-       
     #######
     #######  LOCAL INTERFACE    - use these methods!
-    def addContact(self, host, port):
-       """
-        ping this node and add the contact info to the table on pong!
-       """
-       n =Node(" "*20, host, port)  # note, we 
-       self.sendPing(n)
-
+    def addContact(self, host, port, callback=None):
+        """
+            ping this node and add the contact info to the table on pong!
+        """
+        n =self.Node().init(const.NULL_ID, host, port) 
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        self.sendPing(n, callback=callback)
 
     ## this call is async!
-    def findNode(self, id, callback):
-       """ returns the contact info for node, or the k closest nodes, from the global table """
-       # get K nodes out of local table/cache, or the node we want
-       nodes = self.table.findNodes(id)
-       if len(nodes) == 1 and nodes[0].id == id :
-           # we got it in our table!
-           def tcall(t, callback=callback):
-               callback(t.extras)
-           self.dispatcher.postEvent(tcall, 0, extras=nodes)
-       else:
-           # create our search state
-           state = FindNode(self, self.dispatcher, id, callback)
-           # handle this in our own thread
-           self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
+    def findNode(self, id, callback, errback=None):
+        """ returns the contact info for node, or the k closest nodes, from the global table """
+        # get K nodes out of local table/cache, or the node we want
+        nodes = self.table.findNodes(id)
+        d = Deferred()
+        if errback:
+            d.addCallbacks(callback, errback)
+        else:
+            d.addCallback(callback)
+        if len(nodes) == 1 and nodes[0].id == id :
+            d.callback(nodes)
+        else:
+            # create our search state
+            state = FindNode(self, id, d.callback)
+            reactor.callLater(0, state.goWithNodes, nodes)
     
+    def insertNode(self, n, contacted=1):
+        """
+        insert a node in our local table, pinging oldest contact in bucket, if necessary
+        
+        If all you have is a host/port, then use addContact, which calls this method after
+        receiving the PONG from the remote node.  The reason for the seperation is we can't insert
+        a node into the table without it's peer-ID.  That means of course the node passed into this
+        method needs to be a properly formed Node object with a valid ID.
+        """
+        old = self.table.insertNode(n, contacted=contacted)
+        if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
+            # the bucket is full, check to see if old node is still around and if so, replace it
+            
+            ## these are the callbacks used when we ping the oldest node in a bucket
+            def _staleNodeHandler(oldnode=old, newnode = n):
+                """ called if the pinged node never responds """
+                self.table.replaceStaleNode(old, newnode)
+            
+            def _notStaleNodeHandler(dict, old=old):
+                """ called when we get a pong from the old node """
+                dict = dict['rsp']
+                if dict['id'] == old.id:
+                    self.table.justSeenNode(old.id)
+            
+            df = old.ping(self.node.id)
+            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+
+    def sendPing(self, node, callback=None):
+        """
+            ping a node
+        """
+        df = node.ping(self.node.id)
+        ## these are the callbacks we use when we issue a PING
+        def _pongHandler(dict, node=node, table=self.table, callback=callback):
+            _krpc_sender = dict['_krpc_sender']
+            dict = dict['rsp']
+            sender = {'id' : dict['id']}
+            sender['host'] = _krpc_sender[0]
+            sender['port'] = _krpc_sender[1]
+            n = self.Node().initWithDict(sender)
+            n.conn = self.udp.connectionForAddr((n.host, n.port))
+            table.insertNode(n)
+            if callback:
+                callback()
+        def _defaultPong(err, node=node, table=self.table, callback=callback):
+            table.nodeFailed(node)
+            if callback:
+                callback()
+        
+        df.addCallbacks(_pongHandler,_defaultPong)
+
+    def findCloseNodes(self, callback=lambda a: None):
+        """
+            This does a findNode on the ID one away from our own.  
+            This will allow us to populate our table with nodes on our network closest to our own.
+            This is called as soon as we start up with an empty table
+        """
+        id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
+        self.findNode(id, callback)
+
+    def refreshTable(self, force=0):
+        """
+            force=1 will refresh table regardless of last bucket access time
+        """
+        def callback(nodes):
+            pass
     
+        for bucket in self.table.buckets:
+            if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
+                id = newIDInRange(bucket.min, bucket.max)
+                self.findNode(id, callback)
+
+    def stats(self):
+        """
+        Returns (num_contacts, num_nodes)
+        num_contacts: number contacts in our routing table
+        num_nodes: number of nodes estimated in the entire dht
+        """
+        num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
+        num_nodes = const.K * (2**(len(self.table.buckets) - 1))
+        return (num_contacts, num_nodes)
+
+    def krpc_ping(self, id, _krpc_sender):
+        sender = {'id' : id}
+        sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"id" : self.node.id}
+        
+    def krpc_find_node(self, target, id, _krpc_sender):
+        nodes = self.table.findNodes(target)
+        nodes = map(lambda node: node.senderDict(), nodes)
+        sender = {'id' : id}
+        sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"nodes" : nodes, "id" : self.node.id}
+
+
+## This class provides read-only access to the DHT, valueForKey
+## you probably want to use this mixin and provide your own write methods
+class KhashmirRead(KhashmirBase):
+    _Node = KNodeRead
+    def retrieveValues(self, key):
+        c = self.store.cursor()
+        c.execute("select value from kv where key = %s;", sqlite.encode(key))
+        t = c.fetchone()
+        l = []
+        while t:
+            l.append(t['value'])
+            t = c.fetchone()
+        return l
     ## also async
-    def valueForKey(self, key, callback):
-       """ returns the values found for key in global table """
-       nodes = self.table.findNodes(key)
-       # create our search state
-       state = GetValue(self, self.dispatcher, key, callback)
-       # handle this in our own thread
-       self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
-
-
-    ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
-    def storeValueForKey(self, key, value):
-       """ stores the value for key in the global table, returns immediately, no status 
-           in this implementation, peers respond but don't indicate status to storing values
-           values are stored in peers on a first-come first-served basis
-           this will probably change so more than one value can be stored under a key
-       """
-       def _storeValueForKey(nodes, tf=self.tf, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
-           for node in nodes:
-               if node.id != self.node.id:
-                   t = tf.StoreValue(node, key, value, response, default)
-                   t.dispatch()
-       # this call is asynch
-       self.findNode(key, _storeValueForKey)
-       
-       
-    def insertNode(self, n):
-       """
-       insert a node in our local table, pinging oldest contact in bucket, if necessary
-       
-       If all you have is a host/port, then use addContact, which calls this method after
-       receiving the PONG from the remote node.  The reason for the seperation is we can't insert
-       a node into the table without it's peer-ID.  That means of course the node passed into this
-       method needs to be a properly formed Node object with a valid ID.
-       """
-       old = self.table.insertNode(n)
-       if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
-           # the bucket is full, check to see if old node is still around and if so, replace it
-           t = self.tf.Ping(old, self._notStaleNodeHandler, self._staleNodeHandler)
-           t.newnode = n
-           t.dispatch()
-
-
-    def sendPing(self, node):
-       """
-           ping a node
-       """
-       t = self.tf.Ping(node, self._pongHandler, self._defaultPong)
-       t.dispatch()
-
-
-    def findCloseNodes(self):
-       """
-           This does a findNode on the ID one away from our own.  
-           This will allow us to populate our table with nodes on our network closest to our own.
-           This is called as soon as we start up with an empty table
-       """
-       id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
-       def callback(nodes):
-           pass
-       self.findNode(id, callback)
-
-    def refreshTable(self):
-       """
-           
-       """
-       def callback(nodes):
-           pass
-
-       for bucket in self.table.buckets:
-           if time.time() - bucket.lastAccessed >= 60 * 60:
-               id = randRange(bucket.min, bucket.max)
-               self.findNode(id, callback)
-       
-    #####
-    ##### UNSOLICITED INCOMING MESSAGE HANDLERS
-    
-    def _pingHandler(self, t, msg):
-       #print "Got PING from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
-       self.insertNode(t.target)
-       # respond, no callbacks, we don't care if they get it or not
-       nt = self.tf.Pong(t)
-       nt.dispatch()
-       
-    def _findNodeHandler(self, t, msg):
-       #print "Got FIND_NODES from %s:%s at %s:%s" % (t.target.host, t.target.port, self.node.host, self.node.port)
-       nodes = self.table.findNodes(msg['target'])
-       # respond, no callbacks, we don't care if they get it or not
-       nt = self.tf.GotNodes(t, nodes)
-       nt.dispatch()
-    
-    def _storeValueHandler(self, t, msg):
-       if not self.store.has_key(msg['key']):
-           self.store.put(msg['key'], msg['value'])
-       nt = self.tf.StoredValue(t)
-       nt.dispatch()
-    
-    def _findValueHandler(self, t, msg):
-       if self.store.has_key(msg['key']):
-           t = self.tf.GotValues(t, [(msg['key'], self.store[msg['key']])])
-       else:
-           nodes = self.table.findNodes(msg['key'])
-           t = self.tf.GotNodes(t, nodes)
-       t.dispatch()
-
-
-    ###
-    ### message response callbacks
-    # called when we get a response to store value
-    def _storedValueHandler(self, t, msg):
-       self.table.insertNode(t.target)
-
-
-    ## these are the callbacks used when we ping the oldest node in a bucket
-    def _staleNodeHandler(self, t):
-       """ called if the pinged node never responds """
-       self.table.replaceStaleNode(t.target, t.newnode)
-
-    def _notStaleNodeHandler(self, t, msg):
-       """ called when we get a ping from the remote node """
-       self.table.insertNode(t.target)
-       
-    
-    ## these are the callbacks we use when we issue a PING
-    def _pongHandler(self, t, msg):
-       #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
-       n = Node(msg['id'], t.addr[0], t.addr[1])
-       self.table.insertNode(n)
-
-    def _defaultPong(self, t):
-       # this should probably increment a failed message counter and dump the node if it gets over a threshold
-       print "Never got PONG from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
-       
-    
-
-class ActionBase:
-    """ base class for some long running asynchronous proccesses like finding nodes or values """
-    def __init__(self, table, dispatcher, target, callback):
-       self.table = table
-       self.dispatcher = dispatcher
-       self.target = target
-       self.int = intify(target)
-       self.found = {}
-       self.queried = {}
-       self.answered = {}
-       self.callback = callback
-       self.outstanding = 0
-       self.finished = 0
-       
-       def sort(a, b, int=self.int):
-           """ this function is for sorting nodes relative to the ID we are looking for """
-           x, y = int ^ a.int, int ^ b.int
-           if x > y:
-               return 1
-           elif x < y:
-               return -1
-           return 0
-       self.sort = sort
-    
-    def goWithNodes(self, t):
-       pass
-       
-class FindNode(ActionBase):
-    """ find node action merits it's own class as it is a long running stateful process """
-    def handleGotNodes(self, t, msg):
-       if self.finished or self.answered.has_key(t.id):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[t.id] = 1
-       for node in msg['nodes']:
-           if not self.found.has_key(node['id']):
-               n = Node(node['id'], node['host'], node['port'])
-               self.found[n.id] = n
-               self.table.insertNode(n)
-       self.schedule()
-               
-    def schedule(self):
-       """
-           send messages to new peers, if necessary
-       """
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
-
-       for node in l[:K]:
-           if node.id == self.target:
-               self.finished=1
-               return self.callback([node])
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-               t.timeout = time.time() + 15
-               t.dispatch()
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done!!
-           self.finished=1
-           self.callback(l[:K])
-       
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
-       
-    def goWithNodes(self, t):
-       """
-           this starts the process, our argument is a transaction with t.extras being our list of nodes
-           it's a transaction since we got called from the dispatcher
-       """
-       nodes = t.extras
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-           t.timeout = time.time() + 15
-           t.dispatch()
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback(nodes)
-
-
-
-class GetValue(FindNode):
-    """ get value task """
-    def handleGotNodes(self, t, msg):
-       if self.finished or self.answered.has_key(t.id):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[t.id] = 1
-       # go through nodes
-       # if we have any closer than what we already got, query them
-       if msg['type'] == 'got nodes':
-           for node in msg['nodes']:
-               if not self.found.has_key(node['id']):
-                   n = Node(node['id'], node['host'], node['port'])
-                   self.found[n.id] = n
-                   self.table.insertNode(n)
-       elif msg['type'] == 'got values':
-           ## done
-           self.finished = 1
-           return self.callback(msg['values'])
-       self.schedule()
-               
-    ## get value
-    def schedule(self):
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
-
-       for node in l[:K]:
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-               t.timeout = time.time() + 15
-               t.dispatch()
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done, didn't find it!!
-           self.finished=1
-           self.callback([])
-    
-    ## get value
-    def goWithNodes(self, t):
-       nodes = t.extras
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-           t.timeout = time.time() + 15
-           t.dispatch()
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback([])
-
-
-#------
-def test_build_net(quiet=0):
-    from whrandom import randrange
-    import thread
-    port = 2001
-    l = []
-    peers = 100
-    
-    if not quiet:
-       print "Building %s peer table." % peers
-       
-    for i in xrange(peers):
-       a = Khashmir('localhost', port + i)
-       l.append(a)
-    
-    def run(l=l):
-       while(1):
-               events = 0
-               for peer in l:
-                       events = events + peer.dispatcher.runOnce()
-               if events == 0:
-                       time.sleep(.25)
-
-    for i in range(10):
-       thread.start_new_thread(run, (l[i*10:(i+1)*10],))
-       #thread.start_new_thread(l[i].dispatcher.run, ())
-    
-    for peer in l[1:]:
-       n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
-       n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
-       n = l[randrange(0, len(l))].node
-       peer.addContact(n.host, n.port)
-       
-    time.sleep(5)
-
-    for peer in l:
-       peer.findCloseNodes()
-    time.sleep(5)
-    for peer in l:
-       peer.refreshTable()
-    return l
+    def valueForKey(self, key, callback, searchlocal = 1):
+        """ returns the values found for key in global table
+            callback will be called with a list of values for each peer that returns unique values
+            final callback will be an empty list - probably should change to 'more coming' arg
+        """
+        nodes = self.table.findNodes(key)
         
-def test_find_nodes(l, quiet=0):
-    import threading, sys
-    from whrandom import randrange
-    flag = threading.Event()
-    
-    n = len(l)
-    
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    
-    def callback(nodes, l=l, flag=flag):
-       if (len(nodes) >0) and (nodes[0].id == b.node.id):
-           print "test_find_nodes      PASSED"
-       else:
-           print "test_find_nodes      FAILED"
-       flag.set()
-    a.findNode(b.node.id, callback)
-    flag.wait()
-    
-def test_find_value(l, quiet=0):
-    from whrandom import randrange
-    from sha import sha
-    import time, threading, sys
-    
-    fa = threading.Event()
-    fb = threading.Event()
-    fc = threading.Event()
-    
-    n = len(l)
-    a = l[randrange(0,n)]
-    b = l[randrange(0,n)]
-    c = l[randrange(0,n)]
-    d = l[randrange(0,n)]
-
-    key = sha(`randrange(0,100000)`).digest()
-    value = sha(`randrange(0,100000)`).digest()
-    if not quiet:
-       print "inserting value...",
-       sys.stdout.flush()
-    a.storeValueForKey(key, value)
-    time.sleep(3)
-    print "finding..."
-    
-    def mc(flag, value=value):
-       def callback(values, f=flag, val=value):
-           try:
-               if(len(values) == 0):
-                   print "find                FAILED"
-               else:
-                   if values[0]['value'] != val:
-                       print "find                FAILED"
-                   else:
-                       print "find                FOUND"
-           finally:
-               f.set()
-       return callback
-    b.valueForKey(key, mc(fa))
-    c.valueForKey(key, mc(fb))
-    d.valueForKey(key, mc(fc))
-    
-    fa.wait()
-    fb.wait()
-    fc.wait()
+        # get locals
+        if searchlocal:
+            l = self.retrieveValues(key)
+            if len(l) > 0:
+                reactor.callLater(0, callback, (l))
+        else:
+            l = []
+        
+        # create our search state
+        state = GetValue(self, key, callback)
+        reactor.callLater(0, state.goWithNodes, nodes, l)
+
+    def krpc_find_value(self, key, id, _krpc_sender):
+        sender = {'id' : id}
+        sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
     
-if __name__ == "__main__":
-    l = test_build_net()
-    time.sleep(3)
-    print "finding nodes..."
-    test_find_nodes(l)
-    test_find_nodes(l)
-    test_find_nodes(l)
-    print "inserting and fetching values..."
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    test_find_value(l)
-    for i in l:
-       i.dispatcher.stop()
+        l = self.retrieveValues(key)
+        if len(l) > 0:
+            return {'values' : l, "id": self.node.id}
+        else:
+            nodes = self.table.findNodes(key)
+            nodes = map(lambda node: node.senderDict(), nodes)
+            return {'nodes' : nodes, "id": self.node.id}
+
+###  provides a generic write method, you probably don't want to deploy something that allows
+###  arbitrary value storage
+class KhashmirWrite(KhashmirRead):
+    _Node = KNodeWrite
+    ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+    def storeValueForKey(self, key, value, callback=None):
+        """ stores the value for key in the global table, returns immediately, no status 
+            in this implementation, peers respond but don't indicate status to storing values
+            a key can have many values
+        """
+        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+            if not response:
+                # default callback
+                def _storedValueHandler(sender):
+                    pass
+                response=_storedValueHandler
+            action = StoreValue(self.table, key, value, response)
+            reactor.callLater(0, action.goWithNodes, nodes)
+            
+        # this call is asynch
+        self.findNode(key, _storeValueForKey)
+                    
+    def krpc_store_value(self, key, value, id, _krpc_sender):
+        t = "%0.6f" % time()
+        c = self.store.cursor()
+        try:
+            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+        except sqlite.IntegrityError, reason:
+            # update last insert time
+            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+        self.store.commit()
+        sender = {'id' : id}
+        sender['host'] = _krpc_sender[0]
+        sender['port'] = _krpc_sender[1]        
+        n = self.Node().initWithDict(sender)
+        n.conn = self.udp.connectionForAddr((n.host, n.port))
+        self.insertNode(n, contacted=0)
+        return {"id" : self.node.id}
+
+# the whole shebang, for testing
+class Khashmir(KhashmirWrite):
+    _Node = KNodeWrite