]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
b0b414d7cd4e0193be965a956b93559210d52448
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 import sqlite  ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
25
26 KhashmirDBExcept = "KhashmirDBExcept"
27
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30         __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31         def __init__(self, host, port, db='khashmir.db'):
32                 self.setup(host, port, db)
33         def setup(self, host, port, db='khashmir.db'):
34                 self.node = Node().init(newID(), host, port)
35                 self.table = KTable(self.node)
36                 self.app = Application("xmlrpc")
37                 self.app.listenTCP(port, server.Site(self))
38                 self.findDB(db)
39                 self.last = time.time()
40                 KeyExpirer(store=self.store)
41
42         def findDB(self, db):
43                 import os
44                 try:
45                         os.stat(db)
46                 except OSError:
47                         self.createNewDB(db)
48                 else:
49                         self.loadDB(db)
50             
51         def loadDB(self, db):
52                 try:
53                         self.store = sqlite.connect(db=db)
54                         self.store.autocommit = 1
55                 except:
56                         import traceback
57                         raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
58             
59         def createNewDB(self, db):
60                 self.store = sqlite.connect(db=db)
61                 self.store.autocommit = 1
62                 s = """
63                         create table kv (key text, value text, time timestamp, primary key (key, value));
64                         create index kv_key on kv(key);
65                         create index kv_timestamp on kv(time);
66                         
67                         create table nodes (id text primary key, host text, port number);
68                         """
69                 c = self.store.cursor()
70                 c.execute(s)
71
72         def render(self, request):
73                 """
74                         Override the built in render so we can have access to the request object!
75                         note, crequest is probably only valid on the initial call (not after deferred!)
76                 """
77                 self.crequest = request
78                 return xmlrpc.XMLRPC.render(self, request)
79
80
81         #######
82         #######  LOCAL INTERFACE    - use these methods!
83         def addContact(self, host, port):
84                 """
85                         ping this node and add the contact info to the table on pong!
86                 """
87                 n =Node().init(const.NULL_ID, host, port)  # note, we 
88                 self.sendPing(n)
89
90         ## this call is async!
91         def findNode(self, id, callback, errback=None):
92                 """ returns the contact info for node, or the k closest nodes, from the global table """
93                 # get K nodes out of local table/cache, or the node we want
94                 nodes = self.table.findNodes(id)
95                 d = Deferred()
96                 if errback:
97                         d.addCallbacks(callback, errback)
98                 else:
99                         d.addCallback(callback)
100                 if len(nodes) == 1 and nodes[0].id == id :
101                         d.callback(nodes)
102                 else:
103                         # create our search state
104                         state = FindNode(self, id, d.callback)
105                         reactor.callFromThread(state.goWithNodes, nodes)
106         
107         
108         ## also async
109         def valueForKey(self, key, callback):
110                 """ returns the values found for key in global table
111                         callback will be called with a list of values for each peer that returns unique values
112                         final callback will be an empty list - probably should change to 'more coming' arg
113                 """
114                 nodes = self.table.findNodes(key)
115                 
116                 # get locals
117                 l = self.retrieveValues(key)
118                 if len(l) > 0:
119                         reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
120                 
121                 # create our search state
122                 state = GetValue(self, key, callback)
123                 reactor.callFromThread(state.goWithNodes, nodes, l)
124
125         ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
126         def storeValueForKey(self, key, value, callback=None):
127                 """ stores the value for key in the global table, returns immediately, no status 
128                         in this implementation, peers respond but don't indicate status to storing values
129                         a key can have many values
130                 """
131                 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
132                         if not response:
133                                 # default callback
134                                 def _storedValueHandler(sender):
135                                         pass
136                                 response=_storedValueHandler
137                 
138                         for node in nodes[:const.STORE_REDUNDANCY]:
139                                 def cb(t, table = table, node=node, resp=response):
140                                         self.table.insertNode(node)
141                                         response(t)
142                                 if node.id != self.node.id:
143                                         def default(err, node=node, table=table):
144                                                 table.nodeFailed(node)
145                                         df = node.storeValue(key, value, self.node.senderDict())
146                                         df.addCallbacks(cb, default)
147                 # this call is asynch
148                 self.findNode(key, _storeValueForKey)
149         
150         
151         def insertNode(self, n, contacted=1):
152                 """
153                 insert a node in our local table, pinging oldest contact in bucket, if necessary
154                 
155                 If all you have is a host/port, then use addContact, which calls this method after
156                 receiving the PONG from the remote node.  The reason for the seperation is we can't insert
157                 a node into the table without it's peer-ID.  That means of course the node passed into this
158                 method needs to be a properly formed Node object with a valid ID.
159                 """
160                 old = self.table.insertNode(n, contacted=contacted)
161                 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
162                         # the bucket is full, check to see if old node is still around and if so, replace it
163                         
164                         ## these are the callbacks used when we ping the oldest node in a bucket
165                         def _staleNodeHandler(oldnode=old, newnode = n):
166                                 """ called if the pinged node never responds """
167                                 self.table.replaceStaleNode(old, newnode)
168                         
169                         def _notStaleNodeHandler(sender, old=old):
170                                 """ called when we get a pong from the old node """
171                                 args, sender = sender
172                                 sender = Node().initWithDict(sender)
173                                 if sender.id == old.id:
174                                         self.table.justSeenNode(old)
175                         
176                         df = old.ping(self.node.senderDict())
177                         df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
178
179         def sendPing(self, node):
180                 """
181                         ping a node
182                 """
183                 df = node.ping(self.node.senderDict())
184                 ## these are the callbacks we use when we issue a PING
185                 def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
186                         l, sender = args
187                         if id != const.NULL_ID and id != sender['id'].decode('base64'):
188                                 # whoah, got response from different peer than we were expecting
189                                 pass
190                         else:
191                                 sender['host'] = host
192                                 sender['port'] = port
193                                 n = Node().initWithDict(sender)
194                                 table.insertNode(n)
195                                 return
196                 def _defaultPong(err, node=node, table=self.table):
197                         table.nodeFailed(node)
198                 
199                 df.addCallbacks(_pongHandler,_defaultPong)
200
201         def findCloseNodes(self, callback=lambda a: None):
202                 """
203                         This does a findNode on the ID one away from our own.  
204                         This will allow us to populate our table with nodes on our network closest to our own.
205                         This is called as soon as we start up with an empty table
206                 """
207                 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
208                 self.findNode(id, callback)
209
210         def refreshTable(self):
211                 """
212                         
213                 """
214                 def callback(nodes):
215                         pass
216         
217                 for bucket in self.table.buckets:
218                         if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
219                                 id = newIDInRange(bucket.min, bucket.max)
220                                 self.findNode(id, callback)
221
222
223         def retrieveValues(self, key):
224                 s = "select value from kv where key = '%s';" % key.encode('base64')
225                 c = self.store.cursor()
226                 c.execute(s)
227                 t = c.fetchone()
228                 l = []
229                 while t:
230                         l.append(t['value'])
231                         t = c.fetchone()
232                 return l
233         
234         #####
235         ##### INCOMING MESSAGE HANDLERS
236         
237         def xmlrpc_ping(self, sender):
238                 """
239                         takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
240                         returns sender dict
241                 """
242                 ip = self.crequest.getClientIP()
243                 sender['host'] = ip
244                 n = Node().initWithDict(sender)
245                 self.insertNode(n, contacted=0)
246                 return (), self.node.senderDict()
247                 
248         def xmlrpc_find_node(self, target, sender):
249                 nodes = self.table.findNodes(target.decode('base64'))
250                 nodes = map(lambda node: node.senderDict(), nodes)
251                 ip = self.crequest.getClientIP()
252                 sender['host'] = ip
253                 n = Node().initWithDict(sender)
254                 self.insertNode(n, contacted=0)
255                 return nodes, self.node.senderDict()
256             
257         def xmlrpc_store_value(self, key, value, sender):
258                 t = "%0.6f" % time.time()
259                 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
260                 c = self.store.cursor()
261                 try:
262                         c.execute(s)
263                 except pysqlite_exceptions.IntegrityError, reason:
264                         # update last insert time
265                         s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
266                         c.execute(s)
267                 ip = self.crequest.getClientIP()
268                 sender['host'] = ip
269                 n = Node().initWithDict(sender)
270                 self.insertNode(n, contacted=0)
271                 return (), self.node.senderDict()
272         
273         def xmlrpc_find_value(self, key, sender):
274                 ip = self.crequest.getClientIP()
275                 key = key.decode('base64')
276                 sender['host'] = ip
277                 n = Node().initWithDict(sender)
278                 self.insertNode(n, contacted=0)
279         
280                 l = self.retrieveValues(key)
281                 if len(l) > 0:
282                         return {'values' : l}, self.node.senderDict()
283                 else:
284                         nodes = self.table.findNodes(key)
285                         nodes = map(lambda node: node.senderDict(), nodes)
286                         return {'nodes' : nodes}, self.node.senderDict()
287 #------ testing
288
289 def test_build_net(quiet=0, peers=24, host='localhost',  pause=0):
290         from whrandom import randrange
291         import threading
292         import thread
293         port = 2001
294         l = []
295                 
296         if not quiet:
297                 print "Building %s peer table." % peers
298         
299         for i in xrange(peers):
300                 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
301                 l.append(a)
302         
303         
304         thread.start_new_thread(l[0].app.run, ())
305         time.sleep(1)
306         for peer in l[1:]:
307                 peer.app.run()
308         time.sleep(10)
309         
310         print "adding contacts...."
311         
312         for peer in l[1:]:
313                 n = l[randrange(0, len(l))].node
314                 peer.addContact(host, n.port)
315                 n = l[randrange(0, len(l))].node
316                 peer.addContact(host, n.port)
317                 n = l[randrange(0, len(l))].node
318                 peer.addContact(host, n.port)
319         if pause:
320                 time.sleep(.33)
321         
322         time.sleep(10)
323         print "finding close nodes...."
324         
325         for peer in l:
326                 flag = threading.Event()
327                 def cb(nodes, f=flag):
328                         f.set()
329                 peer.findCloseNodes(cb)
330                 flag.wait()
331         
332         #    for peer in l:
333         #       peer.refreshTable()
334         return l
335         
336 def test_find_nodes(l, quiet=0):
337         import threading, sys
338         from whrandom import randrange
339         flag = threading.Event()
340         
341         n = len(l)
342         
343         a = l[randrange(0,n)]
344         b = l[randrange(0,n)]
345         
346         def callback(nodes, flag=flag, id = b.node.id):
347                 if (len(nodes) >0) and (nodes[0].id == id):
348                         print "test_find_nodes  PASSED"
349                 else:
350                         print "test_find_nodes  FAILED"
351                 flag.set()
352         a.findNode(b.node.id, callback)
353         flag.wait()
354     
355 def test_find_value(l, quiet=0):
356         from whrandom import randrange
357         from sha import sha
358         from hash import newID
359         import time, threading, sys
360         
361         fa = threading.Event()
362         fb = threading.Event()
363         fc = threading.Event()
364         
365         n = len(l)
366         a = l[randrange(0,n)]
367         b = l[randrange(0,n)]
368         c = l[randrange(0,n)]
369         d = l[randrange(0,n)]
370         
371         key = newID()
372         value = newID()
373         if not quiet:
374                 print "inserting value..."
375                 sys.stdout.flush()
376         a.storeValueForKey(key, value)
377         time.sleep(3)
378         if not quiet:
379                 print "finding..."
380                 sys.stdout.flush()
381         
382         class cb:
383                 def __init__(self, flag, value=value):
384                         self.flag = flag
385                         self.val = value
386                         self.found = 0
387                 def callback(self, values):
388                         try:
389                                 if(len(values) == 0):
390                                         if not self.found:
391                                                 print "find                NOT FOUND"
392                                         else:
393                                                 print "find                FOUND"
394                                         sys.stdout.flush()
395                                 else:
396                                         if self.val in values:
397                                                 self.found = 1
398                         finally:
399                                 self.flag.set()
400         
401         b.valueForKey(key, cb(fa).callback)
402         fa.wait()
403         c.valueForKey(key, cb(fb).callback)
404         fb.wait()
405         d.valueForKey(key, cb(fc).callback)    
406         fc.wait()
407     
408 def test_one(host, port, db='/tmp/test'):
409         import thread
410         k = Khashmir(host, port, db)
411         thread.start_new_thread(k.app.run, ())
412         return k
413     
414 if __name__ == "__main__":
415     import sys
416     n = 8
417     if len(sys.argv) > 1:
418                 n = int(sys.argv[1])
419     l = test_build_net(peers=n)
420     time.sleep(3)
421     print "finding nodes..."
422     for i in range(10):
423                 test_find_nodes(l)
424     print "inserting and fetching values..."
425     for i in range(10):
426                 test_find_value(l)