1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
5 from pickle import loads, dumps
8 from ktable import KTable, K
9 from knode import KNode as Node
11 from hash import newID
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
24 from xmlrpclib import Binary
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34 def __init__(self, host, port):
35 self.node = Node().init(newID(), host, port)
36 self.table = KTable(self.node)
37 self.app = Application("xmlrpc")
38 self.app.listenTCP(port, server.Site(self))
40 ## these databases may be more suited to on-disk rather than in-memory
41 # h((key, value)) -> (key, value, time) mappings
43 self.store.open(None, None, db.DB_BTREE)
45 # <insert time> -> h((key, value))
47 self.itime.set_flags(db.DB_DUP)
48 self.itime.open(None, None, db.DB_BTREE)
50 # key -> h((key, value))
52 self.kw.set_flags(db.DB_DUP)
53 self.kw.open(None, None, db.DB_BTREE)
55 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
57 def render(self, request):
59 Override the built in render so we can have access to the request object!
60 note, crequest is probably only valid on the initial call (not after deferred!)
62 self.crequest = request
63 return xmlrpc.XMLRPC.render(self, request)
67 ####### LOCAL INTERFACE - use these methods!
68 def addContact(self, host, port):
70 ping this node and add the contact info to the table on pong!
72 n =Node().init(" "*20, host, port) # note, we
76 ## this call is async!
77 def findNode(self, id, callback, errback=None):
78 """ returns the contact info for node, or the k closest nodes, from the global table """
79 # get K nodes out of local table/cache, or the node we want
80 nodes = self.table.findNodes(id)
82 d.addCallbacks(callback, errback)
83 if len(nodes) == 1 and nodes[0].id == id :
86 # create our search state
87 state = FindNode(self, id, d.callback)
88 reactor.callFromThread(state.goWithNodes, nodes)
92 def valueForKey(self, key, callback):
93 """ returns the values found for key in global table """
94 nodes = self.table.findNodes(key)
95 # create our search state
96 state = GetValue(self, key, callback)
97 reactor.callFromThread(state.goWithNodes, nodes)
100 l = self.retrieveValues(key)
102 reactor.callFromThread(callback, l)
105 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
106 def storeValueForKey(self, key, value, callback=None):
107 """ stores the value for key in the global table, returns immediately, no status
108 in this implementation, peers respond but don't indicate status to storing values
109 values are stored in peers on a first-come first-served basis
110 this will probably change so more than one value can be stored under a key
112 def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
114 # default callback - this will get called for each successful store value
115 def _storedValueHandler(sender):
117 response=_storedValueHandler
119 if node.id != self.node.id:
120 df = node.storeValue(key, value, self.node.senderDict())
121 df.addCallbacks(response, default)
122 # this call is asynch
123 self.findNode(key, _storeValueForKey)
126 def insertNode(self, n):
128 insert a node in our local table, pinging oldest contact in bucket, if necessary
130 If all you have is a host/port, then use addContact, which calls this method after
131 receiving the PONG from the remote node. The reason for the seperation is we can't insert
132 a node into the table without it's peer-ID. That means of course the node passed into this
133 method needs to be a properly formed Node object with a valid ID.
135 old = self.table.insertNode(n)
136 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
137 # the bucket is full, check to see if old node is still around and if so, replace it
139 ## these are the callbacks used when we ping the oldest node in a bucket
140 def _staleNodeHandler(oldnode=old, newnode = n):
141 """ called if the pinged node never responds """
142 self.table.replaceStaleNode(old, newnode)
144 def _notStaleNodeHandler(sender, old=old):
145 """ called when we get a ping from the remote node """
146 sender = Node().initWithDict(sender)
147 if sender.id == old.id:
148 self.table.insertNode(old)
150 df = old.ping(self.node.senderDict())
151 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
154 def sendPing(self, node):
158 df = node.ping(self.node.senderDict())
159 ## these are the callbacks we use when we issue a PING
160 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
161 if id != 20 * ' ' and id != sender['id'].data:
162 # whoah, got response from different peer than we were expecting
165 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
166 sender['host'] = host
167 sender['port'] = port
168 n = Node().initWithDict(sender)
171 def _defaultPong(err):
172 # this should probably increment a failed message counter and dump the node if it gets over a threshold
175 df.addCallbacks(_pongHandler,_defaultPong)
178 def findCloseNodes(self):
180 This does a findNode on the ID one away from our own.
181 This will allow us to populate our table with nodes on our network closest to our own.
182 This is called as soon as we start up with an empty table
184 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
187 self.findNode(id, callback)
189 def refreshTable(self):
196 for bucket in self.table.buckets:
197 if time.time() - bucket.lastAccessed >= 60 * 60:
198 id = randRange(bucket.min, bucket.max)
199 self.findNode(id, callback)
202 def retrieveValues(self, key):
203 if self.kw.has_key(key):
207 while(tup and tup[0] == key):
209 v = loads(self.store[h1])[1]
212 l = map(lambda v: Binary(v), l)
217 ##### INCOMING MESSAGE HANDLERS
219 def xmlrpc_ping(self, sender):
221 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
224 ip = self.crequest.getClientIP()
226 n = Node().initWithDict(sender)
228 return self.node.senderDict()
230 def xmlrpc_find_node(self, target, sender):
231 nodes = self.table.findNodes(target.data)
232 nodes = map(lambda node: node.senderDict(), nodes)
233 ip = self.crequest.getClientIP()
235 n = Node().initWithDict(sender)
237 return nodes, self.node.senderDict()
239 def xmlrpc_store_value(self, key, value, sender):
241 h1 = sha(key+value.data).digest()
243 if not self.store.has_key(h1):
244 v = dumps((key, value.data, t))
245 self.store.put(h1, v)
246 self.itime.put(t, h1)
249 # update last insert time
250 tup = loads(self.store[h1])
251 self.store[h1] = dumps((tup[0], tup[1], t))
252 self.itime.put(t, h1)
254 ip = self.crequest.getClientIP()
256 n = Node().initWithDict(sender)
258 return self.node.senderDict()
260 def xmlrpc_find_value(self, key, sender):
261 ip = self.crequest.getClientIP()
264 n = Node().initWithDict(sender)
267 l = self.retrieveValues(key)
269 return {'values' : l}, self.node.senderDict()
271 nodes = self.table.findNodes(key)
272 nodes = map(lambda node: node.senderDict(), nodes)
273 return {'nodes' : nodes}, self.node.senderDict()
281 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
282 from whrandom import randrange
288 print "Building %s peer table." % peers
290 for i in xrange(peers):
291 a = Khashmir(host, port + i)
295 thread.start_new_thread(l[0].app.run, ())
301 print "adding contacts...."
304 n = l[randrange(0, len(l))].node
305 peer.addContact(host, n.port)
306 n = l[randrange(0, len(l))].node
307 peer.addContact(host, n.port)
308 n = l[randrange(0, len(l))].node
309 peer.addContact(host, n.port)
314 print "finding close nodes...."
317 peer.findCloseNodes()
323 # peer.refreshTable()
326 def test_find_nodes(l, quiet=0):
327 import threading, sys
328 from whrandom import randrange
329 flag = threading.Event()
333 a = l[randrange(0,n)]
334 b = l[randrange(0,n)]
336 def callback(nodes, flag=flag, id = b.node.id):
337 if (len(nodes) >0) and (nodes[0].id == id):
338 print "test_find_nodes PASSED"
340 print "test_find_nodes FAILED"
342 a.findNode(b.node.id, callback)
345 def test_find_value(l, quiet=0):
346 from whrandom import randrange
348 from hash import newID
349 import time, threading, sys
351 fa = threading.Event()
352 fb = threading.Event()
353 fc = threading.Event()
356 a = l[randrange(0,n)]
357 b = l[randrange(0,n)]
358 c = l[randrange(0,n)]
359 d = l[randrange(0,n)]
364 print "inserting value..."
366 a.storeValueForKey(key, value)
372 def __init__(self, flag, value=value):
376 def callback(self, values):
378 if(len(values) == 0):
386 if self.val in values:
391 b.valueForKey(key, cb(fa).callback)
393 c.valueForKey(key, cb(fb).callback)
395 d.valueForKey(key, cb(fc).callback)
400 k = Khashmir('localhost', port)
401 thread.start_new_thread(k.app.run, ())
404 if __name__ == "__main__":
407 print "finding nodes..."
410 print "inserting and fetching values..."