]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
bb78e6ba068969655b69ce78960f70bbb4f256b3
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
9
10 from sha import sha
11
12 from ktable import KTable, K
13 from knode import KNode as Node
14
15 from hash import newID, newIDInRange
16
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
23 threadable.init()
24
25 from bsddb3 import db ## find this at http://pybsddb.sf.net/
26 from bsddb3._db import DBNotFoundError
27
28 from xmlrpclib import Binary
29
30
31
32 # this is the main class!
33 class Khashmir(xmlrpc.XMLRPC):
34     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
35     def __init__(self, host, port):
36         self.node = Node().init(newID(), host, port)
37         self.table = KTable(self.node)
38         self.app = Application("xmlrpc")
39         self.app.listenTCP(port, server.Site(self))
40         
41         ## these databases may be more suited to on-disk rather than in-memory
42         # h((key, value)) -> (key, value, time) mappings
43         self.store = db.DB()
44         self.store.open(None, None, db.DB_BTREE)
45         
46         # <insert time> -> h((key, value))
47         self.itime = db.DB()
48         self.itime.set_flags(db.DB_DUP)
49         self.itime.open(None, None, db.DB_BTREE)
50
51         # key -> h((key, value))
52         self.kw = db.DB()
53         self.kw.set_flags(db.DB_DUP)
54         self.kw.open(None, None, db.DB_BTREE)
55
56         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
57         
58     def render(self, request):
59         """
60             Override the built in render so we can have access to the request object!
61             note, crequest is probably only valid on the initial call (not after deferred!)
62         """
63         self.crequest = request
64         return xmlrpc.XMLRPC.render(self, request)
65
66         
67     #######
68     #######  LOCAL INTERFACE    - use these methods!
69     def addContact(self, host, port):
70         """
71          ping this node and add the contact info to the table on pong!
72         """
73         n =Node().init(" "*20, host, port)  # note, we 
74         self.sendPing(n)
75
76
77     ## this call is async!
78     def findNode(self, id, callback, errback=None):
79         """ returns the contact info for node, or the k closest nodes, from the global table """
80         # get K nodes out of local table/cache, or the node we want
81         nodes = self.table.findNodes(id)
82         d = Deferred()
83         if errback:
84             d.addCallbacks(callback, errback)
85         else:
86             d.addCallback(callback)
87         if len(nodes) == 1 and nodes[0].id == id :
88             d.callback(nodes)
89         else:
90             # create our search state
91             state = FindNode(self, id, d.callback)
92             reactor.callFromThread(state.goWithNodes, nodes)
93     
94     
95     ## also async
96     def valueForKey(self, key, callback):
97         """ returns the values found for key in global table """
98         nodes = self.table.findNodes(key)
99
100         # get locals
101         l = self.retrieveValues(key)
102         if len(l) > 0:
103             reactor.callFromThread(callback, l)
104
105         # create our search state
106         state = GetValue(self, key, callback)
107         reactor.callFromThread(state.goWithNodes, nodes, l)
108         
109
110
111     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
112     def storeValueForKey(self, key, value, callback=None):
113         """ stores the value for key in the global table, returns immediately, no status 
114             in this implementation, peers respond but don't indicate status to storing values
115             values are stored in peers on a first-come first-served basis
116             this will probably change so more than one value can be stored under a key
117         """
118         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
119             if not callback:
120                 # default callback - this will get called for each successful store value
121                 def _storedValueHandler(sender):
122                     pass
123                 response=_storedValueHandler
124         
125             for node in nodes:
126                 def cb(t, table = table, node=node, resp=response):
127                     self.table.insertNode(node)
128                     response(t)
129                 if node.id != self.node.id:
130                     def default(err, node=node, table=table):
131                         table.nodeFailed(node)
132                     df = node.storeValue(key, value, self.node.senderDict())
133                     df.addCallback(cb)
134         # this call is asynch
135         self.findNode(key, _storeValueForKey)
136         
137         
138     def insertNode(self, n, contacted=1):
139         """
140         insert a node in our local table, pinging oldest contact in bucket, if necessary
141         
142         If all you have is a host/port, then use addContact, which calls this method after
143         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
144         a node into the table without it's peer-ID.  That means of course the node passed into this
145         method needs to be a properly formed Node object with a valid ID.
146         """
147         old = self.table.insertNode(n, contacted=contacted)
148         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
149             # the bucket is full, check to see if old node is still around and if so, replace it
150             
151             ## these are the callbacks used when we ping the oldest node in a bucket
152             def _staleNodeHandler(oldnode=old, newnode = n):
153                 """ called if the pinged node never responds """
154                 self.table.replaceStaleNode(old, newnode)
155         
156             def _notStaleNodeHandler(sender, old=old):
157                 """ called when we get a pong from the old node """
158                 sender, conn = sender
159                 if conn['host']:
160                     sender['host'] = conn['host']
161                 sender = Node().initWithDict(sender)
162                 if sender.id == old.id:
163                     self.table.justSeenNode(old)
164
165             df = old.ping(self.node.senderDict())
166             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
167
168
169     def sendPing(self, node):
170         """
171             ping a node
172         """
173         df = node.ping(self.node.senderDict())
174         ## these are the callbacks we use when we issue a PING
175         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
176             sender, conn = sender
177             if id != 20 * ' ' and id != sender['id'].data:
178                 # whoah, got response from different peer than we were expecting
179                 pass
180             else:
181                 sender['host'] = host
182                 sender['port'] = port
183                 n = Node().initWithDict(sender)
184                 table.insertNode(n)
185             return
186         def _defaultPong(err, node=node, table=self.table):
187                 table.nodeFailed(node)
188
189         df.addCallbacks(_pongHandler,_defaultPong)
190
191
192     def findCloseNodes(self):
193         """
194             This does a findNode on the ID one away from our own.  
195             This will allow us to populate our table with nodes on our network closest to our own.
196             This is called as soon as we start up with an empty table
197         """
198         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
199         def callback(nodes):
200             pass
201         self.findNode(id, callback)
202
203     def refreshTable(self):
204         """
205             
206         """
207         def callback(nodes):
208             pass
209
210         for bucket in self.table.buckets:
211             if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
212                 id = newIDInRange(bucket.min, bucket.max)
213                 self.findNode(id, callback)
214         
215  
216     def retrieveValues(self, key):
217         if self.kw.has_key(key):
218             c = self.kw.cursor()
219             tup = c.set(key)
220             l = []
221             while(tup and tup[0] == key):
222                 h1 = tup[1]
223                 v = loads(self.store[h1])[1]
224                 l.append(v)
225                 tup = c.next()
226             return l
227         return []
228         
229     #####
230     ##### INCOMING MESSAGE HANDLERS
231     
232     def xmlrpc_ping(self, sender):
233         """
234             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
235             returns sender dict
236         """
237         ip = self.crequest.getClientIP()
238         sender['host'] = ip
239         n = Node().initWithDict(sender)
240         self.insertNode(n, contacted=0)
241         return self.node.senderDict()
242                 
243     def xmlrpc_find_node(self, target, sender):
244         nodes = self.table.findNodes(target.data)
245         nodes = map(lambda node: node.senderDict(), nodes)
246         ip = self.crequest.getClientIP()
247         sender['host'] = ip
248         n = Node().initWithDict(sender)
249         self.insertNode(n, contacted=0)
250         return nodes, self.node.senderDict()
251     
252     def xmlrpc_store_value(self, key, value, sender):
253         key = key.data
254         h1 = sha(key+value.data).digest()
255         t = `time.time()`
256         if not self.store.has_key(h1):
257             v = dumps((key, value.data, t))
258             self.store.put(h1, v)
259             self.itime.put(t, h1)
260             self.kw.put(key, h1)
261         else:
262             # update last insert time
263             tup = loads(self.store[h1])
264             self.store[h1] = dumps((tup[0], tup[1], t))
265             self.itime.put(t, h1)
266
267         ip = self.crequest.getClientIP()
268         sender['host'] = ip
269         n = Node().initWithDict(sender)
270         self.insertNode(n, contacted=0)
271         return self.node.senderDict()
272         
273     def xmlrpc_find_value(self, key, sender):
274         ip = self.crequest.getClientIP()
275         key = key.data
276         sender['host'] = ip
277         n = Node().initWithDict(sender)
278         self.insertNode(n, contacted=0)
279
280         l = self.retrieveValues(key)
281         if len(l) > 0:
282             l = map(lambda v: Binary(v), l)
283             return {'values' : l}, self.node.senderDict()
284         else:
285             nodes = self.table.findNodes(key)
286             nodes = map(lambda node: node.senderDict(), nodes)
287             return {'nodes' : nodes}, self.node.senderDict()
288
289
290
291
292
293 #------ testing
294
295 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
296     from whrandom import randrange
297     import thread
298     port = 2001
299     l = []
300         
301     if not quiet:
302         print "Building %s peer table." % peers
303         
304     for i in xrange(peers):
305         a = Khashmir(host, port + i)
306         l.append(a)
307     
308
309     thread.start_new_thread(l[0].app.run, ())
310     time.sleep(1)
311     for peer in l[1:]:
312         peer.app.run()
313         #time.sleep(.25)
314
315     print "adding contacts...."
316
317     for peer in l[1:]:
318         n = l[randrange(0, len(l))].node
319         peer.addContact(host, n.port)
320         n = l[randrange(0, len(l))].node
321         peer.addContact(host, n.port)
322         n = l[randrange(0, len(l))].node
323         peer.addContact(host, n.port)
324         if pause:
325             time.sleep(.5)
326             
327     time.sleep(10)
328     print "finding close nodes...."
329
330     for peer in l:
331         peer.findCloseNodes()
332         if pause:
333             time.sleep(2)
334     if pause:
335             time.sleep(10)
336 #    for peer in l:
337 #       peer.refreshTable()
338     return l
339         
340 def test_find_nodes(l, quiet=0):
341     import threading, sys
342     from whrandom import randrange
343     flag = threading.Event()
344     
345     n = len(l)
346     
347     a = l[randrange(0,n)]
348     b = l[randrange(0,n)]
349     
350     def callback(nodes, flag=flag, id = b.node.id):
351         if (len(nodes) >0) and (nodes[0].id == id):
352             print "test_find_nodes      PASSED"
353         else:
354             print "test_find_nodes      FAILED"
355         flag.set()
356     a.findNode(b.node.id, callback)
357     flag.wait()
358     
359 def test_find_value(l, quiet=0):
360     from whrandom import randrange
361     from sha import sha
362     from hash import newID
363     import time, threading, sys
364     
365     fa = threading.Event()
366     fb = threading.Event()
367     fc = threading.Event()
368     
369     n = len(l)
370     a = l[randrange(0,n)]
371     b = l[randrange(0,n)]
372     c = l[randrange(0,n)]
373     d = l[randrange(0,n)]
374
375     key = newID()
376     value = newID()
377     if not quiet:
378         print "inserting value..."
379         sys.stdout.flush()
380     a.storeValueForKey(key, value)
381     time.sleep(3)
382     print "finding..."
383     sys.stdout.flush()
384     
385     class cb:
386         def __init__(self, flag, value=value):
387             self.flag = flag
388             self.val = value
389             self.found = 0
390         def callback(self, values):
391             try:
392                 if(len(values) == 0):
393                     if not self.found:
394                         print "find                NOT FOUND"
395                     else:
396                         print "find                FOUND"
397                     sys.stdout.flush()
398
399                 else:
400                     if self.val in values:
401                         self.found = 1
402             finally:
403                 self.flag.set()
404
405     b.valueForKey(key, cb(fa).callback)
406     fa.wait()
407     c.valueForKey(key, cb(fb).callback)
408     fb.wait()
409     d.valueForKey(key, cb(fc).callback)    
410     fc.wait()
411     
412 def test_one(host, port):
413     import thread
414     k = Khashmir(host, port)
415     thread.start_new_thread(k.app.run, ())
416     return k
417     
418 if __name__ == "__main__":
419     import sys
420     n = 8
421     if len(sys.argv) > 1:
422         n = int(sys.argv[1])
423     l = test_build_net(peers=n)
424     time.sleep(3)
425     print "finding nodes..."
426     for i in range(10):
427         test_find_nodes(l)
428     print "inserting and fetching values..."
429     for i in range(10):
430         test_find_value(l)