]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
only send the ID along in khashmir messages, don't send the host and
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from const import reactor
5 import const
6
7 import time
8
9 from sha import sha
10
11 from ktable import KTable, K
12 from knode import KNode as Node
13
14 from khash import newID, newIDInRange
15
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.python import threadable
22 from twisted.application import service, internet
23 from twisted.web import server
24 threadable.init()
25 import sys
26
27 import sqlite  ## find this at http://pysqlite.sourceforge.net/
28
29 class KhashmirDBExcept(Exception):
30     pass
31
32 # this is the main class!
33 class Khashmir(protocol.Factory):
34     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
35     def __init__(self, host, port, db='khashmir.db'):
36         self.setup(host, port, db)
37         
38     def setup(self, host, port, db='khashmir.db'):
39         self._findDB(db)
40         self.port = port
41         self.node = self._loadSelfNode(host, port)
42         self.table = KTable(self.node)
43         self.app = service.Application("krpc")
44         self.udp = krpc.hostbroker(self)
45         self.udp.protocol = krpc.KRPC
46         self.listenport = reactor.listenUDP(port, self.udp)
47         self.last = time.time()
48         self._loadRoutingTable()
49         KeyExpirer(store=self.store)
50         #self.refreshTable(force=1)
51         reactor.callLater(60, self.checkpoint, (1,))
52         
53     def __del__(self):
54         self.listenport.stopListening()
55         
56     def _loadSelfNode(self, host, port):
57         c = self.store.cursor()
58         c.execute('select id from self where num = 0;')
59         if c.rowcount > 0:
60             id = c.fetchone()[0]
61         else:
62             id = newID()
63         return Node().init(id, host, port)
64         
65     def _saveSelfNode(self):
66         self.store.autocommit = 0
67         c = self.store.cursor()
68         c.execute('delete from self where num = 0;')
69         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
70         self.store.commit()
71         self.store.autocommit = 1
72         
73     def checkpoint(self, auto=0):
74         self._saveSelfNode()
75         self._dumpRoutingTable()
76         if auto:
77             reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
78         
79     def _findDB(self, db):
80         import os
81         try:
82             os.stat(db)
83         except OSError:
84             self._createNewDB(db)
85         else:
86             self._loadDB(db)
87         
88     def _loadDB(self, db):
89         try:
90             self.store = sqlite.connect(db=db)
91             self.store.autocommit = 1
92         except:
93             import traceback
94             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
95         
96     def _createNewDB(self, db):
97         self.store = sqlite.connect(db=db)
98         self.store.autocommit = 1
99         s = """
100             create table kv (key binary, value binary, time timestamp, primary key (key, value));
101             create index kv_key on kv(key);
102             create index kv_timestamp on kv(time);
103             
104             create table nodes (id binary primary key, host text, port number);
105             
106             create table self (num number primary key, id binary);
107             """
108         c = self.store.cursor()
109         c.execute(s)
110
111     def _dumpRoutingTable(self):
112         """
113             save routing table nodes to the database
114         """
115         self.store.autocommit = 0;
116         c = self.store.cursor()
117         c.execute("delete from nodes where id not NULL;")
118         for bucket in self.table.buckets:
119             for node in bucket.l:
120                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
121         self.store.commit()
122         self.store.autocommit = 1;
123         
124     def _loadRoutingTable(self):
125         """
126             load routing table nodes from database
127             it's usually a good idea to call refreshTable(force=1) after loading the table
128         """
129         c = self.store.cursor()
130         c.execute("select * from nodes;")
131         for rec in c.fetchall():
132             n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
133             n.conn = self.udp.connectionForAddr((n.host, n.port))
134             self.table.insertNode(n, contacted=0)
135             
136
137     #######
138     #######  LOCAL INTERFACE    - use these methods!
139     def addContact(self, host, port, callback=None):
140         """
141             ping this node and add the contact info to the table on pong!
142         """
143         n =Node().init(const.NULL_ID, host, port) 
144         n.conn = self.udp.connectionForAddr((n.host, n.port))
145         self.sendPing(n, callback=callback)
146
147     ## this call is async!
148     def findNode(self, id, callback, errback=None):
149         """ returns the contact info for node, or the k closest nodes, from the global table """
150         # get K nodes out of local table/cache, or the node we want
151         nodes = self.table.findNodes(id)
152         d = Deferred()
153         if errback:
154             d.addCallbacks(callback, errback)
155         else:
156             d.addCallback(callback)
157         if len(nodes) == 1 and nodes[0].id == id :
158             d.callback(nodes)
159         else:
160             # create our search state
161             state = FindNode(self, id, d.callback)
162             reactor.callFromThread(state.goWithNodes, nodes)
163     
164     
165     ## also async
166     def valueForKey(self, key, callback, searchlocal = 1):
167         """ returns the values found for key in global table
168             callback will be called with a list of values for each peer that returns unique values
169             final callback will be an empty list - probably should change to 'more coming' arg
170         """
171         nodes = self.table.findNodes(key)
172         
173         # get locals
174         if searchlocal:
175             l = self.retrieveValues(key)
176             if len(l) > 0:
177                 reactor.callLater(0, callback, (l))
178         else:
179             l = []
180         
181         # create our search state
182         state = GetValue(self, key, callback)
183         reactor.callFromThread(state.goWithNodes, nodes, l)
184
185     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
186     def storeValueForKey(self, key, value, callback=None):
187         """ stores the value for key in the global table, returns immediately, no status 
188             in this implementation, peers respond but don't indicate status to storing values
189             a key can have many values
190         """
191         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
192             if not response:
193                 # default callback
194                 def _storedValueHandler(sender):
195                     pass
196                 response=_storedValueHandler
197             action = StoreValue(self.table, key, value, response)
198             reactor.callFromThread(action.goWithNodes, nodes)
199             
200         # this call is asynch
201         self.findNode(key, _storeValueForKey)
202         
203     
204     def insertNode(self, n, contacted=1):
205         """
206         insert a node in our local table, pinging oldest contact in bucket, if necessary
207         
208         If all you have is a host/port, then use addContact, which calls this method after
209         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
210         a node into the table without it's peer-ID.  That means of course the node passed into this
211         method needs to be a properly formed Node object with a valid ID.
212         """
213         old = self.table.insertNode(n, contacted=contacted)
214         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
215             # the bucket is full, check to see if old node is still around and if so, replace it
216             
217             ## these are the callbacks used when we ping the oldest node in a bucket
218             def _staleNodeHandler(oldnode=old, newnode = n):
219                 """ called if the pinged node never responds """
220                 self.table.replaceStaleNode(old, newnode)
221             
222             def _notStaleNodeHandler(dict, old=old):
223                 """ called when we get a pong from the old node """
224                 dict = dict['rsp']
225                 if dict['id'] == old.id:
226                     self.table.justSeenNode(old.id)
227             
228             df = old.ping(self.node.id)
229             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
230
231     def sendPing(self, node, callback=None):
232         """
233             ping a node
234         """
235         df = node.ping(self.node.id)
236         ## these are the callbacks we use when we issue a PING
237         def _pongHandler(dict, node=node, table=self.table, callback=callback):
238             _krpc_sender = dict['_krpc_sender']
239             dict = dict['rsp']
240             sender = {'id' : dict['id']}
241             if node.id != const.NULL_ID and node.id != sender['id']:
242                 # whoah, got response from different peer than we were expecting
243                 self.table.invalidateNode(node)
244             else:
245                 sender['host'] = _krpc_sender[0]
246                 sender['port'] = _krpc_sender[1]
247                 n = Node().initWithDict(sender)
248                 n.conn = self.udp.connectionForAddr((n.host, n.port))
249                 table.insertNode(n)
250                 if callback:
251                     callback()
252         def _defaultPong(err, node=node, table=self.table, callback=callback):
253             table.nodeFailed(node)
254             if callback:
255                 callback()
256         
257         df.addCallbacks(_pongHandler,_defaultPong)
258
259     def findCloseNodes(self, callback=lambda a: None):
260         """
261             This does a findNode on the ID one away from our own.  
262             This will allow us to populate our table with nodes on our network closest to our own.
263             This is called as soon as we start up with an empty table
264         """
265         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
266         self.findNode(id, callback)
267
268     def refreshTable(self, force=0):
269         """
270             force=1 will refresh table regardless of last bucket access time
271         """
272         def callback(nodes):
273             pass
274     
275         for bucket in self.table.buckets:
276             if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
277                 id = newIDInRange(bucket.min, bucket.max)
278                 self.findNode(id, callback)
279
280
281     def retrieveValues(self, key):
282         c = self.store.cursor()
283         c.execute("select value from kv where key = %s;", sqlite.encode(key))
284         t = c.fetchone()
285         l = []
286         while t:
287             l.append(t['value'])
288             t = c.fetchone()
289         return l
290     
291     #####
292     ##### INCOMING MESSAGE HANDLERS
293     
294     def krpc_ping(self, id, _krpc_sender):
295         sender = {'id' : id}
296         sender['host'] = _krpc_sender[0]
297         sender['port'] = _krpc_sender[1]        
298         n = Node().initWithDict(sender)
299         n.conn = self.udp.connectionForAddr((n.host, n.port))
300         self.insertNode(n, contacted=0)
301         return {"id" : self.node.id}
302         
303     def krpc_find_node(self, target, id, _krpc_sender):
304         nodes = self.table.findNodes(target)
305         nodes = map(lambda node: node.senderDict(), nodes)
306         sender = {'id' : id}
307         sender['host'] = _krpc_sender[0]
308         sender['port'] = _krpc_sender[1]        
309         n = Node().initWithDict(sender)
310         n.conn = self.udp.connectionForAddr((n.host, n.port))
311         self.insertNode(n, contacted=0)
312         return {"nodes" : nodes, "id" : self.node.id}
313             
314     def krpc_store_value(self, key, value, id, _krpc_sender):
315         t = "%0.6f" % time.time()
316         c = self.store.cursor()
317         try:
318             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
319         except sqlite.IntegrityError, reason:
320             # update last insert time
321             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
322         sender = {'id' : id}
323         sender['host'] = _krpc_sender[0]
324         sender['port'] = _krpc_sender[1]        
325         n = Node().initWithDict(sender)
326         n.conn = self.udp.connectionForAddr((n.host, n.port))
327         self.insertNode(n, contacted=0)
328         return {"id" : self.node.id}
329     
330     def krpc_find_value(self, key, id, _krpc_sender):
331         sender = {'id' : id}
332         sender['host'] = _krpc_sender[0]
333         sender['port'] = _krpc_sender[1]        
334         n = Node().initWithDict(sender)
335         n.conn = self.udp.connectionForAddr((n.host, n.port))
336         self.insertNode(n, contacted=0)
337     
338         l = self.retrieveValues(key)
339         if len(l) > 0:
340             return {'values' : l, "id": self.node.id}
341         else:
342             nodes = self.table.findNodes(key)
343             nodes = map(lambda node: node.senderDict(), nodes)
344             return {'nodes' : nodes, "id": self.node.id}
345