]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
automatically check responses to make sure it's from the node
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
9
10 from sha import sha
11
12 from ktable import KTable, K
13 from knode import KNode as Node
14
15 from hash import newID, newIDInRange
16
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
23 threadable.init()
24
25 from bsddb3 import db ## find this at http://pybsddb.sf.net/
26 from bsddb3._db import DBNotFoundError
27
28 from xmlrpclib import Binary
29
30
31
32 # this is the main class!
33 class Khashmir(xmlrpc.XMLRPC):
34     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
35     def __init__(self, host, port):
36         self.node = Node().init(newID(), host, port)
37         self.table = KTable(self.node)
38         self.app = Application("xmlrpc")
39         self.app.listenTCP(port, server.Site(self))
40         
41         ## these databases may be more suited to on-disk rather than in-memory
42         # h((key, value)) -> (key, value, time) mappings
43         self.store = db.DB()
44         self.store.open(None, None, db.DB_BTREE)
45         
46         # <insert time> -> h((key, value))
47         self.itime = db.DB()
48         self.itime.set_flags(db.DB_DUP)
49         self.itime.open(None, None, db.DB_BTREE)
50
51         # key -> h((key, value))
52         self.kw = db.DB()
53         self.kw.set_flags(db.DB_DUP)
54         self.kw.open(None, None, db.DB_BTREE)
55
56         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
57         
58     def render(self, request):
59         """
60             Override the built in render so we can have access to the request object!
61             note, crequest is probably only valid on the initial call (not after deferred!)
62         """
63         self.crequest = request
64         return xmlrpc.XMLRPC.render(self, request)
65
66         
67     #######
68     #######  LOCAL INTERFACE    - use these methods!
69     def addContact(self, host, port):
70         """
71          ping this node and add the contact info to the table on pong!
72         """
73         n =Node().init(" "*20, host, port)  # note, we 
74         self.sendPing(n)
75
76
77     ## this call is async!
78     def findNode(self, id, callback, errback=None):
79         """ returns the contact info for node, or the k closest nodes, from the global table """
80         # get K nodes out of local table/cache, or the node we want
81         nodes = self.table.findNodes(id)
82         d = Deferred()
83         if errback:
84             d.addCallbacks(callback, errback)
85         else:
86             d.addCallback(callback)
87         if len(nodes) == 1 and nodes[0].id == id :
88             d.callback(nodes)
89         else:
90             # create our search state
91             state = FindNode(self, id, d.callback)
92             reactor.callFromThread(state.goWithNodes, nodes)
93     
94     
95     ## also async
96     def valueForKey(self, key, callback):
97         """ returns the values found for key in global table """
98         nodes = self.table.findNodes(key)
99
100         # get locals
101         l = self.retrieveValues(key)
102         if len(l) > 0:
103             reactor.callFromThread(callback, l)
104
105         # create our search state
106         state = GetValue(self, key, callback)
107         reactor.callFromThread(state.goWithNodes, nodes, l)
108         
109
110
111     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
112     def storeValueForKey(self, key, value, callback=None):
113         """ stores the value for key in the global table, returns immediately, no status 
114             in this implementation, peers respond but don't indicate status to storing values
115             values are stored in peers on a first-come first-served basis
116             this will probably change so more than one value can be stored under a key
117         """
118         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
119             if not callback:
120                 # default callback - this will get called for each successful store value
121                 def _storedValueHandler(sender):
122                     pass
123                 response=_storedValueHandler
124         
125             for node in nodes:
126                 def cb(t, table = table, node=node, resp=response):
127                     self.table.insertNode(node)
128                     response(t)
129                 if node.id != self.node.id:
130                     def default(err, node=node, table=table):
131                         table.nodeFailed(node)
132                     df = node.storeValue(key, value, self.node.senderDict())
133                     df.addCallbacks(cb, lambda x: None)
134         # this call is asynch
135         self.findNode(key, _storeValueForKey)
136         
137         
138     def insertNode(self, n, contacted=1):
139         """
140         insert a node in our local table, pinging oldest contact in bucket, if necessary
141         
142         If all you have is a host/port, then use addContact, which calls this method after
143         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
144         a node into the table without it's peer-ID.  That means of course the node passed into this
145         method needs to be a properly formed Node object with a valid ID.
146         """
147         old = self.table.insertNode(n, contacted=contacted)
148         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
149             # the bucket is full, check to see if old node is still around and if so, replace it
150             
151             ## these are the callbacks used when we ping the oldest node in a bucket
152             def _staleNodeHandler(oldnode=old, newnode = n):
153                 """ called if the pinged node never responds """
154                 self.table.replaceStaleNode(old, newnode)
155         
156             def _notStaleNodeHandler(sender, old=old):
157                 """ called when we get a pong from the old node """
158                 sender = Node().initWithDict(sender)
159                 if sender.id == old.id:
160                     self.table.justSeenNode(old)
161
162             df = old.ping(self.node.senderDict())
163             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
164
165
166     def sendPing(self, node):
167         """
168             ping a node
169         """
170         df = node.ping(self.node.senderDict())
171         ## these are the callbacks we use when we issue a PING
172         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
173             if id != 20 * ' ' and id != sender['id'].data:
174                 # whoah, got response from different peer than we were expecting
175                 pass
176             else:
177                 sender['host'] = host
178                 sender['port'] = port
179                 n = Node().initWithDict(sender)
180                 table.insertNode(n)
181             return
182         def _defaultPong(err, node=node, table=self.table):
183                 table.nodeFailed(node)
184
185         df.addCallbacks(_pongHandler,_defaultPong)
186
187
188     def findCloseNodes(self):
189         """
190             This does a findNode on the ID one away from our own.  
191             This will allow us to populate our table with nodes on our network closest to our own.
192             This is called as soon as we start up with an empty table
193         """
194         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
195         def callback(nodes):
196             pass
197         self.findNode(id, callback)
198
199     def refreshTable(self):
200         """
201             
202         """
203         def callback(nodes):
204             pass
205
206         for bucket in self.table.buckets:
207             if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
208                 id = newIDInRange(bucket.min, bucket.max)
209                 self.findNode(id, callback)
210         
211  
212     def retrieveValues(self, key):
213         if self.kw.has_key(key):
214             c = self.kw.cursor()
215             tup = c.set(key)
216             l = []
217             while(tup and tup[0] == key):
218                 h1 = tup[1]
219                 v = loads(self.store[h1])[1]
220                 l.append(v)
221                 tup = c.next()
222             return l
223         return []
224         
225     #####
226     ##### INCOMING MESSAGE HANDLERS
227     
228     def xmlrpc_ping(self, sender):
229         """
230             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
231             returns sender dict
232         """
233         ip = self.crequest.getClientIP()
234         sender['host'] = ip
235         n = Node().initWithDict(sender)
236         self.insertNode(n, contacted=0)
237         return self.node.senderDict()
238                 
239     def xmlrpc_find_node(self, target, sender):
240         nodes = self.table.findNodes(target.data)
241         nodes = map(lambda node: node.senderDict(), nodes)
242         ip = self.crequest.getClientIP()
243         sender['host'] = ip
244         n = Node().initWithDict(sender)
245         self.insertNode(n, contacted=0)
246         return nodes, self.node.senderDict()
247     
248     def xmlrpc_store_value(self, key, value, sender):
249         key = key.data
250         h1 = sha(key+value.data).digest()
251         t = `time.time()`
252         if not self.store.has_key(h1):
253             v = dumps((key, value.data, t))
254             self.store.put(h1, v)
255             self.itime.put(t, h1)
256             self.kw.put(key, h1)
257         else:
258             # update last insert time
259             tup = loads(self.store[h1])
260             self.store[h1] = dumps((tup[0], tup[1], t))
261             self.itime.put(t, h1)
262
263         ip = self.crequest.getClientIP()
264         sender['host'] = ip
265         n = Node().initWithDict(sender)
266         self.insertNode(n, contacted=0)
267         return self.node.senderDict()
268         
269     def xmlrpc_find_value(self, key, sender):
270         ip = self.crequest.getClientIP()
271         key = key.data
272         sender['host'] = ip
273         n = Node().initWithDict(sender)
274         self.insertNode(n, contacted=0)
275
276         l = self.retrieveValues(key)
277         if len(l) > 0:
278             l = map(lambda v: Binary(v), l)
279             return {'values' : l}, self.node.senderDict()
280         else:
281             nodes = self.table.findNodes(key)
282             nodes = map(lambda node: node.senderDict(), nodes)
283             return {'nodes' : nodes}, self.node.senderDict()
284
285
286
287
288
289 #------ testing
290
291 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
292     from whrandom import randrange
293     import thread
294     port = 2001
295     l = []
296         
297     if not quiet:
298         print "Building %s peer table." % peers
299         
300     for i in xrange(peers):
301         a = Khashmir(host, port + i)
302         l.append(a)
303     
304
305     thread.start_new_thread(l[0].app.run, ())
306     time.sleep(1)
307     for peer in l[1:]:
308         peer.app.run()
309         #time.sleep(.25)
310
311     print "adding contacts...."
312
313     for peer in l[1:]:
314         n = l[randrange(0, len(l))].node
315         peer.addContact(host, n.port)
316         n = l[randrange(0, len(l))].node
317         peer.addContact(host, n.port)
318         n = l[randrange(0, len(l))].node
319         peer.addContact(host, n.port)
320         if pause:
321             time.sleep(.5)
322             
323     time.sleep(10)
324     print "finding close nodes...."
325
326     for peer in l:
327         peer.findCloseNodes()
328         if pause:
329             time.sleep(2)
330     if pause:
331             time.sleep(10)
332 #    for peer in l:
333 #       peer.refreshTable()
334     return l
335         
336 def test_find_nodes(l, quiet=0):
337     import threading, sys
338     from whrandom import randrange
339     flag = threading.Event()
340     
341     n = len(l)
342     
343     a = l[randrange(0,n)]
344     b = l[randrange(0,n)]
345     
346     def callback(nodes, flag=flag, id = b.node.id):
347         if (len(nodes) >0) and (nodes[0].id == id):
348             print "test_find_nodes      PASSED"
349         else:
350             print "test_find_nodes      FAILED"
351         flag.set()
352     a.findNode(b.node.id, callback)
353     flag.wait()
354     
355 def test_find_value(l, quiet=0):
356     from whrandom import randrange
357     from sha import sha
358     from hash import newID
359     import time, threading, sys
360     
361     fa = threading.Event()
362     fb = threading.Event()
363     fc = threading.Event()
364     
365     n = len(l)
366     a = l[randrange(0,n)]
367     b = l[randrange(0,n)]
368     c = l[randrange(0,n)]
369     d = l[randrange(0,n)]
370
371     key = newID()
372     value = newID()
373     if not quiet:
374         print "inserting value..."
375         sys.stdout.flush()
376     a.storeValueForKey(key, value)
377     time.sleep(3)
378     print "finding..."
379     sys.stdout.flush()
380     
381     class cb:
382         def __init__(self, flag, value=value):
383             self.flag = flag
384             self.val = value
385             self.found = 0
386         def callback(self, values):
387             try:
388                 if(len(values) == 0):
389                     if not self.found:
390                         print "find                NOT FOUND"
391                     else:
392                         print "find                FOUND"
393                     sys.stdout.flush()
394
395                 else:
396                     if self.val in values:
397                         self.found = 1
398             finally:
399                 self.flag.set()
400
401     b.valueForKey(key, cb(fa).callback)
402     fa.wait()
403     c.valueForKey(key, cb(fb).callback)
404     fb.wait()
405     d.valueForKey(key, cb(fc).callback)    
406     fc.wait()
407     
408 def test_one(host, port):
409     import thread
410     k = Khashmir(host, port)
411     thread.start_new_thread(k.app.run, ())
412     return k
413     
414 if __name__ == "__main__":
415     import sys
416     n = 8
417     if len(sys.argv) > 1:
418         n = int(sys.argv[1])
419     l = test_build_net(peers=n)
420     time.sleep(3)
421     print "finding nodes..."
422     for i in range(10):
423         test_find_nodes(l)
424     print "inserting and fetching values..."
425     for i in range(10):
426         test_find_value(l)