1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
6 from ktable import KTable, K
7 from knode import KNode as Node
11 from actions import FindNode, GetValue
12 from twisted.web import xmlrpc
13 from twisted.internet.defer import Deferred
14 from twisted.python import threadable
17 from bsddb3 import db ## find this at http://pybsddb.sf.net/
18 from bsddb3._db import DBNotFoundError
20 # don't ping unless it's been at least this many seconds since we've heard from a peer
21 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
25 # this is the main class!
26 class Khashmir(xmlrpc.XMLRPC):
27 __slots__ = ['listener', 'node', 'table', 'store', 'app']
28 def __init__(self, host, port):
29 self.node = Node(newID(), host, port)
30 self.table = KTable(self.node)
31 from twisted.internet.app import Application
32 from twisted.web import server
33 self.app = Application("xmlrpc")
34 self.app.listenTCP(port, server.Site(self))
36 self.store.open(None, None, db.DB_BTREE)
39 def render(self, request):
41 Override the built in render so we can have access to the request object!
42 note, crequest is probably only valid on the initial call (not after deferred!)
44 self.crequest = request
45 return xmlrpc.XMLRPC.render(self, request)
49 ####### LOCAL INTERFACE - use these methods!
50 def addContact(self, host, port):
52 ping this node and add the contact info to the table on pong!
54 n =Node(" "*20, host, port) # note, we
58 ## this call is async!
59 def findNode(self, id, callback, errback=None):
60 """ returns the contact info for node, or the k closest nodes, from the global table """
61 # get K nodes out of local table/cache, or the node we want
62 nodes = self.table.findNodes(id)
64 d.addCallbacks(callback, errback)
65 if len(nodes) == 1 and nodes[0].id == id :
68 # create our search state
69 state = FindNode(self, id, d.callback)
70 reactor.callFromThread(state.goWithNodes, nodes)
74 def valueForKey(self, key, callback):
75 """ returns the values found for key in global table """
76 nodes = self.table.findNodes(key)
77 # create our search state
78 state = GetValue(self, key, callback)
79 reactor.callFromThread(state.goWithNodes, nodes)
82 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
83 def storeValueForKey(self, key, value):
84 """ stores the value for key in the global table, returns immediately, no status
85 in this implementation, peers respond but don't indicate status to storing values
86 values are stored in peers on a first-come first-served basis
87 this will probably change so more than one value can be stored under a key
89 def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
91 if node.id != self.node.id:
92 df = node.storeValue(key, value, self.node.senderDict())
93 df.addCallbacks(response, default)
95 self.findNode(key, _storeValueForKey)
98 def insertNode(self, n):
100 insert a node in our local table, pinging oldest contact in bucket, if necessary
102 If all you have is a host/port, then use addContact, which calls this method after
103 receiving the PONG from the remote node. The reason for the seperation is we can't insert
104 a node into the table without it's peer-ID. That means of course the node passed into this
105 method needs to be a properly formed Node object with a valid ID.
107 old = self.table.insertNode(n)
108 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
109 # the bucket is full, check to see if old node is still around and if so, replace it
111 ## these are the callbacks used when we ping the oldest node in a bucket
112 def _staleNodeHandler(oldnode=old, newnode = n):
113 """ called if the pinged node never responds """
114 self.table.replaceStaleNode(old, newnode)
116 def _notStaleNodeHandler(sender, old=old):
117 """ called when we get a ping from the remote node """
118 if sender['id'] == old.id:
119 self.table.insertNode(old)
122 df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
125 def sendPing(self, node):
129 df = node.ping(self.node.senderDict())
130 ## these are the callbacks we use when we issue a PING
131 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
132 if id != 20 * ' ' and id != sender['id']:
133 # whoah, got response from different peer than we were expecting
136 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
137 n = Node(sender['id'], host, port)
140 def _defaultPong(err):
141 # this should probably increment a failed message counter and dump the node if it gets over a threshold
144 df.addCallbacks(_pongHandler,_defaultPong)
147 def findCloseNodes(self):
149 This does a findNode on the ID one away from our own.
150 This will allow us to populate our table with nodes on our network closest to our own.
151 This is called as soon as we start up with an empty table
153 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
156 self.findNode(id, callback)
158 def refreshTable(self):
165 for bucket in self.table.buckets:
166 if time.time() - bucket.lastAccessed >= 60 * 60:
167 id = randRange(bucket.min, bucket.max)
168 self.findNode(id, callback)
172 ##### INCOMING MESSAGE HANDLERS
174 def xmlrpc_ping(self, sender):
176 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
179 ip = self.crequest.getClientIP()
180 n = Node(sender['id'], ip, sender['port'])
182 return self.node.senderDict()
184 def xmlrpc_find_node(self, target, sender):
185 nodes = self.table.findNodes(target)
186 nodes = map(lambda node: node.senderDict(), nodes)
187 ip = self.crequest.getClientIP()
188 n = Node(sender['id'], ip, sender['port'])
190 return nodes, self.node.senderDict()
192 def xmlrpc_store_value(self, key, value, sender):
193 if not self.store.has_key(key):
194 self.store.put(key, value)
195 ip = self.crequest.getClientIP()
196 n = Node(sender['id'], ip, sender['port'])
198 return self.node.senderDict()
200 def xmlrpc_find_value(self, key, sender):
201 ip = self.crequest.getClientIP()
202 n = Node(sender['id'], ip, sender['port'])
204 if self.store.has_key(key):
205 return {'values' : self.store[key]}, self.node.senderDict()
207 nodes = self.table.findNodes(msg['key'])
208 nodes = map(lambda node: node.senderDict(), nodes)
209 return {'nodes' : nodes}, self.node.senderDict()
212 ### message response callbacks
213 # called when we get a response to store value
214 def _storedValueHandler(self, sender):
220 def test_build_net(quiet=0):
221 from whrandom import randrange
228 print "Building %s peer table." % peers
230 for i in xrange(peers):
231 a = Khashmir('localhost', port + i)
238 events = events + peer.dispatcher.runOnce()
242 thread.start_new_thread(l[0].app.run, ())
247 n = l[randrange(0, len(l))].node
248 peer.addContact(n.host, n.port)
249 n = l[randrange(0, len(l))].node
250 peer.addContact(n.host, n.port)
251 n = l[randrange(0, len(l))].node
252 peer.addContact(n.host, n.port)
257 peer.findCloseNodes()
263 def test_find_nodes(l, quiet=0):
264 import threading, sys
265 from whrandom import randrange
266 flag = threading.Event()
270 a = l[randrange(0,n)]
271 b = l[randrange(0,n)]
273 def callback(nodes, l=l, flag=flag):
274 if (len(nodes) >0) and (nodes[0].id == b.node.id):
275 print "test_find_nodes PASSED"
277 print "test_find_nodes FAILED"
279 a.findNode(b.node.id, callback)
282 def test_find_value(l, quiet=0):
283 from whrandom import randrange
285 import time, threading, sys
287 fa = threading.Event()
288 fb = threading.Event()
289 fc = threading.Event()
292 a = l[randrange(0,n)]
293 b = l[randrange(0,n)]
294 c = l[randrange(0,n)]
295 d = l[randrange(0,n)]
297 key = sha(`randrange(0,100000)`).digest()
298 value = sha(`randrange(0,100000)`).digest()
300 print "inserting value...",
302 a.storeValueForKey(key, value)
306 def mc(flag, value=value):
307 def callback(values, f=flag, val=value):
309 if(len(values) == 0):
312 if values[0]['value'] != val:
319 b.valueForKey(key, mc(fa))
320 c.valueForKey(key, mc(fb))
321 d.valueForKey(key, mc(fc))
327 if __name__ == "__main__":
330 print "finding nodes..."
334 print "inserting and fetching values..."