From 1d9f9492a5d139a909f6b12aedb72ba104f29d3e Mon Sep 17 00:00:00 2001 From: Jephte Clain Date: Sat, 13 Sep 2025 10:39:44 +0400 Subject: [PATCH] =?UTF-8?q?d=C3=A9but?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- php/src/db/Capacitor.php | 830 ++++++++++++++---- php/src/db/CapacitorChannel.php | 148 +++- php/src/db/CapacitorStorage.php | 770 ---------------- php/src/db/TODO.md | 6 + php/src/db/conds.php | 7 +- .../{MysqlStorage.php => MysqlCapacitor.php} | 14 +- .../{PgsqlStorage.php => PgsqlCapacitor.php} | 26 +- ...{SqliteStorage.php => SqliteCapacitor.php} | 24 +- php/tbin/test_mysql.php | 4 +- php/tbin/test_pgsql.php | 4 +- php/tbin/test_sqlite.php | 4 +- php/tests/db/sqlite/ChannelMigrationTest.php | 8 +- php/tests/db/sqlite/SqliteStorageTest.php | 18 +- 13 files changed, 835 insertions(+), 1028 deletions(-) delete mode 100644 php/src/db/CapacitorStorage.php rename php/src/db/mysql/{MysqlStorage.php => MysqlCapacitor.php} (75%) rename php/src/db/pgsql/{PgsqlStorage.php => PgsqlCapacitor.php} (60%) rename php/src/db/sqlite/{SqliteStorage.php => SqliteCapacitor.php} (63%) diff --git a/php/src/db/Capacitor.php b/php/src/db/Capacitor.php index 8fb2403..949ffee 100644 --- a/php/src/db/Capacitor.php +++ b/php/src/db/Capacitor.php @@ -3,215 +3,703 @@ namespace nulib\db; use nulib\A; use nulib\cl; +use nulib\cv; +use nulib\db\_private\_migration; use nulib\php\func; use nulib\ValueException; use Traversable; /** - * Class Capacitor: un objet permettant d'attaquer un canal spécifique d'une - * instance de {@link CapacitorStorage} + * Class Capacitor: objet permettant d'accumuler des données pour les + * réutiliser plus tard */ -class Capacitor implements ITransactor { - function __construct(CapacitorStorage $storage, CapacitorChannel $channel, bool $ensureExists=true) { - $this->storage = $storage; - $this->channel = $channel; - $this->channel->setCapacitor($this); - if ($ensureExists) $this->ensureExists(); - } - - /** @var CapacitorStorage */ - protected $storage; - - function getStorage(): CapacitorStorage { - return $this->storage; - } - - function db(): IDatabase { - return $this->getStorage()->db(); - } +abstract class Capacitor { + abstract function db(): IDatabase; function ensureLive(): self { - $this->getStorage()->ensureLive(); + $this->db()->ensure(); return $this; } - /** @var CapacitorChannel */ - protected $channel; + # les définitions sont par défaut pour MariaDB/MySQL + const CDATA_DEFINITION = "mediumtext"; + const CSUM_DEFINITION = "varchar(40)"; + const CTIMESTAMP_DEFINITION = "datetime"; + const GSERIAL_DEFINITION = "integer primary key auto_increment"; + const GLIC_DEFINITION = "varchar(80)"; + const GLIB_DEFINITION = "varchar(255)"; + const GTEXT_DEFINITION = "mediumtext"; + const GBOOL_DEFINITION = "integer(1) default 0"; + const GUUID_DEFINITION = "varchar(36)"; - function getChannel(): CapacitorChannel { - return $this->channel; - } - - function getTableName(): string { - return $this->getChannel()->getTableName(); - } - - function getCreateSql(): string { - $channel = $this->channel; - return $this->storage->_getMigration($channel)->getSql(get_class($channel), $this->db()); - } - - /** @var CapacitorChannel[] */ - protected ?array $subChannels = null; - - protected ?array $subManageTransactions = null; - - function willUpdate(...$channels): self { - if ($this->subChannels === null) { - # désactiver la gestion des transaction sur le channel local aussi - $this->subChannels[] = $this->channel; + protected static function verifix_col($def): string { + if (!is_string($def)) $def = strval($def); + $def = trim($def); + $parts = preg_split('/\s+/', $def, 2); + if (count($parts) == 2) { + $def = $parts[0]; + $rest = " $parts[1]"; + } else { + $rest = null; } - if ($channels) { - foreach ($channels as $channel) { - if ($channel instanceof Capacitor) $channel = $channel->getChannel(); - if ($channel instanceof CapacitorChannel) { - $this->subChannels[] = $channel; - } else { - throw ValueException::invalid_type($channel, CapacitorChannel::class); - } - } + switch ($def) { + case "serdata": + case "Cdata": $def = static::CDATA_DEFINITION; break; + case "sersum": + case "Csum": $def = static::CSUM_DEFINITION; break; + case "serts": + case "Ctimestamp": $def = static::CTIMESTAMP_DEFINITION; break; + case "genserial": + case "Gserial": $def = static::GSERIAL_DEFINITION; break; + case "genlic": + case "Glic": $def = static::GLIC_DEFINITION; break; + case "genlib": + case "Glib": $def = static::GLIB_DEFINITION; break; + case "gentext": + case "Gtext": $def = static::GTEXT_DEFINITION; break; + case "genbool": + case "Gbool": $def = static::GBOOL_DEFINITION; break; + case "genuuid": + case "Guuid": $def = static::GUUID_DEFINITION; break; } - return $this; + return "$def$rest"; } - function inTransaction(): bool { - return $this->db()->inTransaction(); - } + const PRIMARY_KEY_DEFINITION = [ + "id_" => "Gserial", + ]; - function beginTransaction(?callable $func=null, bool $commit=true): void { - $db = $this->db(); - if ($this->subChannels !== null) { - # on gère des subchannels: ne débuter la transaction que si ce n'est déjà fait - if ($this->subManageTransactions === null) { - foreach ($this->subChannels as $channel) { - $name = $channel->getName(); - $this->subManageTransactions ??= []; - if (!array_key_exists($name, $this->subManageTransactions)) { - $this->subManageTransactions[$name] = $channel->isManageTransactions(); + const COLUMN_DEFINITIONS = [ + "item__" => "Cdata", + "item__sum_" => "Csum", + "created_" => "Ctimestamp", + "modified_" => "Ctimestamp", + ]; + + protected function getColumnDefinitions(CapacitorChannel $channel, bool $ignoreMigrations=false): array { + $definitions = []; + if ($channel->getPrimaryKeys() === null) { + $definitions[] = static::PRIMARY_KEY_DEFINITION; + } + $definitions[] = $channel->getColumnDefinitions(); + $definitions[] = static::COLUMN_DEFINITIONS; + # forcer les définitions sans clé à la fin (sqlite requière par exemple que + # primary key (columns) soit à la fin) + $tmp = cl::merge(...$definitions); + $definitions = []; + $constraints = []; + $index = 0; + foreach ($tmp as $col => $def) { + if ($col === $index) { + $index++; + if (is_array($def)) { + if (!$ignoreMigrations) { + $mdefs = $def; + $mindex = 0; + foreach ($mdefs as $mcol => $mdef) { + if ($mcol === $mindex) { + $mindex++; + } else { + if ($mdef) { + $definitions[$mcol] = self::verifix_col($mdef); + } else { + unset($definitions[$mcol]); + } + } + } } - $channel->setManageTransactions(false); + } else { + $constraints[] = $def; } - if (!$db->inTransaction()) $db->beginTransaction(); + } else { + $definitions[$col] = self::verifix_col($def); } - } elseif (!$db->inTransaction()) { + } + return cl::merge($definitions, $constraints); + } + + /** sérialiser les valeurs qui doivent l'être dans $row */ + protected function serialize(CapacitorChannel $channel, ?array $row): ?array { + if ($row === null) return null; + $colDefs = $this->getColumnDefinitions($channel); + $index = 0; + $raw = []; + foreach (array_keys($colDefs) as $col) { + $key = $col; + if ($key === $index) { + $index++; + } elseif ($channel->isSerialCol($key)) { + [$serialCol, $sumCol] = $channel->getSumCols($key); + if (array_key_exists($key, $row)) { + $sum = $channel->getSum($key, $row[$key]); + $raw[$serialCol] = $sum[$serialCol]; + if (array_key_exists($sumCol, $colDefs)) { + $raw[$sumCol] = $sum[$sumCol]; + } + } + } elseif (array_key_exists($key, $row)) { + $raw[$col] = $row[$key]; + } + } + return $raw; + } + + /** désérialiser les valeurs qui doivent l'être dans $values */ + protected function unserialize(CapacitorChannel $channel, ?array $raw): ?array { + if ($raw === null) return null; + $colDefs = $this->getColumnDefinitions($channel); + $index = 0; + $row = []; + foreach (array_keys($colDefs) as $col) { + $key = $col; + if ($key === $index) { + $index++; + } elseif (!array_key_exists($col, $raw)) { + } elseif ($channel->isSerialCol($key)) { + $value = $raw[$col]; + if ($value !== null) $value = $channel->unserialize($value); + $row[$key] = $value; + } else { + $row[$key] = $raw[$col]; + } + } + return $row; + } + + function getPrimaryKeys(CapacitorChannel $channel): array { + $primaryKeys = $channel->getPrimaryKeys(); + if ($primaryKeys === null) $primaryKeys = ["id_"]; + return $primaryKeys; + } + + function getRowIds(CapacitorChannel $channel, ?array $row, ?array &$primaryKeys=null): ?array { + $primaryKeys = $this->getPrimaryKeys($channel); + $rowIds = cl::select($row, $primaryKeys); + if (cl::all_n($rowIds)) return null; + else return $rowIds; + } + + abstract protected function tableExists(string $tableName): bool; + + const METADATA_TABLE = "_metadata"; + const METADATA_COLS = [ + "name" => "varchar not null primary key", + "value" => "varchar", + ]; + + protected function prepareMetadata(): void { + if (!$this->tableExists(static::METADATA_TABLE)) { + $db = $this->db(); + $db->exec([ + "create table", + "table" => static::METADATA_TABLE, + "cols" => static::METADATA_COLS, + ]); + $db->exec([ + "insert", + "into" => static::METADATA_TABLE, + "values" => [ + "name" => "version", + "value" => "1", + ], + ]); + } + } + + protected function getCreateChannelSql(CapacitorChannel $channel): array { + return [ + "create table if not exists", + "table" => $channel->getTableName(), + "cols" => $this->getColumnDefinitions($channel, true), + ]; + } + + abstract function getMigration(CapacitorChannel $channel): _migration; + + const CATALOG_TABLE = "_channels"; + const CATALOG_COLS = [ + "name" => "varchar not null primary key", + "table_name" => "varchar", + "class_name" => "varchar", + ]; + + protected function getCreateCatalogSql(): array { + return [ + "create table if not exists", + "table" => static::CATALOG_TABLE, + "cols" => static::CATALOG_COLS, + ]; + } + + protected function addToCatalogSql(CapacitorChannel $channel): array { + return [ + "insert", + "into" => static::CATALOG_TABLE, + "values" => [ + "name" => $channel->getName(), + "table_name" => $channel->getTableName(), + "class_name" => get_class($channel), + ], + ]; + } + + function getCatalog(): iterable { + return $this->db()->all([ + "select", + "from" => static::CATALOG_TABLE, + ]); + } + + function isInCatalog(array $filter, ?array &$raw=null): bool { + $raw = $this->db()->one([ + "select", + "from" => static::CATALOG_TABLE, + "where" => $filter, + ]); + return $raw !== null; + } + + function create(CapacitorChannel $channel): void { + if ($channel->isCreated()) return; + $channel->ensureSetup(); + if (!$this->tableExists($channel->getTableName())) { + $this->prepareMetadata(); + $this->getMigration($channel)->migrate($this->db()); + $this->afterCreate($channel); + } + $channel->setCreated(); + } + + protected function afterCreate(CapacitorChannel $channel): void { + $db = $this->db(); + $db->exec($this->getCreateCatalogSql()); + $db->exec($this->addToCatalogSql($channel)); + } + + /** tester si le canal spécifié existe */ + function exists(CapacitorChannel $channel): bool { + return $this->tableExists($channel->getTableName()); + } + + protected function beforeReset(CapacitorChannel $channel): void { + $db = $this->db; + $name = $channel->getName(); + $db->exec([ + "delete", + "from" => _migration::MIGRATION_TABLE, + "where" => [ + "channel" => $name, + ], + ]); + $db->exec([ + "delete", + "from" => static::CATALOG_TABLE, + "where" => [ + "name" => $name, + ], + ]); + } + + /** supprimer le canal spécifié */ + function reset(CapacitorChannel $channel, bool $recreate=false): void { + $this->beforeReset($channel); + $this->db()->exec([ + "drop table if exists", + $channel->getTableName(), + ]); + $channel->setCreated(false); + if ($recreate) $this->create($channel); + } + + /** + * charger une valeur dans le canal + * + * Après avoir calculé les valeurs des clés supplémentaires + * avec {@link CapacitorChannel::getItemValues()}, l'une des deux fonctions + * {@link CapacitorChannel::onCreate()} ou {@link CapacitorChannel::onUpdate()} + * est appelée en fonction du type d'opération: création ou mise à jour + * + * Ensuite, si $func !== null, la fonction est appelée avec la signature de + * {@link CapacitorChannel::onCreate()} ou {@link CapacitorChannel::onUpdate()} + * en fonction du type d'opération: création ou mise à jour + * + * Dans les deux cas, si la fonction retourne un tableau, il est utilisé pour + * modifier les valeurs insérées/mises à jour. De plus, $row obtient la + * valeur finale des données insérées/mises à jour + * + * Si $args est renseigné, il est ajouté aux arguments utilisés pour appeler + * les méthodes {@link CapacitorChannel::getItemValues()}, + * {@link CapacitorChannel::onCreate()} et/ou + * {@link CapacitorChannel::onUpdate()} + * + * @return int 1 si l'objet a été chargé ou mis à jour, 0 s'il existait + * déjà à l'identique dans le canal + */ + function charge(CapacitorChannel $channel, $item, $func, ?array $args, ?array &$row=null): int { + $this->create($channel); + $tableName = $channel->getTableName(); + $db = $this->db(); + $args ??= []; + + $row = func::call([$channel, "getItemValues"], $item, ...$args); + if ($row === [false]) return 0; + + if ($row !== null && array_key_exists("item", $row)) { + $item = A::pop($row, "item"); + } + + $raw = cl::merge( + $channel->getSum("item", $item), + $this->serialize($channel, $row)); + $praw = null; + $rowIds = $this->getRowIds($channel, $raw, $primaryKeys); + if ($rowIds !== null) { + # modification + $praw = $db->one([ + "select", + "from" => $tableName, + "where" => $rowIds, + ]); + } + + $now = date("Y-m-d H:i:s"); + $insert = null; + if ($praw === null) { + # création + $raw = cl::merge($raw, [ + "created_" => $now, + "modified_" => $now, + ]); + $insert = true; + $initFunc = func::with([$channel, "onCreate"], $args); + $row = $this->unserialize($channel, $raw); + $prow = null; + } else { + # modification + # intégrer autant que possible les valeurs de praw dans raw, de façon que + # l'utilisateur puisse voir clairement ce qui a été modifié + if ($channel->_wasSumModified("item", $raw, $praw)) { + $insert = false; + $raw = cl::merge($praw, $raw, [ + "modified_" => $now, + ]); + } else { + $raw = cl::merge($praw, $raw); + } + $initFunc = func::with([$channel, "onUpdate"], $args); + $row = $this->unserialize($channel, $raw); + $prow = $this->unserialize($channel, $praw); + } + + $updates = $initFunc->prependArgs([$item, $row, $prow])->invoke(); + if ($updates === [false]) return 0; + if (is_array($updates) && $updates) { + if ($insert === null) $insert = false; + if (!array_key_exists("modified_", $updates)) { + $updates["modified_"] = $now; + } + $row = cl::merge($row, $updates); + $raw = cl::merge($raw, $this->serialize($channel, $updates)); + } + + if ($func !== null) { + $updates = func::with($func, $args) + ->prependArgs([$item, $row, $prow]) + ->bind($channel) + ->invoke(); + if ($updates === [false]) return 0; + if (is_array($updates) && $updates) { + if ($insert === null) $insert = false; + if (!array_key_exists("modified_", $updates)) { + $updates["modified_"] = $now; + } + $row = cl::merge($row, $updates); + $raw = cl::merge($raw, $this->serialize($channel, $updates)); + } + } + + # aucune modification + if ($insert === null) return 0; + + # si on est déjà dans une transaction, désactiver la gestion des transactions + $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); + if ($manageTransactions) { + $commited = false; $db->beginTransaction(); } - if ($func !== null) { - $commited = false; - try { - func::call($func, $this); - if ($commit) { - $this->commit(); - $commited = true; + $nbModified = 0; + try { + if ($insert) { + $id = $db->exec([ + "insert", + "into" => $tableName, + "values" => $raw, + ]); + if (count($primaryKeys) == 1 && $rowIds === null) { + # mettre à jour avec l'id généré + $row[$primaryKeys[0]] = $id; + } + $nbModified = 1; + } else { + # calculer ce qui a changé pour ne mettre à jour que le nécessaire + $updates = []; + foreach ($raw as $col => $value) { + if (array_key_exists($col, $rowIds)) { + # ne jamais mettre à jour la clé primaire + continue; + } + if (!cv::equals($value, $praw[$col] ?? null)) { + $updates[$col] = $value; + } + } + if (count($updates) == 1 && array_key_first($updates) == "modified_") { + # si l'unique modification porte sur la date de modification, alors + # la ligne n'est pas modifiée. ce cas se présente quand on altère la + # valeur de $item + $updates = null; + } + if ($updates) { + $db->exec([ + "update", + "table" => $tableName, + "values" => $updates, + "where" => $rowIds, + ]); + $nbModified = 1; } - } finally { - if ($commit && !$commited) $this->rollback(); } + if ($manageTransactions) { + $db->commit(); + $commited = true; + } + return $nbModified; + } finally { + if ($manageTransactions && !$commited) $db->rollback(); } } - protected function beforeEndTransaction(): void { - if ($this->subManageTransactions !== null) { - foreach ($this->subChannels as $channel) { - $name = $channel->getName(); - $channel->setManageTransactions($this->subManageTransactions[$name]); + /** + * décharger les données du canal spécifié. seul la valeur de $item est + * fournie + */ + function discharge(CapacitorChannel $channel, bool $reset=true): Traversable { + $this->create($channel); + $raws = $this->db()->all([ + "select item__", + "from" => $channel->getTableName(), + ]); + foreach ($raws as $raw) { + yield unserialize($raw['item__']); + } + if ($reset) $this->reset($channel); + } + + protected function convertValue2row(CapacitorChannel $channel, array $filter, array $cols): array { + $index = 0; + $fixed = []; + foreach ($filter as $key => $value) { + if ($key === $index) { + $index++; + if (is_array($value)) { + $value = $this->convertValue2row($channel, $value, $cols); + } + $fixed[] = $value; + } else { + $col = "${key}__"; + if (array_key_exists($col, $cols)) { + # colonne sérialisée + $fixed[$col] = $channel->serialize($value); + } else { + $fixed[$key] = $value; + } } - $this->subManageTransactions = null; + } + return $fixed; + } + + protected function verifixFilter(CapacitorChannel $channel, &$filter): void { + if ($filter !== null && !is_array($filter)) { + $primaryKeys = $this->getPrimaryKeys($channel); + $id = $filter; + $channel->verifixId($id); + $filter = [$primaryKeys[0] => $id]; + } + $cols = $this->getColumnDefinitions($channel); + if ($filter !== null) { + $filter = $this->convertValue2row($channel, $filter, $cols); } } - function commit(): void { - $this->beforeEndTransaction(); + /** indiquer le nombre d'éléments du canal spécifié */ + function count(CapacitorChannel $channel, $filter): int { + $this->create($channel); + $this->verifixFilter($channel, $filter); + return $this->db()->get([ + "select count(*)", + "from" => $channel->getTableName(), + "where" => $filter, + ]); + } + + /** + * obtenir la ligne correspondant au filtre sur le canal spécifié + * + * si $filter n'est pas un tableau, il est transformé en ["id_" => $filter] + */ + function one(CapacitorChannel $channel, $filter, ?array $mergeQuery=null): ?array { + if ($filter === null) throw ValueException::null("filter"); + $this->create($channel); + $this->verifixFilter($channel, $filter); + $raw = $this->db()->one(cl::merge([ + "select", + "from" => $channel->getTableName(), + "where" => $filter, + ], $mergeQuery)); + return $this->unserialize($channel, $raw); + } + + /** + * obtenir les lignes correspondant au filtre sur le canal spécifié + * + * si $filter n'est pas un tableau, il est transformé en ["id_" => $filter] + */ + function all(CapacitorChannel $channel, $filter, ?array $mergeQuery=null): Traversable { + $this->create($channel); + $this->verifixFilter($channel, $filter); + $raws = $this->db()->all(cl::merge([ + "select", + "from" => $channel->getTableName(), + "where" => $filter, + ], $mergeQuery), null, $this->getPrimaryKeys($channel)); + foreach ($raws as $key => $raw) { + yield $key => $this->unserialize($channel, $raw); + } + } + + /** + * appeler une fonction pour chaque élément du canal spécifié. + * + * $filter permet de filtrer parmi les élements chargés + * + * $func est appelé avec la signature de {@link CapacitorChannel::onEach()} + * si la fonction retourne un tableau, il est utilisé pour mettre à jour la + * ligne + * + * @param int $nbUpdated reçoit le nombre de lignes mises à jour + * @return int le nombre de lignes parcourues + */ + function each(CapacitorChannel $channel, $filter, $func, ?array $args, ?array $mergeQuery=null, ?int &$nbUpdated=null): int { + $this->create($channel); + if ($func === null) $func = CapacitorChannel::onEach; + $onEach = func::with($func)->bind($channel); $db = $this->db(); - if ($db->inTransaction()) $db->commit(); - } - - function rollback(): void { - $this->beforeEndTransaction(); - $db = $this->db(); - if ($db->inTransaction()) $db->rollback(); - } - - function exists(): bool { - return $this->storage->_exists($this->channel); - } - - function ensureExists(): void { - $this->storage->_ensureExists($this->channel); - } - - function reset(bool $recreate=false): void { - $this->storage->_reset($this->channel, $recreate); - } - - function charge($item, $func=null, ?array $args=null, ?array &$row=null): int { - if ($this->subChannels !== null) $this->beginTransaction(); - return $this->storage->_charge($this->channel, $item, $func, $args, $row); - } - - function chargeAll(?iterable $items, $func=null, ?array $args=null): int { + # si on est déjà dans une transaction, désactiver la gestion des transactions + $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); + if ($manageTransactions) { + $commited = false; + $db->beginTransaction(); + $commitThreshold = $channel->getEachCommitThreshold(); + } $count = 0; - if ($items !== null) { - if ($func !== null) { - $func = func::with($func, $args)->bind($this->channel); + $nbUpdated = 0; + $tableName = $channel->getTableName(); + try { + $args ??= []; + $rows = $this->all($channel, $filter, $mergeQuery); + foreach ($rows as $row) { + $rowIds = $this->getRowIds($channel, $row); + $updates = $onEach->invoke([$row, ...$args]); + if ($updates === [false]) { + break; + } elseif ($updates !== null) { + if (!array_key_exists("modified_", $updates)) { + $updates["modified_"] = date("Y-m-d H:i:s"); + } + $nbUpdated += $db->exec([ + "update", + "table" => $tableName, + "values" => $this->serialize($channel, $updates), + "where" => $rowIds, + ]); + if ($manageTransactions && $commitThreshold !== null) { + $commitThreshold--; + if ($commitThreshold <= 0) { + $db->commit(); + $db->beginTransaction(); + $commitThreshold = $channel->getEachCommitThreshold(); + } + } + } + $count++; } - foreach ($items as $item) { - $count += $this->charge($item, $func); + if ($manageTransactions) { + $db->commit(); + $commited = true; } + return $count; + } finally { + if ($manageTransactions && !$commited) $db->rollback(); } - return $count; } - function discharge(bool $reset=true): Traversable { - return $this->storage->_discharge($this->channel, $reset); + /** + * supprimer tous les éléments correspondant au filtre et pour lesquels la + * fonction retourne une valeur vraie si elle est spécifiée + * + * $filter permet de filtrer parmi les élements chargés + * + * $func est appelé avec la signature de {@link CapacitorChannel::onDelete()} + * si la fonction retourne un tableau, il est utilisé pour mettre à jour la + * ligne + * + * @return int le nombre de lignes parcourues + */ + function delete(CapacitorChannel $channel, $filter, $func, ?array $args): int { + $this->create($channel); + if ($func === null) $func = CapacitorChannel::onDelete; + $onDelete = func::with($func)->bind($channel); + $db = $this->db(); + # si on est déjà dans une transaction, désactiver la gestion des transactions + $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); + if ($manageTransactions) { + $commited = false; + $db->beginTransaction(); + $commitThreshold = $channel->getEachCommitThreshold(); + } + $count = 0; + $tableName = $channel->getTableName(); + try { + $args ??= []; + $rows = $this->all($channel, $filter); + foreach ($rows as $row) { + $rowIds = $this->getRowIds($channel, $row); + $shouldDelete = boolval($onDelete->invoke([$row, ...$args])); + if ($shouldDelete) { + $db->exec([ + "delete", + "from" => $tableName, + "where" => $rowIds, + ]); + if ($manageTransactions && $commitThreshold !== null) { + $commitThreshold--; + if ($commitThreshold <= 0) { + $db->commit(); + $db->beginTransaction(); + $commitThreshold = $channel->getEachCommitThreshold(); + } + } + } + $count++; + } + if ($manageTransactions) { + $db->commit(); + $commited = true; + } + return $count; + } finally { + if ($manageTransactions && !$commited) $db->rollback(); + } } - function count($filter=null): int { - return $this->storage->_count($this->channel, $filter); - } - - function one($filter, ?array $mergeQuery=null): ?array { - return $this->storage->_one($this->channel, $filter, $mergeQuery); - } - - function all($filter, ?array $mergeQuery=null): Traversable { - return $this->storage->_all($this->channel, $filter, $mergeQuery); - } - - function each($filter, $func=null, ?array $args=null, ?array $mergeQuery=null, ?int &$nbUpdated=null): int { - if ($this->subChannels !== null) $this->beginTransaction(); - return $this->storage->_each($this->channel, $filter, $func, $args, $mergeQuery, $nbUpdated); - } - - function delete($filter, $func=null, ?array $args=null): int { - if ($this->subChannels !== null) $this->beginTransaction(); - return $this->storage->_delete($this->channel, $filter, $func, $args); - } - - function dbAll(array $query, ?array $params=null): iterable { - $primaryKeys = $this->channel->getPrimaryKeys(); - return $this->storage->db()->all(cl::merge([ - "select", - "from" => $this->getTableName(), - ], $query), $params, $primaryKeys); - } - - function dbOne(array $query, ?array $params=null): ?array { - return $this->storage->db()->one(cl::merge([ - "select", - "from" => $this->getTableName(), - ], $query), $params); - } - - /** @return int|false */ - function dbUpdate(array $query, ?array $params=null) { - return $this->storage->db()->exec(cl::merge([ - "update", - "table" => $this->getTableName(), - ], $query), $params); - } - - function close(): void { - $this->storage->close(); - } + abstract function close(): void; } diff --git a/php/src/db/CapacitorChannel.php b/php/src/db/CapacitorChannel.php index b438499..765bc94 100644 --- a/php/src/db/CapacitorChannel.php +++ b/php/src/db/CapacitorChannel.php @@ -2,11 +2,13 @@ namespace nulib\db; use nulib\cl; +use nulib\php\func; use nulib\str; +use nulib\ValueException; use Traversable; /** - * Class CapacitorChannel: un canal d'une instance de {@link ICapacitor} + * Class CapacitorChannel: un canal de données */ class CapacitorChannel implements ITransactor { const NAME = null; @@ -50,14 +52,23 @@ class CapacitorChannel implements ITransactor { return $eachCommitThreshold; } - function __construct(?string $name=null, ?int $eachCommitThreshold=null, ?bool $manageTransactions=null) { - $name ??= static::NAME; - $tableName ??= static::TABLE_NAME; + function __construct(?Capacitor $capacitor=null, ?array $params=null) { + $capacitor ??= $params["storage"] ?? null; + $this->capacitor = $capacitor; + + $name = $params["name"] ?? static::NAME; + $tableName = $params["tableName"] ?? static::TABLE_NAME; self::verifix_name($name, $tableName); $this->name = $name; $this->tableName = $tableName; - $this->manageTransactions = $manageTransactions ?? static::MANAGE_TRANSACTIONS; - $this->eachCommitThreshold = self::verifix_eachCommitThreshold($eachCommitThreshold); + + $manageTransactions = $params["manageTransactions"] ?? static::MANAGE_TRANSACTIONS; + $this->manageTransactions = $manageTransactions; + + $eachCommitThreshold = $params["eachCommitThreshold"] ?? null; + $eachCommitThreshold = self::verifix_eachCommitThreshold($eachCommitThreshold); + $this->eachCommitThreshold = $eachCommitThreshold; + $this->setup = false; $this->created = false; $columnDefinitions = $this->COLUMN_DEFINITIONS(); @@ -400,24 +411,20 @@ class CapacitorChannel implements ITransactor { ############################################################################# # Méthodes déléguées pour des workflows centrés sur le channel - /** - * @var Capacitor|null instance de Capacitor par laquelle cette instance est - * utilisée - */ protected ?Capacitor $capacitor; function getCapacitor(): ?Capacitor { return $this->capacitor; } - function setCapacitor(Capacitor $capacitor): self { + function setCapacitor(Capacitor $capacitor, bool $ensureExists=true): self { $this->capacitor = $capacitor; + if ($ensureExists) $this->ensureExists(); return $this; } - function initStorage(CapacitorStorage $storage): self { - new Capacitor($storage, $this); - return $this; + function db(): IDatabase { + return $this->capacitor->db(); } function ensureLive(): self { @@ -425,40 +432,100 @@ class CapacitorChannel implements ITransactor { return $this; } - function willUpdate(...$transactors): ITransactor { - return $this->capacitor->willUpdate(...$transactors); + function getCreateSql(): string { + return $this->capacitor->getMigration($this)->getSql(get_class($this), $this->db()); + } + + /** @var CapacitorChannel[] */ + protected ?array $subChannels = null; + + protected ?array $subManageTransactions = null; + + function willUpdate(...$channels): self { + if ($this->subChannels === null) { + # désactiver la gestion des transaction sur le channel local aussi + $this->subChannels[] = $this; + } + if ($channels) { + foreach ($channels as $channel) { + if ($channel instanceof CapacitorChannel) { + $this->subChannels[] = $channel; + } else { + throw ValueException::invalid_type($channel, CapacitorChannel::class); + } + } + } + return $this; } function inTransaction(): bool { - return $this->capacitor->inTransaction(); + return $this->db()->inTransaction(); } function beginTransaction(?callable $func=null, bool $commit=true): void { - $this->capacitor->beginTransaction($func, $commit); + $db = $this->db(); + if ($this->subChannels !== null) { + # on gère des subchannels: ne débuter la transaction que si ce n'est déjà fait + if ($this->subManageTransactions === null) { + foreach ($this->subChannels as $channel) { + $name = $channel->getName(); + $this->subManageTransactions ??= []; + if (!array_key_exists($name, $this->subManageTransactions)) { + $this->subManageTransactions[$name] = $channel->isManageTransactions(); + } + $channel->setManageTransactions(false); + } + if (!$db->inTransaction()) $db->beginTransaction(); + } + } elseif (!$db->inTransaction()) { + $db->beginTransaction(); + } + if ($func !== null) { + $commited = false; + try { + func::call($func, $this); + if ($commit) { + $this->commit(); + $commited = true; + } + } finally { + if ($commit && !$commited) $this->rollback(); + } + } + } + + protected function beforeEndTransaction(): void { + if ($this->subManageTransactions !== null) { + foreach ($this->subChannels as $channel) { + $name = $channel->getName(); + $channel->setManageTransactions($this->subManageTransactions[$name]); + } + $this->subManageTransactions = null; + } } function commit(): void { - $this->capacitor->commit(); + $this->beforeEndTransaction(); + $db = $this->db(); + if ($db->inTransaction()) $db->commit(); } function rollback(): void { - $this->capacitor->rollback(); - } - - function db(): IDatabase { - return $this->capacitor->getStorage()->db(); + $this->beforeEndTransaction(); + $db = $this->db(); + if ($db->inTransaction()) $db->rollback(); } function exists(): bool { - return $this->capacitor->exists(); + return $this->capacitor->_exists($this); } function ensureExists(): void { - $this->capacitor->ensureExists(); + $this->capacitor->_ensureExists($this); } function reset(bool $recreate=false): void { - $this->capacitor->reset($recreate); + $this->capacitor->_reset($this, $recreate); } function charge($item, $func=null, ?array $args=null, ?array &$row=null): int { @@ -466,7 +533,16 @@ class CapacitorChannel implements ITransactor { } function chargeAll(?iterable $items, $func=null, ?array $args=null): int { - return $this->capacitor->chargeAll($items, $func, $args); + $count = 0; + if ($items !== null) { + if ($func !== null) { + $func = func::with($func, $args)->bind($this); + } + foreach ($items as $item) { + $count += $this->charge($item, $func); + } + } + return $count; } function discharge(bool $reset=true): Traversable { @@ -520,16 +596,26 @@ class CapacitorChannel implements ITransactor { } function dbAll(array $query, ?array $params=null): iterable { - return $this->capacitor->dbAll($query, $params); + $primaryKeys = $this->getPrimaryKeys(); + return $this->capacitor->db()->all(cl::merge([ + "select", + "from" => $this->getTableName(), + ], $query), $params, $primaryKeys); } function dbOne(array $query, ?array $params=null): ?array { - return $this->capacitor->dbOne($query, $params); + return $this->capacitor->db()->one(cl::merge([ + "select", + "from" => $this->getTableName(), + ], $query), $params); } /** @return int|false */ function dbUpdate(array $query, ?array $params=null) { - return $this->capacitor->dbUpdate($query, $params); + return $this->capacitor->db()->exec(cl::merge([ + "update", + "table" => $this->getTableName(), + ], $query), $params); } function close(): void { diff --git a/php/src/db/CapacitorStorage.php b/php/src/db/CapacitorStorage.php deleted file mode 100644 index dbc7f87..0000000 --- a/php/src/db/CapacitorStorage.php +++ /dev/null @@ -1,770 +0,0 @@ -db()->ensure(); - return $this; - } - - /** @var CapacitorChannel[] */ - protected $channels; - - function addChannel(CapacitorChannel $channel): CapacitorChannel { - $this->_create($channel); - $this->channels[$channel->getName()] = $channel; - return $channel; - } - - protected function getChannel(?string $name): CapacitorChannel { - CapacitorChannel::verifix_name($name); - $channel = $this->channels[$name] ?? null; - if ($channel === null) { - $channel = $this->addChannel(new CapacitorChannel($name)); - } - return $channel; - } - - const PRIMARY_KEY_DEFINITION = [ - "id_" => "genserial", - ]; - - # les définitions sont par défaut pour MariaDB/MySQL - const SERDATA_DEFINITION = "mediumtext"; - const SERSUM_DEFINITION = "varchar(40)"; - const SERTS_DEFINITION = "datetime"; - const GENSERIAL_DEFINITION = "integer primary key auto_increment"; - const GENLIC_DEFINITION = "varchar(80)"; - const GENLIB_DEFINITION = "varchar(255)"; - const GENTEXT_DEFINITION = "mediumtext"; - const GENBOOL_DEFINITION = "integer(1) default 0"; - const GENUUID_DEFINITION = "varchar(36)"; - - protected static function gencol($def): string { - if (!is_string($def)) $def = strval($def); - $def = trim($def); - $parts = preg_split('/\s+/', $def, 2); - if (count($parts) == 2) { - $def = $parts[0]; - $rest = " $parts[1]"; - } else { - $rest = null; - } - switch ($def) { - case "serdata": $def = static::SERDATA_DEFINITION; break; - case "sersum": $def = static::SERSUM_DEFINITION; break; - case "serts": $def = static::SERTS_DEFINITION; break; - case "genserial": $def = static::GENSERIAL_DEFINITION; break; - case "genlic": $def = static::GENLIC_DEFINITION; break; - case "genlib": $def = static::GENLIB_DEFINITION; break; - case "gentext": $def = static::GENTEXT_DEFINITION; break; - case "genbool": $def = static::GENBOOL_DEFINITION; break; - case "genuuid": $def = static::GENUUID_DEFINITION; break; - } - return "$def$rest"; - } - - const COLUMN_DEFINITIONS = [ - "item__" => "serdata", - "item__sum_" => "sersum", - "created_" => "serts", - "modified_" => "serts", - ]; - - protected function ColumnDefinitions(CapacitorChannel $channel, bool $ignoreMigrations=false): array { - $definitions = []; - if ($channel->getPrimaryKeys() === null) { - $definitions[] = static::PRIMARY_KEY_DEFINITION; - } - $definitions[] = $channel->getColumnDefinitions(); - $definitions[] = static::COLUMN_DEFINITIONS; - # forcer les définitions sans clé à la fin (sqlite requière par exemple que - # primary key (columns) soit à la fin) - $tmp = cl::merge(...$definitions); - $definitions = []; - $constraints = []; - $index = 0; - foreach ($tmp as $col => $def) { - if ($col === $index) { - $index++; - if (is_array($def)) { - if (!$ignoreMigrations) { - $mdefs = $def; - $mindex = 0; - foreach ($mdefs as $mcol => $mdef) { - if ($mcol === $mindex) { - $mindex++; - } else { - if ($mdef) { - $definitions[$mcol] = self::gencol($mdef); - } else { - unset($definitions[$mcol]); - } - } - } - } - } else { - $constraints[] = $def; - } - } else { - $definitions[$col] = self::gencol($def); - } - } - return cl::merge($definitions, $constraints); - } - - protected function getMigration(CapacitorChannel $channel): ?array { - return $channel->getMigration($this->db()->getPrefix()); - } - - /** sérialiser les valeurs qui doivent l'être dans $row */ - protected function serialize(CapacitorChannel $channel, ?array $row): ?array { - if ($row === null) return null; - $cols = $this->ColumnDefinitions($channel); - $index = 0; - $raw = []; - foreach (array_keys($cols) as $col) { - $key = $col; - if ($key === $index) { - $index++; - } elseif ($channel->isSerialCol($key)) { - [$serialCol, $sumCol] = $channel->getSumCols($key); - if (array_key_exists($key, $row)) { - $sum = $channel->getSum($key, $row[$key]); - $raw[$serialCol] = $sum[$serialCol]; - if (array_key_exists($sumCol, $cols)) { - $raw[$sumCol] = $sum[$sumCol]; - } - } - } elseif (array_key_exists($key, $row)) { - $raw[$col] = $row[$key]; - } - } - return $raw; - } - - /** désérialiser les valeurs qui doivent l'être dans $values */ - protected function unserialize(CapacitorChannel $channel, ?array $raw): ?array { - if ($raw === null) return null; - $cols = $this->ColumnDefinitions($channel); - $index = 0; - $row = []; - foreach (array_keys($cols) as $col) { - $key = $col; - if ($key === $index) { - $index++; - } elseif (!array_key_exists($col, $raw)) { - } elseif ($channel->isSerialCol($key)) { - $value = $raw[$col]; - if ($value !== null) $value = $channel->unserialize($value); - $row[$key] = $value; - } else { - $row[$key] = $raw[$col]; - } - } - return $row; - } - - function getPrimaryKeys(CapacitorChannel $channel): array { - $primaryKeys = $channel->getPrimaryKeys(); - if ($primaryKeys === null) $primaryKeys = ["id_"]; - return $primaryKeys; - } - - function getRowIds(CapacitorChannel $channel, ?array $row, ?array &$primaryKeys=null): ?array { - $primaryKeys = $this->getPrimaryKeys($channel); - $rowIds = cl::select($row, $primaryKeys); - if (cl::all_n($rowIds)) return null; - else return $rowIds; - } - - protected function _createSql(CapacitorChannel $channel): array { - return [ - "create table if not exists", - "table" => $channel->getTableName(), - "cols" => $this->ColumnDefinitions($channel, true), - ]; - } - - abstract protected function tableExists(string $tableName): bool; - - const METADATA_TABLE = "_metadata"; - const METADATA_COLS = [ - "name" => "varchar not null primary key", - "value" => "varchar", - ]; - - protected function _prepareMetadata(): void { - if (!$this->tableExists(static::METADATA_TABLE)) { - $db = $this->db(); - $db->exec([ - "drop table if exists", - "table" => self::CHANNELS_TABLE, - ]); - $db->exec([ - "drop table if exists", - "table" => _migration::MIGRATION_TABLE, - ]); - $db->exec([ - "create table", - "table" => static::METADATA_TABLE, - "cols" => static::METADATA_COLS, - ]); - $db->exec([ - "insert", - "into" => static::METADATA_TABLE, - "values" => [ - "name" => "version", - "value" => "1", - ], - ]); - } - } - - abstract function _getMigration(CapacitorChannel $channel): _migration; - - const CHANNELS_TABLE = "_channels"; - const CHANNELS_COLS = [ - "name" => "varchar not null primary key", - "table_name" => "varchar", - "class_name" => "varchar", - ]; - - function channelExists(string $name, ?array &$raw=null): bool { - $raw = $this->db()->one([ - "select", - "from" => static::CHANNELS_TABLE, - "where" => ["name" => $name], - ]); - return $raw !== null; - } - - function getChannels(): iterable { - return $this->db()->all([ - "select", - "from" => static::CHANNELS_TABLE, - ]); - } - - protected function _createChannelsSql(): array { - return [ - "create table if not exists", - "table" => static::CHANNELS_TABLE, - "cols" => static::CHANNELS_COLS, - ]; - } - - protected function _addToChannelsSql(CapacitorChannel $channel): array { - return [ - "insert", - "into" => static::CHANNELS_TABLE, - "values" => [ - "name" => $channel->getName(), - "table_name" => $channel->getTableName(), - "class_name" => get_class($channel), - ], - ]; - } - - protected function _afterCreate(CapacitorChannel $channel): void { - $db = $this->db(); - $db->exec($this->_createChannelsSql()); - $db->exec($this->_addToChannelsSql($channel)); - } - - protected function _create(CapacitorChannel $channel): void { - $channel->ensureSetup(); - if (!$channel->isCreated()) { - $this->_prepareMetadata(); - $this->_getMigration($channel)->migrate($this->db()); - $this->_afterCreate($channel); - $channel->setCreated(); - } - } - - /** tester si le canal spécifié existe */ - function _exists(CapacitorChannel $channel): bool { - return $this->tableExists($channel->getTableName()); - } - - function exists(?string $channel): bool { - return $this->_exists($this->getChannel($channel)); - } - - /** s'assurer que le canal spécifié existe */ - function _ensureExists(CapacitorChannel $channel): void { - $this->_create($channel); - } - - function ensureExists(?string $channel): void { - $this->_ensureExists($this->getChannel($channel)); - } - - protected function _beforeReset(CapacitorChannel $channel): void { - $db = $this->db; - $name = $channel->getName(); - $db->exec([ - "delete", - "from" => _migration::MIGRATION_TABLE, - "where" => [ - "channel" => $name, - ], - ]); - $db->exec([ - "delete", - "from" => static::CHANNELS_TABLE, - "where" => [ - "name" => $name, - ], - ]); - } - - /** supprimer le canal spécifié */ - function _reset(CapacitorChannel $channel, bool $recreate=false): void { - $this->_beforeReset($channel); - $this->db()->exec([ - "drop table if exists", - $channel->getTableName(), - ]); - $channel->setCreated(false); - if ($recreate) $this->_ensureExists($channel); - } - - function reset(?string $channel, bool $recreate=false): void { - $this->_reset($this->getChannel($channel), $recreate); - } - - /** - * charger une valeur dans le canal - * - * Après avoir calculé les valeurs des clés supplémentaires - * avec {@link CapacitorChannel::getItemValues()}, l'une des deux fonctions - * {@link CapacitorChannel::onCreate()} ou {@link CapacitorChannel::onUpdate()} - * est appelée en fonction du type d'opération: création ou mise à jour - * - * Ensuite, si $func !== null, la fonction est appelée avec la signature de - * {@link CapacitorChannel::onCreate()} ou {@link CapacitorChannel::onUpdate()} - * en fonction du type d'opération: création ou mise à jour - * - * Dans les deux cas, si la fonction retourne un tableau, il est utilisé pour - * modifier les valeurs insérées/mises à jour. De plus, $row obtient la - * valeur finale des données insérées/mises à jour - * - * Si $args est renseigné, il est ajouté aux arguments utilisés pour appeler - * les méthodes {@link CapacitorChannel::getItemValues()}, - * {@link CapacitorChannel::onCreate()} et/ou - * {@link CapacitorChannel::onUpdate()} - * - * @return int 1 si l'objet a été chargé ou mis à jour, 0 s'il existait - * déjà à l'identique dans le canal - */ - function _charge(CapacitorChannel $channel, $item, $func, ?array $args, ?array &$row=null): int { - $this->_create($channel); - $tableName = $channel->getTableName(); - $db = $this->db(); - $args ??= []; - - $row = func::call([$channel, "getItemValues"], $item, ...$args); - if ($row === [false]) return 0; - - if ($row !== null && array_key_exists("item", $row)) { - $item = A::pop($row, "item"); - } - - $raw = cl::merge( - $channel->getSum("item", $item), - $this->serialize($channel, $row)); - $praw = null; - $rowIds = $this->getRowIds($channel, $raw, $primaryKeys); - if ($rowIds !== null) { - # modification - $praw = $db->one([ - "select", - "from" => $tableName, - "where" => $rowIds, - ]); - } - - $now = date("Y-m-d H:i:s"); - $insert = null; - if ($praw === null) { - # création - $raw = cl::merge($raw, [ - "created_" => $now, - "modified_" => $now, - ]); - $insert = true; - $initFunc = func::with([$channel, "onCreate"], $args); - $row = $this->unserialize($channel, $raw); - $prow = null; - } else { - # modification - # intégrer autant que possible les valeurs de praw dans raw, de façon que - # l'utilisateur puisse voir clairement ce qui a été modifié - if ($channel->_wasSumModified("item", $raw, $praw)) { - $insert = false; - $raw = cl::merge($praw, $raw, [ - "modified_" => $now, - ]); - } else { - $raw = cl::merge($praw, $raw); - } - $initFunc = func::with([$channel, "onUpdate"], $args); - $row = $this->unserialize($channel, $raw); - $prow = $this->unserialize($channel, $praw); - } - - $updates = $initFunc->prependArgs([$item, $row, $prow])->invoke(); - if ($updates === [false]) return 0; - if (is_array($updates) && $updates) { - if ($insert === null) $insert = false; - if (!array_key_exists("modified_", $updates)) { - $updates["modified_"] = $now; - } - $row = cl::merge($row, $updates); - $raw = cl::merge($raw, $this->serialize($channel, $updates)); - } - - if ($func !== null) { - $updates = func::with($func, $args) - ->prependArgs([$item, $row, $prow]) - ->bind($channel) - ->invoke(); - if ($updates === [false]) return 0; - if (is_array($updates) && $updates) { - if ($insert === null) $insert = false; - if (!array_key_exists("modified_", $updates)) { - $updates["modified_"] = $now; - } - $row = cl::merge($row, $updates); - $raw = cl::merge($raw, $this->serialize($channel, $updates)); - } - } - - # aucune modification - if ($insert === null) return 0; - - # si on est déjà dans une transaction, désactiver la gestion des transactions - $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); - if ($manageTransactions) { - $commited = false; - $db->beginTransaction(); - } - $nbModified = 0; - try { - if ($insert) { - $id = $db->exec([ - "insert", - "into" => $tableName, - "values" => $raw, - ]); - if (count($primaryKeys) == 1 && $rowIds === null) { - # mettre à jour avec l'id généré - $row[$primaryKeys[0]] = $id; - } - $nbModified = 1; - } else { - # calculer ce qui a changé pour ne mettre à jour que le nécessaire - $updates = []; - foreach ($raw as $col => $value) { - if (array_key_exists($col, $rowIds)) { - # ne jamais mettre à jour la clé primaire - continue; - } - if (!cv::equals($value, $praw[$col] ?? null)) { - $updates[$col] = $value; - } - } - if (count($updates) == 1 && array_key_first($updates) == "modified_") { - # si l'unique modification porte sur la date de modification, alors - # la ligne n'est pas modifiée. ce cas se présente quand on altère la - # valeur de $item - $updates = null; - } - if ($updates) { - $db->exec([ - "update", - "table" => $tableName, - "values" => $updates, - "where" => $rowIds, - ]); - $nbModified = 1; - } - } - if ($manageTransactions) { - $db->commit(); - $commited = true; - } - return $nbModified; - } finally { - if ($manageTransactions && !$commited) $db->rollback(); - } - } - - function charge(?string $channel, $item, $func=null, ?array $args=null, ?array &$row=null): int { - return $this->_charge($this->getChannel($channel), $item, $func, $args, $row); - } - - /** - * décharger les données du canal spécifié. seul la valeur de $item est - * fournie - */ - function _discharge(CapacitorChannel $channel, bool $reset=true): Traversable { - $this->_create($channel); - $raws = $this->db()->all([ - "select item__", - "from" => $channel->getTableName(), - ]); - foreach ($raws as $raw) { - yield unserialize($raw['item__']); - } - if ($reset) $this->_reset($channel); - } - - function discharge(?string $channel, bool $reset=true): Traversable { - return $this->_discharge($this->getChannel($channel), $reset); - } - - protected function _convertValue2row(CapacitorChannel $channel, array $filter, array $cols): array { - $index = 0; - $fixed = []; - foreach ($filter as $key => $value) { - if ($key === $index) { - $index++; - if (is_array($value)) { - $value = $this->_convertValue2row($channel, $value, $cols); - } - $fixed[] = $value; - } else { - $col = "${key}__"; - if (array_key_exists($col, $cols)) { - # colonne sérialisée - $fixed[$col] = $channel->serialize($value); - } else { - $fixed[$key] = $value; - } - } - } - return $fixed; - } - - protected function verifixFilter(CapacitorChannel $channel, &$filter): void { - if ($filter !== null && !is_array($filter)) { - $primaryKeys = $this->getPrimaryKeys($channel); - $id = $filter; - $channel->verifixId($id); - $filter = [$primaryKeys[0] => $id]; - } - $cols = $this->ColumnDefinitions($channel); - if ($filter !== null) { - $filter = $this->_convertValue2row($channel, $filter, $cols); - } - } - - /** indiquer le nombre d'éléments du canal spécifié */ - function _count(CapacitorChannel $channel, $filter): int { - $this->_create($channel); - $this->verifixFilter($channel, $filter); - return $this->db()->get([ - "select count(*)", - "from" => $channel->getTableName(), - "where" => $filter, - ]); - } - - function count(?string $channel, $filter=null): int { - return $this->_count($this->getChannel($channel), $filter); - } - - /** - * obtenir la ligne correspondant au filtre sur le canal spécifié - * - * si $filter n'est pas un tableau, il est transformé en ["id_" => $filter] - */ - function _one(CapacitorChannel $channel, $filter, ?array $mergeQuery=null): ?array { - if ($filter === null) throw ValueException::null("filter"); - $this->_create($channel); - $this->verifixFilter($channel, $filter); - $raw = $this->db()->one(cl::merge([ - "select", - "from" => $channel->getTableName(), - "where" => $filter, - ], $mergeQuery)); - return $this->unserialize($channel, $raw); - } - - function one(?string $channel, $filter, ?array $mergeQuery=null): ?array { - return $this->_one($this->getChannel($channel), $filter, $mergeQuery); - } - - /** - * obtenir les lignes correspondant au filtre sur le canal spécifié - * - * si $filter n'est pas un tableau, il est transformé en ["id_" => $filter] - */ - function _all(CapacitorChannel $channel, $filter, ?array $mergeQuery=null): Traversable { - $this->_create($channel); - $this->verifixFilter($channel, $filter); - $raws = $this->db()->all(cl::merge([ - "select", - "from" => $channel->getTableName(), - "where" => $filter, - ], $mergeQuery), null, $this->getPrimaryKeys($channel)); - foreach ($raws as $key => $raw) { - yield $key => $this->unserialize($channel, $raw); - } - } - - function all(?string $channel, $filter, $mergeQuery=null): Traversable { - return $this->_all($this->getChannel($channel), $filter, $mergeQuery); - } - - /** - * appeler une fonction pour chaque élément du canal spécifié. - * - * $filter permet de filtrer parmi les élements chargés - * - * $func est appelé avec la signature de {@link CapacitorChannel::onEach()} - * si la fonction retourne un tableau, il est utilisé pour mettre à jour la - * ligne - * - * @param int $nbUpdated reçoit le nombre de lignes mises à jour - * @return int le nombre de lignes parcourues - */ - function _each(CapacitorChannel $channel, $filter, $func, ?array $args, ?array $mergeQuery=null, ?int &$nbUpdated=null): int { - $this->_create($channel); - if ($func === null) $func = CapacitorChannel::onEach; - $onEach = func::with($func)->bind($channel); - $db = $this->db(); - # si on est déjà dans une transaction, désactiver la gestion des transactions - $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); - if ($manageTransactions) { - $commited = false; - $db->beginTransaction(); - $commitThreshold = $channel->getEachCommitThreshold(); - } - $count = 0; - $nbUpdated = 0; - $tableName = $channel->getTableName(); - try { - $args ??= []; - $rows = $this->_all($channel, $filter, $mergeQuery); - foreach ($rows as $row) { - $rowIds = $this->getRowIds($channel, $row); - $updates = $onEach->invoke([$row, ...$args]); - if ($updates === [false]) { - break; - } elseif ($updates !== null) { - if (!array_key_exists("modified_", $updates)) { - $updates["modified_"] = date("Y-m-d H:i:s"); - } - $nbUpdated += $db->exec([ - "update", - "table" => $tableName, - "values" => $this->serialize($channel, $updates), - "where" => $rowIds, - ]); - if ($manageTransactions && $commitThreshold !== null) { - $commitThreshold--; - if ($commitThreshold <= 0) { - $db->commit(); - $db->beginTransaction(); - $commitThreshold = $channel->getEachCommitThreshold(); - } - } - } - $count++; - } - if ($manageTransactions) { - $db->commit(); - $commited = true; - } - return $count; - } finally { - if ($manageTransactions && !$commited) $db->rollback(); - } - } - - function each(?string $channel, $filter, $func=null, ?array $args=null, ?array $mergeQuery=null, ?int &$nbUpdated=null): int { - return $this->_each($this->getChannel($channel), $filter, $func, $args, $mergeQuery, $nbUpdated); - } - - /** - * supprimer tous les éléments correspondant au filtre et pour lesquels la - * fonction retourne une valeur vraie si elle est spécifiée - * - * $filter permet de filtrer parmi les élements chargés - * - * $func est appelé avec la signature de {@link CapacitorChannel::onDelete()} - * si la fonction retourne un tableau, il est utilisé pour mettre à jour la - * ligne - * - * @return int le nombre de lignes parcourues - */ - function _delete(CapacitorChannel $channel, $filter, $func, ?array $args): int { - $this->_create($channel); - if ($func === null) $func = CapacitorChannel::onDelete; - $onDelete = func::with($func)->bind($channel); - $db = $this->db(); - # si on est déjà dans une transaction, désactiver la gestion des transactions - $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); - if ($manageTransactions) { - $commited = false; - $db->beginTransaction(); - $commitThreshold = $channel->getEachCommitThreshold(); - } - $count = 0; - $tableName = $channel->getTableName(); - try { - $args ??= []; - $rows = $this->_all($channel, $filter); - foreach ($rows as $row) { - $rowIds = $this->getRowIds($channel, $row); - $shouldDelete = boolval($onDelete->invoke([$row, ...$args])); - if ($shouldDelete) { - $db->exec([ - "delete", - "from" => $tableName, - "where" => $rowIds, - ]); - if ($manageTransactions && $commitThreshold !== null) { - $commitThreshold--; - if ($commitThreshold <= 0) { - $db->commit(); - $db->beginTransaction(); - $commitThreshold = $channel->getEachCommitThreshold(); - } - } - } - $count++; - } - if ($manageTransactions) { - $db->commit(); - $commited = true; - } - return $count; - } finally { - if ($manageTransactions && !$commited) $db->rollback(); - } - } - - function delete(?string $channel, $filter, $func=null, ?array $args=null): int { - return $this->_delete($this->getChannel($channel), $filter, $func, $args); - } - - abstract function close(): void; -} diff --git a/php/src/db/TODO.md b/php/src/db/TODO.md index 1f03e6d..e68b323 100644 --- a/php/src/db/TODO.md +++ b/php/src/db/TODO.md @@ -1,5 +1,11 @@ +# db + # db/Capacitor +La source peut être un iterable + +--- + charge() permet de spécifier la clé associée avec la valeur chargée, et discharge() retourne les valeurs avec la clé primaire diff --git a/php/src/db/conds.php b/php/src/db/conds.php index f0c3dfd..1624e0b 100644 --- a/php/src/db/conds.php +++ b/php/src/db/conds.php @@ -25,14 +25,11 @@ class conds { /** * retourner une condition "like" si la valeur s'y prête - * * - si la valeur fait moins de $likeThreshold caractères, faire une recherche * exacte en retournant ["=", $value] - * - * * - les espaces sont remplacés par % - * - * si $partial + * - si $partial et que $value ne contient pas d'espaces, rajouter un % à la + * fin */ static function like($value, bool $partial=false, ?int $likeThreshold=null) { if ($value === false || $value === null) return $value; diff --git a/php/src/db/mysql/MysqlStorage.php b/php/src/db/mysql/MysqlCapacitor.php similarity index 75% rename from php/src/db/mysql/MysqlStorage.php rename to php/src/db/mysql/MysqlCapacitor.php index be1382c..40509b3 100644 --- a/php/src/db/mysql/MysqlStorage.php +++ b/php/src/db/mysql/MysqlCapacitor.php @@ -3,12 +3,12 @@ namespace nulib\db\mysql; use nulib\cl; use nulib\db\CapacitorChannel; -use nulib\db\CapacitorStorage; +use nulib\db\Capacitor; /** * Class MysqlStorage */ -class MysqlStorage extends CapacitorStorage { +class MysqlCapacitor extends Capacitor { function __construct($mysql) { $this->db = Mysql::with($mysql); } @@ -36,21 +36,21 @@ class MysqlStorage extends CapacitorStorage { "value" => "varchar(255)", ]; - function _getMigration(CapacitorChannel $channel): _mysqlMigration { + function getMigration(CapacitorChannel $channel): _mysqlMigration { $migrations = cl::merge([ - "0init" => [$this->_createSql($channel)], + "0init" => [$this->getCreateChannelSql($channel)], ], $channel->getMigration($this->db->getPrefix())); return new _mysqlMigration($migrations, $channel->getName()); } - const CHANNELS_COLS = [ + const CATALOG_COLS = [ "name" => "varchar(255) not null primary key", "table_name" => "varchar(64)", "class_name" => "varchar(255)", ]; - protected function _addToChannelsSql(CapacitorChannel $channel): array { - return cl::merge(parent::_addToChannelsSql($channel), [ + protected function addToCatalogSql(CapacitorChannel $channel): array { + return cl::merge(parent::addToCatalogSql($channel), [ "suffix" => "on duplicate key update name = name", ]); } diff --git a/php/src/db/pgsql/PgsqlStorage.php b/php/src/db/pgsql/PgsqlCapacitor.php similarity index 60% rename from php/src/db/pgsql/PgsqlStorage.php rename to php/src/db/pgsql/PgsqlCapacitor.php index 6861154..46440af 100644 --- a/php/src/db/pgsql/PgsqlStorage.php +++ b/php/src/db/pgsql/PgsqlCapacitor.php @@ -3,16 +3,16 @@ namespace nulib\db\pgsql; use nulib\cl; use nulib\db\CapacitorChannel; -use nulib\db\CapacitorStorage; +use nulib\db\Capacitor; -class PgsqlStorage extends CapacitorStorage { - const SERDATA_DEFINITION = "text"; - const SERSUM_DEFINITION = "varchar(40)"; - const SERTS_DEFINITION = "timestamp"; - const GENSERIAL_DEFINITION = "serial primary key"; - const GENTEXT_DEFINITION = "text"; - const GENBOOL_DEFINITION = "boolean default false"; - const GENUUID_DEFINITION = "uuid"; +class PgsqlCapacitor extends Capacitor { + const CDATA_DEFINITION = "text"; + const CSUM_DEFINITION = "varchar(40)"; + const CTIMESTAMP_DEFINITION = "timestamp"; + const GSERIAL_DEFINITION = "serial primary key"; + const GTEXT_DEFINITION = "text"; + const GBOOL_DEFINITION = "boolean default false"; + const GUUID_DEFINITION = "uuid"; function __construct($pgsql) { $this->db = Pgsql::with($pgsql); @@ -41,15 +41,15 @@ class PgsqlStorage extends CapacitorStorage { return $found !== null; } - function _getMigration(CapacitorChannel $channel): _pgsqlMigration { + function getMigration(CapacitorChannel $channel): _pgsqlMigration { $migrations = cl::merge([ - "0init" => [$this->_createSql($channel)], + "0init" => [$this->getCreateChannelSql($channel)], ], $channel->getMigration($this->db->getPrefix())); return new _pgsqlMigration($migrations, $channel->getName()); } - protected function _addToChannelsSql(CapacitorChannel $channel): array { - return cl::merge(parent::_addToChannelsSql($channel), [ + protected function addToCatalogSql(CapacitorChannel $channel): array { + return cl::merge(parent::addToCatalogSql($channel), [ "suffix" => "on conflict (name) do nothing", ]); } diff --git a/php/src/db/sqlite/SqliteStorage.php b/php/src/db/sqlite/SqliteCapacitor.php similarity index 63% rename from php/src/db/sqlite/SqliteStorage.php rename to php/src/db/sqlite/SqliteCapacitor.php index f67b678..6d4de53 100644 --- a/php/src/db/sqlite/SqliteStorage.php +++ b/php/src/db/sqlite/SqliteCapacitor.php @@ -3,13 +3,13 @@ namespace nulib\db\sqlite; use nulib\cl; use nulib\db\CapacitorChannel; -use nulib\db\CapacitorStorage; +use nulib\db\Capacitor; /** * Class SqliteStorage */ -class SqliteStorage extends CapacitorStorage { - const GENSERIAL_DEFINITION = "integer primary key autoincrement"; +class SqliteCapacitor extends Capacitor { + const GSERIAL_DEFINITION = "integer primary key autoincrement"; function __construct($sqlite) { $this->db = Sqlite::with($sqlite); @@ -31,30 +31,30 @@ class SqliteStorage extends CapacitorStorage { return $found !== null; } - function _getMigration(CapacitorChannel $channel): _sqliteMigration { + function getMigration(CapacitorChannel $channel): _sqliteMigration { $migrations = cl::merge([ - "0init" => [$this->_createSql($channel)], + "0init" => [$this->getCreateChannelSql($channel)], ], $channel->getMigration($this->db->getPrefix())); return new _sqliteMigration($migrations, $channel->getName()); } - protected function _addToChannelsSql(CapacitorChannel $channel): array { - $sql = parent::_addToChannelsSql($channel); + protected function addToCatalogSql(CapacitorChannel $channel): array { + $sql = parent::addToCatalogSql($channel); $sql[0] = "insert or ignore"; return $sql; } - protected function _afterCreate(CapacitorChannel $channel): void { + protected function afterCreate(CapacitorChannel $channel): void { $db = $this->db; - if (!$this->tableExists(static::CHANNELS_TABLE)) { + if (!$this->tableExists(static::CATALOG_TABLE)) { # ne pas créer si la table existe déjà, pour éviter d'avoir besoin d'un # verrou en écriture - $db->exec($this->_createChannelsSql()); + $db->exec($this->getCreateCatalogSql()); } - if (!$this->channelExists($channel->getName())) { + if (!$this->isInCatalog(["name" => $channel->getName()])) { # ne pas insérer si la ligne existe déjà, pour éviter d'avoir besoin d'un # verrou en écriture - $db->exec($this->_addToChannelsSql($channel)); + $db->exec($this->addToCatalogSql($channel)); } } diff --git a/php/tbin/test_mysql.php b/php/tbin/test_mysql.php index e2eb555..fe29b79 100644 --- a/php/tbin/test_mysql.php +++ b/php/tbin/test_mysql.php @@ -5,7 +5,7 @@ use nulib\cl; use nulib\db\Capacitor; use nulib\db\CapacitorChannel; use nulib\db\mysql\Mysql; -use nulib\db\mysql\MysqlStorage; +use nulib\db\mysql\MysqlCapacitor; use nulib\output\msg; use nulib\output\std\StdMessenger; @@ -35,7 +35,7 @@ class MyChannel extends CapacitorChannel { } } -new Capacitor(new MysqlStorage($db), $channel = new MyChannel()); +new Capacitor(new MysqlCapacitor($db), $channel = new MyChannel()); $channel->charge("hello world"); $channel->charge(["bonjour monde"]); diff --git a/php/tbin/test_pgsql.php b/php/tbin/test_pgsql.php index 1b10f35..a24f832 100644 --- a/php/tbin/test_pgsql.php +++ b/php/tbin/test_pgsql.php @@ -5,7 +5,7 @@ use nulib\cl; use nulib\db\Capacitor; use nulib\db\CapacitorChannel; use nulib\db\pgsql\Pgsql; -use nulib\db\pgsql\PgsqlStorage; +use nulib\db\pgsql\PgsqlCapacitor; $db = new Pgsql([ "host" => "pegase-dre.self", @@ -34,7 +34,7 @@ class MyChannel extends CapacitorChannel { } } -new Capacitor(new PgsqlStorage($db), $channel = new MyChannel()); +new Capacitor(new PgsqlCapacitor($db), $channel = new MyChannel()); $channel->charge("hello world"); $channel->charge(["bonjour monde"]); diff --git a/php/tbin/test_sqlite.php b/php/tbin/test_sqlite.php index e88e3be..32b5815 100644 --- a/php/tbin/test_sqlite.php +++ b/php/tbin/test_sqlite.php @@ -5,7 +5,7 @@ use nulib\cl; use nulib\db\Capacitor; use nulib\db\CapacitorChannel; use nulib\db\sqlite\Sqlite; -use nulib\db\sqlite\SqliteStorage; +use nulib\db\sqlite\SqliteCapacitor; $db = new Sqlite(__DIR__.'/test_sqlite.db'); @@ -27,7 +27,7 @@ class MyChannel extends CapacitorChannel { } } -new Capacitor(new SqliteStorage($db), $channel = new MyChannel()); +new Capacitor(new SqliteCapacitor($db), $channel = new MyChannel()); $channel->charge("hello world"); $channel->charge(["bonjour monde"]); diff --git a/php/tests/db/sqlite/ChannelMigrationTest.php b/php/tests/db/sqlite/ChannelMigrationTest.php index fa48e7c..e399d15 100644 --- a/php/tests/db/sqlite/ChannelMigrationTest.php +++ b/php/tests/db/sqlite/ChannelMigrationTest.php @@ -30,7 +30,7 @@ class ChannelMigrationTest extends TestCase { } function testMigration() { - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $data = [ ["first", "premier", new DateTime(), new DateTime(), 15], ["second", "deuxieme", new DateTime(), new DateTime(), 15], @@ -46,7 +46,7 @@ class ChannelMigrationTest extends TestCase { new Capacitor($storage, $channel = new MyChannelV3()); $this->addData($channel, $data); - $sql = $channel->getCapacitor()->getCreateSql(); + $sql = $channel->getCapacitor()->getCreateChannelSql(); $class = MyChannelV3::class; $expected = <<reset(true); $channel->chargeAll($data); - $sql = $channel->getCapacitor()->getCreateSql(); + $sql = $channel->getCapacitor()->getCreateChannelSql(); $class = MyIndexChannel::class; $expected = <<reset($channel); $storage->charge($channel, "first"); $storage->charge($channel, "second"); @@ -22,7 +22,7 @@ class SqliteStorageTest extends TestCase { self::assertSame(["first", "second", "third"], $items); } - function _testChargeArrays(SqliteStorage $storage, ?string $channel) { + function _testChargeArrays(SqliteCapacitor $storage, ?string $channel) { $storage->reset($channel); $storage->charge($channel, ["id" => 10, "name" => "first"]); $storage->charge($channel, ["name" => "second", "id" => 20]); @@ -30,13 +30,13 @@ class SqliteStorageTest extends TestCase { } function testChargeStrings() { - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $this->_testChargeStrings($storage, null); $storage->close(); } function testChargeArrays() { - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $storage->addChannel(new class extends CapacitorChannel { const NAME = "arrays"; const COLUMN_DEFINITIONS = ["id" => "integer"]; @@ -52,7 +52,7 @@ class SqliteStorageTest extends TestCase { } function testEach() { - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $capacitor = new Capacitor($storage, new class extends CapacitorChannel { const NAME = "each"; const COLUMN_DEFINITIONS = [ @@ -91,7 +91,7 @@ class SqliteStorageTest extends TestCase { } function testPrimayKey() { - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $capacitor = new Capacitor($storage, new class extends CapacitorChannel { const NAME = "pk"; const COLUMN_DEFINITIONS = [ @@ -119,7 +119,7 @@ class SqliteStorageTest extends TestCase { } function testSum() { - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $capacitor = new Capacitor($storage, new class extends CapacitorChannel { const NAME = "sum"; const COLUMN_DEFINITIONS = [ @@ -158,7 +158,7 @@ class SqliteStorageTest extends TestCase { function testEachValues() { # tester que values contient bien toutes les valeurs de la ligne - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $capacitor = new Capacitor($storage, new class extends CapacitorChannel { const NAME = "each_values"; const COLUMN_DEFINITIONS = [ @@ -276,7 +276,7 @@ class SqliteStorageTest extends TestCase { function testSetItemNull() { # tester le forçage de $îtem à null pour économiser la place - $storage = new SqliteStorage(__DIR__.'/capacitor.db'); + $storage = new SqliteCapacitor(__DIR__.'/capacitor.db'); $capacitor = new Capacitor($storage, new class extends CapacitorChannel { const NAME = "set_item_null"; const COLUMN_DEFINITIONS = [