]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
5e7c7171219af99385588b21fbb9d45348ecb20e
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 import sqlite  ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
25
26 KhashmirDBExcept = "KhashmirDBExcept"
27
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30         __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31         def __init__(self, host, port, db='khashmir.db'):
32                 self.setup(host, port, db)
33                 
34         def setup(self, host, port, db='khashmir.db'):
35                 self.findDB(db)
36                 self.node = self.loadSelfNode(host, port)
37                 self.table = KTable(self.node)
38                 self.loadRoutingTable()
39                 self.app = Application("xmlrpc")
40                 self.app.listenTCP(port, server.Site(self))
41                 self.last = time.time()
42                 KeyExpirer(store=self.store)
43                 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
44                 self.refreshTable(force=1)
45                 
46         def loadSelfNode(self, host, port):
47                 c = self.store.cursor()
48                 c.execute('select id from self where num = 0;')
49                 if c.rowcount > 0:
50                         id = c.fetchone()[0].decode('base64')
51                 else:
52                         id = newID()
53                 return Node().init(id, host, port)
54                 
55         def saveSelfNode(self):
56                 self.store.autocommit = 0
57                 c = self.store.cursor()
58                 c.execute('delete from self where num = 0;')
59                 c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
60                 self.store.commit()
61                 self.store.autocommit = 1
62                 
63         def checkpoint(self):
64                 self.saveSelfNode()
65                 self.dumpRoutingTable()
66                 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
67                 
68         def findDB(self, db):
69                 import os
70                 try:
71                         os.stat(db)
72                 except OSError:
73                         self.createNewDB(db)
74                 else:
75                         self.loadDB(db)
76             
77         def loadDB(self, db):
78                 try:
79                         self.store = sqlite.connect(db=db)
80                         self.store.autocommit = 1
81                 except:
82                         import traceback
83                         raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
84             
85         def createNewDB(self, db):
86                 self.store = sqlite.connect(db=db)
87                 self.store.autocommit = 1
88                 s = """
89                         create table kv (key text, value text, time timestamp, primary key (key, value));
90                         create index kv_key on kv(key);
91                         create index kv_timestamp on kv(time);
92                         
93                         create table nodes (id text primary key, host text, port number);
94                         
95                         create table self (num number primary key, id text);
96                         """
97                 c = self.store.cursor()
98                 c.execute(s)
99
100         def dumpRoutingTable(self):
101                 """
102                         save routing table nodes to the database
103                 """
104                 self.store.autocommit = 0;
105                 c = self.store.cursor()
106                 c.execute("delete from nodes where id not NULL;")
107                 for bucket in self.table.buckets:
108                         for node in bucket.l:
109                                 d = node.senderDict()
110                                 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
111                 self.store.commit()
112                 self.store.autocommit = 1;
113                 
114         def loadRoutingTable(self):
115                 """
116                         load routing table nodes from database
117                         it's usually a good idea to call refreshTable(force=1) after loading the table
118                 """
119                 c = self.store.cursor()
120                 c.execute("select * from nodes;")
121                 for rec in c.fetchall():
122                         n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
123                         self.table.insertNode(n, contacted=0)
124                         
125         def render(self, request):
126                 """
127                         Override the built in render so we can have access to the request object!
128                         note, crequest is probably only valid on the initial call (not after deferred!)
129                 """
130                 self.crequest = request
131                 return xmlrpc.XMLRPC.render(self, request)
132
133
134         #######
135         #######  LOCAL INTERFACE    - use these methods!
136         def addContact(self, host, port):
137                 """
138                         ping this node and add the contact info to the table on pong!
139                 """
140                 n =Node().init(const.NULL_ID, host, port)  # note, we 
141                 self.sendPing(n)
142
143         ## this call is async!
144         def findNode(self, id, callback, errback=None):
145                 """ returns the contact info for node, or the k closest nodes, from the global table """
146                 # get K nodes out of local table/cache, or the node we want
147                 nodes = self.table.findNodes(id)
148                 d = Deferred()
149                 if errback:
150                         d.addCallbacks(callback, errback)
151                 else:
152                         d.addCallback(callback)
153                 if len(nodes) == 1 and nodes[0].id == id :
154                         d.callback(nodes)
155                 else:
156                         # create our search state
157                         state = FindNode(self, id, d.callback)
158                         reactor.callFromThread(state.goWithNodes, nodes)
159         
160         
161         ## also async
162         def valueForKey(self, key, callback):
163                 """ returns the values found for key in global table
164                         callback will be called with a list of values for each peer that returns unique values
165                         final callback will be an empty list - probably should change to 'more coming' arg
166                 """
167                 nodes = self.table.findNodes(key)
168                 
169                 # get locals
170                 l = self.retrieveValues(key)
171                 if len(l) > 0:
172                         reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
173                 
174                 # create our search state
175                 state = GetValue(self, key, callback)
176                 reactor.callFromThread(state.goWithNodes, nodes, l)
177
178         ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
179         def storeValueForKey(self, key, value, callback=None):
180                 """ stores the value for key in the global table, returns immediately, no status 
181                         in this implementation, peers respond but don't indicate status to storing values
182                         a key can have many values
183                 """
184                 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
185                         if not response:
186                                 # default callback
187                                 def _storedValueHandler(sender):
188                                         pass
189                                 response=_storedValueHandler
190                 
191                         for node in nodes[:const.STORE_REDUNDANCY]:
192                                 def cb(t, table = table, node=node, resp=response):
193                                         self.table.insertNode(node)
194                                         response(t)
195                                 if node.id != self.node.id:
196                                         def default(err, node=node, table=table):
197                                                 table.nodeFailed(node)
198                                         df = node.storeValue(key, value, self.node.senderDict())
199                                         df.addCallbacks(cb, default)
200                 # this call is asynch
201                 self.findNode(key, _storeValueForKey)
202         
203         
204         def insertNode(self, n, contacted=1):
205                 """
206                 insert a node in our local table, pinging oldest contact in bucket, if necessary
207                 
208                 If all you have is a host/port, then use addContact, which calls this method after
209                 receiving the PONG from the remote node.  The reason for the seperation is we can't insert
210                 a node into the table without it's peer-ID.  That means of course the node passed into this
211                 method needs to be a properly formed Node object with a valid ID.
212                 """
213                 old = self.table.insertNode(n, contacted=contacted)
214                 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
215                         # the bucket is full, check to see if old node is still around and if so, replace it
216                         
217                         ## these are the callbacks used when we ping the oldest node in a bucket
218                         def _staleNodeHandler(oldnode=old, newnode = n):
219                                 """ called if the pinged node never responds """
220                                 self.table.replaceStaleNode(old, newnode)
221                         
222                         def _notStaleNodeHandler(sender, old=old):
223                                 """ called when we get a pong from the old node """
224                                 args, sender = sender
225                                 sender = Node().initWithDict(sender)
226                                 if sender.id == old.id:
227                                         self.table.justSeenNode(old)
228                         
229                         df = old.ping(self.node.senderDict())
230                         df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
231
232         def sendPing(self, node):
233                 """
234                         ping a node
235                 """
236                 df = node.ping(self.node.senderDict())
237                 ## these are the callbacks we use when we issue a PING
238                 def _pongHandler(args, node=node, table=self.table):
239                         l, sender = args
240                         if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
241                                 # whoah, got response from different peer than we were expecting
242                                 self.table.invalidateNode(node)
243                         else:
244                                 sender['host'] = node.host
245                                 sender['port'] = node.port
246                                 n = Node().initWithDict(sender)
247                                 table.insertNode(n)
248                                 return
249                 def _defaultPong(err, node=node, table=self.table):
250                         table.nodeFailed(node)
251                 
252                 df.addCallbacks(_pongHandler,_defaultPong)
253
254         def findCloseNodes(self, callback=lambda a: None):
255                 """
256                         This does a findNode on the ID one away from our own.  
257                         This will allow us to populate our table with nodes on our network closest to our own.
258                         This is called as soon as we start up with an empty table
259                 """
260                 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
261                 self.findNode(id, callback)
262
263         def refreshTable(self, force=0):
264                 """
265                         force=1 will refresh table regardless of last bucket access time
266                 """
267                 def callback(nodes):
268                         pass
269         
270                 for bucket in self.table.buckets:
271                         if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
272                                 id = newIDInRange(bucket.min, bucket.max)
273                                 self.findNode(id, callback)
274
275
276         def retrieveValues(self, key):
277                 s = "select value from kv where key = '%s';" % key.encode('base64')
278                 c = self.store.cursor()
279                 c.execute(s)
280                 t = c.fetchone()
281                 l = []
282                 while t:
283                         l.append(t['value'])
284                         t = c.fetchone()
285                 return l
286         
287         #####
288         ##### INCOMING MESSAGE HANDLERS
289         
290         def xmlrpc_ping(self, sender):
291                 """
292                         takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
293                         returns sender dict
294                 """
295                 ip = self.crequest.getClientIP()
296                 sender['host'] = ip
297                 n = Node().initWithDict(sender)
298                 self.insertNode(n, contacted=0)
299                 return (), self.node.senderDict()
300                 
301         def xmlrpc_find_node(self, target, sender):
302                 nodes = self.table.findNodes(target.decode('base64'))
303                 nodes = map(lambda node: node.senderDict(), nodes)
304                 ip = self.crequest.getClientIP()
305                 sender['host'] = ip
306                 n = Node().initWithDict(sender)
307                 self.insertNode(n, contacted=0)
308                 return nodes, self.node.senderDict()
309             
310         def xmlrpc_store_value(self, key, value, sender):
311                 t = "%0.6f" % time.time()
312                 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
313                 c = self.store.cursor()
314                 try:
315                         c.execute(s)
316                 except pysqlite_exceptions.IntegrityError, reason:
317                         # update last insert time
318                         s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
319                         c.execute(s)
320                 ip = self.crequest.getClientIP()
321                 sender['host'] = ip
322                 n = Node().initWithDict(sender)
323                 self.insertNode(n, contacted=0)
324                 return (), self.node.senderDict()
325         
326         def xmlrpc_find_value(self, key, sender):
327                 ip = self.crequest.getClientIP()
328                 key = key.decode('base64')
329                 sender['host'] = ip
330                 n = Node().initWithDict(sender)
331                 self.insertNode(n, contacted=0)
332         
333                 l = self.retrieveValues(key)
334                 if len(l) > 0:
335                         return {'values' : l}, self.node.senderDict()
336                 else:
337                         nodes = self.table.findNodes(key)
338                         nodes = map(lambda node: node.senderDict(), nodes)
339                         return {'nodes' : nodes}, self.node.senderDict()
340 #------ testing
341
342 def test_build_net(quiet=0, peers=24, host='localhost',  pause=0, startport=2001):
343         from whrandom import randrange
344         import threading
345         import thread
346         import sys
347         port = startport
348         l = []
349                 
350         if not quiet:
351                 print "Building %s peer table." % peers
352         
353         for i in xrange(peers):
354                 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
355                 l.append(a)
356         
357         
358         thread.start_new_thread(l[0].app.run, ())
359         time.sleep(1)
360         for peer in l[1:]:
361                 peer.app.run()
362         time.sleep(3)
363         
364         print "adding contacts...."
365         
366         for peer in l:
367                 n = l[randrange(0, len(l))].node
368                 peer.addContact(host, n.port)
369                 n = l[randrange(0, len(l))].node
370                 peer.addContact(host, n.port)
371                 n = l[randrange(0, len(l))].node
372                 peer.addContact(host, n.port)
373                 if pause:
374                         time.sleep(.33)
375                 sys.stdout.flush()
376         
377         time.sleep(10)
378         print "finding close nodes...."
379         
380         for peer in l:
381                 flag = threading.Event()
382                 def cb(nodes, f=flag):
383                         f.set()
384                 peer.findCloseNodes(cb)
385                 flag.wait()
386                 sys.stdout.flush()
387         #    for peer in l:
388         #       peer.refreshTable()
389         return l
390         
391 def test_find_nodes(l, quiet=0):
392         import threading, sys
393         from whrandom import randrange
394         flag = threading.Event()
395         
396         n = len(l)
397         
398         a = l[randrange(0,n)]
399         b = l[randrange(0,n)]
400         
401         def callback(nodes, flag=flag, id = b.node.id):
402                 if (len(nodes) >0) and (nodes[0].id == id):
403                         print "test_find_nodes  PASSED"
404                 else:
405                         print "test_find_nodes  FAILED"
406                 flag.set()
407         a.findNode(b.node.id, callback)
408         flag.wait()
409     
410 def test_find_value(l, quiet=0):
411         from whrandom import randrange
412         from sha import sha
413         from hash import newID
414         import time, threading, sys
415         
416         fa = threading.Event()
417         fb = threading.Event()
418         fc = threading.Event()
419         
420         n = len(l)
421         a = l[randrange(0,n)]
422         b = l[randrange(0,n)]
423         c = l[randrange(0,n)]
424         d = l[randrange(0,n)]
425         
426         key = newID()
427         value = newID()
428         if not quiet:
429                 print "inserting value..."
430                 sys.stdout.flush()
431         a.storeValueForKey(key, value)
432         time.sleep(3)
433         if not quiet:
434                 print "finding..."
435                 sys.stdout.flush()
436         
437         class cb:
438                 def __init__(self, flag, value=value):
439                         self.flag = flag
440                         self.val = value
441                         self.found = 0
442                 def callback(self, values):
443                         try:
444                                 if(len(values) == 0):
445                                         if not self.found:
446                                                 print "find                NOT FOUND"
447                                         else:
448                                                 print "find                FOUND"
449                                         sys.stdout.flush()
450                                 else:
451                                         if self.val in values:
452                                                 self.found = 1
453                         finally:
454                                 self.flag.set()
455         
456         b.valueForKey(key, cb(fa).callback)
457         fa.wait()
458         c.valueForKey(key, cb(fb).callback)
459         fb.wait()
460         d.valueForKey(key, cb(fc).callback)    
461         fc.wait()
462     
463 def test_one(host, port, db='/tmp/test'):
464         import thread
465         k = Khashmir(host, port, db)
466         thread.start_new_thread(k.app.run, ())
467         return k
468     
469 if __name__ == "__main__":
470     import sys
471     n = 8
472     if len(sys.argv) > 1:
473                 n = int(sys.argv[1])
474     l = test_build_net(peers=n)
475     time.sleep(3)
476     print "finding nodes..."
477     for i in range(10):
478                 test_find_nodes(l)
479     print "inserting and fetching values..."
480     for i in range(10):
481                 test_find_value(l)