]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
32dab3e6522211386cc0bcbd26dd608b205e49ee
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5
6 from ktable import KTable, K
7 from knode import KNode as Node
8
9 from hash import newID, intify
10
11 from twisted.web import xmlrpc
12 from twisted.internet.defer import Deferred
13 from twisted.python import threadable
14 threadable.init()
15
16 from bsddb3 import db ## find this at http://pybsddb.sf.net/
17 from bsddb3._db import DBNotFoundError
18
19 # don't ping unless it's been at least this many seconds since we've heard from a peer
20 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
21
22 # concurrent FIND_NODE/VALUE requests!
23 N = 3
24
25
26
27 # this is the main class!
28 class Khashmir(xmlrpc.XMLRPC):
29     __slots__ = ['listener', 'node', 'table', 'store', 'app']
30     def __init__(self, host, port):
31         self.node = Node(newID(), host, port)
32         self.table = KTable(self.node)
33         from twisted.internet.app import Application
34         from twisted.web import server
35         self.app = Application("xmlrpc")
36         self.app.listenTCP(port, server.Site(self))
37         self.store = db.DB()
38         self.store.open(None, None, db.DB_BTREE)
39         
40
41     def render(self, request):
42         """
43             Override the built in render so we can have access to the request object!
44             note, crequest is probably only valid on the initial call (not after deferred!)
45         """
46         self.crequest = request
47         return xmlrpc.XMLRPC.render(self, request)
48
49         
50     #######
51     #######  LOCAL INTERFACE    - use these methods!
52     def addContact(self, host, port):
53         """
54          ping this node and add the contact info to the table on pong!
55         """
56         n =Node(" "*20, host, port)  # note, we 
57         self.sendPing(n)
58
59
60     ## this call is async!
61     def findNode(self, id, callback, errback=None):
62         """ returns the contact info for node, or the k closest nodes, from the global table """
63         # get K nodes out of local table/cache, or the node we want
64         nodes = self.table.findNodes(id)
65         d = Deferred()
66         d.addCallbacks(callback, errback)
67         if len(nodes) == 1 and nodes[0].id == id :
68             d.callback(nodes)
69         else:
70             # create our search state
71             state = FindNode(self, id, d.callback)
72             reactor.callFromThread(state.goWithNodes, nodes)
73     
74     
75     ## also async
76     def valueForKey(self, key, callback):
77         """ returns the values found for key in global table """
78         nodes = self.table.findNodes(key)
79         # create our search state
80         state = GetValue(self, key, callback)
81         reactor.callFromThread(state.goWithNodes, nodes)
82
83
84     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
85     def storeValueForKey(self, key, value):
86         """ stores the value for key in the global table, returns immediately, no status 
87             in this implementation, peers respond but don't indicate status to storing values
88             values are stored in peers on a first-come first-served basis
89             this will probably change so more than one value can be stored under a key
90         """
91         def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
92             for node in nodes:
93                 if node.id != self.node.id:
94                     df = node.storeValue(key, value, self.node.senderDict())
95                     df.addCallbacks(response, default)
96         # this call is asynch
97         self.findNode(key, _storeValueForKey)
98         
99         
100     def insertNode(self, n):
101         """
102         insert a node in our local table, pinging oldest contact in bucket, if necessary
103         
104         If all you have is a host/port, then use addContact, which calls this method after
105         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
106         a node into the table without it's peer-ID.  That means of course the node passed into this
107         method needs to be a properly formed Node object with a valid ID.
108         """
109         old = self.table.insertNode(n)
110         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
111             # the bucket is full, check to see if old node is still around and if so, replace it
112             
113             ## these are the callbacks used when we ping the oldest node in a bucket
114             def _staleNodeHandler(oldnode=old, newnode = n):
115                 """ called if the pinged node never responds """
116                 self.table.replaceStaleNode(old, newnode)
117         
118             def _notStaleNodeHandler(sender, old=old):
119                 """ called when we get a ping from the remote node """
120                 if sender['id'] == old.id:
121                     self.table.insertNode(old)
122
123             df = old.ping()
124             df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
125
126
127     def sendPing(self, node):
128         """
129             ping a node
130         """
131         df = node.ping(self.node.senderDict())
132         ## these are the callbacks we use when we issue a PING
133         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
134             if id != 20 * ' ' and id != sender['id']:
135                 # whoah, got response from different peer than we were expecting
136                 pass
137             else:
138                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
139                 n = Node(sender['id'], host, port)
140                 table.insertNode(n)
141             return
142         def _defaultPong(err):
143             # this should probably increment a failed message counter and dump the node if it gets over a threshold
144             return      
145
146         df.addCallbacks(_pongHandler,_defaultPong)
147
148
149     def findCloseNodes(self):
150         """
151             This does a findNode on the ID one away from our own.  
152             This will allow us to populate our table with nodes on our network closest to our own.
153             This is called as soon as we start up with an empty table
154         """
155         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
156         def callback(nodes):
157             pass
158         self.findNode(id, callback)
159
160     def refreshTable(self):
161         """
162             
163         """
164         def callback(nodes):
165             pass
166
167         for bucket in self.table.buckets:
168             if time.time() - bucket.lastAccessed >= 60 * 60:
169                 id = randRange(bucket.min, bucket.max)
170                 self.findNode(id, callback)
171         
172  
173     #####
174     ##### INCOMING MESSAGE HANDLERS
175     
176     def xmlrpc_ping(self, sender):
177         """
178             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
179             returns sender dict
180         """
181         ip = self.crequest.getClientIP()
182         n = Node(sender['id'], ip, sender['port'])
183         self.insertNode(n)
184         return self.node.senderDict()
185                 
186     def xmlrpc_find_node(self, target, sender):
187         nodes = self.table.findNodes(target)
188         nodes = map(lambda node: node.senderDict(), nodes)
189         ip = self.crequest.getClientIP()
190         n = Node(sender['id'], ip, sender['port'])
191         self.insertNode(n)
192         return nodes, self.node.senderDict()
193     
194     def xmlrpc_store_value(self, key, value, sender):
195         if not self.store.has_key(key):
196             self.store.put(key, value)
197         ip = self.crequest.getClientIP()
198         n = Node(sender['id'], ip, sender['port'])
199         self.insertNode(n)
200         return self.node.senderDict()
201         
202     def xmlrpc_find_value(self, key, sender):
203         ip = self.crequest.getClientIP()
204         n = Node(sender['id'], ip, sender['port'])
205         self.insertNode(n)
206         if self.store.has_key(key):
207             return {'values' : self.store[key]}, self.node.senderDict()
208         else:
209             nodes = self.table.findNodes(msg['key'])
210             nodes = map(lambda node: node.senderDict(), nodes)
211             return {'nodes' : nodes}, self.node.senderDict()
212
213     ###
214     ### message response callbacks
215     # called when we get a response to store value
216     def _storedValueHandler(self, sender):
217         pass
218
219         
220     
221     
222
223 class ActionBase:
224     """ base class for some long running asynchronous proccesses like finding nodes or values """
225     def __init__(self, table, target, callback):
226         self.table = table
227         self.target = target
228         self.int = intify(target)
229         self.found = {}
230         self.queried = {}
231         self.answered = {}
232         self.callback = callback
233         self.outstanding = 0
234         self.finished = 0
235         
236         def sort(a, b, int=self.int):
237             """ this function is for sorting nodes relative to the ID we are looking for """
238             x, y = int ^ a.int, int ^ b.int
239             if x > y:
240                 return 1
241             elif x < y:
242                 return -1
243             return 0
244         self.sort = sort
245     
246     def goWithNodes(self, t):
247         pass
248         
249         
250
251 FIND_NODE_TIMEOUT = 15
252
253 class FindNode(ActionBase):
254     """ find node action merits it's own class as it is a long running stateful process """
255     def handleGotNodes(self, args):
256         l, sender = args
257         if self.finished or self.answered.has_key(sender['id']):
258             # a day late and a dollar short
259             return
260         self.outstanding = self.outstanding - 1
261         self.answered[sender['id']] = 1
262         for node in l:
263             if not self.found.has_key(node['id']):
264                 n = Node(node['id'], node['host'], node['port'])
265                 self.found[n.id] = n
266                 self.table.insertNode(n)
267         self.schedule()
268                 
269     def schedule(self):
270         """
271             send messages to new peers, if necessary
272         """
273         if self.finished:
274             return
275         l = self.found.values()
276         l.sort(self.sort)
277
278         for node in l[:K]:
279             if node.id == self.target:
280                 self.finished=1
281                 return self.callback([node])
282             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
283                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
284                 df = node.findNode(self.target, self.table.node.senderDict())
285                 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
286                 self.outstanding = self.outstanding + 1
287                 self.queried[node.id] = 1
288             if self.outstanding >= N:
289                 break
290         assert(self.outstanding) >=0
291         if self.outstanding == 0:
292             ## all done!!
293             self.finished=1
294             reactor.callFromThread(self.callback, l[:K])
295         
296     def defaultGotNodes(self, t):
297         if self.finished:
298             return
299         self.outstanding = self.outstanding - 1
300         self.schedule()
301         
302         
303     def goWithNodes(self, nodes):
304         """
305             this starts the process, our argument is a transaction with t.extras being our list of nodes
306             it's a transaction since we got called from the dispatcher
307         """
308         for node in nodes:
309             if node.id == self.table.node.id:
310                 continue
311             self.found[node.id] = node
312             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
313             df = node.findNode(self.target, self.table.node.senderDict())
314             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
315             self.outstanding = self.outstanding + 1
316             self.queried[node.id] = 1
317         if self.outstanding == 0:
318             self.callback(nodes)
319
320
321 GET_VALUE_TIMEOUT = 15
322 class GetValue(FindNode):
323     """ get value task """
324     def handleGotNodes(self, args):
325         l, sender = args
326         l = l[0]
327         if self.finished or self.answered.has_key(sender['id']):
328             # a day late and a dollar short
329             return
330         self.outstanding = self.outstanding - 1
331         self.answered[sender['id']] = 1
332         # go through nodes
333         # if we have any closer than what we already got, query them
334         if l.has_key('nodes'):
335             for node in l['nodes']:
336                 if not self.found.has_key(node['id']):
337                     n = Node(node['id'], node['host'], node['port'])
338                     self.found[n.id] = n
339                     self.table.insertNode(n)
340         elif l.has_key('values'):
341             ## done
342             self.finished = 1
343             return self.callback(l['values'])
344         self.schedule()
345                 
346     ## get value
347     def schedule(self):
348         if self.finished:
349             return
350         l = self.found.values()
351         l.sort(self.sort)
352
353         for node in l[:K]:
354             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
355                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
356                 df = node.getValue(node, self.target)
357                 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
358                 self.outstanding = self.outstanding + 1
359                 self.queried[node.id] = 1
360             if self.outstanding >= N:
361                 break
362         assert(self.outstanding) >=0
363         if self.outstanding == 0:
364             ## all done, didn't find it!!
365             self.finished=1
366             reactor.callFromThread(self.callback,[])
367     
368     ## get value
369     def goWithNodes(self, nodes):
370         for node in nodes:
371             if node.id == self.table.node.id:
372                 continue
373             self.found[node.id] = node
374             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
375             df = node.findNode(self.target, self.table.node.senderDict())
376             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
377             self.outstanding = self.outstanding + 1
378             self.queried[node.id] = 1
379         if self.outstanding == 0:
380             reactor.callFromThread(self.callback, [])
381
382
383
384 #------
385
386 def test_build_net(quiet=0):
387     from whrandom import randrange
388     import thread
389     port = 2001
390     l = []
391     peers = 16
392     
393     if not quiet:
394         print "Building %s peer table." % peers
395         
396     for i in xrange(peers):
397         a = Khashmir('localhost', port + i)
398         l.append(a)
399     
400     def run(l=l):
401         while(1):
402                 events = 0
403                 for peer in l:
404                         events = events + peer.dispatcher.runOnce()
405                 if events == 0:
406                         time.sleep(.25)
407
408     thread.start_new_thread(l[0].app.run, ())
409     for peer in l[1:]:
410         peer.app.run()
411         
412     for peer in l[1:]:
413         n = l[randrange(0, len(l))].node
414         peer.addContact(n.host, n.port)
415         n = l[randrange(0, len(l))].node
416         peer.addContact(n.host, n.port)
417         n = l[randrange(0, len(l))].node
418         peer.addContact(n.host, n.port)
419         
420     time.sleep(5)
421
422     for peer in l:
423         peer.findCloseNodes()
424     time.sleep(5)
425     for peer in l:
426         peer.refreshTable()
427     return l
428         
429 def test_find_nodes(l, quiet=0):
430     import threading, sys
431     from whrandom import randrange
432     flag = threading.Event()
433     
434     n = len(l)
435     
436     a = l[randrange(0,n)]
437     b = l[randrange(0,n)]
438     
439     def callback(nodes, l=l, flag=flag):
440         if (len(nodes) >0) and (nodes[0].id == b.node.id):
441             print "test_find_nodes      PASSED"
442         else:
443             print "test_find_nodes      FAILED"
444         flag.set()
445     a.findNode(b.node.id, callback)
446     flag.wait()
447     
448 def test_find_value(l, quiet=0):
449     from whrandom import randrange
450     from sha import sha
451     import time, threading, sys
452     
453     fa = threading.Event()
454     fb = threading.Event()
455     fc = threading.Event()
456     
457     n = len(l)
458     a = l[randrange(0,n)]
459     b = l[randrange(0,n)]
460     c = l[randrange(0,n)]
461     d = l[randrange(0,n)]
462
463     key = sha(`randrange(0,100000)`).digest()
464     value = sha(`randrange(0,100000)`).digest()
465     if not quiet:
466         print "inserting value...",
467         sys.stdout.flush()
468     a.storeValueForKey(key, value)
469     time.sleep(3)
470     print "finding..."
471     
472     def mc(flag, value=value):
473         def callback(values, f=flag, val=value):
474             try:
475                 if(len(values) == 0):
476                     print "find                FAILED"
477                 else:
478                     if values[0]['value'] != val:
479                         print "find                FAILED"
480                     else:
481                         print "find                FOUND"
482             finally:
483                 f.set()
484         return callback
485     b.valueForKey(key, mc(fa))
486     c.valueForKey(key, mc(fb))
487     d.valueForKey(key, mc(fc))
488     
489     fa.wait()
490     fb.wait()
491     fc.wait()
492     
493 if __name__ == "__main__":
494     l = test_build_net()
495     time.sleep(3)
496     print "finding nodes..."
497     test_find_nodes(l)
498     test_find_nodes(l)
499     test_find_nodes(l)
500     print "inserting and fetching values..."
501     test_find_value(l)
502     test_find_value(l)
503     test_find_value(l)
504     test_find_value(l)
505     test_find_value(l)
506     test_find_value(l)