]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/khashmir.py
Move all the khashmir database operations to a separate module.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
index d1843292904962dad43de250d313b8fc395ec030..be60243dcd23bb26a0ed8831c16019eb5cc5adc6 100644 (file)
@@ -8,21 +8,18 @@ from time import time
 from random import randrange
 from sha import sha
 import os
-import sqlite  ## find this at http://pysqlite.sourceforge.net/
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
 from twisted.trial import unittest
 
+from db import DB
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
 from khash import newID, newIDInRange
 from actions import FindNode, GetValue, KeyExpirer, StoreValue
 import krpc
 
-class KhashmirDBExcept(Exception):
-    pass
-
 # this is the base class, has base functionality and find node, no key-value mappings
 class KhashmirBase(protocol.Factory):
     _Node = KNodeBase
@@ -33,7 +30,7 @@ class KhashmirBase(protocol.Factory):
     def setup(self, config, cache_dir):
         self.config = config
         self.port = config['PORT']
-        self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
+        self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         #self.app = service.Application("krpc")
@@ -55,80 +52,27 @@ class KhashmirBase(protocol.Factory):
         self.listenport.stopListening()
         
     def _loadSelfNode(self, host, port):
-        c = self.store.cursor()
-        c.execute('select id from self where num = 0;')
-        if c.rowcount > 0:
-            id = c.fetchone()[0]
-        else:
+        id = self.store.getSelfNode()
+        if not id:
             id = newID()
         return self._Node().init(id, host, port)
         
-    def _saveSelfNode(self):
-        c = self.store.cursor()
-        c.execute('delete from self where num = 0;')
-        c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
-        self.store.commit()
-        
     def checkpoint(self, auto=0):
-        self._saveSelfNode()
-        self._dumpRoutingTable()
+        self.store.saveSelfNode(self.node.id)
+        self.store.dumpRoutingTable(self.table.buckets)
         self.refreshTable()
         if auto:
             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
                               self.checkpoint, (1,))
         
-    def _findDB(self, db):
-        self.db = db
-        try:
-            os.stat(db)
-        except OSError:
-            self._createNewDB(db)
-        else:
-            self._loadDB(db)
-        
-    def _loadDB(self, db):
-        try:
-            self.store = sqlite.connect(db=db)
-            #self.store.autocommit = 0
-        except:
-            import traceback
-            raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
-        
-    def _createNewDB(self, db):
-        self.store = sqlite.connect(db=db)
-        s = """
-            create table kv (key binary, value binary, time timestamp, primary key (key, value));
-            create index kv_key on kv(key);
-            create index kv_timestamp on kv(time);
-            
-            create table nodes (id binary primary key, host text, port number);
-            
-            create table self (num number primary key, id binary);
-            """
-        c = self.store.cursor()
-        c.execute(s)
-        self.store.commit()
-
-    def _dumpRoutingTable(self):
-        """
-            save routing table nodes to the database
-        """
-        c = self.store.cursor()
-        c.execute("delete from nodes where id not NULL;")
-        for bucket in self.table.buckets:
-            for node in bucket.l:
-                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
-        self.store.commit()
-        
     def _loadRoutingTable(self):
         """
             load routing table nodes from database
             it's usually a good idea to call refreshTable(force=1) after loading the table
         """
-        c = self.store.cursor()
-        c.execute("select * from nodes;")
-        for rec in c.fetchall():
+        nodes = self.store.getRoutingTable()
+        for rec in nodes:
             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
             n.conn = self.udp.connectionForAddr((n.host, n.port))
             self.table.insertNode(n, contacted=0)
@@ -278,15 +222,7 @@ class KhashmirBase(protocol.Factory):
 ## you probably want to use this mixin and provide your own write methods
 class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
-    def retrieveValues(self, key):
-        c = self.store.cursor()
-        c.execute("select value from kv where key = %s;", sqlite.encode(key))
-        t = c.fetchone()
-        l = []
-        while t:
-            l.append(t['value'])
-            t = c.fetchone()
-        return l
+
     ## also async
     def valueForKey(self, key, callback, searchlocal = 1):
         """ returns the values found for key in global table
@@ -297,7 +233,7 @@ class KhashmirRead(KhashmirBase):
         
         # get locals
         if searchlocal:
-            l = self.retrieveValues(key)
+            l = self.store.retrieveValues(key)
             if len(l) > 0:
                 reactor.callLater(0, callback, key, l)
         else:
@@ -315,7 +251,7 @@ class KhashmirRead(KhashmirBase):
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
     
-        l = self.retrieveValues(key)
+        l = self.store.retrieveValues(key)
         if len(l) > 0:
             return {'values' : l, "id": self.node.id}
         else:
@@ -346,14 +282,7 @@ class KhashmirWrite(KhashmirRead):
         self.findNode(key, _storeValueForKey)
                     
     def krpc_store_value(self, key, value, id, _krpc_sender):
-        t = "%0.6f" % time()
-        c = self.store.cursor()
-        try:
-            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
-        except sqlite.IntegrityError, reason:
-            # update last insert time
-            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
-        self.store.commit()
+        self.store.storeValue(key, value)
         sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
@@ -388,8 +317,8 @@ class SimpleTests(unittest.TestCase):
     def tearDown(self):
         self.a.shutdown()
         self.b.shutdown()
-        os.unlink(self.a.db)
-        os.unlink(self.b.db)
+        os.unlink(self.a.store.db)
+        os.unlink(self.b.store.db)
 
     def testAddContact(self):
         self.assertEqual(len(self.a.table.buckets), 1)
@@ -485,7 +414,7 @@ class MultiTest(unittest.TestCase):
     def tearDown(self):
         for i in self.l:
             i.shutdown()
-            os.unlink(i.db)
+            os.unlink(i.store.db)
             
         reactor.iterate()