1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
5 from pickle import loads, dumps
8 from ktable import KTable, K
9 from knode import KNode as Node
11 from hash import newID
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
24 from xmlrpclib import Binary
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34 def __init__(self, host, port):
35 self.node = Node().init(newID(), host, port)
36 self.table = KTable(self.node)
37 self.app = Application("xmlrpc")
38 self.app.listenTCP(port, server.Site(self))
40 ## these databases may be more suited to on-disk rather than in-memory
41 # h((key, value)) -> (key, value, time) mappings
43 self.store.open(None, None, db.DB_BTREE)
45 # <insert time> -> h((key, value))
47 self.itime.set_flags(db.DB_DUP)
48 self.itime.open(None, None, db.DB_BTREE)
50 # key -> h((key, value))
52 self.kw.set_flags(db.DB_DUP)
53 self.kw.open(None, None, db.DB_BTREE)
55 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
57 def render(self, request):
59 Override the built in render so we can have access to the request object!
60 note, crequest is probably only valid on the initial call (not after deferred!)
62 self.crequest = request
63 return xmlrpc.XMLRPC.render(self, request)
67 ####### LOCAL INTERFACE - use these methods!
68 def addContact(self, host, port):
70 ping this node and add the contact info to the table on pong!
72 n =Node().init(" "*20, host, port) # note, we
76 ## this call is async!
77 def findNode(self, id, callback, errback=None):
78 """ returns the contact info for node, or the k closest nodes, from the global table """
79 # get K nodes out of local table/cache, or the node we want
80 nodes = self.table.findNodes(id)
82 d.addCallbacks(callback, errback)
83 if len(nodes) == 1 and nodes[0].id == id :
86 # create our search state
87 state = FindNode(self, id, d.callback)
88 reactor.callFromThread(state.goWithNodes, nodes)
92 def valueForKey(self, key, callback):
93 """ returns the values found for key in global table """
94 nodes = self.table.findNodes(key)
97 l = self.retrieveValues(key)
99 reactor.callFromThread(callback, l)
101 # create our search state
102 state = GetValue(self, key, callback)
103 reactor.callFromThread(state.goWithNodes, nodes, l)
107 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
108 def storeValueForKey(self, key, value, callback=None):
109 """ stores the value for key in the global table, returns immediately, no status
110 in this implementation, peers respond but don't indicate status to storing values
111 values are stored in peers on a first-come first-served basis
112 this will probably change so more than one value can be stored under a key
114 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
116 # default callback - this will get called for each successful store value
117 def _storedValueHandler(sender):
119 response=_storedValueHandler
122 def cb(t, table = table, node=node, resp=response):
123 self.table.insertNode(node)
125 if node.id != self.node.id:
126 def default(err, node=node, table=table):
127 table.nodeFailed(node)
128 df = node.storeValue(key, value, self.node.senderDict())
130 # this call is asynch
131 self.findNode(key, _storeValueForKey)
134 def insertNode(self, n):
136 insert a node in our local table, pinging oldest contact in bucket, if necessary
138 If all you have is a host/port, then use addContact, which calls this method after
139 receiving the PONG from the remote node. The reason for the seperation is we can't insert
140 a node into the table without it's peer-ID. That means of course the node passed into this
141 method needs to be a properly formed Node object with a valid ID.
143 old = self.table.insertNode(n)
144 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
145 # the bucket is full, check to see if old node is still around and if so, replace it
147 ## these are the callbacks used when we ping the oldest node in a bucket
148 def _staleNodeHandler(oldnode=old, newnode = n):
149 """ called if the pinged node never responds """
150 self.table.replaceStaleNode(old, newnode)
152 def _notStaleNodeHandler(sender, old=old):
153 """ called when we get a pong from the old node """
154 sender = Node().initWithDict(sender)
155 if sender.id == old.id:
156 self.table.justSeenNode(old)
158 df = old.ping(self.node.senderDict())
159 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
162 def sendPing(self, node):
166 df = node.ping(self.node.senderDict())
167 ## these are the callbacks we use when we issue a PING
168 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
169 if id != 20 * ' ' and id != sender['id'].data:
170 # whoah, got response from different peer than we were expecting
173 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
174 sender['host'] = host
175 sender['port'] = port
176 n = Node().initWithDict(sender)
179 def _defaultPong(err, node=node, table=self.table):
180 table.nodeFailed(node)
182 df.addCallbacks(_pongHandler,_defaultPong)
185 def findCloseNodes(self):
187 This does a findNode on the ID one away from our own.
188 This will allow us to populate our table with nodes on our network closest to our own.
189 This is called as soon as we start up with an empty table
191 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
194 self.findNode(id, callback)
196 def refreshTable(self):
203 for bucket in self.table.buckets:
204 if time.time() - bucket.lastAccessed >= 60 * 60:
205 id = randRange(bucket.min, bucket.max)
206 self.findNode(id, callback)
209 def retrieveValues(self, key):
210 if self.kw.has_key(key):
214 while(tup and tup[0] == key):
216 v = loads(self.store[h1])[1]
223 ##### INCOMING MESSAGE HANDLERS
225 def xmlrpc_ping(self, sender):
227 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
230 ip = self.crequest.getClientIP()
232 n = Node().initWithDict(sender)
234 return self.node.senderDict()
236 def xmlrpc_find_node(self, target, sender):
237 nodes = self.table.findNodes(target.data)
238 nodes = map(lambda node: node.senderDict(), nodes)
239 ip = self.crequest.getClientIP()
241 n = Node().initWithDict(sender)
243 return nodes, self.node.senderDict()
245 def xmlrpc_store_value(self, key, value, sender):
247 h1 = sha(key+value.data).digest()
249 if not self.store.has_key(h1):
250 v = dumps((key, value.data, t))
251 self.store.put(h1, v)
252 self.itime.put(t, h1)
255 # update last insert time
256 tup = loads(self.store[h1])
257 self.store[h1] = dumps((tup[0], tup[1], t))
258 self.itime.put(t, h1)
260 ip = self.crequest.getClientIP()
262 n = Node().initWithDict(sender)
264 return self.node.senderDict()
266 def xmlrpc_find_value(self, key, sender):
267 ip = self.crequest.getClientIP()
270 n = Node().initWithDict(sender)
273 l = self.retrieveValues(key)
275 l = map(lambda v: Binary(v), l)
276 return {'values' : l}, self.node.senderDict()
278 nodes = self.table.findNodes(key)
279 nodes = map(lambda node: node.senderDict(), nodes)
280 return {'nodes' : nodes}, self.node.senderDict()
288 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
289 from whrandom import randrange
295 print "Building %s peer table." % peers
297 for i in xrange(peers):
298 a = Khashmir(host, port + i)
302 thread.start_new_thread(l[0].app.run, ())
308 print "adding contacts...."
311 n = l[randrange(0, len(l))].node
312 peer.addContact(host, n.port)
313 n = l[randrange(0, len(l))].node
314 peer.addContact(host, n.port)
315 n = l[randrange(0, len(l))].node
316 peer.addContact(host, n.port)
321 print "finding close nodes...."
324 peer.findCloseNodes()
330 # peer.refreshTable()
333 def test_find_nodes(l, quiet=0):
334 import threading, sys
335 from whrandom import randrange
336 flag = threading.Event()
340 a = l[randrange(0,n)]
341 b = l[randrange(0,n)]
343 def callback(nodes, flag=flag, id = b.node.id):
344 if (len(nodes) >0) and (nodes[0].id == id):
345 print "test_find_nodes PASSED"
347 print "test_find_nodes FAILED"
349 a.findNode(b.node.id, callback)
352 def test_find_value(l, quiet=0):
353 from whrandom import randrange
355 from hash import newID
356 import time, threading, sys
358 fa = threading.Event()
359 fb = threading.Event()
360 fc = threading.Event()
363 a = l[randrange(0,n)]
364 b = l[randrange(0,n)]
365 c = l[randrange(0,n)]
366 d = l[randrange(0,n)]
371 print "inserting value..."
373 a.storeValueForKey(key, value)
379 def __init__(self, flag, value=value):
383 def callback(self, values):
385 if(len(values) == 0):
393 if self.val in values:
398 b.valueForKey(key, cb(fa).callback)
400 c.valueForKey(key, cb(fb).callback)
402 d.valueForKey(key, cb(fc).callback)
407 k = Khashmir('localhost', port)
408 thread.start_new_thread(k.app.run, ())
411 if __name__ == "__main__":
414 print "finding nodes..."
417 print "inserting and fetching values..."