]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
27c29281eced0bc437748a9e4942f83bbd55958c
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from xmlrpclib import Binary
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95         # decode values, they will be base64 encoded
96         # create our search state
97         state = GetValue(self, key, callback)
98         reactor.callFromThread(state.goWithNodes, nodes)
99
100
101
102     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
103     def storeValueForKey(self, key, value, callback=None):
104         """ stores the value for key in the global table, returns immediately, no status 
105             in this implementation, peers respond but don't indicate status to storing values
106             values are stored in peers on a first-come first-served basis
107             this will probably change so more than one value can be stored under a key
108         """
109         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
110             if not callback:
111                 # default callback - this will get called for each successful store value
112                 def _storedValueHandler(sender):
113                     pass
114                 response=_storedValueHandler
115             for node in nodes:
116                 if node.id != self.node.id:
117                     df = node.storeValue(key, value, self.node.senderDict())
118                     df.addCallbacks(response, default)
119         # this call is asynch
120         self.findNode(key, _storeValueForKey)
121         
122         
123     def insertNode(self, n):
124         """
125         insert a node in our local table, pinging oldest contact in bucket, if necessary
126         
127         If all you have is a host/port, then use addContact, which calls this method after
128         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
129         a node into the table without it's peer-ID.  That means of course the node passed into this
130         method needs to be a properly formed Node object with a valid ID.
131         """
132         old = self.table.insertNode(n)
133         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
134             # the bucket is full, check to see if old node is still around and if so, replace it
135             
136             ## these are the callbacks used when we ping the oldest node in a bucket
137             def _staleNodeHandler(oldnode=old, newnode = n):
138                 """ called if the pinged node never responds """
139                 self.table.replaceStaleNode(old, newnode)
140         
141             def _notStaleNodeHandler(sender, old=old):
142                 """ called when we get a ping from the remote node """
143                 if sender['id'] == old.id:
144                     self.table.insertNode(old)
145
146             df = old.ping(self.node.senderDict())
147             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
148
149
150     def sendPing(self, node):
151         """
152             ping a node
153         """
154         df = node.ping(self.node.senderDict())
155         ## these are the callbacks we use when we issue a PING
156         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
157             if id != 20 * ' ' and id != sender['id']:
158                 # whoah, got response from different peer than we were expecting
159                 pass
160             else:
161                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
162                 sender['host'] = host
163                 sender['port'] = port
164                 n = Node().initWithDict(sender)
165                 table.insertNode(n)
166             return
167         def _defaultPong(err):
168             # this should probably increment a failed message counter and dump the node if it gets over a threshold
169             return      
170
171         df.addCallbacks(_pongHandler,_defaultPong)
172
173
174     def findCloseNodes(self):
175         """
176             This does a findNode on the ID one away from our own.  
177             This will allow us to populate our table with nodes on our network closest to our own.
178             This is called as soon as we start up with an empty table
179         """
180         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
181         def callback(nodes):
182             pass
183         self.findNode(id, callback)
184
185     def refreshTable(self):
186         """
187             
188         """
189         def callback(nodes):
190             pass
191
192         for bucket in self.table.buckets:
193             if time.time() - bucket.lastAccessed >= 60 * 60:
194                 id = randRange(bucket.min, bucket.max)
195                 self.findNode(id, callback)
196         
197  
198     #####
199     ##### INCOMING MESSAGE HANDLERS
200     
201     def xmlrpc_ping(self, sender):
202         """
203             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
204             returns sender dict
205         """
206         ip = self.crequest.getClientIP()
207         sender['host'] = ip
208         n = Node().initWithDict(sender)
209         self.insertNode(n)
210         return self.node.senderDict()
211                 
212     def xmlrpc_find_node(self, target, sender):
213         nodes = self.table.findNodes(target)
214         nodes = map(lambda node: node.senderDict(), nodes)
215         ip = self.crequest.getClientIP()
216         sender['host'] = ip
217         n = Node().initWithDict(sender)
218         self.insertNode(n)
219         return nodes, self.node.senderDict()
220     
221     def xmlrpc_store_value(self, key, value, sender):
222         key = key.data
223         h1 = sha(key+value.data).digest()
224         t = `time.time()`
225         if not self.store.has_key(h1):
226             v = dumps((key, value.data, t))
227             self.store.put(h1, v)
228             self.itime.put(t, h1)
229             self.kw.put(key, h1)
230         else:
231             # update last insert time
232             tup = loads(self.store[h1])
233             self.store[h1] = dumps((tup[0], tup[1], t))
234             self.itime.put(t, h1)
235
236         ip = self.crequest.getClientIP()
237         sender['host'] = ip
238         n = Node().initWithDict(sender)
239         self.insertNode(n)
240         return self.node.senderDict()
241         
242     def xmlrpc_find_value(self, key, sender):
243         ip = self.crequest.getClientIP()
244         key = key.data
245         sender['host'] = ip
246         n = Node().initWithDict(sender)
247         self.insertNode(n)
248
249         if self.kw.has_key(key):
250             c = self.kw.cursor()
251             tup = c.set(key)
252             l = []
253             while(tup):
254                 h1 = tup[1]
255                 v = loads(self.store[h1])[1]
256                 l.append(v)
257                 tup = c.next()
258             l = map(lambda v: Binary(v), l)
259             return {'values' : l}, self.node.senderDict()
260         else:
261             nodes = self.table.findNodes(key)
262             nodes = map(lambda node: node.senderDict(), nodes)
263             return {'nodes' : nodes}, self.node.senderDict()
264
265
266
267
268
269 #------ testing
270
271 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
272     from whrandom import randrange
273     import thread
274     port = 2001
275     l = []
276         
277     if not quiet:
278         print "Building %s peer table." % peers
279         
280     for i in xrange(peers):
281         a = Khashmir(host, port + i)
282         l.append(a)
283     
284
285     thread.start_new_thread(l[0].app.run, ())
286     time.sleep(1)
287     for peer in l[1:]:
288         peer.app.run()
289         #time.sleep(.25)
290
291     print "adding contacts...."
292
293     for peer in l[1:]:
294         n = l[randrange(0, len(l))].node
295         peer.addContact(host, n.port)
296         n = l[randrange(0, len(l))].node
297         peer.addContact(host, n.port)
298         n = l[randrange(0, len(l))].node
299         peer.addContact(host, n.port)
300         if pause:
301             time.sleep(.30)
302             
303     time.sleep(1)
304     print "finding close nodes...."
305
306     for peer in l:
307         peer.findCloseNodes()
308         if pause:
309             time.sleep(.5)
310     if pause:
311             time.sleep(2)
312 #    for peer in l:
313 #       peer.refreshTable()
314     return l
315         
316 def test_find_nodes(l, quiet=0):
317     import threading, sys
318     from whrandom import randrange
319     flag = threading.Event()
320     
321     n = len(l)
322     
323     a = l[randrange(0,n)]
324     b = l[randrange(0,n)]
325     
326     def callback(nodes, flag=flag, id = b.node.id):
327         if (len(nodes) >0) and (nodes[0].id == id):
328             print "test_find_nodes      PASSED"
329         else:
330             print "test_find_nodes      FAILED"
331         flag.set()
332     a.findNode(b.node.id, callback)
333     flag.wait()
334     
335 def test_find_value(l, quiet=0):
336     from whrandom import randrange
337     from sha import sha
338     from hash import newID
339     import time, threading, sys
340     
341     fa = threading.Event()
342     fb = threading.Event()
343     fc = threading.Event()
344     
345     n = len(l)
346     a = l[randrange(0,n)]
347     b = l[randrange(0,n)]
348     c = l[randrange(0,n)]
349     d = l[randrange(0,n)]
350
351     key = newID()
352     value = newID()
353     if not quiet:
354         print "inserting value..."
355         sys.stdout.flush()
356     a.storeValueForKey(key, value)
357     time.sleep(3)
358     print "finding..."
359     sys.stdout.flush()
360     
361     class cb:
362         def __init__(self, flag, value=value):
363             self.flag = flag
364             self.val = value
365             self.found = 0
366         def callback(self, values):
367             try:
368                 if(len(values) == 0):
369                     if not self.found:
370                         print "find                FAILED"
371                     else:
372                         print "find                FOUND"
373                     sys.stdout.flush()
374
375                 else:
376                     if self.val in values:
377                         self.found = 1
378             finally:
379                 self.flag.set()
380
381     b.valueForKey(key, cb(fa).callback)
382     fa.wait()
383     c.valueForKey(key, cb(fb).callback)
384     fb.wait()
385     d.valueForKey(key, cb(fc).callback)    
386     fc.wait()
387     
388 def test_one(port):
389     import thread
390     k = Khashmir('localhost', port)
391     thread.start_new_thread(k.app.run, ())
392     return k
393     
394 if __name__ == "__main__":
395     l = test_build_net()
396     time.sleep(3)
397     print "finding nodes..."
398     for i in range(10):
399         test_find_nodes(l)
400     print "inserting and fetching values..."
401     for i in range(10):
402         test_find_value(l)