Modify khashmir's config system to not use the const module.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from time import time
5 from random import randrange
6 import os
7 import sqlite  ## find this at http://pysqlite.sourceforge.net/
8
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol
11 from twisted.internet import reactor
12
13 from ktable import KTable
14 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
15 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18
19 class KhashmirDBExcept(Exception):
20     pass
21
22 # this is the base class, has base functionality and find node, no key-value mappings
23 class KhashmirBase(protocol.Factory):
24     _Node = KNodeBase
25     def __init__(self, config, cache_dir='/tmp'):
26         self.config = None
27         self.setup(config, cache_dir)
28         
29     def setup(self, config, cache_dir):
30         self.config = config
31         self._findDB(os.path.join(cache_dir, 'khashmir.db'))
32         self.port = config['PORT']
33         self.node = self._loadSelfNode('', self.port)
34         self.table = KTable(self.node, config)
35         #self.app = service.Application("krpc")
36         self.udp = krpc.hostbroker(self)
37         self.udp.protocol = krpc.KRPC
38         self.listenport = reactor.listenUDP(port, self.udp)
39         self.last = time()
40         self._loadRoutingTable()
41         KeyExpirer(self.store, config)
42         self.refreshTable(force=1)
43         reactor.callLater(60, self.checkpoint, (1,))
44
45     def Node(self):
46         n = self._Node()
47         n.table = self.table
48         return n
49     
50     def __del__(self):
51         self.listenport.stopListening()
52         
53     def _loadSelfNode(self, host, port):
54         c = self.store.cursor()
55         c.execute('select id from self where num = 0;')
56         if c.rowcount > 0:
57             id = c.fetchone()[0]
58         else:
59             id = newID()
60         return self._Node().init(id, host, port)
61         
62     def _saveSelfNode(self):
63         c = self.store.cursor()
64         c.execute('delete from self where num = 0;')
65         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
66         self.store.commit()
67         
68     def checkpoint(self, auto=0):
69         self._saveSelfNode()
70         self._dumpRoutingTable()
71         self.refreshTable()
72         if auto:
73             reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
74                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
75                               self.checkpoint, (1,))
76         
77     def _findDB(self, db):
78         try:
79             os.stat(db)
80         except OSError:
81             self._createNewDB(db)
82         else:
83             self._loadDB(db)
84         
85     def _loadDB(self, db):
86         try:
87             self.store = sqlite.connect(db=db)
88             #self.store.autocommit = 0
89         except:
90             import traceback
91             raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
92         
93     def _createNewDB(self, db):
94         self.store = sqlite.connect(db=db)
95         s = """
96             create table kv (key binary, value binary, time timestamp, primary key (key, value));
97             create index kv_key on kv(key);
98             create index kv_timestamp on kv(time);
99             
100             create table nodes (id binary primary key, host text, port number);
101             
102             create table self (num number primary key, id binary);
103             """
104         c = self.store.cursor()
105         c.execute(s)
106         self.store.commit()
107
108     def _dumpRoutingTable(self):
109         """
110             save routing table nodes to the database
111         """
112         c = self.store.cursor()
113         c.execute("delete from nodes where id not NULL;")
114         for bucket in self.table.buckets:
115             for node in bucket.l:
116                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
117         self.store.commit()
118         
119     def _loadRoutingTable(self):
120         """
121             load routing table nodes from database
122             it's usually a good idea to call refreshTable(force=1) after loading the table
123         """
124         c = self.store.cursor()
125         c.execute("select * from nodes;")
126         for rec in c.fetchall():
127             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
128             n.conn = self.udp.connectionForAddr((n.host, n.port))
129             self.table.insertNode(n, contacted=0)
130             
131
132     #######
133     #######  LOCAL INTERFACE    - use these methods!
134     def addContact(self, host, port, callback=None):
135         """
136             ping this node and add the contact info to the table on pong!
137         """
138         n =self.Node().init(NULL_ID, host, port) 
139         n.conn = self.udp.connectionForAddr((n.host, n.port))
140         self.sendPing(n, callback=callback)
141
142     ## this call is async!
143     def findNode(self, id, callback, errback=None):
144         """ returns the contact info for node, or the k closest nodes, from the global table """
145         # get K nodes out of local table/cache, or the node we want
146         nodes = self.table.findNodes(id)
147         d = Deferred()
148         if errback:
149             d.addCallbacks(callback, errback)
150         else:
151             d.addCallback(callback)
152         if len(nodes) == 1 and nodes[0].id == id :
153             d.callback(nodes)
154         else:
155             # create our search state
156             state = FindNode(self, id, d.callback, self.config)
157             reactor.callLater(0, state.goWithNodes, nodes)
158     
159     def insertNode(self, n, contacted=1):
160         """
161         insert a node in our local table, pinging oldest contact in bucket, if necessary
162         
163         If all you have is a host/port, then use addContact, which calls this method after
164         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
165         a node into the table without it's peer-ID.  That means of course the node passed into this
166         method needs to be a properly formed Node object with a valid ID.
167         """
168         old = self.table.insertNode(n, contacted=contacted)
169         if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
170             # the bucket is full, check to see if old node is still around and if so, replace it
171             
172             ## these are the callbacks used when we ping the oldest node in a bucket
173             def _staleNodeHandler(oldnode=old, newnode = n):
174                 """ called if the pinged node never responds """
175                 self.table.replaceStaleNode(old, newnode)
176             
177             def _notStaleNodeHandler(dict, old=old):
178                 """ called when we get a pong from the old node """
179                 dict = dict['rsp']
180                 if dict['id'] == old.id:
181                     self.table.justSeenNode(old.id)
182             
183             df = old.ping(self.node.id)
184             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
185
186     def sendPing(self, node, callback=None):
187         """
188             ping a node
189         """
190         df = node.ping(self.node.id)
191         ## these are the callbacks we use when we issue a PING
192         def _pongHandler(dict, node=node, table=self.table, callback=callback):
193             _krpc_sender = dict['_krpc_sender']
194             dict = dict['rsp']
195             sender = {'id' : dict['id']}
196             sender['host'] = _krpc_sender[0]
197             sender['port'] = _krpc_sender[1]
198             n = self.Node().initWithDict(sender)
199             n.conn = self.udp.connectionForAddr((n.host, n.port))
200             table.insertNode(n)
201             if callback:
202                 callback()
203         def _defaultPong(err, node=node, table=self.table, callback=callback):
204             table.nodeFailed(node)
205             if callback:
206                 callback()
207         
208         df.addCallbacks(_pongHandler,_defaultPong)
209
210     def findCloseNodes(self, callback=lambda a: None):
211         """
212             This does a findNode on the ID one away from our own.  
213             This will allow us to populate our table with nodes on our network closest to our own.
214             This is called as soon as we start up with an empty table
215         """
216         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
217         self.findNode(id, callback)
218
219     def refreshTable(self, force=0):
220         """
221             force=1 will refresh table regardless of last bucket access time
222         """
223         def callback(nodes):
224             pass
225     
226         for bucket in self.table.buckets:
227             if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
228                 id = newIDInRange(bucket.min, bucket.max)
229                 self.findNode(id, callback)
230
231     def stats(self):
232         """
233         Returns (num_contacts, num_nodes)
234         num_contacts: number contacts in our routing table
235         num_nodes: number of nodes estimated in the entire dht
236         """
237         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
238         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
239         return (num_contacts, num_nodes)
240
241     def krpc_ping(self, id, _krpc_sender):
242         sender = {'id' : id}
243         sender['host'] = _krpc_sender[0]
244         sender['port'] = _krpc_sender[1]        
245         n = self.Node().initWithDict(sender)
246         n.conn = self.udp.connectionForAddr((n.host, n.port))
247         self.insertNode(n, contacted=0)
248         return {"id" : self.node.id}
249         
250     def krpc_find_node(self, target, id, _krpc_sender):
251         nodes = self.table.findNodes(target)
252         nodes = map(lambda node: node.senderDict(), nodes)
253         sender = {'id' : id}
254         sender['host'] = _krpc_sender[0]
255         sender['port'] = _krpc_sender[1]        
256         n = self.Node().initWithDict(sender)
257         n.conn = self.udp.connectionForAddr((n.host, n.port))
258         self.insertNode(n, contacted=0)
259         return {"nodes" : nodes, "id" : self.node.id}
260
261
262 ## This class provides read-only access to the DHT, valueForKey
263 ## you probably want to use this mixin and provide your own write methods
264 class KhashmirRead(KhashmirBase):
265     _Node = KNodeRead
266     def retrieveValues(self, key):
267         c = self.store.cursor()
268         c.execute("select value from kv where key = %s;", sqlite.encode(key))
269         t = c.fetchone()
270         l = []
271         while t:
272             l.append(t['value'])
273             t = c.fetchone()
274         return l
275     ## also async
276     def valueForKey(self, key, callback, searchlocal = 1):
277         """ returns the values found for key in global table
278             callback will be called with a list of values for each peer that returns unique values
279             final callback will be an empty list - probably should change to 'more coming' arg
280         """
281         nodes = self.table.findNodes(key)
282         
283         # get locals
284         if searchlocal:
285             l = self.retrieveValues(key)
286             if len(l) > 0:
287                 reactor.callLater(0, callback, (l))
288         else:
289             l = []
290         
291         # create our search state
292         state = GetValue(self, key, callback, self.config)
293         reactor.callLater(0, state.goWithNodes, nodes, l)
294
295     def krpc_find_value(self, key, id, _krpc_sender):
296         sender = {'id' : id}
297         sender['host'] = _krpc_sender[0]
298         sender['port'] = _krpc_sender[1]        
299         n = self.Node().initWithDict(sender)
300         n.conn = self.udp.connectionForAddr((n.host, n.port))
301         self.insertNode(n, contacted=0)
302     
303         l = self.retrieveValues(key)
304         if len(l) > 0:
305             return {'values' : l, "id": self.node.id}
306         else:
307             nodes = self.table.findNodes(key)
308             nodes = map(lambda node: node.senderDict(), nodes)
309             return {'nodes' : nodes, "id": self.node.id}
310
311 ###  provides a generic write method, you probably don't want to deploy something that allows
312 ###  arbitrary value storage
313 class KhashmirWrite(KhashmirRead):
314     _Node = KNodeWrite
315     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
316     def storeValueForKey(self, key, value, callback=None):
317         """ stores the value for key in the global table, returns immediately, no status 
318             in this implementation, peers respond but don't indicate status to storing values
319             a key can have many values
320         """
321         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
322             if not response:
323                 # default callback
324                 def _storedValueHandler(sender):
325                     pass
326                 response=_storedValueHandler
327             action = StoreValue(self.table, key, value, response, self.config)
328             reactor.callLater(0, action.goWithNodes, nodes)
329             
330         # this call is asynch
331         self.findNode(key, _storeValueForKey)
332                     
333     def krpc_store_value(self, key, value, id, _krpc_sender):
334         t = "%0.6f" % time()
335         c = self.store.cursor()
336         try:
337             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
338         except sqlite.IntegrityError, reason:
339             # update last insert time
340             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
341         self.store.commit()
342         sender = {'id' : id}
343         sender['host'] = _krpc_sender[0]
344         sender['port'] = _krpc_sender[1]        
345         n = self.Node().initWithDict(sender)
346         n.conn = self.udp.connectionForAddr((n.host, n.port))
347         self.insertNode(n, contacted=0)
348         return {"id" : self.node.id}
349
350 # the whole shebang, for testing
351 class Khashmir(KhashmirWrite):
352     _Node = KNodeWrite