1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
5 from pickle import loads, dumps
8 from ktable import KTable, K
9 from knode import KNode as Node
11 from hash import newID
13 from actions import FindNode, GetValue
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
24 # don't ping unless it's been at least this many seconds since we've heard from a peer
25 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
29 # this is the main class!
30 class Khashmir(xmlrpc.XMLRPC):
31 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
32 def __init__(self, host, port):
33 self.node = Node(newID(), host, port)
34 self.table = KTable(self.node)
35 self.app = Application("xmlrpc")
36 self.app.listenTCP(port, server.Site(self))
38 ## these databases may be more suited to on-disk rather than in-memory
39 # h((key, value)) -> (key, value, time) mappings
41 self.store.open(None, None, db.DB_BTREE)
43 # <insert time> -> h((key, value))
45 self.itime.set_flags(db.DB_DUP)
46 self.itime.open(None, None, db.DB_BTREE)
48 # key -> h((key, value))
50 self.kw.set_flags(db.DB_DUP)
51 self.kw.open(None, None, db.DB_BTREE)
54 def render(self, request):
56 Override the built in render so we can have access to the request object!
57 note, crequest is probably only valid on the initial call (not after deferred!)
59 self.crequest = request
60 return xmlrpc.XMLRPC.render(self, request)
64 ####### LOCAL INTERFACE - use these methods!
65 def addContact(self, host, port):
67 ping this node and add the contact info to the table on pong!
69 n =Node(" "*20, host, port) # note, we
73 ## this call is async!
74 def findNode(self, id, callback, errback=None):
75 """ returns the contact info for node, or the k closest nodes, from the global table """
76 # get K nodes out of local table/cache, or the node we want
77 nodes = self.table.findNodes(id)
79 d.addCallbacks(callback, errback)
80 if len(nodes) == 1 and nodes[0].id == id :
83 # create our search state
84 state = FindNode(self, id, d.callback)
85 reactor.callFromThread(state.goWithNodes, nodes)
89 def valueForKey(self, key, callback):
90 """ returns the values found for key in global table """
91 nodes = self.table.findNodes(key)
92 # create our search state
93 state = GetValue(self, key, callback)
94 reactor.callFromThread(state.goWithNodes, nodes)
97 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
98 def storeValueForKey(self, key, value):
99 """ stores the value for key in the global table, returns immediately, no status
100 in this implementation, peers respond but don't indicate status to storing values
101 values are stored in peers on a first-come first-served basis
102 this will probably change so more than one value can be stored under a key
104 def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
106 if node.id != self.node.id:
107 df = node.storeValue(key, value, self.node.senderDict())
108 df.addCallbacks(response, default)
109 # this call is asynch
110 self.findNode(key, _storeValueForKey)
113 def insertNode(self, n):
115 insert a node in our local table, pinging oldest contact in bucket, if necessary
117 If all you have is a host/port, then use addContact, which calls this method after
118 receiving the PONG from the remote node. The reason for the seperation is we can't insert
119 a node into the table without it's peer-ID. That means of course the node passed into this
120 method needs to be a properly formed Node object with a valid ID.
122 old = self.table.insertNode(n)
123 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
124 # the bucket is full, check to see if old node is still around and if so, replace it
126 ## these are the callbacks used when we ping the oldest node in a bucket
127 def _staleNodeHandler(oldnode=old, newnode = n):
128 """ called if the pinged node never responds """
129 self.table.replaceStaleNode(old, newnode)
131 def _notStaleNodeHandler(sender, old=old):
132 """ called when we get a ping from the remote node """
133 if sender['id'] == old.id:
134 self.table.insertNode(old)
136 df = old.ping(self.node.senderDict())
137 df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
140 def sendPing(self, node):
144 df = node.ping(self.node.senderDict())
145 ## these are the callbacks we use when we issue a PING
146 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
147 if id != 20 * ' ' and id != sender['id']:
148 # whoah, got response from different peer than we were expecting
151 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
152 n = Node(sender['id'], host, port)
155 def _defaultPong(err):
156 # this should probably increment a failed message counter and dump the node if it gets over a threshold
159 df.addCallbacks(_pongHandler,_defaultPong)
162 def findCloseNodes(self):
164 This does a findNode on the ID one away from our own.
165 This will allow us to populate our table with nodes on our network closest to our own.
166 This is called as soon as we start up with an empty table
168 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
171 self.findNode(id, callback)
173 def refreshTable(self):
180 for bucket in self.table.buckets:
181 if time.time() - bucket.lastAccessed >= 60 * 60:
182 id = randRange(bucket.min, bucket.max)
183 self.findNode(id, callback)
187 ##### INCOMING MESSAGE HANDLERS
189 def xmlrpc_ping(self, sender):
191 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
194 ip = self.crequest.getClientIP()
195 n = Node(sender['id'], ip, sender['port'])
197 return self.node.senderDict()
199 def xmlrpc_find_node(self, target, sender):
200 nodes = self.table.findNodes(target)
201 nodes = map(lambda node: node.senderDict(), nodes)
202 ip = self.crequest.getClientIP()
203 n = Node(sender['id'], ip, sender['port'])
205 return nodes, self.node.senderDict()
207 def xmlrpc_store_value(self, key, value, sender):
208 h1 = sha(key+value).digest()
210 if not self.store.has_key(h1):
211 v = dumps((key, value, t))
212 self.store.put(h1, v)
213 self.itime.put(t, h1)
216 # update last insert time
217 tup = loads(self.store[h1])
218 self.store[h1] = dumps((tup[0], tup[1], t))
219 self.itime.put(t, h1)
221 ip = self.crequest.getClientIP()
222 n = Node(sender['id'], ip, sender['port'])
224 return self.node.senderDict()
226 def xmlrpc_find_value(self, key, sender):
227 ip = self.crequest.getClientIP()
228 n = Node(sender['id'], ip, sender['port'])
230 if self.kw.has_key(key):
232 tup = c.set_range(key)
236 v = loads(self.store[h1])[1]
239 return {'values' : l}, self.node.senderDict()
241 nodes = self.table.findNodes(key)
242 nodes = map(lambda node: node.senderDict(), nodes)
243 return {'nodes' : nodes}, self.node.senderDict()
246 ### message response callbacks
247 # called when we get a response to store value
248 def _storedValueHandler(self, sender):
258 def test_build_net(quiet=0, peers=64, pause=1):
259 from whrandom import randrange
265 print "Building %s peer table." % peers
267 for i in xrange(peers):
268 a = Khashmir('localhost', port + i)
272 thread.start_new_thread(l[0].app.run, ())
278 print "adding contacts...."
281 n = l[randrange(0, len(l))].node
282 peer.addContact(n.host, n.port)
283 n = l[randrange(0, len(l))].node
284 peer.addContact(n.host, n.port)
285 n = l[randrange(0, len(l))].node
286 peer.addContact(n.host, n.port)
291 print "finding close nodes...."
294 peer.findCloseNodes()
300 # peer.refreshTable()
303 def test_find_nodes(l, quiet=0):
304 import threading, sys
305 from whrandom import randrange
306 flag = threading.Event()
310 a = l[randrange(0,n)]
311 b = l[randrange(0,n)]
313 def callback(nodes, flag=flag):
314 if (len(nodes) >0) and (nodes[0].id == b.node.id):
315 print "test_find_nodes PASSED"
317 print "test_find_nodes FAILED"
319 a.findNode(b.node.id, callback)
322 def test_find_value(l, quiet=0):
323 from whrandom import randrange
325 import time, threading, sys
327 fa = threading.Event()
328 fb = threading.Event()
329 fc = threading.Event()
332 a = l[randrange(0,n)]
333 b = l[randrange(0,n)]
334 c = l[randrange(0,n)]
335 d = l[randrange(0,n)]
337 key = sha(`randrange(0,100000)`).digest()
338 value = sha(`randrange(0,100000)`).digest()
340 print "inserting value..."
342 a.storeValueForKey(key, value)
348 def __init__(self, flag, value=value):
352 def callback(self, values):
354 if(len(values) == 0):
362 if self.val in values:
367 b.valueForKey(key, cb(fa).callback)
369 c.valueForKey(key, cb(fb).callback)
371 d.valueForKey(key, cb(fc).callback)
376 k = Khashmir('localhost', port)
377 thread.start_new_thread(k.app.run, ())
380 if __name__ == "__main__":
383 print "finding nodes..."
386 print "inserting and fetching values..."