]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
more constants
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7 from pickle import loads, dumps
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 from bsddb3 import db ## find this at http://pybsddb.sf.net/
24 from bsddb3._db import DBNotFoundError
25
26 from xmlrpclib import Binary
27
28
29
30 # this is the main class!
31 class Khashmir(xmlrpc.XMLRPC):
32     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
33     def __init__(self, host, port):
34         self.node = Node().init(newID(), host, port)
35         self.table = KTable(self.node)
36         self.app = Application("xmlrpc")
37         self.app.listenTCP(port, server.Site(self))
38         
39         ## these databases may be more suited to on-disk rather than in-memory
40         # h((key, value)) -> (key, value, time) mappings
41         self.store = db.DB()
42         self.store.open(None, None, db.DB_BTREE)
43         
44         # <insert time> -> h((key, value))
45         self.itime = db.DB()
46         self.itime.set_flags(db.DB_DUP)
47         self.itime.open(None, None, db.DB_BTREE)
48
49         # key -> h((key, value))
50         self.kw = db.DB()
51         self.kw.set_flags(db.DB_DUP)
52         self.kw.open(None, None, db.DB_BTREE)
53
54         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
55         
56     def render(self, request):
57         """
58             Override the built in render so we can have access to the request object!
59             note, crequest is probably only valid on the initial call (not after deferred!)
60         """
61         self.crequest = request
62         return xmlrpc.XMLRPC.render(self, request)
63
64         
65     #######
66     #######  LOCAL INTERFACE    - use these methods!
67     def addContact(self, host, port):
68         """
69          ping this node and add the contact info to the table on pong!
70         """
71         n =Node().init(" "*20, host, port)  # note, we 
72         self.sendPing(n)
73
74
75     ## this call is async!
76     def findNode(self, id, callback, errback=None):
77         """ returns the contact info for node, or the k closest nodes, from the global table """
78         # get K nodes out of local table/cache, or the node we want
79         nodes = self.table.findNodes(id)
80         d = Deferred()
81         d.addCallbacks(callback, errback)
82         if len(nodes) == 1 and nodes[0].id == id :
83             d.callback(nodes)
84         else:
85             # create our search state
86             state = FindNode(self, id, d.callback)
87             reactor.callFromThread(state.goWithNodes, nodes)
88     
89     
90     ## also async
91     def valueForKey(self, key, callback):
92         """ returns the values found for key in global table """
93         nodes = self.table.findNodes(key)
94
95         # get locals
96         l = self.retrieveValues(key)
97         if len(l) > 0:
98             reactor.callFromThread(callback, l)
99
100         # create our search state
101         state = GetValue(self, key, callback)
102         reactor.callFromThread(state.goWithNodes, nodes, l)
103         
104
105
106     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
107     def storeValueForKey(self, key, value, callback=None):
108         """ stores the value for key in the global table, returns immediately, no status 
109             in this implementation, peers respond but don't indicate status to storing values
110             values are stored in peers on a first-come first-served basis
111             this will probably change so more than one value can be stored under a key
112         """
113         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
114             if not callback:
115                 # default callback - this will get called for each successful store value
116                 def _storedValueHandler(sender):
117                     pass
118                 response=_storedValueHandler
119         
120             for node in nodes:
121                 def cb(t, table = table, node=node, resp=response):
122                     self.table.insertNode(node)
123                     response(t)
124                 if node.id != self.node.id:
125                     def default(err, node=node, table=table):
126                         table.nodeFailed(node)
127                     df = node.storeValue(key, value, self.node.senderDict())
128                     df.addCallback(cb)
129         # this call is asynch
130         self.findNode(key, _storeValueForKey)
131         
132         
133     def insertNode(self, n, contacted=1):
134         """
135         insert a node in our local table, pinging oldest contact in bucket, if necessary
136         
137         If all you have is a host/port, then use addContact, which calls this method after
138         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
139         a node into the table without it's peer-ID.  That means of course the node passed into this
140         method needs to be a properly formed Node object with a valid ID.
141         """
142         old = self.table.insertNode(n, contacted=contacted)
143         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
144             # the bucket is full, check to see if old node is still around and if so, replace it
145             
146             ## these are the callbacks used when we ping the oldest node in a bucket
147             def _staleNodeHandler(oldnode=old, newnode = n):
148                 """ called if the pinged node never responds """
149                 self.table.replaceStaleNode(old, newnode)
150         
151             def _notStaleNodeHandler(sender, old=old):
152                 """ called when we get a pong from the old node """
153                 sender, conn = sender
154                 sender['host'] = conn['host']
155                 sender = Node().initWithDict(sender)
156                 if sender.id == old.id:
157                     self.table.justSeenNode(old)
158
159             df = old.ping(self.node.senderDict())
160             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
161
162
163     def sendPing(self, node):
164         """
165             ping a node
166         """
167         df = node.ping(self.node.senderDict())
168         ## these are the callbacks we use when we issue a PING
169         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
170             sender = sender[0]
171             if id != 20 * ' ' and id != sender['id'].data:
172                 # whoah, got response from different peer than we were expecting
173                 pass
174             else:
175                 sender['host'] = host
176                 sender['port'] = port
177                 n = Node().initWithDict(sender)
178                 table.insertNode(n)
179             return
180         def _defaultPong(err, node=node, table=self.table):
181                 table.nodeFailed(node)
182
183         df.addCallbacks(_pongHandler,_defaultPong)
184
185
186     def findCloseNodes(self):
187         """
188             This does a findNode on the ID one away from our own.  
189             This will allow us to populate our table with nodes on our network closest to our own.
190             This is called as soon as we start up with an empty table
191         """
192         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
193         def callback(nodes):
194             pass
195         self.findNode(id, callback)
196
197     def refreshTable(self):
198         """
199             
200         """
201         def callback(nodes):
202             pass
203
204         for bucket in self.table.buckets:
205             if time.time() - bucket.lastAccessed >= 60 * 60:
206                 id = randRange(bucket.min, bucket.max)
207                 self.findNode(id, callback)
208         
209  
210     def retrieveValues(self, key):
211         if self.kw.has_key(key):
212             c = self.kw.cursor()
213             tup = c.set(key)
214             l = []
215             while(tup and tup[0] == key):
216                 h1 = tup[1]
217                 v = loads(self.store[h1])[1]
218                 l.append(v)
219                 tup = c.next()
220             return l
221         return []
222         
223     #####
224     ##### INCOMING MESSAGE HANDLERS
225     
226     def xmlrpc_ping(self, sender):
227         """
228             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
229             returns sender dict
230         """
231         ip = self.crequest.getClientIP()
232         sender['host'] = ip
233         n = Node().initWithDict(sender)
234         self.insertNode(n, contacted=0)
235         return self.node.senderDict()
236                 
237     def xmlrpc_find_node(self, target, sender):
238         nodes = self.table.findNodes(target.data)
239         nodes = map(lambda node: node.senderDict(), nodes)
240         ip = self.crequest.getClientIP()
241         sender['host'] = ip
242         n = Node().initWithDict(sender)
243         self.insertNode(n, contacted=0)
244         return nodes, self.node.senderDict()
245     
246     def xmlrpc_store_value(self, key, value, sender):
247         key = key.data
248         h1 = sha(key+value.data).digest()
249         t = `time.time()`
250         if not self.store.has_key(h1):
251             v = dumps((key, value.data, t))
252             self.store.put(h1, v)
253             self.itime.put(t, h1)
254             self.kw.put(key, h1)
255         else:
256             # update last insert time
257             tup = loads(self.store[h1])
258             self.store[h1] = dumps((tup[0], tup[1], t))
259             self.itime.put(t, h1)
260
261         ip = self.crequest.getClientIP()
262         sender['host'] = ip
263         n = Node().initWithDict(sender)
264         self.insertNode(n, contacted=0)
265         return self.node.senderDict()
266         
267     def xmlrpc_find_value(self, key, sender):
268         ip = self.crequest.getClientIP()
269         key = key.data
270         sender['host'] = ip
271         n = Node().initWithDict(sender)
272         self.insertNode(n, contacted=0)
273
274         l = self.retrieveValues(key)
275         if len(l) > 0:
276             l = map(lambda v: Binary(v), l)
277             return {'values' : l}, self.node.senderDict()
278         else:
279             nodes = self.table.findNodes(key)
280             nodes = map(lambda node: node.senderDict(), nodes)
281             return {'nodes' : nodes}, self.node.senderDict()
282
283
284
285
286
287 #------ testing
288
289 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
290     from whrandom import randrange
291     import thread
292     port = 2001
293     l = []
294         
295     if not quiet:
296         print "Building %s peer table." % peers
297         
298     for i in xrange(peers):
299         a = Khashmir(host, port + i)
300         l.append(a)
301     
302
303     thread.start_new_thread(l[0].app.run, ())
304     time.sleep(1)
305     for peer in l[1:]:
306         peer.app.run()
307         #time.sleep(.25)
308
309     print "adding contacts...."
310
311     for peer in l[1:]:
312         n = l[randrange(0, len(l))].node
313         peer.addContact(host, n.port)
314         n = l[randrange(0, len(l))].node
315         peer.addContact(host, n.port)
316         n = l[randrange(0, len(l))].node
317         peer.addContact(host, n.port)
318         if pause:
319             time.sleep(.30)
320             
321     time.sleep(1)
322     print "finding close nodes...."
323
324     for peer in l:
325         peer.findCloseNodes()
326         if pause:
327             time.sleep(.5)
328     if pause:
329             time.sleep(2)
330 #    for peer in l:
331 #       peer.refreshTable()
332     return l
333         
334 def test_find_nodes(l, quiet=0):
335     import threading, sys
336     from whrandom import randrange
337     flag = threading.Event()
338     
339     n = len(l)
340     
341     a = l[randrange(0,n)]
342     b = l[randrange(0,n)]
343     
344     def callback(nodes, flag=flag, id = b.node.id):
345         if (len(nodes) >0) and (nodes[0].id == id):
346             print "test_find_nodes      PASSED"
347         else:
348             print "test_find_nodes      FAILED"
349         flag.set()
350     a.findNode(b.node.id, callback)
351     flag.wait()
352     
353 def test_find_value(l, quiet=0):
354     from whrandom import randrange
355     from sha import sha
356     from hash import newID
357     import time, threading, sys
358     
359     fa = threading.Event()
360     fb = threading.Event()
361     fc = threading.Event()
362     
363     n = len(l)
364     a = l[randrange(0,n)]
365     b = l[randrange(0,n)]
366     c = l[randrange(0,n)]
367     d = l[randrange(0,n)]
368
369     key = newID()
370     value = newID()
371     if not quiet:
372         print "inserting value..."
373         sys.stdout.flush()
374     a.storeValueForKey(key, value)
375     time.sleep(3)
376     print "finding..."
377     sys.stdout.flush()
378     
379     class cb:
380         def __init__(self, flag, value=value):
381             self.flag = flag
382             self.val = value
383             self.found = 0
384         def callback(self, values):
385             try:
386                 if(len(values) == 0):
387                     if not self.found:
388                         print "find                NOT FOUND"
389                     else:
390                         print "find                FOUND"
391                     sys.stdout.flush()
392
393                 else:
394                     if self.val in values:
395                         self.found = 1
396             finally:
397                 self.flag.set()
398
399     b.valueForKey(key, cb(fa).callback)
400     fa.wait()
401     c.valueForKey(key, cb(fb).callback)
402     fb.wait()
403     d.valueForKey(key, cb(fc).callback)    
404     fc.wait()
405     
406 def test_one(host, port):
407     import thread
408     k = Khashmir(host, port)
409     thread.start_new_thread(k.app.run, ())
410     return k
411     
412 if __name__ == "__main__":
413     import sys
414     n = 8
415     if len(sys.argv) > 1:
416         n = int(sys.argv[1])
417     l = test_build_net(peers=n)
418     time.sleep(3)
419     print "finding nodes..."
420     for i in range(10):
421         test_find_nodes(l)
422     print "inserting and fetching values..."
423     for i in range(10):
424         test_find_value(l)