]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
Ignore the pyc and eclipse project files.
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from const import reactor
5 import const
6
7 import time
8
9 from sha import sha
10
11 from ktable import KTable, K
12 from knode import *
13
14 from khash import newID, newIDInRange
15
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.python import threadable
22 from twisted.application import service, internet
23 from twisted.web import server
24 threadable.init()
25 import sys
26
27 from random import randrange
28
29 import sqlite  ## find this at http://pysqlite.sourceforge.net/
30
31 class KhashmirDBExcept(Exception):
32     pass
33
34 # this is the base class, has base functionality and find node, no key-value mappings
35 class KhashmirBase(protocol.Factory):
36     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
37     _Node = KNodeBase
38     def __init__(self, host, port, db='khashmir.db'):
39         self.setup(host, port, db)
40         
41     def setup(self, host, port, db='khashmir.db'):
42         self._findDB(db)
43         self.port = port
44         self.node = self._loadSelfNode(host, port)
45         self.table = KTable(self.node)
46         #self.app = service.Application("krpc")
47         self.udp = krpc.hostbroker(self)
48         self.udp.protocol = krpc.KRPC
49         self.listenport = reactor.listenUDP(port, self.udp)
50         self.last = time.time()
51         self._loadRoutingTable()
52         KeyExpirer(store=self.store)
53         self.refreshTable(force=1)
54         reactor.callLater(60, self.checkpoint, (1,))
55
56     def Node(self):
57         n = self._Node()
58         n.table = self.table
59         return n
60     
61     def __del__(self):
62         self.listenport.stopListening()
63         
64     def _loadSelfNode(self, host, port):
65         c = self.store.cursor()
66         c.execute('select id from self where num = 0;')
67         if c.rowcount > 0:
68             id = c.fetchone()[0]
69         else:
70             id = newID()
71         return self._Node().init(id, host, port)
72         
73     def _saveSelfNode(self):
74         c = self.store.cursor()
75         c.execute('delete from self where num = 0;')
76         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
77         self.store.commit()
78         
79     def checkpoint(self, auto=0):
80         self._saveSelfNode()
81         self._dumpRoutingTable()
82         self.refreshTable()
83         if auto:
84             reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
85         
86     def _findDB(self, db):
87         import os
88         try:
89             os.stat(db)
90         except OSError:
91             self._createNewDB(db)
92         else:
93             self._loadDB(db)
94         
95     def _loadDB(self, db):
96         try:
97             self.store = sqlite.connect(db=db)
98             #self.store.autocommit = 0
99         except:
100             import traceback
101             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
102         
103     def _createNewDB(self, db):
104         self.store = sqlite.connect(db=db)
105         s = """
106             create table kv (key binary, value binary, time timestamp, primary key (key, value));
107             create index kv_key on kv(key);
108             create index kv_timestamp on kv(time);
109             
110             create table nodes (id binary primary key, host text, port number);
111             
112             create table self (num number primary key, id binary);
113             """
114         c = self.store.cursor()
115         c.execute(s)
116         self.store.commit()
117
118     def _dumpRoutingTable(self):
119         """
120             save routing table nodes to the database
121         """
122         c = self.store.cursor()
123         c.execute("delete from nodes where id not NULL;")
124         for bucket in self.table.buckets:
125             for node in bucket.l:
126                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
127         self.store.commit()
128         
129     def _loadRoutingTable(self):
130         """
131             load routing table nodes from database
132             it's usually a good idea to call refreshTable(force=1) after loading the table
133         """
134         c = self.store.cursor()
135         c.execute("select * from nodes;")
136         for rec in c.fetchall():
137             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
138             n.conn = self.udp.connectionForAddr((n.host, n.port))
139             self.table.insertNode(n, contacted=0)
140             
141
142     #######
143     #######  LOCAL INTERFACE    - use these methods!
144     def addContact(self, host, port, callback=None):
145         """
146             ping this node and add the contact info to the table on pong!
147         """
148         n =self.Node().init(const.NULL_ID, host, port) 
149         n.conn = self.udp.connectionForAddr((n.host, n.port))
150         self.sendPing(n, callback=callback)
151
152     ## this call is async!
153     def findNode(self, id, callback, errback=None):
154         """ returns the contact info for node, or the k closest nodes, from the global table """
155         # get K nodes out of local table/cache, or the node we want
156         nodes = self.table.findNodes(id)
157         d = Deferred()
158         if errback:
159             d.addCallbacks(callback, errback)
160         else:
161             d.addCallback(callback)
162         if len(nodes) == 1 and nodes[0].id == id :
163             d.callback(nodes)
164         else:
165             # create our search state
166             state = FindNode(self, id, d.callback)
167             reactor.callFromThread(state.goWithNodes, nodes)
168     
169     def insertNode(self, n, contacted=1):
170         """
171         insert a node in our local table, pinging oldest contact in bucket, if necessary
172         
173         If all you have is a host/port, then use addContact, which calls this method after
174         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
175         a node into the table without it's peer-ID.  That means of course the node passed into this
176         method needs to be a properly formed Node object with a valid ID.
177         """
178         old = self.table.insertNode(n, contacted=contacted)
179         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
180             # the bucket is full, check to see if old node is still around and if so, replace it
181             
182             ## these are the callbacks used when we ping the oldest node in a bucket
183             def _staleNodeHandler(oldnode=old, newnode = n):
184                 """ called if the pinged node never responds """
185                 self.table.replaceStaleNode(old, newnode)
186             
187             def _notStaleNodeHandler(dict, old=old):
188                 """ called when we get a pong from the old node """
189                 dict = dict['rsp']
190                 if dict['id'] == old.id:
191                     self.table.justSeenNode(old.id)
192             
193             df = old.ping(self.node.id)
194             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
195
196     def sendPing(self, node, callback=None):
197         """
198             ping a node
199         """
200         df = node.ping(self.node.id)
201         ## these are the callbacks we use when we issue a PING
202         def _pongHandler(dict, node=node, table=self.table, callback=callback):
203             _krpc_sender = dict['_krpc_sender']
204             dict = dict['rsp']
205             sender = {'id' : dict['id']}
206             sender['host'] = _krpc_sender[0]
207             sender['port'] = _krpc_sender[1]
208             n = self.Node().initWithDict(sender)
209             n.conn = self.udp.connectionForAddr((n.host, n.port))
210             table.insertNode(n)
211             if callback:
212                 callback()
213         def _defaultPong(err, node=node, table=self.table, callback=callback):
214             table.nodeFailed(node)
215             if callback:
216                 callback()
217         
218         df.addCallbacks(_pongHandler,_defaultPong)
219
220     def findCloseNodes(self, callback=lambda a: None):
221         """
222             This does a findNode on the ID one away from our own.  
223             This will allow us to populate our table with nodes on our network closest to our own.
224             This is called as soon as we start up with an empty table
225         """
226         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
227         self.findNode(id, callback)
228
229     def refreshTable(self, force=0):
230         """
231             force=1 will refresh table regardless of last bucket access time
232         """
233         def callback(nodes):
234             pass
235     
236         for bucket in self.table.buckets:
237             if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
238                 id = newIDInRange(bucket.min, bucket.max)
239                 self.findNode(id, callback)
240
241     def stats(self):
242         """
243         Returns (num_contacts, num_nodes)
244         num_contacts: number contacts in our routing table
245         num_nodes: number of nodes estimated in the entire dht
246         """
247         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
248         num_nodes = const.K * (2**(len(self.table.buckets) - 1))
249         return (num_contacts, num_nodes)
250
251     def krpc_ping(self, id, _krpc_sender):
252         sender = {'id' : id}
253         sender['host'] = _krpc_sender[0]
254         sender['port'] = _krpc_sender[1]        
255         n = self.Node().initWithDict(sender)
256         n.conn = self.udp.connectionForAddr((n.host, n.port))
257         self.insertNode(n, contacted=0)
258         return {"id" : self.node.id}
259         
260     def krpc_find_node(self, target, id, _krpc_sender):
261         nodes = self.table.findNodes(target)
262         nodes = map(lambda node: node.senderDict(), nodes)
263         sender = {'id' : id}
264         sender['host'] = _krpc_sender[0]
265         sender['port'] = _krpc_sender[1]        
266         n = self.Node().initWithDict(sender)
267         n.conn = self.udp.connectionForAddr((n.host, n.port))
268         self.insertNode(n, contacted=0)
269         return {"nodes" : nodes, "id" : self.node.id}
270
271
272 ## This class provides read-only access to the DHT, valueForKey
273 ## you probably want to use this mixin and provide your own write methods
274 class KhashmirRead(KhashmirBase):
275     _Node = KNodeRead
276     def retrieveValues(self, key):
277         c = self.store.cursor()
278         c.execute("select value from kv where key = %s;", sqlite.encode(key))
279         t = c.fetchone()
280         l = []
281         while t:
282             l.append(t['value'])
283             t = c.fetchone()
284         return l
285     ## also async
286     def valueForKey(self, key, callback, searchlocal = 1):
287         """ returns the values found for key in global table
288             callback will be called with a list of values for each peer that returns unique values
289             final callback will be an empty list - probably should change to 'more coming' arg
290         """
291         nodes = self.table.findNodes(key)
292         
293         # get locals
294         if searchlocal:
295             l = self.retrieveValues(key)
296             if len(l) > 0:
297                 reactor.callLater(0, callback, (l))
298         else:
299             l = []
300         
301         # create our search state
302         state = GetValue(self, key, callback)
303         reactor.callFromThread(state.goWithNodes, nodes, l)
304
305     def krpc_find_value(self, key, id, _krpc_sender):
306         sender = {'id' : id}
307         sender['host'] = _krpc_sender[0]
308         sender['port'] = _krpc_sender[1]        
309         n = self.Node().initWithDict(sender)
310         n.conn = self.udp.connectionForAddr((n.host, n.port))
311         self.insertNode(n, contacted=0)
312     
313         l = self.retrieveValues(key)
314         if len(l) > 0:
315             return {'values' : l, "id": self.node.id}
316         else:
317             nodes = self.table.findNodes(key)
318             nodes = map(lambda node: node.senderDict(), nodes)
319             return {'nodes' : nodes, "id": self.node.id}
320
321 ###  provides a generic write method, you probably don't want to deploy something that allows
322 ###  arbitrary value storage
323 class KhashmirWrite(KhashmirRead):
324     _Node = KNodeWrite
325     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
326     def storeValueForKey(self, key, value, callback=None):
327         """ stores the value for key in the global table, returns immediately, no status 
328             in this implementation, peers respond but don't indicate status to storing values
329             a key can have many values
330         """
331         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
332             if not response:
333                 # default callback
334                 def _storedValueHandler(sender):
335                     pass
336                 response=_storedValueHandler
337             action = StoreValue(self.table, key, value, response)
338             reactor.callFromThread(action.goWithNodes, nodes)
339             
340         # this call is asynch
341         self.findNode(key, _storeValueForKey)
342                     
343     def krpc_store_value(self, key, value, id, _krpc_sender):
344         t = "%0.6f" % time.time()
345         c = self.store.cursor()
346         try:
347             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
348         except sqlite.IntegrityError, reason:
349             # update last insert time
350             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
351         self.store.commit()
352         sender = {'id' : id}
353         sender['host'] = _krpc_sender[0]
354         sender['port'] = _krpc_sender[1]        
355         n = self.Node().initWithDict(sender)
356         n.conn = self.udp.connectionForAddr((n.host, n.port))
357         self.insertNode(n, contacted=0)
358         return {"id" : self.node.id}
359
360 # the whole shebang, for testing
361 class Khashmir(KhashmirWrite):
362     _Node = KNodeWrite