1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from listener import Listener
4 from ktable import KTable, K
6 from dispatcher import Dispatcher
7 from hash import newID, intify
13 from bsddb3 import db ## find this at http://pybsddb.sf.net/
14 from bsddb3._db import DBNotFoundError
16 # don't ping unless it's been at least this many seconds since we've heard from a peer
17 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
19 # concurrent FIND_NODE/VALUE requests!
23 # this is the main class!
25 __slots__ = ['listener', 'node', 'table', 'dispatcher', 'tf', 'store']
26 def __init__(self, host, port):
27 self.listener = Listener(host, port)
28 self.node = Node(newID(), host, port)
29 self.table = KTable(self.node)
30 self.dispatcher = Dispatcher(self.listener, messages.BASE, self.node.id)
31 self.tf = transactions.TransactionFactory(self.node.id, self.dispatcher)
34 self.store.open(None, None, db.DB_BTREE)
36 #### register unsolicited incoming message handlers
37 self.dispatcher.registerHandler('ping', self._pingHandler, messages.PING)
39 self.dispatcher.registerHandler('find node', self._findNodeHandler, messages.FIND_NODE)
41 self.dispatcher.registerHandler('get value', self._findValueHandler, messages.GET_VALUE)
43 self.dispatcher.registerHandler('store value', self._storeValueHandler, messages.STORE_VALUE)
47 ####### LOCAL INTERFACE - use these methods!
48 def addContact(self, host, port):
50 ping this node and add the contact info to the table on pong!
52 n =Node(" "*20, host, port) # note, we
56 ## this call is async!
57 def findNode(self, id, callback):
58 """ returns the contact info for node, or the k closest nodes, from the global table """
59 # get K nodes out of local table/cache, or the node we want
60 nodes = self.table.findNodes(id)
61 if len(nodes) == 1 and nodes[0].id == id :
62 # we got it in our table!
63 def tcall(t, callback=callback):
65 self.dispatcher.postEvent(tcall, 0, extras=nodes)
67 # create our search state
68 state = FindNode(self, self.dispatcher, id, callback)
69 # handle this in our own thread
70 self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
74 def valueForKey(self, key, callback):
75 """ returns the values found for key in global table """
76 nodes = self.table.findNodes(key)
77 # create our search state
78 state = GetValue(self, self.dispatcher, key, callback)
79 # handle this in our own thread
80 self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
83 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
84 def storeValueForKey(self, key, value):
85 """ stores the value for key in the global table, returns immediately, no status
86 in this implementation, peers respond but don't indicate status to storing values
87 values are stored in peers on a first-come first-served basis
88 this will probably change so more than one value can be stored under a key
90 def _storeValueForKey(nodes, tf=self.tf, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
92 if node.id != self.node.id:
93 t = tf.StoreValue(node, key, value, response, default)
96 self.findNode(key, _storeValueForKey)
99 def insertNode(self, n):
101 insert a node in our local table, pinging oldest contact in bucket, if necessary
103 If all you have is a host/port, then use addContact, which calls this method after
104 receiving the PONG from the remote node. The reason for the seperation is we can't insert
105 a node into the table without it's peer-ID. That means of course the node passed into this
106 method needs to be a properly formed Node object with a valid ID.
108 old = self.table.insertNode(n)
109 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
110 # the bucket is full, check to see if old node is still around and if so, replace it
111 t = self.tf.Ping(old, self._notStaleNodeHandler, self._staleNodeHandler)
116 def sendPing(self, node):
120 t = self.tf.Ping(node, self._pongHandler, self._defaultPong)
124 def findCloseNodes(self):
126 This does a findNode on the ID one away from our own.
127 This will allow us to populate our table with nodes on our network closest to our own.
128 This is called as soon as we start up with an empty table
130 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
133 self.findNode(id, callback)
135 def refreshTable(self):
142 for bucket in self.table.buckets:
143 if time.time() - bucket.lastAccessed >= 60 * 60:
144 id = randRange(bucket.min, bucket.max)
145 self.findNode(id, callback)
149 ##### UNSOLICITED INCOMING MESSAGE HANDLERS
151 def _pingHandler(self, t, msg):
152 #print "Got PING from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
153 self.insertNode(t.target)
154 # respond, no callbacks, we don't care if they get it or not
158 def _findNodeHandler(self, t, msg):
159 #print "Got FIND_NODES from %s:%s at %s:%s" % (t.target.host, t.target.port, self.node.host, self.node.port)
160 nodes = self.table.findNodes(msg['target'])
161 # respond, no callbacks, we don't care if they get it or not
162 nt = self.tf.GotNodes(t, nodes)
165 def _storeValueHandler(self, t, msg):
166 if not self.store.has_key(msg['key']):
167 self.store.put(msg['key'], msg['value'])
168 nt = self.tf.StoredValue(t)
171 def _findValueHandler(self, t, msg):
172 if self.store.has_key(msg['key']):
173 t = self.tf.GotValues(t, [(msg['key'], self.store[msg['key']])])
175 nodes = self.table.findNodes(msg['key'])
176 t = self.tf.GotNodes(t, nodes)
181 ### message response callbacks
182 # called when we get a response to store value
183 def _storedValueHandler(self, t, msg):
184 self.table.insertNode(t.target)
187 ## these are the callbacks used when we ping the oldest node in a bucket
188 def _staleNodeHandler(self, t):
189 """ called if the pinged node never responds """
190 self.table.replaceStaleNode(t.target, t.newnode)
192 def _notStaleNodeHandler(self, t, msg):
193 """ called when we get a ping from the remote node """
194 self.table.insertNode(t.target)
197 ## these are the callbacks we use when we issue a PING
198 def _pongHandler(self, t, msg):
199 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
200 n = Node(msg['id'], t.addr[0], t.addr[1])
201 self.table.insertNode(n)
203 def _defaultPong(self, t):
204 # this should probably increment a failed message counter and dump the node if it gets over a threshold
205 print "Never got PONG from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
210 """ base class for some long running asynchronous proccesses like finding nodes or values """
211 def __init__(self, table, dispatcher, target, callback):
213 self.dispatcher = dispatcher
215 self.int = intify(target)
219 self.callback = callback
223 def sort(a, b, int=self.int):
224 """ this function is for sorting nodes relative to the ID we are looking for """
225 x, y = int ^ a.int, int ^ b.int
233 def goWithNodes(self, t):
236 class FindNode(ActionBase):
237 """ find node action merits it's own class as it is a long running stateful process """
238 def handleGotNodes(self, t, msg):
239 if self.finished or self.answered.has_key(t.id):
240 # a day late and a dollar short
242 self.outstanding = self.outstanding - 1
243 self.answered[t.id] = 1
244 for node in msg['nodes']:
245 if not self.found.has_key(node['id']):
246 n = Node(node['id'], node['host'], node['port'])
248 self.table.insertNode(n)
253 send messages to new peers, if necessary
257 l = self.found.values()
261 if node.id == self.target:
263 return self.callback([node])
264 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
265 t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
266 self.outstanding = self.outstanding + 1
267 self.queried[node.id] = 1
268 t.timeout = time.time() + 15
270 if self.outstanding >= N:
272 assert(self.outstanding) >=0
273 if self.outstanding == 0:
278 def defaultGotNodes(self, t):
281 self.outstanding = self.outstanding - 1
285 def goWithNodes(self, t):
287 this starts the process, our argument is a transaction with t.extras being our list of nodes
288 it's a transaction since we got called from the dispatcher
292 if node.id == self.table.node.id:
294 self.found[node.id] = node
295 t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
296 t.timeout = time.time() + 15
298 self.outstanding = self.outstanding + 1
299 self.queried[node.id] = 1
300 if self.outstanding == 0:
305 class GetValue(FindNode):
306 """ get value task """
307 def handleGotNodes(self, t, msg):
308 if self.finished or self.answered.has_key(t.id):
309 # a day late and a dollar short
311 self.outstanding = self.outstanding - 1
312 self.answered[t.id] = 1
314 # if we have any closer than what we already got, query them
315 if msg['type'] == 'got nodes':
316 for node in msg['nodes']:
317 if not self.found.has_key(node['id']):
318 n = Node(node['id'], node['host'], node['port'])
320 self.table.insertNode(n)
321 elif msg['type'] == 'got values':
324 return self.callback(msg['values'])
331 l = self.found.values()
335 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
336 t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
337 self.outstanding = self.outstanding + 1
338 self.queried[node.id] = 1
339 t.timeout = time.time() + 15
341 if self.outstanding >= N:
343 assert(self.outstanding) >=0
344 if self.outstanding == 0:
345 ## all done, didn't find it!!
350 def goWithNodes(self, t):
353 if node.id == self.table.node.id:
355 self.found[node.id] = node
356 t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
357 t.timeout = time.time() + 15
359 self.outstanding = self.outstanding + 1
360 self.queried[node.id] = 1
361 if self.outstanding == 0:
366 def test_build_net(quiet=0):
367 from whrandom import randrange
374 print "Building %s peer table." % peers
376 for i in xrange(peers):
377 a = Khashmir('localhost', port + i)
384 events = events + peer.dispatcher.runOnce()
389 thread.start_new_thread(run, (l[i*10:(i+1)*10],))
390 #thread.start_new_thread(l[i].dispatcher.run, ())
393 n = l[randrange(0, len(l))].node
394 peer.addContact(n.host, n.port)
395 n = l[randrange(0, len(l))].node
396 peer.addContact(n.host, n.port)
397 n = l[randrange(0, len(l))].node
398 peer.addContact(n.host, n.port)
403 peer.findCloseNodes()
409 def test_find_nodes(l, quiet=0):
410 import threading, sys
411 from whrandom import randrange
412 flag = threading.Event()
416 a = l[randrange(0,n)]
417 b = l[randrange(0,n)]
419 def callback(nodes, l=l, flag=flag):
420 if (len(nodes) >0) and (nodes[0].id == b.node.id):
421 print "test_find_nodes PASSED"
423 print "test_find_nodes FAILED"
425 a.findNode(b.node.id, callback)
428 def test_find_value(l, quiet=0):
429 from whrandom import randrange
431 import time, threading, sys
433 fa = threading.Event()
434 fb = threading.Event()
435 fc = threading.Event()
438 a = l[randrange(0,n)]
439 b = l[randrange(0,n)]
440 c = l[randrange(0,n)]
441 d = l[randrange(0,n)]
443 key = sha(`randrange(0,100000)`).digest()
444 value = sha(`randrange(0,100000)`).digest()
446 print "inserting value...",
448 a.storeValueForKey(key, value)
452 def mc(flag, value=value):
453 def callback(values, f=flag, val=value):
455 if(len(values) == 0):
458 if values[0]['value'] != val:
465 b.valueForKey(key, mc(fa))
466 c.valueForKey(key, mc(fb))
467 d.valueForKey(key, mc(fc))
473 if __name__ == "__main__":
476 print "finding nodes..."
480 print "inserting and fetching values..."