]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
slight changes to test scripts
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 import sqlite  ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
25
26 KhashmirDBExcept = "KhashmirDBExcept"
27
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30         __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31         def __init__(self, host, port, db='khashmir.db'):
32                 self.setup(host, port, db)
33         def setup(self, host, port, db='khashmir.db'):
34                 self.node = Node().init(newID(), host, port)
35                 self.table = KTable(self.node)
36                 self.app = Application("xmlrpc")
37                 self.app.listenTCP(port, server.Site(self))
38                 self.findDB(db)
39                 self.last = time.time()
40                 KeyExpirer(store=self.store)
41
42         def findDB(self, db):
43                 import os
44                 try:
45                         os.stat(db)
46                 except OSError:
47                         self.createNewDB(db)
48                 else:
49                         self.loadDB(db)
50             
51         def loadDB(self, db):
52                 try:
53                         self.store = sqlite.connect(db=db)
54                         self.store.autocommit = 1
55                 except:
56                         import traceback
57                         raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
58             
59         def createNewDB(self, db):
60                 self.store = sqlite.connect(db=db)
61                 self.store.autocommit = 1
62                 s = """
63                         create table kv (key text, value text, time timestamp, primary key (key, value));
64                         create index kv_key on kv(key);
65                         create index kv_timestamp on kv(time);
66                         
67                         create table nodes (id text primary key, host text, port number);
68                         """
69                 c = self.store.cursor()
70                 c.execute(s)
71
72         def render(self, request):
73                 """
74                         Override the built in render so we can have access to the request object!
75                         note, crequest is probably only valid on the initial call (not after deferred!)
76                 """
77                 self.crequest = request
78                 return xmlrpc.XMLRPC.render(self, request)
79
80
81         #######
82         #######  LOCAL INTERFACE    - use these methods!
83         def addContact(self, host, port):
84                 """
85                         ping this node and add the contact info to the table on pong!
86                 """
87                 n =Node().init(const.NULL_ID, host, port)  # note, we 
88                 self.sendPing(n)
89
90         ## this call is async!
91         def findNode(self, id, callback, errback=None):
92                 """ returns the contact info for node, or the k closest nodes, from the global table """
93                 # get K nodes out of local table/cache, or the node we want
94                 nodes = self.table.findNodes(id)
95                 d = Deferred()
96                 if errback:
97                         d.addCallbacks(callback, errback)
98                 else:
99                         d.addCallback(callback)
100                 if len(nodes) == 1 and nodes[0].id == id :
101                         d.callback(nodes)
102                 else:
103                         # create our search state
104                         state = FindNode(self, id, d.callback)
105                         reactor.callFromThread(state.goWithNodes, nodes)
106         
107         
108         ## also async
109         def valueForKey(self, key, callback):
110                 """ returns the values found for key in global table
111                         callback will be called with a list of values for each peer that returns unique values
112                         final callback will be an empty list - probably should change to 'more coming' arg
113                 """
114                 nodes = self.table.findNodes(key)
115                 
116                 # get locals
117                 l = self.retrieveValues(key)
118                 if len(l) > 0:
119                         reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
120                 
121                 # create our search state
122                 state = GetValue(self, key, callback)
123                 reactor.callFromThread(state.goWithNodes, nodes, l)
124
125         ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
126         def storeValueForKey(self, key, value, callback=None):
127                 """ stores the value for key in the global table, returns immediately, no status 
128                         in this implementation, peers respond but don't indicate status to storing values
129                         a key can have many values
130                 """
131                 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
132                         if not response:
133                                 # default callback
134                                 def _storedValueHandler(sender):
135                                         pass
136                                 response=_storedValueHandler
137                 
138                         for node in nodes[:const.STORE_REDUNDANCY]:
139                                 def cb(t, table = table, node=node, resp=response):
140                                         self.table.insertNode(node)
141                                         response(t)
142                                 if node.id != self.node.id:
143                                         def default(err, node=node, table=table):
144                                                 table.nodeFailed(node)
145                                         df = node.storeValue(key, value, self.node.senderDict())
146                                         df.addCallbacks(cb, default)
147                 # this call is asynch
148                 self.findNode(key, _storeValueForKey)
149         
150         
151         def insertNode(self, n, contacted=1):
152                 """
153                 insert a node in our local table, pinging oldest contact in bucket, if necessary
154                 
155                 If all you have is a host/port, then use addContact, which calls this method after
156                 receiving the PONG from the remote node.  The reason for the seperation is we can't insert
157                 a node into the table without it's peer-ID.  That means of course the node passed into this
158                 method needs to be a properly formed Node object with a valid ID.
159                 """
160                 old = self.table.insertNode(n, contacted=contacted)
161                 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
162                         # the bucket is full, check to see if old node is still around and if so, replace it
163                         
164                         ## these are the callbacks used when we ping the oldest node in a bucket
165                         def _staleNodeHandler(oldnode=old, newnode = n):
166                                 """ called if the pinged node never responds """
167                                 self.table.replaceStaleNode(old, newnode)
168                         
169                         def _notStaleNodeHandler(sender, old=old):
170                                 """ called when we get a pong from the old node """
171                                 args, sender = sender
172                                 sender = Node().initWithDict(sender)
173                                 if sender.id == old.id:
174                                         self.table.justSeenNode(old)
175                         
176                         df = old.ping(self.node.senderDict())
177                         df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
178
179         def sendPing(self, node):
180                 """
181                         ping a node
182                 """
183                 df = node.ping(self.node.senderDict())
184                 ## these are the callbacks we use when we issue a PING
185                 def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
186                         l, sender = args
187                         if id != const.NULL_ID and id != sender['id'].decode('base64'):
188                                 # whoah, got response from different peer than we were expecting
189                                 pass
190                         else:
191                                 sender['host'] = host
192                                 sender['port'] = port
193                                 n = Node().initWithDict(sender)
194                                 table.insertNode(n)
195                                 return
196                 def _defaultPong(err, node=node, table=self.table):
197                         table.nodeFailed(node)
198                 
199                 df.addCallbacks(_pongHandler,_defaultPong)
200
201         def findCloseNodes(self, callback=lambda a: None):
202                 """
203                         This does a findNode on the ID one away from our own.  
204                         This will allow us to populate our table with nodes on our network closest to our own.
205                         This is called as soon as we start up with an empty table
206                 """
207                 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
208                 self.findNode(id, callback)
209
210         def refreshTable(self):
211                 """
212                         
213                 """
214                 def callback(nodes):
215                         pass
216         
217                 for bucket in self.table.buckets:
218                         if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
219                                 id = newIDInRange(bucket.min, bucket.max)
220                                 self.findNode(id, callback)
221
222
223         def retrieveValues(self, key):
224                 s = "select value from kv where key = '%s';" % key.encode('base64')
225                 c = self.store.cursor()
226                 c.execute(s)
227                 t = c.fetchone()
228                 l = []
229                 while t:
230                         l.append(t['value'])
231                         t = c.fetchone()
232                 return l
233         
234         #####
235         ##### INCOMING MESSAGE HANDLERS
236         
237         def xmlrpc_ping(self, sender):
238                 """
239                         takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
240                         returns sender dict
241                 """
242                 ip = self.crequest.getClientIP()
243                 sender['host'] = ip
244                 n = Node().initWithDict(sender)
245                 self.insertNode(n, contacted=0)
246                 return (), self.node.senderDict()
247                 
248         def xmlrpc_find_node(self, target, sender):
249                 nodes = self.table.findNodes(target.decode('base64'))
250                 nodes = map(lambda node: node.senderDict(), nodes)
251                 ip = self.crequest.getClientIP()
252                 sender['host'] = ip
253                 n = Node().initWithDict(sender)
254                 self.insertNode(n, contacted=0)
255                 return nodes, self.node.senderDict()
256             
257         def xmlrpc_store_value(self, key, value, sender):
258                 t = "%0.6f" % time.time()
259                 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
260                 c = self.store.cursor()
261                 try:
262                         c.execute(s)
263                 except pysqlite_exceptions.IntegrityError, reason:
264                         # update last insert time
265                         s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
266                         c.execute(s)
267                 ip = self.crequest.getClientIP()
268                 sender['host'] = ip
269                 n = Node().initWithDict(sender)
270                 self.insertNode(n, contacted=0)
271                 return (), self.node.senderDict()
272         
273         def xmlrpc_find_value(self, key, sender):
274                 ip = self.crequest.getClientIP()
275                 key = key.decode('base64')
276                 sender['host'] = ip
277                 n = Node().initWithDict(sender)
278                 self.insertNode(n, contacted=0)
279         
280                 l = self.retrieveValues(key)
281                 if len(l) > 0:
282                         return {'values' : l}, self.node.senderDict()
283                 else:
284                         nodes = self.table.findNodes(key)
285                         nodes = map(lambda node: node.senderDict(), nodes)
286                         return {'nodes' : nodes}, self.node.senderDict()
287 #------ testing
288
289 def test_build_net(quiet=0, peers=24, host='localhost',  pause=0):
290         from whrandom import randrange
291         import threading
292         import thread
293         import sys
294         port = 2001
295         l = []
296                 
297         if not quiet:
298                 print "Building %s peer table." % peers
299         
300         for i in xrange(peers):
301                 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
302                 l.append(a)
303         
304         
305         thread.start_new_thread(l[0].app.run, ())
306         time.sleep(1)
307         for peer in l[1:]:
308                 peer.app.run()
309         time.sleep(3)
310         
311         print "adding contacts...."
312         
313         for peer in l:
314                 n = l[randrange(0, len(l))].node
315                 peer.addContact(host, n.port)
316                 n = l[randrange(0, len(l))].node
317                 peer.addContact(host, n.port)
318                 n = l[randrange(0, len(l))].node
319                 peer.addContact(host, n.port)
320                 if pause:
321                         time.sleep(.33)
322                 sys.stdout.flush()
323         
324         time.sleep(10)
325         print "finding close nodes...."
326         
327         for peer in l:
328                 flag = threading.Event()
329                 def cb(nodes, f=flag):
330                         f.set()
331                 peer.findCloseNodes(cb)
332                 flag.wait()
333                 sys.stdout.flush()
334         #    for peer in l:
335         #       peer.refreshTable()
336         return l
337         
338 def test_find_nodes(l, quiet=0):
339         import threading, sys
340         from whrandom import randrange
341         flag = threading.Event()
342         
343         n = len(l)
344         
345         a = l[randrange(0,n)]
346         b = l[randrange(0,n)]
347         
348         def callback(nodes, flag=flag, id = b.node.id):
349                 if (len(nodes) >0) and (nodes[0].id == id):
350                         print "test_find_nodes  PASSED"
351                 else:
352                         print "test_find_nodes  FAILED"
353                 flag.set()
354         a.findNode(b.node.id, callback)
355         flag.wait()
356     
357 def test_find_value(l, quiet=0):
358         from whrandom import randrange
359         from sha import sha
360         from hash import newID
361         import time, threading, sys
362         
363         fa = threading.Event()
364         fb = threading.Event()
365         fc = threading.Event()
366         
367         n = len(l)
368         a = l[randrange(0,n)]
369         b = l[randrange(0,n)]
370         c = l[randrange(0,n)]
371         d = l[randrange(0,n)]
372         
373         key = newID()
374         value = newID()
375         if not quiet:
376                 print "inserting value..."
377                 sys.stdout.flush()
378         a.storeValueForKey(key, value)
379         time.sleep(3)
380         if not quiet:
381                 print "finding..."
382                 sys.stdout.flush()
383         
384         class cb:
385                 def __init__(self, flag, value=value):
386                         self.flag = flag
387                         self.val = value
388                         self.found = 0
389                 def callback(self, values):
390                         try:
391                                 if(len(values) == 0):
392                                         if not self.found:
393                                                 print "find                NOT FOUND"
394                                         else:
395                                                 print "find                FOUND"
396                                         sys.stdout.flush()
397                                 else:
398                                         if self.val in values:
399                                                 self.found = 1
400                         finally:
401                                 self.flag.set()
402         
403         b.valueForKey(key, cb(fa).callback)
404         fa.wait()
405         c.valueForKey(key, cb(fb).callback)
406         fb.wait()
407         d.valueForKey(key, cb(fc).callback)    
408         fc.wait()
409     
410 def test_one(host, port, db='/tmp/test'):
411         import thread
412         k = Khashmir(host, port, db)
413         thread.start_new_thread(k.app.run, ())
414         return k
415     
416 if __name__ == "__main__":
417     import sys
418     n = 8
419     if len(sys.argv) > 1:
420                 n = int(sys.argv[1])
421     l = test_build_net(peers=n)
422     time.sleep(3)
423     print "finding nodes..."
424     for i in range(10):
425                 test_find_nodes(l)
426     print "inserting and fetching values..."
427     for i in range(10):
428                 test_find_value(l)