1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
5 from pickle import loads, dumps
8 from ktable import KTable, K
9 from knode import KNode as Node
11 from hash import newID
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
24 from xmlrpclib import Binary
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34 def __init__(self, host, port):
35 self.node = Node().init(newID(), host, port)
36 self.table = KTable(self.node)
37 self.app = Application("xmlrpc")
38 self.app.listenTCP(port, server.Site(self))
40 ## these databases may be more suited to on-disk rather than in-memory
41 # h((key, value)) -> (key, value, time) mappings
43 self.store.open(None, None, db.DB_BTREE)
45 # <insert time> -> h((key, value))
47 self.itime.set_flags(db.DB_DUP)
48 self.itime.open(None, None, db.DB_BTREE)
50 # key -> h((key, value))
52 self.kw.set_flags(db.DB_DUP)
53 self.kw.open(None, None, db.DB_BTREE)
55 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
57 def render(self, request):
59 Override the built in render so we can have access to the request object!
60 note, crequest is probably only valid on the initial call (not after deferred!)
62 self.crequest = request
63 return xmlrpc.XMLRPC.render(self, request)
67 ####### LOCAL INTERFACE - use these methods!
68 def addContact(self, host, port):
70 ping this node and add the contact info to the table on pong!
72 n =Node().init(" "*20, host, port) # note, we
76 ## this call is async!
77 def findNode(self, id, callback, errback=None):
78 """ returns the contact info for node, or the k closest nodes, from the global table """
79 # get K nodes out of local table/cache, or the node we want
80 nodes = self.table.findNodes(id)
82 d.addCallbacks(callback, errback)
83 if len(nodes) == 1 and nodes[0].id == id :
86 # create our search state
87 state = FindNode(self, id, d.callback)
88 reactor.callFromThread(state.goWithNodes, nodes)
92 def valueForKey(self, key, callback):
93 """ returns the values found for key in global table """
94 nodes = self.table.findNodes(key)
97 l = self.retrieveValues(key)
99 reactor.callFromThread(callback, l)
101 # create our search state
102 state = GetValue(self, key, callback)
103 reactor.callFromThread(state.goWithNodes, nodes, {'found' : l})
107 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
108 def storeValueForKey(self, key, value, callback=None):
109 """ stores the value for key in the global table, returns immediately, no status
110 in this implementation, peers respond but don't indicate status to storing values
111 values are stored in peers on a first-come first-served basis
112 this will probably change so more than one value can be stored under a key
114 def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
116 # default callback - this will get called for each successful store value
117 def _storedValueHandler(sender):
119 response=_storedValueHandler
121 if node.id != self.node.id:
122 df = node.storeValue(key, value, self.node.senderDict())
123 df.addCallbacks(response, default)
124 # this call is asynch
125 self.findNode(key, _storeValueForKey)
128 def insertNode(self, n):
130 insert a node in our local table, pinging oldest contact in bucket, if necessary
132 If all you have is a host/port, then use addContact, which calls this method after
133 receiving the PONG from the remote node. The reason for the seperation is we can't insert
134 a node into the table without it's peer-ID. That means of course the node passed into this
135 method needs to be a properly formed Node object with a valid ID.
137 old = self.table.insertNode(n)
138 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
139 # the bucket is full, check to see if old node is still around and if so, replace it
141 ## these are the callbacks used when we ping the oldest node in a bucket
142 def _staleNodeHandler(oldnode=old, newnode = n):
143 """ called if the pinged node never responds """
144 self.table.replaceStaleNode(old, newnode)
146 def _notStaleNodeHandler(sender, old=old):
147 """ called when we get a ping from the remote node """
148 sender = Node().initWithDict(sender)
149 if sender.id == old.id:
150 self.table.insertNode(old)
152 df = old.ping(self.node.senderDict())
153 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
156 def sendPing(self, node):
160 df = node.ping(self.node.senderDict())
161 ## these are the callbacks we use when we issue a PING
162 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
163 if id != 20 * ' ' and id != sender['id'].data:
164 # whoah, got response from different peer than we were expecting
167 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
168 sender['host'] = host
169 sender['port'] = port
170 n = Node().initWithDict(sender)
173 def _defaultPong(err):
174 # this should probably increment a failed message counter and dump the node if it gets over a threshold
177 df.addCallbacks(_pongHandler,_defaultPong)
180 def findCloseNodes(self):
182 This does a findNode on the ID one away from our own.
183 This will allow us to populate our table with nodes on our network closest to our own.
184 This is called as soon as we start up with an empty table
186 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
189 self.findNode(id, callback)
191 def refreshTable(self):
198 for bucket in self.table.buckets:
199 if time.time() - bucket.lastAccessed >= 60 * 60:
200 id = randRange(bucket.min, bucket.max)
201 self.findNode(id, callback)
204 def retrieveValues(self, key):
205 if self.kw.has_key(key):
209 while(tup and tup[0] == key):
211 v = loads(self.store[h1])[1]
218 ##### INCOMING MESSAGE HANDLERS
220 def xmlrpc_ping(self, sender):
222 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
225 ip = self.crequest.getClientIP()
227 n = Node().initWithDict(sender)
229 return self.node.senderDict()
231 def xmlrpc_find_node(self, target, sender):
232 nodes = self.table.findNodes(target.data)
233 nodes = map(lambda node: node.senderDict(), nodes)
234 ip = self.crequest.getClientIP()
236 n = Node().initWithDict(sender)
238 return nodes, self.node.senderDict()
240 def xmlrpc_store_value(self, key, value, sender):
242 h1 = sha(key+value.data).digest()
244 if not self.store.has_key(h1):
245 v = dumps((key, value.data, t))
246 self.store.put(h1, v)
247 self.itime.put(t, h1)
250 # update last insert time
251 tup = loads(self.store[h1])
252 self.store[h1] = dumps((tup[0], tup[1], t))
253 self.itime.put(t, h1)
255 ip = self.crequest.getClientIP()
257 n = Node().initWithDict(sender)
259 return self.node.senderDict()
261 def xmlrpc_find_value(self, key, sender):
262 ip = self.crequest.getClientIP()
265 n = Node().initWithDict(sender)
268 l = self.retrieveValues(key)
270 l = map(lambda v: Binary(v), l)
271 return {'values' : l}, self.node.senderDict()
273 nodes = self.table.findNodes(key)
274 nodes = map(lambda node: node.senderDict(), nodes)
275 return {'nodes' : nodes}, self.node.senderDict()
283 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
284 from whrandom import randrange
290 print "Building %s peer table." % peers
292 for i in xrange(peers):
293 a = Khashmir(host, port + i)
297 thread.start_new_thread(l[0].app.run, ())
303 print "adding contacts...."
306 n = l[randrange(0, len(l))].node
307 peer.addContact(host, n.port)
308 n = l[randrange(0, len(l))].node
309 peer.addContact(host, n.port)
310 n = l[randrange(0, len(l))].node
311 peer.addContact(host, n.port)
316 print "finding close nodes...."
319 peer.findCloseNodes()
325 # peer.refreshTable()
328 def test_find_nodes(l, quiet=0):
329 import threading, sys
330 from whrandom import randrange
331 flag = threading.Event()
335 a = l[randrange(0,n)]
336 b = l[randrange(0,n)]
338 def callback(nodes, flag=flag, id = b.node.id):
339 if (len(nodes) >0) and (nodes[0].id == id):
340 print "test_find_nodes PASSED"
342 print "test_find_nodes FAILED"
344 a.findNode(b.node.id, callback)
347 def test_find_value(l, quiet=0):
348 from whrandom import randrange
350 from hash import newID
351 import time, threading, sys
353 fa = threading.Event()
354 fb = threading.Event()
355 fc = threading.Event()
358 a = l[randrange(0,n)]
359 b = l[randrange(0,n)]
360 c = l[randrange(0,n)]
361 d = l[randrange(0,n)]
366 print "inserting value..."
368 a.storeValueForKey(key, value)
374 def __init__(self, flag, value=value):
378 def callback(self, values):
380 if(len(values) == 0):
388 if self.val in values:
393 b.valueForKey(key, cb(fa).callback)
395 c.valueForKey(key, cb(fb).callback)
397 d.valueForKey(key, cb(fc).callback)
402 k = Khashmir('localhost', port)
403 thread.start_new_thread(k.app.run, ())
406 if __name__ == "__main__":
409 print "finding nodes..."
412 print "inserting and fetching values..."