]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
mo' constants
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
9
10 from sha import sha
11
12 from ktable import KTable, K
13 from knode import KNode as Node
14
15 from hash import newID, newIDInRange
16
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
23 threadable.init()
24
25 from bsddb3 import db ## find this at http://pybsddb.sf.net/
26 from bsddb3._db import DBNotFoundError
27
28 from xmlrpclib import Binary
29
30
31
32 # this is the main class!
33 class Khashmir(xmlrpc.XMLRPC):
34     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
35     def __init__(self, host, port):
36         self.node = Node().init(newID(), host, port)
37         self.table = KTable(self.node)
38         self.app = Application("xmlrpc")
39         self.app.listenTCP(port, server.Site(self))
40         
41         ## these databases may be more suited to on-disk rather than in-memory
42         # h((key, value)) -> (key, value, time) mappings
43         self.store = db.DB()
44         self.store.open(None, None, db.DB_BTREE)
45         
46         # <insert time> -> h((key, value))
47         self.itime = db.DB()
48         self.itime.set_flags(db.DB_DUP)
49         self.itime.open(None, None, db.DB_BTREE)
50
51         # key -> h((key, value))
52         self.kw = db.DB()
53         self.kw.set_flags(db.DB_DUP)
54         self.kw.open(None, None, db.DB_BTREE)
55
56         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
57         
58     def render(self, request):
59         """
60             Override the built in render so we can have access to the request object!
61             note, crequest is probably only valid on the initial call (not after deferred!)
62         """
63         self.crequest = request
64         return xmlrpc.XMLRPC.render(self, request)
65
66         
67     #######
68     #######  LOCAL INTERFACE    - use these methods!
69     def addContact(self, host, port):
70         """
71          ping this node and add the contact info to the table on pong!
72         """
73         n =Node().init(const.NULL_ID, host, port)  # note, we 
74         self.sendPing(n)
75
76
77     ## this call is async!
78     def findNode(self, id, callback, errback=None):
79         """ returns the contact info for node, or the k closest nodes, from the global table """
80         # get K nodes out of local table/cache, or the node we want
81         nodes = self.table.findNodes(id)
82         d = Deferred()
83         if errback:
84             d.addCallbacks(callback, errback)
85         else:
86             d.addCallback(callback)
87         if len(nodes) == 1 and nodes[0].id == id :
88             d.callback(nodes)
89         else:
90             # create our search state
91             state = FindNode(self, id, d.callback)
92             reactor.callFromThread(state.goWithNodes, nodes)
93     
94     
95     ## also async
96     def valueForKey(self, key, callback):
97         """ returns the values found for key in global table """
98         nodes = self.table.findNodes(key)
99
100         # get locals
101         l = self.retrieveValues(key)
102         if len(l) > 0:
103             reactor.callFromThread(callback, l)
104
105         # create our search state
106         state = GetValue(self, key, callback)
107         reactor.callFromThread(state.goWithNodes, nodes, l)
108         
109
110
111     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
112     def storeValueForKey(self, key, value, callback=None):
113         """ stores the value for key in the global table, returns immediately, no status 
114             in this implementation, peers respond but don't indicate status to storing values
115             values are stored in peers on a first-come first-served basis
116             this will probably change so more than one value can be stored under a key
117         """
118         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
119             if not callback:
120                 # default callback - this will get called for each successful store value
121                 def _storedValueHandler(sender):
122                     pass
123                 response=_storedValueHandler
124         
125             for node in nodes[:const.STORE_REDUNDANCY]:
126                 def cb(t, table = table, node=node, resp=response):
127                     self.table.insertNode(node)
128                     response(t)
129                 if node.id != self.node.id:
130                     def default(err, node=node, table=table):
131                         table.nodeFailed(node)
132                     df = node.storeValue(key, value, self.node.senderDict())
133                     df.addCallbacks(cb, lambda x: None)
134         # this call is asynch
135         self.findNode(key, _storeValueForKey)
136         
137         
138     def insertNode(self, n, contacted=1):
139         """
140         insert a node in our local table, pinging oldest contact in bucket, if necessary
141         
142         If all you have is a host/port, then use addContact, which calls this method after
143         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
144         a node into the table without it's peer-ID.  That means of course the node passed into this
145         method needs to be a properly formed Node object with a valid ID.
146         """
147         old = self.table.insertNode(n, contacted=contacted)
148         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
149             # the bucket is full, check to see if old node is still around and if so, replace it
150             
151             ## these are the callbacks used when we ping the oldest node in a bucket
152             def _staleNodeHandler(oldnode=old, newnode = n):
153                 """ called if the pinged node never responds """
154                 self.table.replaceStaleNode(old, newnode)
155         
156             def _notStaleNodeHandler(sender, old=old):
157                 """ called when we get a pong from the old node """
158                 sender = Node().initWithDict(sender)
159                 if sender.id == old.id:
160                     self.table.justSeenNode(old)
161
162             df = old.ping(self.node.senderDict())
163             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
164
165
166     def sendPing(self, node):
167         """
168             ping a node
169         """
170         df = node.ping(self.node.senderDict())
171         ## these are the callbacks we use when we issue a PING
172         def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
173             l, sender = args
174             if id != const.NULL_ID and id != sender['id'].data:
175                 # whoah, got response from different peer than we were expecting
176                 pass
177             else:
178                 sender['host'] = host
179                 sender['port'] = port
180                 n = Node().initWithDict(sender)
181                 table.insertNode(n)
182             return
183         def _defaultPong(err, node=node, table=self.table):
184                 table.nodeFailed(node)
185
186         df.addCallbacks(_pongHandler,_defaultPong)
187
188
189     def findCloseNodes(self):
190         """
191             This does a findNode on the ID one away from our own.  
192             This will allow us to populate our table with nodes on our network closest to our own.
193             This is called as soon as we start up with an empty table
194         """
195         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
196         def callback(nodes):
197             pass
198         self.findNode(id, callback)
199
200     def refreshTable(self):
201         """
202             
203         """
204         def callback(nodes):
205             pass
206
207         for bucket in self.table.buckets:
208             if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
209                 id = newIDInRange(bucket.min, bucket.max)
210                 self.findNode(id, callback)
211         
212  
213     def retrieveValues(self, key):
214         if self.kw.has_key(key):
215             c = self.kw.cursor()
216             tup = c.set(key)
217             l = []
218             while(tup and tup[0] == key):
219                 h1 = tup[1]
220                 v = loads(self.store[h1])[1]
221                 l.append(v)
222                 tup = c.next()
223             return l
224         return []
225         
226     #####
227     ##### INCOMING MESSAGE HANDLERS
228     
229     def xmlrpc_ping(self, sender):
230         """
231             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
232             returns sender dict
233         """
234         ip = self.crequest.getClientIP()
235         sender['host'] = ip
236         n = Node().initWithDict(sender)
237         self.insertNode(n, contacted=0)
238         return (), self.node.senderDict()
239                 
240     def xmlrpc_find_node(self, target, sender):
241         nodes = self.table.findNodes(target.data)
242         nodes = map(lambda node: node.senderDict(), nodes)
243         ip = self.crequest.getClientIP()
244         sender['host'] = ip
245         n = Node().initWithDict(sender)
246         self.insertNode(n, contacted=0)
247         return nodes, self.node.senderDict()
248     
249     def xmlrpc_store_value(self, key, value, sender):
250         key = key.data
251         h1 = sha(key+value.data).digest()
252         t = `time.time()`
253         if not self.store.has_key(h1):
254             v = dumps((key, value.data, t))
255             self.store.put(h1, v)
256             self.itime.put(t, h1)
257             self.kw.put(key, h1)
258         else:
259             # update last insert time
260             tup = loads(self.store[h1])
261             self.store[h1] = dumps((tup[0], tup[1], t))
262             self.itime.put(t, h1)
263
264         ip = self.crequest.getClientIP()
265         sender['host'] = ip
266         n = Node().initWithDict(sender)
267         self.insertNode(n, contacted=0)
268         return (), self.node.senderDict()
269         
270     def xmlrpc_find_value(self, key, sender):
271         ip = self.crequest.getClientIP()
272         key = key.data
273         sender['host'] = ip
274         n = Node().initWithDict(sender)
275         self.insertNode(n, contacted=0)
276
277         l = self.retrieveValues(key)
278         if len(l) > 0:
279             l = map(lambda v: Binary(v), l)
280             return {'values' : l}, self.node.senderDict()
281         else:
282             nodes = self.table.findNodes(key)
283             nodes = map(lambda node: node.senderDict(), nodes)
284             return {'nodes' : nodes}, self.node.senderDict()
285
286
287
288
289
290 #------ testing
291
292 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
293     from whrandom import randrange
294     import thread
295     port = 2001
296     l = []
297         
298     if not quiet:
299         print "Building %s peer table." % peers
300         
301     for i in xrange(peers):
302         a = Khashmir(host, port + i)
303         l.append(a)
304     
305
306     thread.start_new_thread(l[0].app.run, ())
307     time.sleep(1)
308     for peer in l[1:]:
309         peer.app.run()
310         #time.sleep(.25)
311
312     print "adding contacts...."
313
314     for peer in l[1:]:
315         n = l[randrange(0, len(l))].node
316         peer.addContact(host, n.port)
317         n = l[randrange(0, len(l))].node
318         peer.addContact(host, n.port)
319         n = l[randrange(0, len(l))].node
320         peer.addContact(host, n.port)
321         if pause:
322             time.sleep(.5)
323             
324     time.sleep(1)
325     print "finding close nodes...."
326
327     for peer in l:
328         peer.findCloseNodes()
329         if pause:
330             time.sleep(.5)
331     if pause:
332             time.sleep(1)
333 #    for peer in l:
334 #       peer.refreshTable()
335     return l
336         
337 def test_find_nodes(l, quiet=0):
338     import threading, sys
339     from whrandom import randrange
340     flag = threading.Event()
341     
342     n = len(l)
343     
344     a = l[randrange(0,n)]
345     b = l[randrange(0,n)]
346     
347     def callback(nodes, flag=flag, id = b.node.id):
348         if (len(nodes) >0) and (nodes[0].id == id):
349             print "test_find_nodes      PASSED"
350         else:
351             print "test_find_nodes      FAILED"
352         flag.set()
353     a.findNode(b.node.id, callback)
354     flag.wait()
355     
356 def test_find_value(l, quiet=0):
357     from whrandom import randrange
358     from sha import sha
359     from hash import newID
360     import time, threading, sys
361     
362     fa = threading.Event()
363     fb = threading.Event()
364     fc = threading.Event()
365     
366     n = len(l)
367     a = l[randrange(0,n)]
368     b = l[randrange(0,n)]
369     c = l[randrange(0,n)]
370     d = l[randrange(0,n)]
371
372     key = newID()
373     value = newID()
374     if not quiet:
375         print "inserting value..."
376         sys.stdout.flush()
377     a.storeValueForKey(key, value)
378     time.sleep(3)
379     print "finding..."
380     sys.stdout.flush()
381     
382     class cb:
383         def __init__(self, flag, value=value):
384             self.flag = flag
385             self.val = value
386             self.found = 0
387         def callback(self, values):
388             try:
389                 if(len(values) == 0):
390                     if not self.found:
391                         print "find                NOT FOUND"
392                     else:
393                         print "find                FOUND"
394                     sys.stdout.flush()
395
396                 else:
397                     if self.val in values:
398                         self.found = 1
399             finally:
400                 self.flag.set()
401
402     b.valueForKey(key, cb(fa).callback)
403     fa.wait()
404     c.valueForKey(key, cb(fb).callback)
405     fb.wait()
406     d.valueForKey(key, cb(fc).callback)    
407     fc.wait()
408     
409 def test_one(host, port):
410     import thread
411     k = Khashmir(host, port)
412     thread.start_new_thread(k.app.run, ())
413     return k
414     
415 if __name__ == "__main__":
416     import sys
417     n = 8
418     if len(sys.argv) > 1:
419         n = int(sys.argv[1])
420     l = test_build_net(peers=n)
421     time.sleep(3)
422     print "finding nodes..."
423     for i in range(10):
424         test_find_nodes(l)
425     print "inserting and fetching values..."
426     for i in range(10):
427         test_find_value(l)