]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
Move all the khashmir database operations to a separate module.
authorCameron Dale <camrdale@gmail.com>
Tue, 8 Jan 2008 20:33:18 +0000 (12:33 -0800)
committerCameron Dale <camrdale@gmail.com>
Tue, 8 Jan 2008 20:33:18 +0000 (12:33 -0800)
apt_dht_Khashmir/DHT.py
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/db.py [new file with mode: 0644]
apt_dht_Khashmir/khashmir.py

index 42e2e9afd4489f33a1ddc385f1938b18301f5ec4..3d6c9f931f5aa2582b7d880b50bfdc0123cb89c4 100644 (file)
@@ -253,12 +253,12 @@ class TestSimpleDHT(unittest.TestCase):
     def tearDown(self):
         self.a.leave()
         try:
-            os.unlink(self.a.khashmir.db)
+            os.unlink(self.a.khashmir.store.db)
         except:
             pass
         self.b.leave()
         try:
-            os.unlink(self.b.khashmir.db)
+            os.unlink(self.b.khashmir.store.db)
         except:
             pass
 
@@ -322,7 +322,7 @@ class TestMultiDHT(unittest.TestCase):
     def get_values(self):
         self.checked = 0
         for i in range(len(self.l)):
-            for j in random.sample(xrange(len(self.l)), 4):
+            for j in random.sample(xrange(len(self.l)), max(len(self.l), 4)):
                 self.checked += 1
                 d = self.l[i].getValue(sha.new(str(self.startport+j)).digest())
                 check = []
@@ -349,6 +349,6 @@ class TestMultiDHT(unittest.TestCase):
         for i in self.l:
             try:
                 i.leave()
-                os.unlink(i.khashmir.db)
+                os.unlink(i.khashmir.store.db)
             except:
                 pass
index 7f8f91128b3e7360ce0cb6811a5a7f06f3b675c2..4214e4eb932580230aa0552a5270d86eba10b124 100644 (file)
@@ -263,13 +263,7 @@ class KeyExpirer:
         self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
     
     def doExpire(self):
-        self.cut = "%0.6f" % (time() - self.config['KE_AGE'])
-        self._expire()
-    
-    def _expire(self):
-        c = self.store.cursor()
-        s = "delete from kv where time < '%s';" % self.cut
-        c.execute(s)
+        self.store.expireValues(time() - self.config['KE_AGE'])
         self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
         
     def shutdown(self):
diff --git a/apt_dht_Khashmir/db.py b/apt_dht_Khashmir/db.py
new file mode 100644 (file)
index 0000000..c29abd8
--- /dev/null
@@ -0,0 +1,107 @@
+
+from time import time
+import sqlite  ## find this at http://pysqlite.sourceforge.net/
+import os
+
+class DBExcept(Exception):
+    pass
+
+class DB:
+    """Database access for storing persistent data."""
+    
+    def __init__(self, db):
+        self.db = db
+        try:
+            os.stat(db)
+        except OSError:
+            self._createNewDB(db)
+        else:
+            self._loadDB(db)
+        
+    def _loadDB(self, db):
+        try:
+            self.store = sqlite.connect(db=db)
+            #self.store.autocommit = 0
+        except:
+            import traceback
+            raise DBExcept, "Couldn't open DB", traceback.format_exc()
+        
+    def _createNewDB(self, db):
+        self.store = sqlite.connect(db=db)
+        s = """
+            create table kv (key binary, value binary, time timestamp, primary key (key, value));
+            create index kv_key on kv(key);
+            create index kv_timestamp on kv(time);
+            
+            create table nodes (id binary primary key, host text, port number);
+            
+            create table self (num number primary key, id binary);
+            """
+        c = self.store.cursor()
+        c.execute(s)
+        self.store.commit()
+
+    def getSelfNode(self):
+        c = self.store.cursor()
+        c.execute('select id from self where num = 0;')
+        if c.rowcount > 0:
+            return c.fetchone()[0]
+        else:
+            return None
+        
+    def saveSelfNode(self, id):
+        c = self.store.cursor()
+        c.execute('delete from self where num = 0;')
+        c.execute("insert into self values (0, %s);", sqlite.encode(id))
+        self.store.commit()
+        
+    def dumpRoutingTable(self, buckets):
+        """
+            save routing table nodes to the database
+        """
+        c = self.store.cursor()
+        c.execute("delete from nodes where id not NULL;")
+        for bucket in buckets:
+            for node in bucket.l:
+                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
+        self.store.commit()
+        
+    def getRoutingTable(self):
+        """
+            load routing table nodes from database
+            it's usually a good idea to call refreshTable(force=1) after loading the table
+        """
+        c = self.store.cursor()
+        c.execute("select * from nodes;")
+        return c.fetchall()
+            
+    def retrieveValues(self, key):
+        c = self.store.cursor()
+        c.execute("select value from kv where key = %s;", sqlite.encode(key))
+        t = c.fetchone()
+        l = []
+        while t:
+            l.append(t['value'])
+            t = c.fetchone()
+        return l
+
+    def storeValue(self, key, value):
+        """Store or update a key and value."""
+        t = "%0.6f" % time()
+        c = self.store.cursor()
+        try:
+            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+        except sqlite.IntegrityError, reason:
+            # update last insert time
+            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+        self.store.commit()
+
+    def expireValues(self, expireTime):
+        """Expire older values than expireTime."""
+        t = "%0.6f" % expireTime
+        c = self.store.cursor()
+        s = "delete from kv where time < '%s';" % t
+        c.execute(s)
+        
+    def close(self):
+        self.store.close()
index d1843292904962dad43de250d313b8fc395ec030..be60243dcd23bb26a0ed8831c16019eb5cc5adc6 100644 (file)
@@ -8,21 +8,18 @@ from time import time
 from random import randrange
 from sha import sha
 import os
-import sqlite  ## find this at http://pysqlite.sourceforge.net/
 
 from twisted.internet.defer import Deferred
 from twisted.internet import protocol, reactor
 from twisted.trial import unittest
 
+from db import DB
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
 from khash import newID, newIDInRange
 from actions import FindNode, GetValue, KeyExpirer, StoreValue
 import krpc
 
-class KhashmirDBExcept(Exception):
-    pass
-
 # this is the base class, has base functionality and find node, no key-value mappings
 class KhashmirBase(protocol.Factory):
     _Node = KNodeBase
@@ -33,7 +30,7 @@ class KhashmirBase(protocol.Factory):
     def setup(self, config, cache_dir):
         self.config = config
         self.port = config['PORT']
-        self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
+        self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         #self.app = service.Application("krpc")
@@ -55,80 +52,27 @@ class KhashmirBase(protocol.Factory):
         self.listenport.stopListening()
         
     def _loadSelfNode(self, host, port):
-        c = self.store.cursor()
-        c.execute('select id from self where num = 0;')
-        if c.rowcount > 0:
-            id = c.fetchone()[0]
-        else:
+        id = self.store.getSelfNode()
+        if not id:
             id = newID()
         return self._Node().init(id, host, port)
         
-    def _saveSelfNode(self):
-        c = self.store.cursor()
-        c.execute('delete from self where num = 0;')
-        c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
-        self.store.commit()
-        
     def checkpoint(self, auto=0):
-        self._saveSelfNode()
-        self._dumpRoutingTable()
+        self.store.saveSelfNode(self.node.id)
+        self.store.dumpRoutingTable(self.table.buckets)
         self.refreshTable()
         if auto:
             self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
                                         int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
                               self.checkpoint, (1,))
         
-    def _findDB(self, db):
-        self.db = db
-        try:
-            os.stat(db)
-        except OSError:
-            self._createNewDB(db)
-        else:
-            self._loadDB(db)
-        
-    def _loadDB(self, db):
-        try:
-            self.store = sqlite.connect(db=db)
-            #self.store.autocommit = 0
-        except:
-            import traceback
-            raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
-        
-    def _createNewDB(self, db):
-        self.store = sqlite.connect(db=db)
-        s = """
-            create table kv (key binary, value binary, time timestamp, primary key (key, value));
-            create index kv_key on kv(key);
-            create index kv_timestamp on kv(time);
-            
-            create table nodes (id binary primary key, host text, port number);
-            
-            create table self (num number primary key, id binary);
-            """
-        c = self.store.cursor()
-        c.execute(s)
-        self.store.commit()
-
-    def _dumpRoutingTable(self):
-        """
-            save routing table nodes to the database
-        """
-        c = self.store.cursor()
-        c.execute("delete from nodes where id not NULL;")
-        for bucket in self.table.buckets:
-            for node in bucket.l:
-                c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
-        self.store.commit()
-        
     def _loadRoutingTable(self):
         """
             load routing table nodes from database
             it's usually a good idea to call refreshTable(force=1) after loading the table
         """
-        c = self.store.cursor()
-        c.execute("select * from nodes;")
-        for rec in c.fetchall():
+        nodes = self.store.getRoutingTable()
+        for rec in nodes:
             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
             n.conn = self.udp.connectionForAddr((n.host, n.port))
             self.table.insertNode(n, contacted=0)
@@ -278,15 +222,7 @@ class KhashmirBase(protocol.Factory):
 ## you probably want to use this mixin and provide your own write methods
 class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
-    def retrieveValues(self, key):
-        c = self.store.cursor()
-        c.execute("select value from kv where key = %s;", sqlite.encode(key))
-        t = c.fetchone()
-        l = []
-        while t:
-            l.append(t['value'])
-            t = c.fetchone()
-        return l
+
     ## also async
     def valueForKey(self, key, callback, searchlocal = 1):
         """ returns the values found for key in global table
@@ -297,7 +233,7 @@ class KhashmirRead(KhashmirBase):
         
         # get locals
         if searchlocal:
-            l = self.retrieveValues(key)
+            l = self.store.retrieveValues(key)
             if len(l) > 0:
                 reactor.callLater(0, callback, key, l)
         else:
@@ -315,7 +251,7 @@ class KhashmirRead(KhashmirBase):
         n.conn = self.udp.connectionForAddr((n.host, n.port))
         self.insertNode(n, contacted=0)
     
-        l = self.retrieveValues(key)
+        l = self.store.retrieveValues(key)
         if len(l) > 0:
             return {'values' : l, "id": self.node.id}
         else:
@@ -346,14 +282,7 @@ class KhashmirWrite(KhashmirRead):
         self.findNode(key, _storeValueForKey)
                     
     def krpc_store_value(self, key, value, id, _krpc_sender):
-        t = "%0.6f" % time()
-        c = self.store.cursor()
-        try:
-            c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
-        except sqlite.IntegrityError, reason:
-            # update last insert time
-            c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
-        self.store.commit()
+        self.store.storeValue(key, value)
         sender = {'id' : id}
         sender['host'] = _krpc_sender[0]
         sender['port'] = _krpc_sender[1]        
@@ -388,8 +317,8 @@ class SimpleTests(unittest.TestCase):
     def tearDown(self):
         self.a.shutdown()
         self.b.shutdown()
-        os.unlink(self.a.db)
-        os.unlink(self.b.db)
+        os.unlink(self.a.store.db)
+        os.unlink(self.b.store.db)
 
     def testAddContact(self):
         self.assertEqual(len(self.a.table.buckets), 1)
@@ -485,7 +414,7 @@ class MultiTest(unittest.TestCase):
     def tearDown(self):
         for i in self.l:
             i.shutdown()
-            os.unlink(i.db)
+            os.unlink(i.store.db)
             
         reactor.iterate()