1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
5 from pickle import loads, dumps
8 from ktable import KTable, K
9 from knode import KNode as Node
11 from hash import newID
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
24 from xmlrpclib import Binary
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34 def __init__(self, host, port):
35 self.node = Node().init(newID(), host, port)
36 self.table = KTable(self.node)
37 self.app = Application("xmlrpc")
38 self.app.listenTCP(port, server.Site(self))
40 ## these databases may be more suited to on-disk rather than in-memory
41 # h((key, value)) -> (key, value, time) mappings
43 self.store.open(None, None, db.DB_BTREE)
45 # <insert time> -> h((key, value))
47 self.itime.set_flags(db.DB_DUP)
48 self.itime.open(None, None, db.DB_BTREE)
50 # key -> h((key, value))
52 self.kw.set_flags(db.DB_DUP)
53 self.kw.open(None, None, db.DB_BTREE)
55 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
57 def render(self, request):
59 Override the built in render so we can have access to the request object!
60 note, crequest is probably only valid on the initial call (not after deferred!)
62 self.crequest = request
63 return xmlrpc.XMLRPC.render(self, request)
67 ####### LOCAL INTERFACE - use these methods!
68 def addContact(self, host, port):
70 ping this node and add the contact info to the table on pong!
72 n =Node().init(" "*20, host, port) # note, we
76 ## this call is async!
77 def findNode(self, id, callback, errback=None):
78 """ returns the contact info for node, or the k closest nodes, from the global table """
79 # get K nodes out of local table/cache, or the node we want
80 nodes = self.table.findNodes(id)
82 d.addCallbacks(callback, errback)
83 if len(nodes) == 1 and nodes[0].id == id :
86 # create our search state
87 state = FindNode(self, id, d.callback)
88 reactor.callFromThread(state.goWithNodes, nodes)
92 def valueForKey(self, key, callback):
93 """ returns the values found for key in global table """
94 nodes = self.table.findNodes(key)
95 # decode values, they will be base64 encoded
96 # create our search state
97 state = GetValue(self, key, callback)
98 reactor.callFromThread(state.goWithNodes, nodes)
102 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
103 def storeValueForKey(self, key, value, callback=None):
104 """ stores the value for key in the global table, returns immediately, no status
105 in this implementation, peers respond but don't indicate status to storing values
106 values are stored in peers on a first-come first-served basis
107 this will probably change so more than one value can be stored under a key
109 def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
111 # default callback - this will get called for each successful store value
112 def _storedValueHandler(sender):
114 response=_storedValueHandler
116 if node.id != self.node.id:
117 df = node.storeValue(key, value, self.node.senderDict())
118 df.addCallbacks(response, default)
119 # this call is asynch
120 self.findNode(key, _storeValueForKey)
123 def insertNode(self, n):
125 insert a node in our local table, pinging oldest contact in bucket, if necessary
127 If all you have is a host/port, then use addContact, which calls this method after
128 receiving the PONG from the remote node. The reason for the seperation is we can't insert
129 a node into the table without it's peer-ID. That means of course the node passed into this
130 method needs to be a properly formed Node object with a valid ID.
132 old = self.table.insertNode(n)
133 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
134 # the bucket is full, check to see if old node is still around and if so, replace it
136 ## these are the callbacks used when we ping the oldest node in a bucket
137 def _staleNodeHandler(oldnode=old, newnode = n):
138 """ called if the pinged node never responds """
139 self.table.replaceStaleNode(old, newnode)
141 def _notStaleNodeHandler(sender, old=old):
142 """ called when we get a ping from the remote node """
143 sender = Node().initWithDict(sender)
144 if sender.id == old.id:
145 self.table.insertNode(old)
147 df = old.ping(self.node.senderDict())
148 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
151 def sendPing(self, node):
155 df = node.ping(self.node.senderDict())
156 ## these are the callbacks we use when we issue a PING
157 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
158 if id != 20 * ' ' and id != sender['id'].data:
159 # whoah, got response from different peer than we were expecting
162 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
163 sender['host'] = host
164 sender['port'] = port
165 n = Node().initWithDict(sender)
168 def _defaultPong(err):
169 # this should probably increment a failed message counter and dump the node if it gets over a threshold
172 df.addCallbacks(_pongHandler,_defaultPong)
175 def findCloseNodes(self):
177 This does a findNode on the ID one away from our own.
178 This will allow us to populate our table with nodes on our network closest to our own.
179 This is called as soon as we start up with an empty table
181 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
184 self.findNode(id, callback)
186 def refreshTable(self):
193 for bucket in self.table.buckets:
194 if time.time() - bucket.lastAccessed >= 60 * 60:
195 id = randRange(bucket.min, bucket.max)
196 self.findNode(id, callback)
200 ##### INCOMING MESSAGE HANDLERS
202 def xmlrpc_ping(self, sender):
204 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
207 ip = self.crequest.getClientIP()
209 n = Node().initWithDict(sender)
211 return self.node.senderDict()
213 def xmlrpc_find_node(self, target, sender):
214 nodes = self.table.findNodes(target.data)
215 nodes = map(lambda node: node.senderDict(), nodes)
216 ip = self.crequest.getClientIP()
218 n = Node().initWithDict(sender)
220 return nodes, self.node.senderDict()
222 def xmlrpc_store_value(self, key, value, sender):
224 h1 = sha(key+value.data).digest()
226 if not self.store.has_key(h1):
227 v = dumps((key, value.data, t))
228 self.store.put(h1, v)
229 self.itime.put(t, h1)
232 # update last insert time
233 tup = loads(self.store[h1])
234 self.store[h1] = dumps((tup[0], tup[1], t))
235 self.itime.put(t, h1)
237 ip = self.crequest.getClientIP()
239 n = Node().initWithDict(sender)
241 return self.node.senderDict()
243 def xmlrpc_find_value(self, key, sender):
244 ip = self.crequest.getClientIP()
247 n = Node().initWithDict(sender)
250 if self.kw.has_key(key):
256 v = loads(self.store[h1])[1]
259 l = map(lambda v: Binary(v), l)
260 return {'values' : l}, self.node.senderDict()
262 nodes = self.table.findNodes(key)
263 nodes = map(lambda node: node.senderDict(), nodes)
264 return {'nodes' : nodes}, self.node.senderDict()
272 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
273 from whrandom import randrange
279 print "Building %s peer table." % peers
281 for i in xrange(peers):
282 a = Khashmir(host, port + i)
286 thread.start_new_thread(l[0].app.run, ())
292 print "adding contacts...."
295 n = l[randrange(0, len(l))].node
296 peer.addContact(host, n.port)
297 n = l[randrange(0, len(l))].node
298 peer.addContact(host, n.port)
299 n = l[randrange(0, len(l))].node
300 peer.addContact(host, n.port)
305 print "finding close nodes...."
308 peer.findCloseNodes()
314 # peer.refreshTable()
317 def test_find_nodes(l, quiet=0):
318 import threading, sys
319 from whrandom import randrange
320 flag = threading.Event()
324 a = l[randrange(0,n)]
325 b = l[randrange(0,n)]
327 def callback(nodes, flag=flag, id = b.node.id):
328 if (len(nodes) >0) and (nodes[0].id == id):
329 print "test_find_nodes PASSED"
331 print "test_find_nodes FAILED"
333 a.findNode(b.node.id, callback)
336 def test_find_value(l, quiet=0):
337 from whrandom import randrange
339 from hash import newID
340 import time, threading, sys
342 fa = threading.Event()
343 fb = threading.Event()
344 fc = threading.Event()
347 a = l[randrange(0,n)]
348 b = l[randrange(0,n)]
349 c = l[randrange(0,n)]
350 d = l[randrange(0,n)]
355 print "inserting value..."
357 a.storeValueForKey(key, value)
363 def __init__(self, flag, value=value):
367 def callback(self, values):
369 if(len(values) == 0):
377 if self.val in values:
382 b.valueForKey(key, cb(fa).callback)
384 c.valueForKey(key, cb(fb).callback)
386 d.valueForKey(key, cb(fc).callback)
391 k = Khashmir('localhost', port)
392 thread.start_new_thread(k.app.run, ())
395 if __name__ == "__main__":
398 print "finding nodes..."
401 print "inserting and fetching values..."