36ff79f8abf02894802285f8c980af4464f8c373
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 import sqlite  ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
25
26 KhashmirDBExcept = "KhashmirDBExcept"
27
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31     def __init__(self, host, port, db='khashmir.db'):
32         self.node = Node().init(newID(), host, port)
33         self.table = KTable(self.node)
34         self.app = Application("xmlrpc")
35         self.app.listenTCP(port, server.Site(self))
36         self.findDB(db)
37         self.last = time.time()
38         KeyExpirer(store=self.store)
39
40     def findDB(self, db):
41         import os
42         try:
43             os.stat(db)
44         except OSError:
45             self.createNewDB(db)
46         else:
47             self.loadDB(db)
48             
49     def loadDB(self, db):
50         try:
51             self.store = sqlite.connect(db=db)
52             self.store.autocommit = 1
53         except:
54             import traceback
55             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
56             
57     def createNewDB(self, db):
58         self.store = sqlite.connect(db=db)
59         self.store.autocommit = 1
60         s = """
61             create table kv (key text, value text, time timestamp, primary key (key, value));
62             create index kv_key on kv(key);
63             create index kv_timestamp on kv(time);
64             
65             create table nodes (id text primary key, host text, port number);
66             """
67         c = self.store.cursor()
68         c.execute(s)
69                 
70     def render(self, request):
71         """
72             Override the built in render so we can have access to the request object!
73             note, crequest is probably only valid on the initial call (not after deferred!)
74         """
75         self.crequest = request
76         return xmlrpc.XMLRPC.render(self, request)
77
78         
79     #######
80     #######  LOCAL INTERFACE    - use these methods!
81     def addContact(self, host, port):
82         """
83          ping this node and add the contact info to the table on pong!
84         """
85         n =Node().init(const.NULL_ID, host, port)  # note, we 
86         self.sendPing(n)
87
88
89     ## this call is async!
90     def findNode(self, id, callback, errback=None):
91         """ returns the contact info for node, or the k closest nodes, from the global table """
92         # get K nodes out of local table/cache, or the node we want
93         nodes = self.table.findNodes(id)
94         d = Deferred()
95         if errback:
96             d.addCallbacks(callback, errback)
97         else:
98             d.addCallback(callback)
99         if len(nodes) == 1 and nodes[0].id == id :
100             d.callback(nodes)
101         else:
102             # create our search state
103             state = FindNode(self, id, d.callback)
104             reactor.callFromThread(state.goWithNodes, nodes)
105     
106     
107     ## also async
108     def valueForKey(self, key, callback):
109         """ returns the values found for key in global table
110             callback will be called with a list of values for each peer that returns unique values
111             final callback will be an empty list - probably should change to 'more coming' arg
112         """
113         nodes = self.table.findNodes(key)
114
115         # get locals
116         l = self.retrieveValues(key)
117         if len(l) > 0:
118             reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
119
120         # create our search state
121         state = GetValue(self, key, callback)
122         reactor.callFromThread(state.goWithNodes, nodes, l)
123         
124
125
126     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
127     def storeValueForKey(self, key, value, callback=None):
128         """ stores the value for key in the global table, returns immediately, no status 
129             in this implementation, peers respond but don't indicate status to storing values
130             a key can have many values
131         """
132         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
133             if not callback:
134                 # default callback
135                 def _storedValueHandler(sender):
136                     pass
137                 response=_storedValueHandler
138         
139             for node in nodes[:const.STORE_REDUNDANCY]:
140                 def cb(t, table = table, node=node, resp=response):
141                     self.table.insertNode(node)
142                     response(t)
143                 if node.id != self.node.id:
144                     def default(err, node=node, table=table):
145                         table.nodeFailed(node)
146                     df = node.storeValue(key, value, self.node.senderDict())
147                     df.addCallbacks(cb, lambda x: None)
148         # this call is asynch
149         self.findNode(key, _storeValueForKey)
150         
151         
152     def insertNode(self, n, contacted=1):
153         """
154         insert a node in our local table, pinging oldest contact in bucket, if necessary
155         
156         If all you have is a host/port, then use addContact, which calls this method after
157         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
158         a node into the table without it's peer-ID.  That means of course the node passed into this
159         method needs to be a properly formed Node object with a valid ID.
160         """
161         old = self.table.insertNode(n, contacted=contacted)
162         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
163             # the bucket is full, check to see if old node is still around and if so, replace it
164             
165             ## these are the callbacks used when we ping the oldest node in a bucket
166             def _staleNodeHandler(oldnode=old, newnode = n):
167                 """ called if the pinged node never responds """
168                 self.table.replaceStaleNode(old, newnode)
169         
170             def _notStaleNodeHandler(sender, old=old):
171                 """ called when we get a pong from the old node """
172                 sender = Node().initWithDict(sender)
173                 if sender.id == old.id:
174                     self.table.justSeenNode(old)
175
176             df = old.ping(self.node.senderDict())
177             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
178
179
180     def sendPing(self, node):
181         """
182             ping a node
183         """
184         df = node.ping(self.node.senderDict())
185         ## these are the callbacks we use when we issue a PING
186         def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
187             l, sender = args
188             if id != const.NULL_ID and id != sender['id'].decode('base64'):
189                 # whoah, got response from different peer than we were expecting
190                 pass
191             else:
192                 sender['host'] = host
193                 sender['port'] = port
194                 n = Node().initWithDict(sender)
195                 table.insertNode(n)
196             return
197         def _defaultPong(err, node=node, table=self.table):
198                 table.nodeFailed(node)
199
200         df.addCallbacks(_pongHandler,_defaultPong)
201
202
203     def findCloseNodes(self, callback=lambda a: None):
204         """
205             This does a findNode on the ID one away from our own.  
206             This will allow us to populate our table with nodes on our network closest to our own.
207             This is called as soon as we start up with an empty table
208         """
209         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
210         self.findNode(id, callback)
211
212     def refreshTable(self):
213         """
214             
215         """
216         def callback(nodes):
217             pass
218
219         for bucket in self.table.buckets:
220             if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
221                 id = newIDInRange(bucket.min, bucket.max)
222                 self.findNode(id, callback)
223         
224  
225     def retrieveValues(self, key):
226         s = "select value from kv where key = '%s';" % key.encode('base64')
227         c = self.store.cursor()
228         c.execute(s)
229         t = c.fetchone()
230         l = []
231         while t:
232             l.append(t['value'])
233             t = c.fetchone()
234         return l
235         
236     #####
237     ##### INCOMING MESSAGE HANDLERS
238     
239     def xmlrpc_ping(self, sender):
240         """
241             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
242             returns sender dict
243         """
244         ip = self.crequest.getClientIP()
245         sender['host'] = ip
246         n = Node().initWithDict(sender)
247         self.insertNode(n, contacted=0)
248         return (), self.node.senderDict()
249                 
250     def xmlrpc_find_node(self, target, sender):
251         nodes = self.table.findNodes(target.decode('base64'))
252         nodes = map(lambda node: node.senderDict(), nodes)
253         ip = self.crequest.getClientIP()
254         sender['host'] = ip
255         n = Node().initWithDict(sender)
256         self.insertNode(n, contacted=0)
257         return nodes, self.node.senderDict()
258             
259     def xmlrpc_store_value(self, key, value, sender):
260         t = "%0.6f" % time.time()
261         s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
262         c = self.store.cursor()
263         try:
264             c.execute(s)
265         except pysqlite_exceptions.IntegrityError, reason:
266             # update last insert time
267             s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
268             c.execute(s)
269         ip = self.crequest.getClientIP()
270         sender['host'] = ip
271         n = Node().initWithDict(sender)
272         self.insertNode(n, contacted=0)
273         return (), self.node.senderDict()
274         
275     def xmlrpc_find_value(self, key, sender):
276         ip = self.crequest.getClientIP()
277         key = key.decode('base64')
278         sender['host'] = ip
279         n = Node().initWithDict(sender)
280         self.insertNode(n, contacted=0)
281
282         l = self.retrieveValues(key)
283         if len(l) > 0:
284             return {'values' : l}, self.node.senderDict()
285         else:
286             nodes = self.table.findNodes(key)
287             nodes = map(lambda node: node.senderDict(), nodes)
288             return {'nodes' : nodes}, self.node.senderDict()
289
290
291
292
293
294 #------ testing
295
296 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
297     from whrandom import randrange
298     import threading
299     import thread
300     port = 2001
301     l = []
302         
303     if not quiet:
304         print "Building %s peer table." % peers
305         
306     for i in xrange(peers):
307         a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
308         l.append(a)
309     
310
311     thread.start_new_thread(l[0].app.run, ())
312     time.sleep(1)
313     for peer in l[1:]:
314         peer.app.run()
315     time.sleep(10)
316
317     print "adding contacts...."
318
319     for peer in l[1:]:
320         n = l[randrange(0, len(l))].node
321         peer.addContact(host, n.port)
322         n = l[randrange(0, len(l))].node
323         peer.addContact(host, n.port)
324         n = l[randrange(0, len(l))].node
325         peer.addContact(host, n.port)
326         if pause:
327             time.sleep(.33)
328         
329     time.sleep(10)
330     print "finding close nodes...."
331
332     for peer in l:
333         flag = threading.Event()
334         def cb(nodes, f=flag):
335             f.set()
336         peer.findCloseNodes(cb)
337         flag.wait()
338
339 #    for peer in l:
340 #       peer.refreshTable()
341     return l
342         
343 def test_find_nodes(l, quiet=0):
344     import threading, sys
345     from whrandom import randrange
346     flag = threading.Event()
347     
348     n = len(l)
349     
350     a = l[randrange(0,n)]
351     b = l[randrange(0,n)]
352     
353     def callback(nodes, flag=flag, id = b.node.id):
354         if (len(nodes) >0) and (nodes[0].id == id):
355             print "test_find_nodes      PASSED"
356         else:
357             print "test_find_nodes      FAILED"
358         flag.set()
359     a.findNode(b.node.id, callback)
360     flag.wait()
361     
362 def test_find_value(l, quiet=0):
363     from whrandom import randrange
364     from sha import sha
365     from hash import newID
366     import time, threading, sys
367     
368     fa = threading.Event()
369     fb = threading.Event()
370     fc = threading.Event()
371     
372     n = len(l)
373     a = l[randrange(0,n)]
374     b = l[randrange(0,n)]
375     c = l[randrange(0,n)]
376     d = l[randrange(0,n)]
377
378     key = newID()
379     value = newID()
380     if not quiet:
381         print "inserting value..."
382         sys.stdout.flush()
383     a.storeValueForKey(key, value)
384     time.sleep(3)
385     print "finding..."
386     sys.stdout.flush()
387     
388     class cb:
389         def __init__(self, flag, value=value):
390             self.flag = flag
391             self.val = value
392             self.found = 0
393         def callback(self, values):
394             try:
395                 if(len(values) == 0):
396                     if not self.found:
397                         print "find                NOT FOUND"
398                     else:
399                         print "find                FOUND"
400                     sys.stdout.flush()
401
402                 else:
403                     if self.val in values:
404                         self.found = 1
405             finally:
406                 self.flag.set()
407
408     b.valueForKey(key, cb(fa).callback)
409     fa.wait()
410     c.valueForKey(key, cb(fb).callback)
411     fb.wait()
412     d.valueForKey(key, cb(fc).callback)    
413     fc.wait()
414     
415 def test_one(host, port, db='/tmp/test'):
416     import thread
417     k = Khashmir(host, port, db)
418     thread.start_new_thread(k.app.run, ())
419     return k
420     
421 if __name__ == "__main__":
422     import sys
423     n = 8
424     if len(sys.argv) > 1:
425         n = int(sys.argv[1])
426     l = test_build_net(peers=n)
427     time.sleep(3)
428     print "finding nodes..."
429     for i in range(10):
430         test_find_nodes(l)
431     print "inserting and fetching values..."
432     for i in range(10):
433         test_find_value(l)