]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
Initial revision
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from listener import Listener
4 from ktable import KTable, K
5 from node import Node
6 from dispatcher import Dispatcher
7 from hash import newID, intify
8 import messages
9 import transactions
10
11 import time
12
13 from bsddb3 import db ## find this at http://pybsddb.sf.net/
14 from bsddb3._db import DBNotFoundError
15
16 # don't ping unless it's been at least this many seconds since we've heard from a peer
17 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
18
19 # concurrent FIND_NODE/VALUE requests!
20 N = 3
21
22
23 # this is the main class!
24 class Khashmir:
25     __slots__ = ['listener', 'node', 'table', 'dispatcher', 'tf', 'store']
26     def __init__(self, host, port):
27         self.listener = Listener(host, port)
28         self.node = Node(newID(), host, port)
29         self.table = KTable(self.node)
30         self.dispatcher = Dispatcher(self.listener, messages.BASE, self.node.id)
31         self.tf = transactions.TransactionFactory(self.node.id, self.dispatcher)
32         
33         self.store = db.DB()
34         self.store.open(None, None, db.DB_BTREE)
35
36         #### register unsolicited incoming message handlers
37         self.dispatcher.registerHandler('ping', self._pingHandler, messages.PING)
38                 
39         self.dispatcher.registerHandler('find node', self._findNodeHandler, messages.FIND_NODE)
40
41         self.dispatcher.registerHandler('get value', self._findValueHandler, messages.GET_VALUE)
42         
43         self.dispatcher.registerHandler('store value', self._storeValueHandler, messages.STORE_VALUE)
44         
45         
46     #######
47     #######  LOCAL INTERFACE    - use these methods!
48     def addContact(self, host, port):
49         """
50          ping this node and add the contact info to the table on pong!
51         """
52         n =Node(" "*20, host, port)  # note, we 
53         self.sendPing(n)
54
55
56     ## this call is async!
57     def findNode(self, id, callback):
58         """ returns the contact info for node, or the k closest nodes, from the global table """
59         # get K nodes out of local table/cache, or the node we want
60         nodes = self.table.findNodes(id)
61         if len(nodes) == 1 and nodes[0].id == id :
62             # we got it in our table!
63             def tcall(t, callback=callback):
64                 callback(t.extras)
65             self.dispatcher.postEvent(tcall, 0, extras=nodes)
66         else:
67             # create our search state
68             state = FindNode(self, self.dispatcher, id, callback)
69             # handle this in our own thread
70             self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
71     
72     
73     ## also async
74     def valueForKey(self, key, callback):
75         """ returns the values found for key in global table """
76         nodes = self.table.findNodes(key)
77         # create our search state
78         state = GetValue(self, self.dispatcher, key, callback)
79         # handle this in our own thread
80         self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
81
82
83     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
84     def storeValueForKey(self, key, value):
85         """ stores the value for key in the global table, returns immediately, no status 
86             in this implementation, peers respond but don't indicate status to storing values
87             values are stored in peers on a first-come first-served basis
88             this will probably change so more than one value can be stored under a key
89         """
90         def _storeValueForKey(nodes, tf=self.tf, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
91             for node in nodes:
92                 if node.id != self.node.id:
93                     t = tf.StoreValue(node, key, value, response, default)
94                     t.dispatch()
95         # this call is asynch
96         self.findNode(key, _storeValueForKey)
97         
98         
99     def insertNode(self, n):
100         """
101         insert a node in our local table, pinging oldest contact in bucket, if necessary
102         
103         If all you have is a host/port, then use addContact, which calls this function after
104         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
105         a node into the table without it's peer-ID.  That means of course the node passed into this
106         method needs to be a properly formed Node object with a valid ID.
107         """
108         old = self.table.insertNode(n)
109         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
110             # the bucket is full, check to see if old node is still around and if so, replace it
111             t = self.tf.Ping(old, self._notStaleNodeHandler, self._staleNodeHandler)
112             t.newnode = n
113             t.dispatch()
114
115
116     def sendPing(self, node):
117         """
118             ping a node
119         """
120         t = self.tf.Ping(node, self._pongHandler, self._defaultPong)
121         t.dispatch()
122
123
124     def findCloseNodes(self):
125         """
126             This does a findNode on the ID one away from our own.  
127             This will allow us to populate our table with nodes on our network closest to our own.
128             This is called as soon as we start up with an empty table
129         """
130         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
131         def callback(nodes):
132             pass
133         self.findNode(id, callback)
134
135     def refreshTable(self):
136         """
137             
138         """
139         def callback(nodes):
140             pass
141
142         for bucket in self.table.buckets:
143             if time.time() - bucket.lastAccessed >= 60 * 60:
144                 id = randRange(bucket.min, bucket.max)
145                 self.findNode(id, callback)
146         
147  
148     #####
149     ##### UNSOLICITED INCOMING MESSAGE HANDLERS
150     
151     def _pingHandler(self, t, msg):
152         #print "Got PING from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
153         self.insertNode(t.target)
154         # respond, no callbacks, we don't care if they get it or not
155         nt = self.tf.Pong(t)
156         nt.dispatch()
157         
158     def _findNodeHandler(self, t, msg):
159         #print "Got FIND_NODES from %s:%s at %s:%s" % (t.target.host, t.target.port, self.node.host, self.node.port)
160         nodes = self.table.findNodes(msg['target'])
161         # respond, no callbacks, we don't care if they get it or not
162         nt = self.tf.GotNodes(t, nodes)
163         nt.dispatch()
164     
165     def _storeValueHandler(self, t, msg):
166         if not self.store.has_key(msg['key']):
167             self.store.put(msg['key'], msg['value'])
168         nt = self.tf.StoredValue(t)
169         nt.dispatch()
170     
171     def _findValueHandler(self, t, msg):
172         if self.store.has_key(msg['key']):
173             t = self.tf.GotValues(t, [(msg['key'], self.store[msg['key']])])
174         else:
175             nodes = self.table.findNodes(msg['key'])
176             t = self.tf.GotNodes(t, nodes)
177         t.dispatch()
178
179
180     ###
181     ### message response callbacks
182     # called when we get a response to store value
183     def _storedValueHandler(self, t, msg):
184         self.table.insertNode(t.target)
185
186
187     ## these are the callbacks used when we ping the oldest node in a bucket
188     def _staleNodeHandler(self, t):
189         """ called if the pinged node never responds """
190         self.table.replaceStaleNode(t.target, t.newnode)
191
192     def _notStaleNodeHandler(self, t, msg):
193         """ called when we get a ping from the remote node """
194         self.table.insertNode(t.target)
195         
196     
197     ## these are the callbacks we use when we issue a PING
198     def _pongHandler(self, t, msg):
199         #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
200         n = Node(msg['id'], t.addr[0], t.addr[1])
201         self.table.insertNode(n)
202
203     def _defaultPong(self, t):
204         # this should probably increment a failed message counter and dump the node if it gets over a threshold
205         print "Never got PONG from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
206         
207     
208
209 class ActionBase:
210     """ base class for some long running asynchronous proccesses like finding nodes or values """
211     def __init__(self, table, dispatcher, target, callback):
212         self.table = table
213         self.dispatcher = dispatcher
214         self.target = target
215         self.int = intify(target)
216         self.found = {}
217         self.queried = {}
218         self.answered = {}
219         self.callback = callback
220         self.outstanding = 0
221         self.finished = 0
222         
223         def sort(a, b, int=self.int):
224             """ this function is for sorting nodes relative to the ID we are looking for """
225             x, y = int ^ a.int, int ^ b.int
226             if x > y:
227                 return 1
228             elif x < y:
229                 return -1
230             return 0
231         self.sort = sort
232     
233     def goWithNodes(self, t):
234         pass
235         
236 class FindNode(ActionBase):
237     """ find node action merits it's own class as it is a long running stateful process """
238     def handleGotNodes(self, t, msg):
239         if self.finished or self.answered.has_key(t.id):
240             # a day late and a dollar short
241             return
242         self.outstanding = self.outstanding - 1
243         self.answered[t.id] = 1
244         for node in msg['nodes']:
245             if not self.found.has_key(node['id']):
246                 n = Node(node['id'], node['host'], node['port'])
247                 self.found[n.id] = n
248                 self.table.insertNode(n)
249         self.schedule()
250                 
251     def schedule(self):
252         """
253             send messages to new peers, if necessary
254         """
255         if self.finished:
256             return
257         l = self.found.values()
258         l.sort(self.sort)
259
260         for node in l[:K]:
261             if node.id == self.target:
262                 self.finished=1
263                 return self.callback([node])
264             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
265                 t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
266                 self.outstanding = self.outstanding + 1
267                 self.queried[node.id] = 1
268                 t.timeout = time.time() + 15
269                 t.dispatch()
270             if self.outstanding >= N:
271                 break
272         assert(self.outstanding) >=0
273         if self.outstanding == 0:
274             ## all done!!
275             self.finished=1
276             self.callback(l[:K])
277         
278     def defaultGotNodes(self, t):
279         if self.finished:
280             return
281         self.outstanding = self.outstanding - 1
282         self.schedule()
283         
284         
285     def goWithNodes(self, t):
286         """
287             this starts the process, our argument is a transaction with t.extras being our list of nodes
288             it's a transaction since we got called from the dispatcher
289         """
290         nodes = t.extras
291         for node in nodes:
292             if node.id == self.table.node.id:
293                 continue
294             self.found[node.id] = node
295             t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
296             t.timeout = time.time() + 15
297             t.dispatch()
298             self.outstanding = self.outstanding + 1
299             self.queried[node.id] = 1
300         if self.outstanding == 0:
301             self.callback(nodes)
302
303
304
305 class GetValue(FindNode):
306     """ get value task """
307     def handleGotNodes(self, t, msg):
308         if self.finished or self.answered.has_key(t.id):
309             # a day late and a dollar short
310             return
311         self.outstanding = self.outstanding - 1
312         self.answered[t.id] = 1
313         # go through nodes
314         # if we have any closer than what we already got, query them
315         if msg['type'] == 'got nodes':
316             for node in msg['nodes']:
317                 if not self.found.has_key(node['id']):
318                     n = Node(node['id'], node['host'], node['port'])
319                     self.found[n.id] = n
320                     self.table.insertNode(n)
321         elif msg['type'] == 'got values':
322             ## done
323             self.finished = 1
324             return self.callback(msg['values'])
325         self.schedule()
326                 
327     ## get value
328     def schedule(self):
329         if self.finished:
330             return
331         l = self.found.values()
332         l.sort(self.sort)
333
334         for node in l[:K]:
335             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
336                 t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
337                 self.outstanding = self.outstanding + 1
338                 self.queried[node.id] = 1
339                 t.timeout = time.time() + 15
340                 t.dispatch()
341             if self.outstanding >= N:
342                 break
343         assert(self.outstanding) >=0
344         if self.outstanding == 0:
345             ## all done, didn't find it!!
346             self.finished=1
347             self.callback([])
348     
349     ## get value
350     def goWithNodes(self, t):
351         nodes = t.extras
352         for node in nodes:
353             if node.id == self.table.node.id:
354                 continue
355             self.found[node.id] = node
356             t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
357             t.timeout = time.time() + 15
358             t.dispatch()
359             self.outstanding = self.outstanding + 1
360             self.queried[node.id] = 1
361         if self.outstanding == 0:
362             self.callback([])
363
364
365 #------
366 def test_build_net(quiet=0):
367     from whrandom import randrange
368     import thread
369     port = 2001
370     l = []
371     peers = 100
372     
373     if not quiet:
374         print "Building %s peer table." % peers
375         
376     for i in xrange(peers):
377         a = Khashmir('localhost', port + i)
378         l.append(a)
379     
380     def run(l=l):
381         while(1):
382                 events = 0
383                 for peer in l:
384                         events = events + peer.dispatcher.runOnce()
385                 if events == 0:
386                         time.sleep(.25)
387
388     for i in range(10):
389         thread.start_new_thread(run, (l[i*10:(i+1)*10],))
390         #thread.start_new_thread(l[i].dispatcher.run, ())
391     
392     for peer in l[1:]:
393         n = l[randrange(0, len(l))].node
394         peer.addContact(n.host, n.port)
395         n = l[randrange(0, len(l))].node
396         peer.addContact(n.host, n.port)
397         n = l[randrange(0, len(l))].node
398         peer.addContact(n.host, n.port)
399         
400     time.sleep(5)
401
402     for peer in l:
403         peer.findCloseNodes()
404     time.sleep(5)
405     for peer in l:
406         peer.refreshTable()
407     return l
408         
409 def test_find_nodes(l, quiet=0):
410     import threading, sys
411     from whrandom import randrange
412     flag = threading.Event()
413     
414     n = len(l)
415     
416     a = l[randrange(0,n)]
417     b = l[randrange(0,n)]
418     
419     def callback(nodes, l=l, flag=flag):
420         if (len(nodes) >0) and (nodes[0].id == b.node.id):
421             print "test_find_nodes      PASSED"
422         else:
423             print "test_find_nodes      FAILED"
424         flag.set()
425     a.findNode(b.node.id, callback)
426     flag.wait()
427     
428 def test_find_value(l, quiet=0):
429     from whrandom import randrange
430     from sha import sha
431     import time, threading, sys
432     
433     fa = threading.Event()
434     fb = threading.Event()
435     fc = threading.Event()
436     
437     n = len(l)
438     a = l[randrange(0,n)]
439     b = l[randrange(0,n)]
440     c = l[randrange(0,n)]
441     d = l[randrange(0,n)]
442
443     key = sha(`randrange(0,100000)`).digest()
444     value = sha(`randrange(0,100000)`).digest()
445     if not quiet:
446         print "inserting value...",
447         sys.stdout.flush()
448     a.storeValueForKey(key, value)
449     time.sleep(3)
450     print "finding..."
451     
452     def mc(flag, value=value):
453         def callback(values, f=flag, val=value):
454             try:
455                 if(len(values) == 0):
456                     print "find                FAILED"
457                 else:
458                     if values[0]['value'] != val:
459                         print "find                FAILED"
460                     else:
461                         print "find                FOUND"
462             finally:
463                 f.set()
464         return callback
465     b.valueForKey(key, mc(fa))
466     c.valueForKey(key, mc(fb))
467     d.valueForKey(key, mc(fc))
468     
469     fa.wait()
470     fb.wait()
471     fc.wait()
472     
473 if __name__ == "__main__":
474     l = test_build_net()
475     time.sleep(3)
476     print "finding nodes..."
477     test_find_nodes(l)
478     test_find_nodes(l)
479     test_find_nodes(l)
480     print "inserting and fetching values..."
481     test_find_value(l)
482     test_find_value(l)
483     test_find_value(l)
484     test_find_value(l)
485     test_find_value(l)
486     test_find_value(l)
487     for i in l:
488         i.dispatcher.stop()