]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
Remove the obsolete whrandom module and use random instead.
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from const import reactor
5 import const
6
7 import time
8
9 from sha import sha
10
11 from ktable import KTable, K
12 from knode import *
13
14 from khash import newID, newIDInRange
15
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.application import service, internet
22 from twisted.web import server
23 import sys
24
25 from random import randrange
26
27 import sqlite  ## find this at http://pysqlite.sourceforge.net/
28
29 class KhashmirDBExcept(Exception):
30     pass
31
32 # this is the base class, has base functionality and find node, no key-value mappings
33 class KhashmirBase(protocol.Factory):
34     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
35     _Node = KNodeBase
36     def __init__(self, host, port, db='khashmir.db'):
37         self.setup(host, port, db)
38         
39     def setup(self, host, port, db='khashmir.db'):
40         self._findDB(db)
41         self.port = port
42         self.node = self._loadSelfNode(host, port)
43         self.table = KTable(self.node)
44         #self.app = service.Application("krpc")
45         self.udp = krpc.hostbroker(self)
46         self.udp.protocol = krpc.KRPC
47         self.listenport = reactor.listenUDP(port, self.udp)
48         self.last = time.time()
49         self._loadRoutingTable()
50         KeyExpirer(store=self.store)
51         self.refreshTable(force=1)
52         reactor.callLater(60, self.checkpoint, (1,))
53
54     def Node(self):
55         n = self._Node()
56         n.table = self.table
57         return n
58     
59     def __del__(self):
60         self.listenport.stopListening()
61         
62     def _loadSelfNode(self, host, port):
63         c = self.store.cursor()
64         c.execute('select id from self where num = 0;')
65         if c.rowcount > 0:
66             id = c.fetchone()[0]
67         else:
68             id = newID()
69         return self._Node().init(id, host, port)
70         
71     def _saveSelfNode(self):
72         c = self.store.cursor()
73         c.execute('delete from self where num = 0;')
74         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
75         self.store.commit()
76         
77     def checkpoint(self, auto=0):
78         self._saveSelfNode()
79         self._dumpRoutingTable()
80         self.refreshTable()
81         if auto:
82             reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
83         
84     def _findDB(self, db):
85         import os
86         try:
87             os.stat(db)
88         except OSError:
89             self._createNewDB(db)
90         else:
91             self._loadDB(db)
92         
93     def _loadDB(self, db):
94         try:
95             self.store = sqlite.connect(db=db)
96             #self.store.autocommit = 0
97         except:
98             import traceback
99             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
100         
101     def _createNewDB(self, db):
102         self.store = sqlite.connect(db=db)
103         s = """
104             create table kv (key binary, value binary, time timestamp, primary key (key, value));
105             create index kv_key on kv(key);
106             create index kv_timestamp on kv(time);
107             
108             create table nodes (id binary primary key, host text, port number);
109             
110             create table self (num number primary key, id binary);
111             """
112         c = self.store.cursor()
113         c.execute(s)
114         self.store.commit()
115
116     def _dumpRoutingTable(self):
117         """
118             save routing table nodes to the database
119         """
120         c = self.store.cursor()
121         c.execute("delete from nodes where id not NULL;")
122         for bucket in self.table.buckets:
123             for node in bucket.l:
124                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
125         self.store.commit()
126         
127     def _loadRoutingTable(self):
128         """
129             load routing table nodes from database
130             it's usually a good idea to call refreshTable(force=1) after loading the table
131         """
132         c = self.store.cursor()
133         c.execute("select * from nodes;")
134         for rec in c.fetchall():
135             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
136             n.conn = self.udp.connectionForAddr((n.host, n.port))
137             self.table.insertNode(n, contacted=0)
138             
139
140     #######
141     #######  LOCAL INTERFACE    - use these methods!
142     def addContact(self, host, port, callback=None):
143         """
144             ping this node and add the contact info to the table on pong!
145         """
146         n =self.Node().init(const.NULL_ID, host, port) 
147         n.conn = self.udp.connectionForAddr((n.host, n.port))
148         self.sendPing(n, callback=callback)
149
150     ## this call is async!
151     def findNode(self, id, callback, errback=None):
152         """ returns the contact info for node, or the k closest nodes, from the global table """
153         # get K nodes out of local table/cache, or the node we want
154         nodes = self.table.findNodes(id)
155         d = Deferred()
156         if errback:
157             d.addCallbacks(callback, errback)
158         else:
159             d.addCallback(callback)
160         if len(nodes) == 1 and nodes[0].id == id :
161             d.callback(nodes)
162         else:
163             # create our search state
164             state = FindNode(self, id, d.callback)
165             reactor.callLater(0, state.goWithNodes, nodes)
166     
167     def insertNode(self, n, contacted=1):
168         """
169         insert a node in our local table, pinging oldest contact in bucket, if necessary
170         
171         If all you have is a host/port, then use addContact, which calls this method after
172         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
173         a node into the table without it's peer-ID.  That means of course the node passed into this
174         method needs to be a properly formed Node object with a valid ID.
175         """
176         old = self.table.insertNode(n, contacted=contacted)
177         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
178             # the bucket is full, check to see if old node is still around and if so, replace it
179             
180             ## these are the callbacks used when we ping the oldest node in a bucket
181             def _staleNodeHandler(oldnode=old, newnode = n):
182                 """ called if the pinged node never responds """
183                 self.table.replaceStaleNode(old, newnode)
184             
185             def _notStaleNodeHandler(dict, old=old):
186                 """ called when we get a pong from the old node """
187                 dict = dict['rsp']
188                 if dict['id'] == old.id:
189                     self.table.justSeenNode(old.id)
190             
191             df = old.ping(self.node.id)
192             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
193
194     def sendPing(self, node, callback=None):
195         """
196             ping a node
197         """
198         df = node.ping(self.node.id)
199         ## these are the callbacks we use when we issue a PING
200         def _pongHandler(dict, node=node, table=self.table, callback=callback):
201             _krpc_sender = dict['_krpc_sender']
202             dict = dict['rsp']
203             sender = {'id' : dict['id']}
204             sender['host'] = _krpc_sender[0]
205             sender['port'] = _krpc_sender[1]
206             n = self.Node().initWithDict(sender)
207             n.conn = self.udp.connectionForAddr((n.host, n.port))
208             table.insertNode(n)
209             if callback:
210                 callback()
211         def _defaultPong(err, node=node, table=self.table, callback=callback):
212             table.nodeFailed(node)
213             if callback:
214                 callback()
215         
216         df.addCallbacks(_pongHandler,_defaultPong)
217
218     def findCloseNodes(self, callback=lambda a: None):
219         """
220             This does a findNode on the ID one away from our own.  
221             This will allow us to populate our table with nodes on our network closest to our own.
222             This is called as soon as we start up with an empty table
223         """
224         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
225         self.findNode(id, callback)
226
227     def refreshTable(self, force=0):
228         """
229             force=1 will refresh table regardless of last bucket access time
230         """
231         def callback(nodes):
232             pass
233     
234         for bucket in self.table.buckets:
235             if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
236                 id = newIDInRange(bucket.min, bucket.max)
237                 self.findNode(id, callback)
238
239     def stats(self):
240         """
241         Returns (num_contacts, num_nodes)
242         num_contacts: number contacts in our routing table
243         num_nodes: number of nodes estimated in the entire dht
244         """
245         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
246         num_nodes = const.K * (2**(len(self.table.buckets) - 1))
247         return (num_contacts, num_nodes)
248
249     def krpc_ping(self, id, _krpc_sender):
250         sender = {'id' : id}
251         sender['host'] = _krpc_sender[0]
252         sender['port'] = _krpc_sender[1]        
253         n = self.Node().initWithDict(sender)
254         n.conn = self.udp.connectionForAddr((n.host, n.port))
255         self.insertNode(n, contacted=0)
256         return {"id" : self.node.id}
257         
258     def krpc_find_node(self, target, id, _krpc_sender):
259         nodes = self.table.findNodes(target)
260         nodes = map(lambda node: node.senderDict(), nodes)
261         sender = {'id' : id}
262         sender['host'] = _krpc_sender[0]
263         sender['port'] = _krpc_sender[1]        
264         n = self.Node().initWithDict(sender)
265         n.conn = self.udp.connectionForAddr((n.host, n.port))
266         self.insertNode(n, contacted=0)
267         return {"nodes" : nodes, "id" : self.node.id}
268
269
270 ## This class provides read-only access to the DHT, valueForKey
271 ## you probably want to use this mixin and provide your own write methods
272 class KhashmirRead(KhashmirBase):
273     _Node = KNodeRead
274     def retrieveValues(self, key):
275         c = self.store.cursor()
276         c.execute("select value from kv where key = %s;", sqlite.encode(key))
277         t = c.fetchone()
278         l = []
279         while t:
280             l.append(t['value'])
281             t = c.fetchone()
282         return l
283     ## also async
284     def valueForKey(self, key, callback, searchlocal = 1):
285         """ returns the values found for key in global table
286             callback will be called with a list of values for each peer that returns unique values
287             final callback will be an empty list - probably should change to 'more coming' arg
288         """
289         nodes = self.table.findNodes(key)
290         
291         # get locals
292         if searchlocal:
293             l = self.retrieveValues(key)
294             if len(l) > 0:
295                 reactor.callLater(0, callback, (l))
296         else:
297             l = []
298         
299         # create our search state
300         state = GetValue(self, key, callback)
301         reactor.callLater(0, state.goWithNodes, nodes, l)
302
303     def krpc_find_value(self, key, id, _krpc_sender):
304         sender = {'id' : id}
305         sender['host'] = _krpc_sender[0]
306         sender['port'] = _krpc_sender[1]        
307         n = self.Node().initWithDict(sender)
308         n.conn = self.udp.connectionForAddr((n.host, n.port))
309         self.insertNode(n, contacted=0)
310     
311         l = self.retrieveValues(key)
312         if len(l) > 0:
313             return {'values' : l, "id": self.node.id}
314         else:
315             nodes = self.table.findNodes(key)
316             nodes = map(lambda node: node.senderDict(), nodes)
317             return {'nodes' : nodes, "id": self.node.id}
318
319 ###  provides a generic write method, you probably don't want to deploy something that allows
320 ###  arbitrary value storage
321 class KhashmirWrite(KhashmirRead):
322     _Node = KNodeWrite
323     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
324     def storeValueForKey(self, key, value, callback=None):
325         """ stores the value for key in the global table, returns immediately, no status 
326             in this implementation, peers respond but don't indicate status to storing values
327             a key can have many values
328         """
329         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
330             if not response:
331                 # default callback
332                 def _storedValueHandler(sender):
333                     pass
334                 response=_storedValueHandler
335             action = StoreValue(self.table, key, value, response)
336             reactor.callLater(action.goWithNodes, nodes)
337             
338         # this call is asynch
339         self.findNode(key, _storeValueForKey)
340                     
341     def krpc_store_value(self, key, value, id, _krpc_sender):
342         t = "%0.6f" % time.time()
343         c = self.store.cursor()
344         try:
345             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
346         except sqlite.IntegrityError, reason:
347             # update last insert time
348             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
349         self.store.commit()
350         sender = {'id' : id}
351         sender['host'] = _krpc_sender[0]
352         sender['port'] = _krpc_sender[1]        
353         n = self.Node().initWithDict(sender)
354         n.conn = self.udp.connectionForAddr((n.host, n.port))
355         self.insertNode(n, contacted=0)
356         return {"id" : self.node.id}
357
358 # the whole shebang, for testing
359 class Khashmir(KhashmirWrite):
360     _Node = KNodeWrite