]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
d8b9df4ff1132c2ee5849652ca2535f635c07510
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from xmlrpclib import Binary
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95         # decode values, they will be base64 encoded
96         # create our search state
97         state = GetValue(self, key, callback)
98         reactor.callFromThread(state.goWithNodes, nodes)
99
100
101
102     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
103     def storeValueForKey(self, key, value, callback=None):
104         """ stores the value for key in the global table, returns immediately, no status 
105             in this implementation, peers respond but don't indicate status to storing values
106             values are stored in peers on a first-come first-served basis
107             this will probably change so more than one value can be stored under a key
108         """
109         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
110             if not callback:
111                 # default callback - this will get called for each successful store value
112                 def _storedValueHandler(sender):
113                     pass
114                 response=_storedValueHandler
115             for node in nodes:
116                 if node.id != self.node.id:
117                     df = node.storeValue(key, value, self.node.senderDict())
118                     df.addCallbacks(response, default)
119         # this call is asynch
120         self.findNode(key, _storeValueForKey)
121         
122         
123     def insertNode(self, n):
124         """
125         insert a node in our local table, pinging oldest contact in bucket, if necessary
126         
127         If all you have is a host/port, then use addContact, which calls this method after
128         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
129         a node into the table without it's peer-ID.  That means of course the node passed into this
130         method needs to be a properly formed Node object with a valid ID.
131         """
132         old = self.table.insertNode(n)
133         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
134             # the bucket is full, check to see if old node is still around and if so, replace it
135             
136             ## these are the callbacks used when we ping the oldest node in a bucket
137             def _staleNodeHandler(oldnode=old, newnode = n):
138                 """ called if the pinged node never responds """
139                 self.table.replaceStaleNode(old, newnode)
140         
141             def _notStaleNodeHandler(sender, old=old):
142                 """ called when we get a ping from the remote node """
143                 sender = Node().initWithDict(sender)
144                 if sender.id == old.id:
145                     self.table.insertNode(old)
146
147             df = old.ping(self.node.senderDict())
148             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
149
150
151     def sendPing(self, node):
152         """
153             ping a node
154         """
155         df = node.ping(self.node.senderDict())
156         ## these are the callbacks we use when we issue a PING
157         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
158             if id != 20 * ' ' and id != sender['id'].data:
159                 # whoah, got response from different peer than we were expecting
160                 pass
161             else:
162                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
163                 sender['host'] = host
164                 sender['port'] = port
165                 n = Node().initWithDict(sender)
166                 table.insertNode(n)
167             return
168         def _defaultPong(err):
169             # this should probably increment a failed message counter and dump the node if it gets over a threshold
170             return      
171
172         df.addCallbacks(_pongHandler,_defaultPong)
173
174
175     def findCloseNodes(self):
176         """
177             This does a findNode on the ID one away from our own.  
178             This will allow us to populate our table with nodes on our network closest to our own.
179             This is called as soon as we start up with an empty table
180         """
181         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
182         def callback(nodes):
183             pass
184         self.findNode(id, callback)
185
186     def refreshTable(self):
187         """
188             
189         """
190         def callback(nodes):
191             pass
192
193         for bucket in self.table.buckets:
194             if time.time() - bucket.lastAccessed >= 60 * 60:
195                 id = randRange(bucket.min, bucket.max)
196                 self.findNode(id, callback)
197         
198  
199     #####
200     ##### INCOMING MESSAGE HANDLERS
201     
202     def xmlrpc_ping(self, sender):
203         """
204             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
205             returns sender dict
206         """
207         ip = self.crequest.getClientIP()
208         sender['host'] = ip
209         n = Node().initWithDict(sender)
210         self.insertNode(n)
211         return self.node.senderDict()
212                 
213     def xmlrpc_find_node(self, target, sender):
214         nodes = self.table.findNodes(target)
215         nodes = map(lambda node: node.senderDict(), nodes)
216         ip = self.crequest.getClientIP()
217         sender['host'] = ip
218         n = Node().initWithDict(sender)
219         self.insertNode(n)
220         return nodes, self.node.senderDict()
221     
222     def xmlrpc_store_value(self, key, value, sender):
223         key = key.data
224         h1 = sha(key+value.data).digest()
225         t = `time.time()`
226         if not self.store.has_key(h1):
227             v = dumps((key, value.data, t))
228             self.store.put(h1, v)
229             self.itime.put(t, h1)
230             self.kw.put(key, h1)
231         else:
232             # update last insert time
233             tup = loads(self.store[h1])
234             self.store[h1] = dumps((tup[0], tup[1], t))
235             self.itime.put(t, h1)
236
237         ip = self.crequest.getClientIP()
238         sender['host'] = ip
239         n = Node().initWithDict(sender)
240         self.insertNode(n)
241         return self.node.senderDict()
242         
243     def xmlrpc_find_value(self, key, sender):
244         ip = self.crequest.getClientIP()
245         key = key.data
246         sender['host'] = ip
247         n = Node().initWithDict(sender)
248         self.insertNode(n)
249
250         if self.kw.has_key(key):
251             c = self.kw.cursor()
252             tup = c.set(key)
253             l = []
254             while(tup):
255                 h1 = tup[1]
256                 v = loads(self.store[h1])[1]
257                 l.append(v)
258                 tup = c.next()
259             l = map(lambda v: Binary(v), l)
260             return {'values' : l}, self.node.senderDict()
261         else:
262             nodes = self.table.findNodes(key)
263             nodes = map(lambda node: node.senderDict(), nodes)
264             return {'nodes' : nodes}, self.node.senderDict()
265
266
267
268
269
270 #------ testing
271
272 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
273     from whrandom import randrange
274     import thread
275     port = 2001
276     l = []
277         
278     if not quiet:
279         print "Building %s peer table." % peers
280         
281     for i in xrange(peers):
282         a = Khashmir(host, port + i)
283         l.append(a)
284     
285
286     thread.start_new_thread(l[0].app.run, ())
287     time.sleep(1)
288     for peer in l[1:]:
289         peer.app.run()
290         #time.sleep(.25)
291
292     print "adding contacts...."
293
294     for peer in l[1:]:
295         n = l[randrange(0, len(l))].node
296         peer.addContact(host, n.port)
297         n = l[randrange(0, len(l))].node
298         peer.addContact(host, n.port)
299         n = l[randrange(0, len(l))].node
300         peer.addContact(host, n.port)
301         if pause:
302             time.sleep(.30)
303             
304     time.sleep(1)
305     print "finding close nodes...."
306
307     for peer in l:
308         peer.findCloseNodes()
309         if pause:
310             time.sleep(.5)
311     if pause:
312             time.sleep(2)
313 #    for peer in l:
314 #       peer.refreshTable()
315     return l
316         
317 def test_find_nodes(l, quiet=0):
318     import threading, sys
319     from whrandom import randrange
320     flag = threading.Event()
321     
322     n = len(l)
323     
324     a = l[randrange(0,n)]
325     b = l[randrange(0,n)]
326     
327     def callback(nodes, flag=flag, id = b.node.id):
328         if (len(nodes) >0) and (nodes[0].id == id):
329             print "test_find_nodes      PASSED"
330         else:
331             print "test_find_nodes      FAILED"
332         flag.set()
333     a.findNode(b.node.id, callback)
334     flag.wait()
335     
336 def test_find_value(l, quiet=0):
337     from whrandom import randrange
338     from sha import sha
339     from hash import newID
340     import time, threading, sys
341     
342     fa = threading.Event()
343     fb = threading.Event()
344     fc = threading.Event()
345     
346     n = len(l)
347     a = l[randrange(0,n)]
348     b = l[randrange(0,n)]
349     c = l[randrange(0,n)]
350     d = l[randrange(0,n)]
351
352     key = newID()
353     value = newID()
354     if not quiet:
355         print "inserting value..."
356         sys.stdout.flush()
357     a.storeValueForKey(key, value)
358     time.sleep(3)
359     print "finding..."
360     sys.stdout.flush()
361     
362     class cb:
363         def __init__(self, flag, value=value):
364             self.flag = flag
365             self.val = value
366             self.found = 0
367         def callback(self, values):
368             try:
369                 if(len(values) == 0):
370                     if not self.found:
371                         print "find                FAILED"
372                     else:
373                         print "find                FOUND"
374                     sys.stdout.flush()
375
376                 else:
377                     if self.val in values:
378                         self.found = 1
379             finally:
380                 self.flag.set()
381
382     b.valueForKey(key, cb(fa).callback)
383     fa.wait()
384     c.valueForKey(key, cb(fb).callback)
385     fb.wait()
386     d.valueForKey(key, cb(fc).callback)    
387     fc.wait()
388     
389 def test_one(port):
390     import thread
391     k = Khashmir('localhost', port)
392     thread.start_new_thread(k.app.run, ())
393     return k
394     
395 if __name__ == "__main__":
396     l = test_build_net()
397     time.sleep(3)
398     print "finding nodes..."
399     for i in range(10):
400         test_find_nodes(l)
401     print "inserting and fetching values..."
402     for i in range(10):
403         test_find_value(l)