X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fpgsqlschema.php;h=d50e35f660f65587c89809be2f85198d0c80b04f;hb=b53e1439969bfa2c0b551d8cc2fc8fe15652c62a;hp=715065d774b977f5018a92f90955c00744e050ea;hpb=e89908f26140c217e01b2f8f755712f38f3935f3;p=quix0rs-gnu-social.git diff --git a/lib/pgsqlschema.php b/lib/pgsqlschema.php index 715065d774..d50e35f660 100644 --- a/lib/pgsqlschema.php +++ b/lib/pgsqlschema.php @@ -41,6 +41,8 @@ if (!defined('STATUSNET')) { * @category Database * @package StatusNet * @author Evan Prodromou + * @author Brenda Wallace + * @author Brion Vibber * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 * @link http://status.net/ */ @@ -49,170 +51,209 @@ class PgsqlSchema extends Schema { /** - * Returns a TableDef object for the table + * Returns a table definition array for the table * in the schema with the given name. * * Throws an exception if the table is not found. * - * @param string $name Name of the table to get + * @param string $table Name of the table to get * - * @return TableDef tabledef for that table. + * @return array tabledef for that table. */ - public function getTableDef($name) + public function getTableDef($table) { - $res = $this->conn->query("SELECT *, column_default as default, is_nullable as Null, - udt_name as Type, column_name AS Field from INFORMATION_SCHEMA.COLUMNS where table_name = '$name'"); + $def = array(); + $hasKeys = false; - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); + // Pull column data from INFORMATION_SCHEMA + $columns = $this->fetchMetaInfo($table, 'columns', 'ordinal_position'); + if (count($columns) == 0) { + throw new SchemaTableMissingException("No such table: $table"); } - $td = new TableDef(); + // We'll need to match up fields by ordinal reference + $orderedFields = array(); - $td->name = $name; - $td->columns = array(); + foreach ($columns as $row) { - if ($res->numRows() == 0 ) { - throw new Exception('no such table'); //pretend to be the msyql error. yeah, this sucks. - } - $row = array(); + $name = $row['column_name']; + $orderedFields[$row['ordinal_position']] = $name; - while ($res->fetchInto($row, DB_FETCHMODE_ASSOC)) { -// var_dump($row); - $cd = new ColumnDef(); + $field = array(); + $field['type'] = $row['udt_name']; - $cd->name = $row['field']; + if ($type == 'char' || $type == 'varchar') { + if ($row['character_maximum_length'] !== null) { + $field['length'] = intval($row['character_maximum_length']); + } + } + if ($type == 'numeric') { + // Other int types may report these values, but they're irrelevant. + // Just ignore them! + if ($row['numeric_precision'] !== null) { + $field['precision'] = intval($row['numeric_precision']); + } + if ($row['numeric_scale'] !== null) { + $field['scale'] = intval($row['numeric_scale']); + } + } + if ($row['is_nullable'] == 'NO') { + $field['not null'] = true; + } + if ($row['column_default'] !== null) { + $field['default'] = $row['column_default']; + if ($this->isNumericType($type)) { + $field['default'] = intval($field['default']); + } + } - $packed = $row['type']; + $def['fields'][$name] = $field; + } - if (preg_match('/^(\w+)\((\d+)\)$/', $packed, $match)) { - $cd->type = $match[1]; - $cd->size = $match[2]; - } else { - $cd->type = $packed; + // Pulling index info from pg_class & pg_index + // This can give us primary & unique key info, but not foreign key constraints + // so we exclude them and pick them up later. + $indexInfo = $this->getIndexInfo($table); + foreach ($indexInfo as $row) { + $keyName = $row['key_name']; + + // Dig the column references out! + // + // These are inconvenient arrays with partial references to the + // pg_att table, but since we've already fetched up the column + // info on the current table, we can look those up locally. + $cols = array(); + $colPositions = explode(' ', $row['indkey']); + foreach ($colPositions as $ord) { + if ($ord == 0) { + $cols[] = 'FUNCTION'; // @fixme + } else { + $cols[] = $orderedFields[$ord]; + } } - $cd->nullable = ($row['null'] == 'YES') ? true : false; - $cd->key = $row['Key']; - $cd->default = $row['default']; - $cd->extra = $row['Extra']; - - $td->columns[] = $cd; + $def['indexes'][$keyName] = $cols; } - return $td; - } - /** - * Gets a ColumnDef object for a single column. - * - * Throws an exception if the table is not found. - * - * @param string $table name of the table - * @param string $column name of the column - * - * @return ColumnDef definition of the column or null - * if not found. - */ + // Pull constraint data from INFORMATION_SCHEMA: + // Primary key, unique keys, foreign keys + $keyColumns = $this->fetchMetaInfo($table, 'key_column_usage', 'constraint_name,ordinal_position'); + $keys = array(); - public function getColumnDef($table, $column) - { - $td = $this->getTableDef($table); - - foreach ($td->columns as $cd) { - if ($cd->name == $column) { - return $cd; + foreach ($keyColumns as $row) { + $keyName = $row['constraint_name']; + $keyCol = $row['column_name']; + if (!isset($keys[$keyName])) { + $keys[$keyName] = array(); } + $keys[$keyName][] = $keyCol; } - return null; + foreach ($keys as $keyName => $cols) { + // name hack -- is this reliable? + if ($keyName == "{$table}_pkey") { + $def['primary key'] = $cols; + } else if (preg_match("/^{$table}_(.*)_fkey$/", $keyName, $matches)) { + $fkey = $this->getForeignKeyInfo($table, $keyName); + $colMap = array_combine($cols, $fkey['col_names']); + $def['foreign keys'][$keyName] = array($fkey['table_name'], $colMap); + } else { + $def['unique keys'][$keyName] = $cols; + } + } + return $def; } /** - * Creates a table with the given names and columns. - * - * @param string $name Name of the table - * @param array $columns Array of ColumnDef objects - * for new table. + * Pull some INFORMATION.SCHEMA data for the given table. * - * @return boolean success flag + * @param string $table + * @return array of arrays */ - - public function createTable($name, $columns) + function fetchMetaInfo($table, $infoTable, $orderBy=null) { - $uniques = array(); - $primary = array(); - $indices = array(); - - $sql = "CREATE TABLE $name (\n"; - - for ($i = 0; $i < count($columns); $i++) { - - $cd =& $columns[$i]; - - if ($i > 0) { - $sql .= ",\n"; - } - - $sql .= $this->_columnSql($cd); - - switch ($cd->key) { - case 'UNI': - $uniques[] = $cd->name; - break; - case 'PRI': - $primary[] = $cd->name; - break; - case 'MUL': - $indices[] = $cd->name; - break; - } + $query = "SELECT * FROM information_schema.%s " . + "WHERE table_name='%s'"; + $sql = sprintf($query, $infoTable, $table); + if ($orderBy) { + $sql .= ' ORDER BY ' . $orderBy; } + return $this->fetchQueryData($sql); + } - if (count($primary) > 0) { // it really should be... - $sql .= ",\n primary key (" . implode(',', $primary) . ")"; - } - - - - foreach ($indices as $i) { - $sql .= ",\nindex {$name}_{$i}_idx ($i)"; - } - - $sql .= "); "; - - - foreach ($uniques as $u) { - $sql .= "\n CREATE index {$name}_{$u}_idx ON {$name} ($u); "; - } - $res = $this->conn->query($sql); + /** + * Pull some PG-specific index info + * @param string $table + * @return array of arrays + */ + function getIndexInfo($table) + { + $query = 'SELECT ' . + '(SELECT relname FROM pg_class WHERE oid=indexrelid) AS key_name, ' . + '* FROM pg_index ' . + 'WHERE indrelid=(SELECT oid FROM pg_class WHERE relname=\'%s\') ' . + 'AND indisprimary=\'f\' AND indisunique=\'f\' ' . + 'ORDER BY indrelid, indexrelid'; + $sql = sprintf($query, $table); + return $this->fetchQueryData($sql); + } - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); + /** + * Column names from the foreign table can be resolved with a call to getTableColumnNames() + * @param $table + * @return array array of rows with keys: fkey_name, table_name, table_id, col_names (array of strings) + */ + function getForeignKeyInfo($table, $constraint_name) + { + // In a sane world, it'd be easier to query the column names directly. + // But it's pretty hard to work with arrays such as col_indexes in direct SQL here. + $query = 'SELECT ' . + '(SELECT relname FROM pg_class WHERE oid=confrelid) AS table_name, ' . + 'confrelid AS table_id, ' . + '(SELECT indkey FROM pg_index WHERE indexrelid=conindid) AS col_indexes ' . + 'FROM pg_constraint ' . + 'WHERE conrelid=(SELECT oid FROM pg_class WHERE relname=\'%s\') ' . + 'AND conname=\'%s\' ' . + 'AND contype=\'f\''; + $sql = sprintf($query, $table, $constraint_name); + $data = $this->fetchQueryData($sql); + if (count($data) < 1) { + throw new Exception("Could not find foreign key " . $constraint_name . " on table " . $table); } - return true; + $row = $data[0]; + return array( + 'table_name' => $row['table_name'], + 'col_names' => $this->getTableColumnNames($row['table_id'], $row['col_indexes']) + ); } /** - * Drops a table from the schema - * - * Throws an exception if the table is not found. * - * @param string $name Name of the table to drop - * - * @return boolean success flag + * @param int $table_id + * @param array $col_indexes + * @return array of strings */ - - public function dropTable($name) + function getTableColumnNames($table_id, $col_indexes) { - $res = $this->conn->query("DROP TABLE $name"); - - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); + $indexes = array_map('intval', explode(' ', $col_indexes)); + $query = 'SELECT attnum AS col_index, attname AS col_name ' . + 'FROM pg_attribute where attrelid=%d ' . + 'AND attnum IN (%s)'; + $sql = sprintf($query, $table_id, implode(',', $indexes)); + $data = $this->fetchQueryData($sql); + + $byId = array(); + foreach ($data as $row) { + $byId[$row['col_index']] = $row['col_name']; } - return true; + $out = array(); + foreach ($indexes as $id) { + $out[] = $byId[$id]; + } + return $out; } /** @@ -223,7 +264,7 @@ class PgsqlSchema extends Schema */ private function _columnTypeTranslation($type) { $map = array( - 'datetime' => 'timestamp' + 'datetime' => 'timestamp', ); if(!empty($map[$type])) { return $map[$type]; @@ -232,296 +273,183 @@ class PgsqlSchema extends Schema } /** - * Adds an index to a table. - * - * If no name is provided, a name will be made up based - * on the table name and column names. + * Return the proper SQL for creating or + * altering a column. * - * Throws an exception on database error, esp. if the table - * does not exist. + * Appropriate for use in CREATE TABLE or + * ALTER TABLE statements. * - * @param string $table Name of the table - * @param array $columnNames Name of columns to index - * @param string $name (Optional) name of the index + * @param array $cd column to create * - * @return boolean success flag + * @return string correct SQL for that column */ - public function createIndex($table, $columnNames, $name=null) + function columnSql(array $cd) { - if (!is_array($columnNames)) { - $columnNames = array($columnNames); - } - - if (empty($name)) { - $name = "$table_".implode("_", $columnNames)."_idx"; - } - - $res = $this->conn->query("ALTER TABLE $table ". - "ADD INDEX $name (". - implode(",", $columnNames).")"); - - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); + $line = array(); + $line[] = parent::columnSql($cd); + + /* + if ($table['foreign keys'][$name]) { + foreach ($table['foreign keys'][$name] as $foreignTable => $foreignColumn) { + $line[] = 'references'; + $line[] = $this->quoteIdentifier($foreignTable); + $line[] = '(' . $this->quoteIdentifier($foreignColumn) . ')'; + } } + */ - return true; + return implode(' ', $line); } /** - * Drops a named index from a table. + * Append phrase(s) to an array of partial ALTER TABLE chunks in order + * to alter the given column from its old state to a new one. * - * @param string $table name of the table the index is on. - * @param string $name name of the index - * - * @return boolean success flag + * @param array $phrase + * @param string $columnName + * @param array $old previous column definition as found in DB + * @param array $cd current column definition */ - - public function dropIndex($table, $name) + function appendAlterModifyColumn(array &$phrase, $columnName, array $old, array $cd) { - $res = $this->conn->query("ALTER TABLE $table DROP INDEX $name"); + $prefix = 'ALTER COLUMN ' . $this->quoteIdentifier($columnName) . ' '; - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); + $oldType = $this->mapType($old); + $newType = $this->mapType($cd); + if ($oldType != $newType) { + $phrase[] = $prefix . 'TYPE ' . $newType; } - return true; - } - - /** - * Adds a column to a table - * - * @param string $table name of the table - * @param ColumnDef $columndef Definition of the new - * column. - * - * @return boolean success flag - */ - - public function addColumn($table, $columndef) - { - $sql = "ALTER TABLE $table ADD COLUMN " . $this->_columnSql($columndef); - - $res = $this->conn->query($sql); - - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); + if (!empty($old['not null']) && empty($cd['not null'])) { + $phrase[] = $prefix . 'DROP NOT NULL'; + } else if (empty($old['not null']) && !empty($cd['not null'])) { + $phrase[] = $prefix . 'SET NOT NULL'; } - return true; + if (isset($old['default']) && !isset($cd['default'])) { + $phrase[] = $prefix . 'DROP DEFAULT'; + } else if (!isset($old['default']) && isset($cd['default'])) { + $phrase[] = $prefix . 'SET DEFAULT ' . $this->quoteDefaultValue($cd); + } } /** - * Modifies a column in the schema. - * - * The name must match an existing column and table. + * Append an SQL statement to drop an index from a table. + * Note that in PostgreSQL, index names are DB-unique. * - * @param string $table name of the table - * @param ColumnDef $columndef new definition of the column. - * - * @return boolean success flag + * @param array $statements + * @param string $table + * @param string $name + * @param array $def */ - - public function modifyColumn($table, $columndef) + function appendDropIndex(array &$statements, $table, $name) { - $sql = "ALTER TABLE $table MODIFY COLUMN " . - $this->_columnSql($columndef); - - $res = $this->conn->query($sql); - - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); - } - - return true; + $statements[] = "DROP INDEX $name"; } /** - * Drops a column from a table - * - * The name must match an existing column. + * Quote a db/table/column identifier if necessary. * - * @param string $table name of the table - * @param string $columnName name of the column to drop - * - * @return boolean success flag + * @param string $name + * @return string */ - - public function dropColumn($table, $columnName) + function quoteIdentifier($name) { - $sql = "ALTER TABLE $table DROP COLUMN $columnName"; - - $res = $this->conn->query($sql); - - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); - } - - return true; + return $this->conn->quoteIdentifier($name); } - /** - * Ensures that a table exists with the given - * name and the given column definitions. - * - * If the table does not yet exist, it will - * create the table. If it does exist, it will - * alter the table to match the column definitions. - * - * @param string $tableName name of the table - * @param array $columns array of ColumnDef - * objects for the table - * - * @return boolean success flag - */ - - public function ensureTable($tableName, $columns) + function mapType($column) { - // XXX: DB engine portability -> toilet - - try { - $td = $this->getTableDef($tableName); - - } catch (Exception $e) { - if (preg_match('/no such table/', $e->getMessage())) { - return $this->createTable($tableName, $columns); - } else { - throw $e; - } + $map = array('serial' => 'bigserial', // FIXME: creates the wrong name for the sequence for some internal sequence-lookup function, so better fix this to do the real 'create sequence' dance. + 'numeric' => 'decimal', + 'datetime' => 'timestamp', + 'blob' => 'bytea'); + + $type = $column['type']; + if (isset($map[$type])) { + $type = $map[$type]; } - $cur = $this->_names($td->columns); - $new = $this->_names($columns); - - $toadd = array_diff($new, $cur); - $todrop = array_diff($cur, $new); - $same = array_intersect($new, $cur); - $tomod = array(); - - foreach ($same as $m) { - $curCol = $this->_byName($td->columns, $m); - $newCol = $this->_byName($columns, $m); - - if (!$newCol->equals($curCol)) { - $tomod[] = $newCol->name; + if ($type == 'int') { + if (!empty($column['size'])) { + $size = $column['size']; + if ($size == 'small') { + return 'int2'; + } else if ($size == 'big') { + return 'int8'; + } } + return 'int4'; } - if (count($toadd) + count($todrop) + count($tomod) == 0) { - // nothing to do - return true; - } - - // For efficiency, we want this all in one - // query, instead of using our methods. - - $phrase = array(); - - foreach ($toadd as $columnName) { - $cd = $this->_byName($columns, $columnName); - - $phrase[] = 'ADD COLUMN ' . $this->_columnSql($cd); - } - - foreach ($todrop as $columnName) { - $phrase[] = 'DROP COLUMN ' . $columnName; - } - - foreach ($tomod as $columnName) { - $cd = $this->_byName($columns, $columnName); - - $phrase[] = 'MODIFY COLUMN ' . $this->_columnSql($cd); - } - - $sql = 'ALTER TABLE ' . $tableName . ' ' . implode(', ', $phrase); - - $res = $this->conn->query($sql); - - if (PEAR::isError($res)) { - throw new Exception($res->getMessage()); - } - - return true; + return $type; } - /** - * Returns the array of names from an array of - * ColumnDef objects. - * - * @param array $cds array of ColumnDef objects - * - * @return array strings for name values - */ - - private function _names($cds) + // @fixme need name... :P + function typeAndSize($column) { - $names = array(); - - foreach ($cds as $cd) { - $names[] = $cd->name; + if ($column['type'] == 'enum') { + $vals = array_map(array($this, 'quote'), $column['enum']); + return "text check ($name in " . implode(',', $vals) . ')'; + } else { + return parent::typeAndSize($column); } - - return $names; } /** - * Get a ColumnDef from an array matching - * name. + * Filter the given table definition array to match features available + * in this database. * - * @param array $cds Array of ColumnDef objects - * @param string $name Name of the column + * This lets us strip out unsupported things like comments, foreign keys, + * or type variants that we wouldn't get back from getTableDef(). * - * @return ColumnDef matching item or null if no match. + * @param array $tableDef */ - - private function _byName($cds, $name) + function filterDef(array $tableDef) { - foreach ($cds as $cd) { - if ($cd->name == $name) { - return $cd; + foreach ($tableDef['fields'] as $name => &$col) { + // No convenient support for field descriptions + unset($col['description']); + + /* + if (isset($col['size'])) { + // Don't distinguish between tinyint and int. + if ($col['size'] == 'tiny' && $col['type'] == 'int') { + unset($col['size']); + } } + */ + $col['type'] = $this->mapType($col); + unset($col['size']); } - - return null; + if (!empty($tableDef['primary key'])) { + $tableDef['primary key'] = $this->filterKeyDef($tableDef['primary key']); + } + if (!empty($tableDef['unique keys'])) { + foreach ($tableDef['unique keys'] as $i => $def) { + $tableDef['unique keys'][$i] = $this->filterKeyDef($def); + } + } + return $tableDef; } /** - * Return the proper SQL for creating or - * altering a column. - * - * Appropriate for use in CREATE TABLE or - * ALTER TABLE statements. + * Filter the given key/index definition to match features available + * in this database. * - * @param ColumnDef $cd column to create - * - * @return string correct SQL for that column + * @param array $def + * @return array */ - - private function _columnSql($cd) + function filterKeyDef(array $def) { - $sql = "{$cd->name} "; - $type = $this->_columnTypeTranslation($cd->type); - - if (!empty($cd->size)) { - $sql .= "{$type}({$cd->size}) "; - } else { - $sql .= "{$type} "; - } - - if (!empty($cd->default)) { - $sql .= "default {$cd->default} "; - } else { - $sql .= ($cd->nullable) ? "null " : "not null "; - } - - if (!empty($cd->auto_increment)) { - $sql .= " auto_increment "; - } - - if (!empty($cd->extra)) { - $sql .= "{$cd->extra} "; + // PostgreSQL doesn't like prefix lengths specified on keys...? + foreach ($def as $i => $item) + { + if (is_array($item)) { + $def[$i] = $item[0]; + } } - - return $sql; + return $def; } }