1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
12 from ktable import KTable, K
13 from knode import KNode as Node
15 from hash import newID, newIDInRange
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
25 from bsddb3 import db ## find this at http://pybsddb.sf.net/
26 from bsddb3._db import DBNotFoundError
28 from xmlrpclib import Binary
32 # this is the main class!
33 class Khashmir(xmlrpc.XMLRPC):
34 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
35 def __init__(self, host, port):
36 self.node = Node().init(newID(), host, port)
37 self.table = KTable(self.node)
38 self.app = Application("xmlrpc")
39 self.app.listenTCP(port, server.Site(self))
41 ## these databases may be more suited to on-disk rather than in-memory
42 # h((key, value)) -> (key, value, time) mappings
44 self.store.open(None, None, db.DB_BTREE)
46 # <insert time> -> h((key, value))
48 self.itime.set_flags(db.DB_DUP)
49 self.itime.open(None, None, db.DB_BTREE)
51 # key -> h((key, value))
53 self.kw.set_flags(db.DB_DUP)
54 self.kw.open(None, None, db.DB_BTREE)
56 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
58 def render(self, request):
60 Override the built in render so we can have access to the request object!
61 note, crequest is probably only valid on the initial call (not after deferred!)
63 self.crequest = request
64 return xmlrpc.XMLRPC.render(self, request)
68 ####### LOCAL INTERFACE - use these methods!
69 def addContact(self, host, port):
71 ping this node and add the contact info to the table on pong!
73 n =Node().init(" "*20, host, port) # note, we
77 ## this call is async!
78 def findNode(self, id, callback, errback=None):
79 """ returns the contact info for node, or the k closest nodes, from the global table """
80 # get K nodes out of local table/cache, or the node we want
81 nodes = self.table.findNodes(id)
84 d.addCallbacks(callback, errback)
86 d.addCallback(callback)
87 if len(nodes) == 1 and nodes[0].id == id :
90 # create our search state
91 state = FindNode(self, id, d.callback)
92 reactor.callFromThread(state.goWithNodes, nodes)
96 def valueForKey(self, key, callback):
97 """ returns the values found for key in global table """
98 nodes = self.table.findNodes(key)
101 l = self.retrieveValues(key)
103 reactor.callFromThread(callback, l)
105 # create our search state
106 state = GetValue(self, key, callback)
107 reactor.callFromThread(state.goWithNodes, nodes, l)
111 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
112 def storeValueForKey(self, key, value, callback=None):
113 """ stores the value for key in the global table, returns immediately, no status
114 in this implementation, peers respond but don't indicate status to storing values
115 values are stored in peers on a first-come first-served basis
116 this will probably change so more than one value can be stored under a key
118 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
120 # default callback - this will get called for each successful store value
121 def _storedValueHandler(sender):
123 response=_storedValueHandler
126 def cb(t, table = table, node=node, resp=response):
127 self.table.insertNode(node)
129 if node.id != self.node.id:
130 def default(err, node=node, table=table):
131 table.nodeFailed(node)
132 df = node.storeValue(key, value, self.node.senderDict())
134 # this call is asynch
135 self.findNode(key, _storeValueForKey)
138 def insertNode(self, n, contacted=1):
140 insert a node in our local table, pinging oldest contact in bucket, if necessary
142 If all you have is a host/port, then use addContact, which calls this method after
143 receiving the PONG from the remote node. The reason for the seperation is we can't insert
144 a node into the table without it's peer-ID. That means of course the node passed into this
145 method needs to be a properly formed Node object with a valid ID.
147 old = self.table.insertNode(n, contacted=contacted)
148 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
149 # the bucket is full, check to see if old node is still around and if so, replace it
151 ## these are the callbacks used when we ping the oldest node in a bucket
152 def _staleNodeHandler(oldnode=old, newnode = n):
153 """ called if the pinged node never responds """
154 self.table.replaceStaleNode(old, newnode)
156 def _notStaleNodeHandler(sender, old=old):
157 """ called when we get a pong from the old node """
158 sender, conn = sender
160 sender['host'] = conn['host']
161 sender = Node().initWithDict(sender)
162 if sender.id == old.id:
163 self.table.justSeenNode(old)
165 df = old.ping(self.node.senderDict())
166 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
169 def sendPing(self, node):
173 df = node.ping(self.node.senderDict())
174 ## these are the callbacks we use when we issue a PING
175 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
176 sender, conn = sender
177 if id != 20 * ' ' and id != sender['id'].data:
178 # whoah, got response from different peer than we were expecting
181 sender['host'] = host
182 sender['port'] = port
183 n = Node().initWithDict(sender)
186 def _defaultPong(err, node=node, table=self.table):
187 table.nodeFailed(node)
189 df.addCallbacks(_pongHandler,_defaultPong)
192 def findCloseNodes(self):
194 This does a findNode on the ID one away from our own.
195 This will allow us to populate our table with nodes on our network closest to our own.
196 This is called as soon as we start up with an empty table
198 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
201 self.findNode(id, callback)
203 def refreshTable(self):
210 for bucket in self.table.buckets:
211 if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
212 id = newIDInRange(bucket.min, bucket.max)
213 self.findNode(id, callback)
216 def retrieveValues(self, key):
217 if self.kw.has_key(key):
221 while(tup and tup[0] == key):
223 v = loads(self.store[h1])[1]
230 ##### INCOMING MESSAGE HANDLERS
232 def xmlrpc_ping(self, sender):
234 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
237 ip = self.crequest.getClientIP()
239 n = Node().initWithDict(sender)
240 self.insertNode(n, contacted=0)
241 return self.node.senderDict()
243 def xmlrpc_find_node(self, target, sender):
244 nodes = self.table.findNodes(target.data)
245 nodes = map(lambda node: node.senderDict(), nodes)
246 ip = self.crequest.getClientIP()
248 n = Node().initWithDict(sender)
249 self.insertNode(n, contacted=0)
250 return nodes, self.node.senderDict()
252 def xmlrpc_store_value(self, key, value, sender):
254 h1 = sha(key+value.data).digest()
256 if not self.store.has_key(h1):
257 v = dumps((key, value.data, t))
258 self.store.put(h1, v)
259 self.itime.put(t, h1)
262 # update last insert time
263 tup = loads(self.store[h1])
264 self.store[h1] = dumps((tup[0], tup[1], t))
265 self.itime.put(t, h1)
267 ip = self.crequest.getClientIP()
269 n = Node().initWithDict(sender)
270 self.insertNode(n, contacted=0)
271 return self.node.senderDict()
273 def xmlrpc_find_value(self, key, sender):
274 ip = self.crequest.getClientIP()
277 n = Node().initWithDict(sender)
278 self.insertNode(n, contacted=0)
280 l = self.retrieveValues(key)
282 l = map(lambda v: Binary(v), l)
283 return {'values' : l}, self.node.senderDict()
285 nodes = self.table.findNodes(key)
286 nodes = map(lambda node: node.senderDict(), nodes)
287 return {'nodes' : nodes}, self.node.senderDict()
295 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
296 from whrandom import randrange
302 print "Building %s peer table." % peers
304 for i in xrange(peers):
305 a = Khashmir(host, port + i)
309 thread.start_new_thread(l[0].app.run, ())
315 print "adding contacts...."
318 n = l[randrange(0, len(l))].node
319 peer.addContact(host, n.port)
320 n = l[randrange(0, len(l))].node
321 peer.addContact(host, n.port)
322 n = l[randrange(0, len(l))].node
323 peer.addContact(host, n.port)
328 print "finding close nodes...."
331 peer.findCloseNodes()
337 # peer.refreshTable()
340 def test_find_nodes(l, quiet=0):
341 import threading, sys
342 from whrandom import randrange
343 flag = threading.Event()
347 a = l[randrange(0,n)]
348 b = l[randrange(0,n)]
350 def callback(nodes, flag=flag, id = b.node.id):
351 if (len(nodes) >0) and (nodes[0].id == id):
352 print "test_find_nodes PASSED"
354 print "test_find_nodes FAILED"
356 a.findNode(b.node.id, callback)
359 def test_find_value(l, quiet=0):
360 from whrandom import randrange
362 from hash import newID
363 import time, threading, sys
365 fa = threading.Event()
366 fb = threading.Event()
367 fc = threading.Event()
370 a = l[randrange(0,n)]
371 b = l[randrange(0,n)]
372 c = l[randrange(0,n)]
373 d = l[randrange(0,n)]
378 print "inserting value..."
380 a.storeValueForKey(key, value)
386 def __init__(self, flag, value=value):
390 def callback(self, values):
392 if(len(values) == 0):
394 print "find NOT FOUND"
400 if self.val in values:
405 b.valueForKey(key, cb(fa).callback)
407 c.valueForKey(key, cb(fb).callback)
409 d.valueForKey(key, cb(fc).callback)
412 def test_one(host, port):
414 k = Khashmir(host, port)
415 thread.start_new_thread(k.app.run, ())
418 if __name__ == "__main__":
421 if len(sys.argv) > 1:
423 l = test_build_net(peers=n)
425 print "finding nodes..."
428 print "inserting and fetching values..."