]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - khashmir.py
finding values is now fixed
[quix0rs-apt-p2p.git] / khashmir.py
index 363d0a043bb6551153223287d7c87e1f98c8003a..aa1902f2bbe96ec1dde250a4a5e7a029e353bc8a 100644 (file)
@@ -1,14 +1,20 @@
 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
 
-from listener import Listener
+from const import reactor
+import time
+
 from ktable import KTable, K
-from node import Node
-from dispatcher import Dispatcher
-from hash import newID, intify
-import messages
-import transactions
+from knode import KNode as Node
 
-import time
+from hash import newID
+
+from actions import FindNode, GetValue
+from twisted.web import xmlrpc
+from twisted.internet.defer import Deferred
+from twisted.python import threadable
+from twisted.internet.app import Application
+from twisted.web import server
+threadable.init()
 
 from bsddb3 import db ## find this at http://pybsddb.sf.net/
 from bsddb3._db import DBNotFoundError
@@ -16,32 +22,28 @@ from bsddb3._db import DBNotFoundError
 # don't ping unless it's been at least this many seconds since we've heard from a peer
 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
 
-# concurrent FIND_NODE/VALUE requests!
-N = 3
 
 
 # this is the main class!
-class Khashmir:
-    __slots__ = ['listener', 'node', 'table', 'dispatcher', 'tf', 'store']
+class Khashmir(xmlrpc.XMLRPC):
+    __slots__ = ['listener', 'node', 'table', 'store', 'app']
     def __init__(self, host, port):
-       self.listener = Listener(host, port)
        self.node = Node(newID(), host, port)
        self.table = KTable(self.node)
-       self.dispatcher = Dispatcher(self.listener, messages.BASE, self.node.id)
-       self.tf = transactions.TransactionFactory(self.node.id, self.dispatcher)
-       
+       self.app = Application("xmlrpc")
+       self.app.listenTCP(port, server.Site(self))
        self.store = db.DB()
        self.store.open(None, None, db.DB_BTREE)
+       
 
-       #### register unsolicited incoming message handlers
-       self.dispatcher.registerHandler('ping', self._pingHandler, messages.PING)
-               
-       self.dispatcher.registerHandler('find node', self._findNodeHandler, messages.FIND_NODE)
+    def render(self, request):
+       """
+           Override the built in render so we can have access to the request object!
+           note, crequest is probably only valid on the initial call (not after deferred!)
+       """
+       self.crequest = request
+       return xmlrpc.XMLRPC.render(self, request)
 
-       self.dispatcher.registerHandler('get value', self._findValueHandler, messages.GET_VALUE)
-       
-       self.dispatcher.registerHandler('store value', self._storeValueHandler, messages.STORE_VALUE)
-       
        
     #######
     #######  LOCAL INTERFACE    - use these methods!
@@ -54,20 +56,18 @@ class Khashmir:
 
 
     ## this call is async!
-    def findNode(self, id, callback):
+    def findNode(self, id, callback, errback=None):
        """ returns the contact info for node, or the k closest nodes, from the global table """
        # get K nodes out of local table/cache, or the node we want
        nodes = self.table.findNodes(id)
+       d = Deferred()
+       d.addCallbacks(callback, errback)
        if len(nodes) == 1 and nodes[0].id == id :
-           # we got it in our table!
-           def tcall(t, callback=callback):
-               callback(t.extras)
-           self.dispatcher.postEvent(tcall, 0, extras=nodes)
+           d.callback(nodes)
        else:
            # create our search state
-           state = FindNode(self, self.dispatcher, id, callback)
-           # handle this in our own thread
-           self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
+           state = FindNode(self, id, d.callback)
+           reactor.callFromThread(state.goWithNodes, nodes)
     
     
     ## also async
@@ -75,9 +75,8 @@ class Khashmir:
        """ returns the values found for key in global table """
        nodes = self.table.findNodes(key)
        # create our search state
-       state = GetValue(self, self.dispatcher, key, callback)
-       # handle this in our own thread
-       self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
+       state = GetValue(self, key, callback)
+       reactor.callFromThread(state.goWithNodes, nodes)
 
 
     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
@@ -87,11 +86,11 @@ class Khashmir:
            values are stored in peers on a first-come first-served basis
            this will probably change so more than one value can be stored under a key
        """
-       def _storeValueForKey(nodes, tf=self.tf, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
+       def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
            for node in nodes:
                if node.id != self.node.id:
-                   t = tf.StoreValue(node, key, value, response, default)
-                   t.dispatch()
+                   df = node.storeValue(key, value, self.node.senderDict())
+                   df.addCallbacks(response, default)
        # this call is asynch
        self.findNode(key, _storeValueForKey)
        
@@ -100,7 +99,7 @@ class Khashmir:
        """
        insert a node in our local table, pinging oldest contact in bucket, if necessary
        
-       If all you have is a host/port, then use addContact, which calls this function after
+       If all you have is a host/port, then use addContact, which calls this method after
        receiving the PONG from the remote node.  The reason for the seperation is we can't insert
        a node into the table without it's peer-ID.  That means of course the node passed into this
        method needs to be a properly formed Node object with a valid ID.
@@ -108,17 +107,41 @@ class Khashmir:
        old = self.table.insertNode(n)
        if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
            # the bucket is full, check to see if old node is still around and if so, replace it
-           t = self.tf.Ping(old, self._notStaleNodeHandler, self._staleNodeHandler)
-           t.newnode = n
-           t.dispatch()
+           
+           ## these are the callbacks used when we ping the oldest node in a bucket
+           def _staleNodeHandler(oldnode=old, newnode = n):
+               """ called if the pinged node never responds """
+               self.table.replaceStaleNode(old, newnode)
+       
+           def _notStaleNodeHandler(sender, old=old):
+               """ called when we get a ping from the remote node """
+               if sender['id'] == old.id:
+                   self.table.insertNode(old)
+
+           df = old.ping()
+           df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
 
 
     def sendPing(self, node):
        """
            ping a node
        """
-       t = self.tf.Ping(node, self._pongHandler, self._defaultPong)
-       t.dispatch()
+       df = node.ping(self.node.senderDict())
+       ## these are the callbacks we use when we issue a PING
+       def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
+           if id != 20 * ' ' and id != sender['id']:
+               # whoah, got response from different peer than we were expecting
+               pass
+           else:
+               #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
+               n = Node(sender['id'], host, port)
+               table.insertNode(n)
+           return
+       def _defaultPong(err):
+           # this should probably increment a failed message counter and dump the node if it gets over a threshold
+           return      
+
+       df.addCallbacks(_pongHandler,_defaultPong)
 
 
     def findCloseNodes(self):
@@ -146,229 +169,64 @@ class Khashmir:
        
  
     #####
-    ##### UNSOLICITED INCOMING MESSAGE HANDLERS
-    
-    def _pingHandler(self, t, msg):
-       #print "Got PING from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
-       self.insertNode(t.target)
-       # respond, no callbacks, we don't care if they get it or not
-       nt = self.tf.Pong(t)
-       nt.dispatch()
-       
-    def _findNodeHandler(self, t, msg):
-       #print "Got FIND_NODES from %s:%s at %s:%s" % (t.target.host, t.target.port, self.node.host, self.node.port)
-       nodes = self.table.findNodes(msg['target'])
-       # respond, no callbacks, we don't care if they get it or not
-       nt = self.tf.GotNodes(t, nodes)
-       nt.dispatch()
+    ##### INCOMING MESSAGE HANDLERS
     
-    def _storeValueHandler(self, t, msg):
-       if not self.store.has_key(msg['key']):
-           self.store.put(msg['key'], msg['value'])
-       nt = self.tf.StoredValue(t)
-       nt.dispatch()
+    def xmlrpc_ping(self, sender):
+       """
+           takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
+           returns sender dict
+       """
+       ip = self.crequest.getClientIP()
+       n = Node(sender['id'], ip, sender['port'])
+       self.insertNode(n)
+       return self.node.senderDict()
+               
+    def xmlrpc_find_node(self, target, sender):
+       nodes = self.table.findNodes(target)
+       nodes = map(lambda node: node.senderDict(), nodes)
+       ip = self.crequest.getClientIP()
+       n = Node(sender['id'], ip, sender['port'])
+       self.insertNode(n)
+       return nodes, self.node.senderDict()
     
-    def _findValueHandler(self, t, msg):
-       if self.store.has_key(msg['key']):
-           t = self.tf.GotValues(t, [(msg['key'], self.store[msg['key']])])
+    def xmlrpc_store_value(self, key, value, sender):
+       if not self.store.has_key(key):
+           self.store.put(key, value)
+       ip = self.crequest.getClientIP()
+       n = Node(sender['id'], ip, sender['port'])
+       self.insertNode(n)
+       return self.node.senderDict()
+       
+    def xmlrpc_find_value(self, key, sender):
+       ip = self.crequest.getClientIP()
+       n = Node(sender['id'], ip, sender['port'])
+       self.insertNode(n)
+       if self.store.has_key(key):
+           return {'values' : self.store[key]}, self.node.senderDict()
        else:
-           nodes = self.table.findNodes(msg['key'])
-           t = self.tf.GotNodes(t, nodes)
-       t.dispatch()
-
+           nodes = self.table.findNodes(key)
+           nodes = map(lambda node: node.senderDict(), nodes)
+           return {'nodes' : nodes}, self.node.senderDict()
 
     ###
     ### message response callbacks
     # called when we get a response to store value
-    def _storedValueHandler(self, t, msg):
-       self.table.insertNode(t.target)
+    def _storedValueHandler(self, sender):
+       pass
 
 
-    ## these are the callbacks used when we ping the oldest node in a bucket
-    def _staleNodeHandler(self, t):
-       """ called if the pinged node never responds """
-       self.table.replaceStaleNode(t.target, t.newnode)
 
-    def _notStaleNodeHandler(self, t, msg):
-       """ called when we get a ping from the remote node """
-       self.table.insertNode(t.target)
-       
-    
-    ## these are the callbacks we use when we issue a PING
-    def _pongHandler(self, t, msg):
-       #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
-       n = Node(msg['id'], t.addr[0], t.addr[1])
-       self.table.insertNode(n)
-
-    def _defaultPong(self, t):
-       # this should probably increment a failed message counter and dump the node if it gets over a threshold
-       print "Never got PONG from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
-       
-    
 
-class ActionBase:
-    """ base class for some long running asynchronous proccesses like finding nodes or values """
-    def __init__(self, table, dispatcher, target, callback):
-       self.table = table
-       self.dispatcher = dispatcher
-       self.target = target
-       self.int = intify(target)
-       self.found = {}
-       self.queried = {}
-       self.answered = {}
-       self.callback = callback
-       self.outstanding = 0
-       self.finished = 0
-       
-       def sort(a, b, int=self.int):
-           """ this function is for sorting nodes relative to the ID we are looking for """
-           x, y = int ^ a.int, int ^ b.int
-           if x > y:
-               return 1
-           elif x < y:
-               return -1
-           return 0
-       self.sort = sort
-    
-    def goWithNodes(self, t):
-       pass
-       
-class FindNode(ActionBase):
-    """ find node action merits it's own class as it is a long running stateful process """
-    def handleGotNodes(self, t, msg):
-       if self.finished or self.answered.has_key(t.id):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[t.id] = 1
-       for node in msg['nodes']:
-           if not self.found.has_key(node['id']):
-               n = Node(node['id'], node['host'], node['port'])
-               self.found[n.id] = n
-               self.table.insertNode(n)
-       self.schedule()
-               
-    def schedule(self):
-       """
-           send messages to new peers, if necessary
-       """
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
-
-       for node in l[:K]:
-           if node.id == self.target:
-               self.finished=1
-               return self.callback([node])
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-               t.timeout = time.time() + 15
-               t.dispatch()
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done!!
-           self.finished=1
-           self.callback(l[:K])
-       
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
-       
-    def goWithNodes(self, t):
-       """
-           this starts the process, our argument is a transaction with t.extras being our list of nodes
-           it's a transaction since we got called from the dispatcher
-       """
-       nodes = t.extras
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-           t.timeout = time.time() + 15
-           t.dispatch()
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback(nodes)
-
-
-
-class GetValue(FindNode):
-    """ get value task """
-    def handleGotNodes(self, t, msg):
-       if self.finished or self.answered.has_key(t.id):
-           # a day late and a dollar short
-           return
-       self.outstanding = self.outstanding - 1
-       self.answered[t.id] = 1
-       # go through nodes
-       # if we have any closer than what we already got, query them
-       if msg['type'] == 'got nodes':
-           for node in msg['nodes']:
-               if not self.found.has_key(node['id']):
-                   n = Node(node['id'], node['host'], node['port'])
-                   self.found[n.id] = n
-                   self.table.insertNode(n)
-       elif msg['type'] == 'got values':
-           ## done
-           self.finished = 1
-           return self.callback(msg['values'])
-       self.schedule()
-               
-    ## get value
-    def schedule(self):
-       if self.finished:
-           return
-       l = self.found.values()
-       l.sort(self.sort)
-
-       for node in l[:K]:
-           if not self.queried.has_key(node.id) and node.id != self.table.node.id:
-               t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-               self.outstanding = self.outstanding + 1
-               self.queried[node.id] = 1
-               t.timeout = time.time() + 15
-               t.dispatch()
-           if self.outstanding >= N:
-               break
-       assert(self.outstanding) >=0
-       if self.outstanding == 0:
-           ## all done, didn't find it!!
-           self.finished=1
-           self.callback([])
-    
-    ## get value
-    def goWithNodes(self, t):
-       nodes = t.extras
-       for node in nodes:
-           if node.id == self.table.node.id:
-               continue
-           self.found[node.id] = node
-           t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
-           t.timeout = time.time() + 15
-           t.dispatch()
-           self.outstanding = self.outstanding + 1
-           self.queried[node.id] = 1
-       if self.outstanding == 0:
-           self.callback([])
-
-
-#------
+
+
+#------ testing
+
 def test_build_net(quiet=0):
     from whrandom import randrange
     import thread
     port = 2001
     l = []
-    peers = 100
+    peers = 16
     
     if not quiet:
        print "Building %s peer table." % peers
@@ -385,10 +243,10 @@ def test_build_net(quiet=0):
                if events == 0:
                        time.sleep(.25)
 
-    for i in range(10):
-       thread.start_new_thread(run, (l[i*10:(i+1)*10],))
-       #thread.start_new_thread(l[i].dispatcher.run, ())
-    
+    thread.start_new_thread(l[0].app.run, ())
+    for peer in l[1:]:
+       peer.app.run()
+       
     for peer in l[1:]:
        n = l[randrange(0, len(l))].node
        peer.addContact(n.host, n.port)
@@ -455,7 +313,7 @@ def test_find_value(l, quiet=0):
                if(len(values) == 0):
                    print "find                FAILED"
                else:
-                   if values[0]['value'] != val:
+                   if values != val:
                        print "find                FAILED"
                    else:
                        print "find                FOUND"
@@ -463,11 +321,10 @@ def test_find_value(l, quiet=0):
                f.set()
        return callback
     b.valueForKey(key, mc(fa))
-    c.valueForKey(key, mc(fb))
-    d.valueForKey(key, mc(fc))
-    
     fa.wait()
+    c.valueForKey(key, mc(fb))
     fb.wait()
+    d.valueForKey(key, mc(fc))    
     fc.wait()
     
 if __name__ == "__main__":
@@ -484,5 +341,3 @@ if __name__ == "__main__":
     test_find_value(l)
     test_find_value(l)
     test_find_value(l)
-    for i in l:
-       i.dispatcher.stop()