1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
6 from ktable import KTable, K
7 from knode import KNode as Node
11 from actions import FindNode, GetValue
12 from twisted.web import xmlrpc
13 from twisted.internet.defer import Deferred
14 from twisted.python import threadable
15 from twisted.internet.app import Application
16 from twisted.web import server
19 from bsddb3 import db ## find this at http://pybsddb.sf.net/
20 from bsddb3._db import DBNotFoundError
22 # don't ping unless it's been at least this many seconds since we've heard from a peer
23 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
27 # this is the main class!
28 class Khashmir(xmlrpc.XMLRPC):
29 __slots__ = ['listener', 'node', 'table', 'store', 'app']
30 def __init__(self, host, port):
31 self.node = Node(newID(), host, port)
32 self.table = KTable(self.node)
33 self.app = Application("xmlrpc")
34 self.app.listenTCP(port, server.Site(self))
36 self.store.open(None, None, db.DB_BTREE)
39 def render(self, request):
41 Override the built in render so we can have access to the request object!
42 note, crequest is probably only valid on the initial call (not after deferred!)
44 self.crequest = request
45 return xmlrpc.XMLRPC.render(self, request)
49 ####### LOCAL INTERFACE - use these methods!
50 def addContact(self, host, port):
52 ping this node and add the contact info to the table on pong!
54 n =Node(" "*20, host, port) # note, we
58 ## this call is async!
59 def findNode(self, id, callback, errback=None):
60 """ returns the contact info for node, or the k closest nodes, from the global table """
61 # get K nodes out of local table/cache, or the node we want
62 nodes = self.table.findNodes(id)
64 d.addCallbacks(callback, errback)
65 if len(nodes) == 1 and nodes[0].id == id :
68 # create our search state
69 state = FindNode(self, id, d.callback)
70 reactor.callFromThread(state.goWithNodes, nodes)
74 def valueForKey(self, key, callback):
75 """ returns the values found for key in global table """
76 nodes = self.table.findNodes(key)
77 # create our search state
78 state = GetValue(self, key, callback)
79 reactor.callFromThread(state.goWithNodes, nodes)
82 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
83 def storeValueForKey(self, key, value):
84 """ stores the value for key in the global table, returns immediately, no status
85 in this implementation, peers respond but don't indicate status to storing values
86 values are stored in peers on a first-come first-served basis
87 this will probably change so more than one value can be stored under a key
89 def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
91 if node.id != self.node.id:
92 df = node.storeValue(key, value, self.node.senderDict())
93 df.addCallbacks(response, default)
95 self.findNode(key, _storeValueForKey)
98 def insertNode(self, n):
100 insert a node in our local table, pinging oldest contact in bucket, if necessary
102 If all you have is a host/port, then use addContact, which calls this method after
103 receiving the PONG from the remote node. The reason for the seperation is we can't insert
104 a node into the table without it's peer-ID. That means of course the node passed into this
105 method needs to be a properly formed Node object with a valid ID.
107 old = self.table.insertNode(n)
108 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
109 # the bucket is full, check to see if old node is still around and if so, replace it
111 ## these are the callbacks used when we ping the oldest node in a bucket
112 def _staleNodeHandler(oldnode=old, newnode = n):
113 """ called if the pinged node never responds """
114 self.table.replaceStaleNode(old, newnode)
116 def _notStaleNodeHandler(sender, old=old):
117 """ called when we get a ping from the remote node """
118 if sender['id'] == old.id:
119 self.table.insertNode(old)
122 df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
125 def sendPing(self, node):
129 df = node.ping(self.node.senderDict())
130 ## these are the callbacks we use when we issue a PING
131 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
132 if id != 20 * ' ' and id != sender['id']:
133 # whoah, got response from different peer than we were expecting
136 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
137 n = Node(sender['id'], host, port)
140 def _defaultPong(err):
141 # this should probably increment a failed message counter and dump the node if it gets over a threshold
144 df.addCallbacks(_pongHandler,_defaultPong)
147 def findCloseNodes(self):
149 This does a findNode on the ID one away from our own.
150 This will allow us to populate our table with nodes on our network closest to our own.
151 This is called as soon as we start up with an empty table
153 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
156 self.findNode(id, callback)
158 def refreshTable(self):
165 for bucket in self.table.buckets:
166 if time.time() - bucket.lastAccessed >= 60 * 60:
167 id = randRange(bucket.min, bucket.max)
168 self.findNode(id, callback)
172 ##### INCOMING MESSAGE HANDLERS
174 def xmlrpc_ping(self, sender):
176 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
179 ip = self.crequest.getClientIP()
180 n = Node(sender['id'], ip, sender['port'])
182 return self.node.senderDict()
184 def xmlrpc_find_node(self, target, sender):
185 nodes = self.table.findNodes(target)
186 nodes = map(lambda node: node.senderDict(), nodes)
187 ip = self.crequest.getClientIP()
188 n = Node(sender['id'], ip, sender['port'])
190 return nodes, self.node.senderDict()
192 def xmlrpc_store_value(self, key, value, sender):
193 if not self.store.has_key(key):
194 self.store.put(key, value)
195 ip = self.crequest.getClientIP()
196 n = Node(sender['id'], ip, sender['port'])
198 return self.node.senderDict()
200 def xmlrpc_find_value(self, key, sender):
201 ip = self.crequest.getClientIP()
202 n = Node(sender['id'], ip, sender['port'])
204 if self.store.has_key(key):
205 return {'values' : self.store[key]}, self.node.senderDict()
207 nodes = self.table.findNodes(key)
208 nodes = map(lambda node: node.senderDict(), nodes)
209 return {'nodes' : nodes}, self.node.senderDict()
212 ### message response callbacks
213 # called when we get a response to store value
214 def _storedValueHandler(self, sender):
224 def test_build_net(quiet=0):
225 from whrandom import randrange
232 print "Building %s peer table." % peers
234 for i in xrange(peers):
235 a = Khashmir('localhost', port + i)
239 thread.start_new_thread(l[0].app.run, ())
245 print "adding contacts...."
248 n = l[randrange(0, len(l))].node
249 peer.addContact(n.host, n.port)
250 n = l[randrange(0, len(l))].node
251 peer.addContact(n.host, n.port)
252 n = l[randrange(0, len(l))].node
253 peer.addContact(n.host, n.port)
257 print "finding close nodes...."
260 peer.findCloseNodes()
263 # peer.refreshTable()
266 def test_find_nodes(l, quiet=0):
267 import threading, sys
268 from whrandom import randrange
269 flag = threading.Event()
273 a = l[randrange(0,n)]
274 b = l[randrange(0,n)]
276 def callback(nodes, l=l, flag=flag):
277 if (len(nodes) >0) and (nodes[0].id == b.node.id):
278 print "test_find_nodes PASSED"
280 print "test_find_nodes FAILED"
282 a.findNode(b.node.id, callback)
285 def test_find_value(l, quiet=0):
286 from whrandom import randrange
288 import time, threading, sys
290 fa = threading.Event()
291 fb = threading.Event()
292 fc = threading.Event()
295 a = l[randrange(0,n)]
296 b = l[randrange(0,n)]
297 c = l[randrange(0,n)]
298 d = l[randrange(0,n)]
300 key = sha(`randrange(0,100000)`).digest()
301 value = sha(`randrange(0,100000)`).digest()
303 print "inserting value...",
305 a.storeValueForKey(key, value)
309 def mc(flag, value=value):
310 def callback(values, f=flag, val=value):
312 if(len(values) == 0):
322 b.valueForKey(key, mc(fa))
324 c.valueForKey(key, mc(fb))
326 d.valueForKey(key, mc(fc))
329 if __name__ == "__main__":
332 print "finding nodes..."
336 print "inserting and fetching values..."