]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
19bd3d8537af3cb2d884a487ad6f1de82a6558c0
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
9
10 from sha import sha
11
12 from ktable import KTable, K
13 from knode import KNode as Node
14
15 from hash import newID, newIDInRange
16
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
23 threadable.init()
24
25 import sqlite  ## find this at http://pysqlite.sourceforge.net/
26
27
28 KhashmirDBExcept = "KhashmirDBExcept"
29
30 # this is the main class!
31 class Khashmir(xmlrpc.XMLRPC):
32     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
33     def __init__(self, host, port, db='khashmir.db'):
34         self.node = Node().init(newID(), host, port)
35         self.table = KTable(self.node)
36         self.app = Application("xmlrpc")
37         self.app.listenTCP(port, server.Site(self))
38         self.findDB(db)
39         self.last = time.time()
40         KeyExpirer(store=self.store)
41
42     def findDB(self, db):
43         import os
44         try:
45             os.stat(db)
46         except OSError:
47             self.createNewDB(db)
48         else:
49             self.loadDB(db)
50             
51     def loadDB(self, db):
52         try:
53             self.store = sqlite.connect(db=db)
54         except:
55             import traceback
56             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
57             
58     def createNewDB(self, db):
59         self.store = sqlite.connect(db=db)
60         s = """
61             create table kv (hkv text primary key, key text, value text, time timestamp);
62             create index kv_key on kv(key);
63             
64             create table nodes (id text primary key, host text, port number);
65             """
66         c = self.store.cursor()
67         c.execute(s)
68         self.store.commit()
69                 
70     def render(self, request):
71         """
72             Override the built in render so we can have access to the request object!
73             note, crequest is probably only valid on the initial call (not after deferred!)
74         """
75         self.crequest = request
76         return xmlrpc.XMLRPC.render(self, request)
77
78         
79     #######
80     #######  LOCAL INTERFACE    - use these methods!
81     def addContact(self, host, port):
82         """
83          ping this node and add the contact info to the table on pong!
84         """
85         n =Node().init(const.NULL_ID, host, port)  # note, we 
86         self.sendPing(n)
87
88
89     ## this call is async!
90     def findNode(self, id, callback, errback=None):
91         """ returns the contact info for node, or the k closest nodes, from the global table """
92         # get K nodes out of local table/cache, or the node we want
93         nodes = self.table.findNodes(id)
94         d = Deferred()
95         if errback:
96             d.addCallbacks(callback, errback)
97         else:
98             d.addCallback(callback)
99         if len(nodes) == 1 and nodes[0].id == id :
100             d.callback(nodes)
101         else:
102             # create our search state
103             state = FindNode(self, id, d.callback)
104             reactor.callFromThread(state.goWithNodes, nodes)
105     
106     
107     ## also async
108     def valueForKey(self, key, callback):
109         """ returns the values found for key in global table
110             callback will be called with a list of values for each peer that returns unique values
111             final callback will be an empty list - probably should change to 'more coming' arg
112         """
113         nodes = self.table.findNodes(key)
114
115         # get locals
116         l = self.retrieveValues(key)
117         if len(l) > 0:
118             reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
119
120         # create our search state
121         state = GetValue(self, key, callback)
122         reactor.callFromThread(state.goWithNodes, nodes, l)
123         
124
125
126     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
127     def storeValueForKey(self, key, value, callback=None):
128         """ stores the value for key in the global table, returns immediately, no status 
129             in this implementation, peers respond but don't indicate status to storing values
130             a key can have many values
131         """
132         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
133             if not callback:
134                 # default callback
135                 def _storedValueHandler(sender):
136                     pass
137                 response=_storedValueHandler
138         
139             for node in nodes[:const.STORE_REDUNDANCY]:
140                 def cb(t, table = table, node=node, resp=response):
141                     self.table.insertNode(node)
142                     response(t)
143                 if node.id != self.node.id:
144                     def default(err, node=node, table=table):
145                         table.nodeFailed(node)
146                     df = node.storeValue(key, value, self.node.senderDict())
147                     df.addCallbacks(cb, lambda x: None)
148         # this call is asynch
149         self.findNode(key, _storeValueForKey)
150         
151         
152     def insertNode(self, n, contacted=1):
153         """
154         insert a node in our local table, pinging oldest contact in bucket, if necessary
155         
156         If all you have is a host/port, then use addContact, which calls this method after
157         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
158         a node into the table without it's peer-ID.  That means of course the node passed into this
159         method needs to be a properly formed Node object with a valid ID.
160         """
161         old = self.table.insertNode(n, contacted=contacted)
162         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
163             # the bucket is full, check to see if old node is still around and if so, replace it
164             
165             ## these are the callbacks used when we ping the oldest node in a bucket
166             def _staleNodeHandler(oldnode=old, newnode = n):
167                 """ called if the pinged node never responds """
168                 self.table.replaceStaleNode(old, newnode)
169         
170             def _notStaleNodeHandler(sender, old=old):
171                 """ called when we get a pong from the old node """
172                 sender = Node().initWithDict(sender)
173                 if sender.id == old.id:
174                     self.table.justSeenNode(old)
175
176             df = old.ping(self.node.senderDict())
177             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
178
179
180     def sendPing(self, node):
181         """
182             ping a node
183         """
184         df = node.ping(self.node.senderDict())
185         ## these are the callbacks we use when we issue a PING
186         def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
187             l, sender = args
188             if id != const.NULL_ID and id != sender['id'].decode('base64'):
189                 # whoah, got response from different peer than we were expecting
190                 pass
191             else:
192                 sender['host'] = host
193                 sender['port'] = port
194                 n = Node().initWithDict(sender)
195                 table.insertNode(n)
196             return
197         def _defaultPong(err, node=node, table=self.table):
198                 table.nodeFailed(node)
199
200         df.addCallbacks(_pongHandler,_defaultPong)
201
202
203     def findCloseNodes(self):
204         """
205             This does a findNode on the ID one away from our own.  
206             This will allow us to populate our table with nodes on our network closest to our own.
207             This is called as soon as we start up with an empty table
208         """
209         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
210         def callback(nodes):
211             pass
212         self.findNode(id, callback)
213
214     def refreshTable(self):
215         """
216             
217         """
218         def callback(nodes):
219             pass
220
221         for bucket in self.table.buckets:
222             if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
223                 id = newIDInRange(bucket.min, bucket.max)
224                 self.findNode(id, callback)
225         
226  
227     def retrieveValues(self, key):
228         s = "select value from kv where key = '%s';" % key.encode('base64')
229         c = self.store.cursor()
230         c.execute(s)
231         t = c.fetchone()
232         l = []
233         while t:
234             l.append(t['value'])
235             t = c.fetchone()
236         return l
237         
238     #####
239     ##### INCOMING MESSAGE HANDLERS
240     
241     def xmlrpc_ping(self, sender):
242         """
243             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
244             returns sender dict
245         """
246         ip = self.crequest.getClientIP()
247         sender['host'] = ip
248         n = Node().initWithDict(sender)
249         self.insertNode(n, contacted=0)
250         return (), self.node.senderDict()
251                 
252     def xmlrpc_find_node(self, target, sender):
253         nodes = self.table.findNodes(target.decode('base64'))
254         nodes = map(lambda node: node.senderDict(), nodes)
255         ip = self.crequest.getClientIP()
256         sender['host'] = ip
257         n = Node().initWithDict(sender)
258         self.insertNode(n, contacted=0)
259         return nodes, self.node.senderDict()
260             
261     def xmlrpc_store_value(self, key, value, sender):
262         h1 = sha(key+value).digest().encode('base64')
263         t = `time.time()`
264         s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t)
265         c = self.store.cursor()
266         try:
267             c.execute(s)
268         except:
269             # update last insert time
270             s = "update kv set time = '%s' where hkv = '%s'" % (t, h1)
271             c.execute(s)
272         self.store.commit()
273         ip = self.crequest.getClientIP()
274         sender['host'] = ip
275         n = Node().initWithDict(sender)
276         self.insertNode(n, contacted=0)
277         return (), self.node.senderDict()
278         
279     def xmlrpc_find_value(self, key, sender):
280         ip = self.crequest.getClientIP()
281         key = key.decode('base64')
282         sender['host'] = ip
283         n = Node().initWithDict(sender)
284         self.insertNode(n, contacted=0)
285
286         l = self.retrieveValues(key)
287         if len(l) > 0:
288             return {'values' : l}, self.node.senderDict()
289         else:
290             nodes = self.table.findNodes(key)
291             nodes = map(lambda node: node.senderDict(), nodes)
292             return {'nodes' : nodes}, self.node.senderDict()
293
294
295
296
297
298 #------ testing
299
300 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
301     from whrandom import randrange
302     import thread
303     port = 2001
304     l = []
305         
306     if not quiet:
307         print "Building %s peer table." % peers
308         
309     for i in xrange(peers):
310         a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
311         l.append(a)
312     
313
314     thread.start_new_thread(l[0].app.run, ())
315     time.sleep(1)
316     for peer in l[1:]:
317         peer.app.run()
318         #time.sleep(.25)
319
320     print "adding contacts...."
321
322     for peer in l[1:]:
323         n = l[randrange(0, len(l))].node
324         peer.addContact(host, n.port)
325         n = l[randrange(0, len(l))].node
326         peer.addContact(host, n.port)
327         n = l[randrange(0, len(l))].node
328         peer.addContact(host, n.port)
329         if pause:
330             time.sleep(.5)
331             
332     time.sleep(1)
333     print "finding close nodes...."
334
335     for peer in l:
336         peer.findCloseNodes()
337         if pause:
338             time.sleep(.5)
339     if pause:
340             time.sleep(1)
341 #    for peer in l:
342 #       peer.refreshTable()
343     return l
344         
345 def test_find_nodes(l, quiet=0):
346     import threading, sys
347     from whrandom import randrange
348     flag = threading.Event()
349     
350     n = len(l)
351     
352     a = l[randrange(0,n)]
353     b = l[randrange(0,n)]
354     
355     def callback(nodes, flag=flag, id = b.node.id):
356         if (len(nodes) >0) and (nodes[0].id == id):
357             print "test_find_nodes      PASSED"
358         else:
359             print "test_find_nodes      FAILED"
360         flag.set()
361     a.findNode(b.node.id, callback)
362     flag.wait()
363     
364 def test_find_value(l, quiet=0):
365     from whrandom import randrange
366     from sha import sha
367     from hash import newID
368     import time, threading, sys
369     
370     fa = threading.Event()
371     fb = threading.Event()
372     fc = threading.Event()
373     
374     n = len(l)
375     a = l[randrange(0,n)]
376     b = l[randrange(0,n)]
377     c = l[randrange(0,n)]
378     d = l[randrange(0,n)]
379
380     key = newID()
381     value = newID()
382     if not quiet:
383         print "inserting value..."
384         sys.stdout.flush()
385     a.storeValueForKey(key, value)
386     time.sleep(3)
387     print "finding..."
388     sys.stdout.flush()
389     
390     class cb:
391         def __init__(self, flag, value=value):
392             self.flag = flag
393             self.val = value
394             self.found = 0
395         def callback(self, values):
396             try:
397                 if(len(values) == 0):
398                     if not self.found:
399                         print "find                NOT FOUND"
400                     else:
401                         print "find                FOUND"
402                     sys.stdout.flush()
403
404                 else:
405                     if self.val in values:
406                         self.found = 1
407             finally:
408                 self.flag.set()
409
410     b.valueForKey(key, cb(fa).callback)
411     fa.wait()
412     c.valueForKey(key, cb(fb).callback)
413     fb.wait()
414     d.valueForKey(key, cb(fc).callback)    
415     fc.wait()
416     
417 def test_one(host, port, db='/tmp/test'):
418     import thread
419     k = Khashmir(host, port, db)
420     thread.start_new_thread(k.app.run, ())
421     return k
422     
423 if __name__ == "__main__":
424     import sys
425     n = 8
426     if len(sys.argv) > 1:
427         n = int(sys.argv[1])
428     l = test_build_net(peers=n)
429     time.sleep(3)
430     print "finding nodes..."
431     for i in range(10):
432         test_find_nodes(l)
433     print "inserting and fetching values..."
434     for i in range(10):
435         test_find_value(l)