]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
1722ad48ad9291a7375302b0035e45019ca297ed
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from xmlrpclib import Binary
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95
96         # get locals
97         l = self.retrieveValues(key)
98         if len(l) > 0:
99             reactor.callFromThread(callback, l)
100
101         # create our search state
102         state = GetValue(self, key, callback)
103         reactor.callFromThread(state.goWithNodes, nodes, l)
104         
105
106
107     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
108     def storeValueForKey(self, key, value, callback=None):
109         """ stores the value for key in the global table, returns immediately, no status 
110             in this implementation, peers respond but don't indicate status to storing values
111             values are stored in peers on a first-come first-served basis
112             this will probably change so more than one value can be stored under a key
113         """
114         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
115             if not callback:
116                 # default callback - this will get called for each successful store value
117                 def _storedValueHandler(sender):
118                     pass
119                 response=_storedValueHandler
120         
121             for node in nodes:
122                 def cb(t, table = table, node=node, resp=response):
123                     self.table.insertNode(node)
124                     response(t)
125                 if node.id != self.node.id:
126                     def default(err, node=node, table=table):
127                         table.nodeFailed(node)
128                     df = node.storeValue(key, value, self.node.senderDict())
129                     df.addCallback(cb)
130         # this call is asynch
131         self.findNode(key, _storeValueForKey)
132         
133         
134     def insertNode(self, n):
135         """
136         insert a node in our local table, pinging oldest contact in bucket, if necessary
137         
138         If all you have is a host/port, then use addContact, which calls this method after
139         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
140         a node into the table without it's peer-ID.  That means of course the node passed into this
141         method needs to be a properly formed Node object with a valid ID.
142         """
143         old = self.table.insertNode(n)
144         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
145             # the bucket is full, check to see if old node is still around and if so, replace it
146             
147             ## these are the callbacks used when we ping the oldest node in a bucket
148             def _staleNodeHandler(oldnode=old, newnode = n):
149                 """ called if the pinged node never responds """
150                 self.table.replaceStaleNode(old, newnode)
151         
152             def _notStaleNodeHandler(sender, old=old):
153                 """ called when we get a pong from the old node """
154                 sender = Node().initWithDict(sender)
155                 if sender.id == old.id:
156                     self.table.justSeenNode(old)
157
158             df = old.ping(self.node.senderDict())
159             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
160
161
162     def sendPing(self, node):
163         """
164             ping a node
165         """
166         df = node.ping(self.node.senderDict())
167         ## these are the callbacks we use when we issue a PING
168         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
169             if id != 20 * ' ' and id != sender['id'].data:
170                 # whoah, got response from different peer than we were expecting
171                 pass
172             else:
173                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
174                 sender['host'] = host
175                 sender['port'] = port
176                 n = Node().initWithDict(sender)
177                 table.insertNode(n)
178             return
179         def _defaultPong(err, node=node, table=self.table):
180                 table.nodeFailed(node)
181
182         df.addCallbacks(_pongHandler,_defaultPong)
183
184
185     def findCloseNodes(self):
186         """
187             This does a findNode on the ID one away from our own.  
188             This will allow us to populate our table with nodes on our network closest to our own.
189             This is called as soon as we start up with an empty table
190         """
191         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
192         def callback(nodes):
193             pass
194         self.findNode(id, callback)
195
196     def refreshTable(self):
197         """
198             
199         """
200         def callback(nodes):
201             pass
202
203         for bucket in self.table.buckets:
204             if time.time() - bucket.lastAccessed >= 60 * 60:
205                 id = randRange(bucket.min, bucket.max)
206                 self.findNode(id, callback)
207         
208  
209     def retrieveValues(self, key):
210         if self.kw.has_key(key):
211             c = self.kw.cursor()
212             tup = c.set(key)
213             l = []
214             while(tup and tup[0] == key):
215                 h1 = tup[1]
216                 v = loads(self.store[h1])[1]
217                 l.append(v)
218                 tup = c.next()
219             return l
220         return []
221         
222     #####
223     ##### INCOMING MESSAGE HANDLERS
224     
225     def xmlrpc_ping(self, sender):
226         """
227             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
228             returns sender dict
229         """
230         ip = self.crequest.getClientIP()
231         sender['host'] = ip
232         n = Node().initWithDict(sender)
233         self.insertNode(n)
234         return self.node.senderDict()
235                 
236     def xmlrpc_find_node(self, target, sender):
237         nodes = self.table.findNodes(target.data)
238         nodes = map(lambda node: node.senderDict(), nodes)
239         ip = self.crequest.getClientIP()
240         sender['host'] = ip
241         n = Node().initWithDict(sender)
242         self.insertNode(n)
243         return nodes, self.node.senderDict()
244     
245     def xmlrpc_store_value(self, key, value, sender):
246         key = key.data
247         h1 = sha(key+value.data).digest()
248         t = `time.time()`
249         if not self.store.has_key(h1):
250             v = dumps((key, value.data, t))
251             self.store.put(h1, v)
252             self.itime.put(t, h1)
253             self.kw.put(key, h1)
254         else:
255             # update last insert time
256             tup = loads(self.store[h1])
257             self.store[h1] = dumps((tup[0], tup[1], t))
258             self.itime.put(t, h1)
259
260         ip = self.crequest.getClientIP()
261         sender['host'] = ip
262         n = Node().initWithDict(sender)
263         self.insertNode(n)
264         return self.node.senderDict()
265         
266     def xmlrpc_find_value(self, key, sender):
267         ip = self.crequest.getClientIP()
268         key = key.data
269         sender['host'] = ip
270         n = Node().initWithDict(sender)
271         self.insertNode(n)
272
273         l = self.retrieveValues(key)
274         if len(l) > 0:
275             l = map(lambda v: Binary(v), l)
276             return {'values' : l}, self.node.senderDict()
277         else:
278             nodes = self.table.findNodes(key)
279             nodes = map(lambda node: node.senderDict(), nodes)
280             return {'nodes' : nodes}, self.node.senderDict()
281
282
283
284
285
286 #------ testing
287
288 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
289     from whrandom import randrange
290     import thread
291     port = 2001
292     l = []
293         
294     if not quiet:
295         print "Building %s peer table." % peers
296         
297     for i in xrange(peers):
298         a = Khashmir(host, port + i)
299         l.append(a)
300     
301
302     thread.start_new_thread(l[0].app.run, ())
303     time.sleep(1)
304     for peer in l[1:]:
305         peer.app.run()
306         #time.sleep(.25)
307
308     print "adding contacts...."
309
310     for peer in l[1:]:
311         n = l[randrange(0, len(l))].node
312         peer.addContact(host, n.port)
313         n = l[randrange(0, len(l))].node
314         peer.addContact(host, n.port)
315         n = l[randrange(0, len(l))].node
316         peer.addContact(host, n.port)
317         if pause:
318             time.sleep(.30)
319             
320     time.sleep(1)
321     print "finding close nodes...."
322
323     for peer in l:
324         peer.findCloseNodes()
325         if pause:
326             time.sleep(.5)
327     if pause:
328             time.sleep(2)
329 #    for peer in l:
330 #       peer.refreshTable()
331     return l
332         
333 def test_find_nodes(l, quiet=0):
334     import threading, sys
335     from whrandom import randrange
336     flag = threading.Event()
337     
338     n = len(l)
339     
340     a = l[randrange(0,n)]
341     b = l[randrange(0,n)]
342     
343     def callback(nodes, flag=flag, id = b.node.id):
344         if (len(nodes) >0) and (nodes[0].id == id):
345             print "test_find_nodes      PASSED"
346         else:
347             print "test_find_nodes      FAILED"
348         flag.set()
349     a.findNode(b.node.id, callback)
350     flag.wait()
351     
352 def test_find_value(l, quiet=0):
353     from whrandom import randrange
354     from sha import sha
355     from hash import newID
356     import time, threading, sys
357     
358     fa = threading.Event()
359     fb = threading.Event()
360     fc = threading.Event()
361     
362     n = len(l)
363     a = l[randrange(0,n)]
364     b = l[randrange(0,n)]
365     c = l[randrange(0,n)]
366     d = l[randrange(0,n)]
367
368     key = newID()
369     value = newID()
370     if not quiet:
371         print "inserting value..."
372         sys.stdout.flush()
373     a.storeValueForKey(key, value)
374     time.sleep(3)
375     print "finding..."
376     sys.stdout.flush()
377     
378     class cb:
379         def __init__(self, flag, value=value):
380             self.flag = flag
381             self.val = value
382             self.found = 0
383         def callback(self, values):
384             try:
385                 if(len(values) == 0):
386                     if not self.found:
387                         print "find                FAILED"
388                     else:
389                         print "find                FOUND"
390                     sys.stdout.flush()
391
392                 else:
393                     if self.val in values:
394                         self.found = 1
395             finally:
396                 self.flag.set()
397
398     b.valueForKey(key, cb(fa).callback)
399     fa.wait()
400     c.valueForKey(key, cb(fb).callback)
401     fb.wait()
402     d.valueForKey(key, cb(fc).callback)    
403     fc.wait()
404     
405 def test_one(port):
406     import thread
407     k = Khashmir('localhost', port)
408     thread.start_new_thread(k.app.run, ())
409     return k
410     
411 if __name__ == "__main__":
412     l = test_build_net()
413     time.sleep(3)
414     print "finding nodes..."
415     for i in range(10):
416         test_find_nodes(l)
417     print "inserting and fetching values..."
418     for i in range(10):
419         test_find_value(l)