_create($channel); $this->channels[$channel->getName()] = $channel; return $channel; } protected function getChannel(?string $name): CapacitorChannel { CapacitorChannel::verifix_name($name); $channel = $this->channels[$name] ?? null; if ($channel === null) { $channel = $this->addChannel(new CapacitorChannel($name)); } return $channel; } /** DOIT être défini dans les classes dérivées */ const PRIMARY_KEY_DEFINITION = null; const SERDATA_DEFINITION = "mediumtext"; const SERSUM_DEFINITION = "varchar(40)"; const SERTS_DEFINITION = "datetime"; protected static function sercol($def): string { if (!is_string($def)) $def = strval($def); switch ($def) { case "serdata": $def = static::SERDATA_DEFINITION; break; case "sersum": $def = static::SERSUM_DEFINITION; break; case "serts": $def = static::SERTS_DEFINITION; break; } return $def; } const COLUMN_DEFINITIONS = [ "item__" => "serdata", "item__sum_" => "sersum", "created_" => "serts", "modified_" => "serts", ]; protected function ColumnDefinitions(CapacitorChannel $channel, bool $ignoreMigrations=false): array { $definitions = []; if ($channel->getPrimaryKeys() === null) { $definitions[] = static::PRIMARY_KEY_DEFINITION; } $definitions[] = $channel->getColumnDefinitions(); $definitions[] = static::COLUMN_DEFINITIONS; # forcer les définitions sans clé à la fin (sqlite requière par exemple que # primary key (columns) soit à la fin) $tmp = cl::merge(...$definitions); $definitions = []; $constraints = []; $index = 0; foreach ($tmp as $col => $def) { if ($col === $index) { $index++; if (is_array($def)) { if (!$ignoreMigrations) { $mdefs = $def; $mindex = 0; foreach ($mdefs as $mcol => $mdef) { if ($mcol === $mindex) { $mindex++; } else { if ($mdef) { $definitions[$mcol] = self::sercol($mdef); } else { unset($definitions[$mcol]); } } } } } else { $constraints[] = $def; } } else { $definitions[$col] = self::sercol($def); } } return cl::merge($definitions, $constraints); } protected function getMigration(CapacitorChannel $channel): ?array { return $channel->getMigration(); } /** sérialiser les valeurs qui doivent l'être dans $values */ protected function serialize(CapacitorChannel $channel, ?array $values): ?array { if ($values === null) return null; $cols = $this->ColumnDefinitions($channel); $index = 0; $row = []; foreach (array_keys($cols) as $col) { $key = $col; if ($key === $index) { $index++; } elseif ($channel->isSerialCol($key)) { [$serialCol, $sumCol] = $channel->getSumCols($key); if (array_key_exists($key, $values)) { $sum = $channel->getSum($key, $values[$key]); $row[$serialCol] = $sum[$serialCol]; if (array_key_exists($sumCol, $cols)) { $row[$sumCol] = $sum[$sumCol]; } } } elseif (array_key_exists($key, $values)) { $row[$col] = $values[$key]; } } return $row; } /** désérialiser les valeurs qui doivent l'être dans $values */ protected function unserialize(CapacitorChannel $channel, ?array $row): ?array { if ($row === null) return null; $cols = $this->ColumnDefinitions($channel); $index = 0; $values = []; foreach (array_keys($cols) as $col) { $key = $col; if ($key === $index) { $index++; } elseif (!array_key_exists($col, $row)) { } elseif ($channel->isSerialCol($key)) { $value = $row[$col]; if ($value !== null) $value = $channel->unserialize($value); $values[$key] = $value; } else { $values[$key] = $row[$col]; } } return $values; } function getPrimaryKeys(CapacitorChannel $channel): array { $primaryKeys = $channel->getPrimaryKeys(); if ($primaryKeys === null) $primaryKeys = ["id_"]; return $primaryKeys; } function getRowIds(CapacitorChannel $channel, ?array $row, ?array &$primaryKeys=null): ?array { $primaryKeys = $this->getPrimaryKeys($channel); $rowIds = cl::select($row, $primaryKeys); if (cl::all_n($rowIds)) return null; else return $rowIds; } protected function _createSql(CapacitorChannel $channel): array { return [ "create table if not exists", "table" => $channel->getTableName(), "cols" => $this->ColumnDefinitions($channel, true), ]; } abstract protected function tableExists(string $tableName): bool; const METADATA_TABLE = "_metadata"; const METADATA_COLS = [ "name" => "varchar not null primary key", "value" => "varchar", ]; protected function _prepareMetadata(): void { if (!$this->tableExists(static::METADATA_TABLE)) { $db = $this->db(); $db->exec([ "drop table if exists", "table" => self::CHANNELS_TABLE, ]); $db->exec([ "drop table if exists", "table" => _migration::MIGRATION_TABLE, ]); $db->exec([ "create table", "table" => static::METADATA_TABLE, "cols" => static::METADATA_COLS, ]); $db->exec([ "insert", "into" => static::METADATA_TABLE, "values" => [ "name" => "version", "value" => "1", ], ]); } } abstract function _getMigration(CapacitorChannel $channel): _migration; const CHANNELS_TABLE = "_channels"; const CHANNELS_COLS = [ "name" => "varchar not null primary key", "table_name" => "varchar", "class_name" => "varchar", ]; protected function _createChannelsSql(): array { return [ "create table if not exists", "table" => static::CHANNELS_TABLE, "cols" => static::CHANNELS_COLS, ]; } protected function _addToChannelsSql(CapacitorChannel $channel): array { return [ "insert", "into" => static::CHANNELS_TABLE, "values" => [ "name" => $channel->getName(), "table_name" => $channel->getTableName(), "class_name" => get_class($channel), ], ]; } protected function _afterCreate(CapacitorChannel $channel): void { $db = $this->db(); $db->exec($this->_createChannelsSql()); $db->exec($this->_addToChannelsSql($channel)); } protected function _create(CapacitorChannel $channel): void { $channel->ensureSetup(); if (!$channel->isCreated()) { $this->_prepareMetadata(); $this->_getMigration($channel)->migrate($this->db()); $this->_afterCreate($channel); $channel->setCreated(); } } /** tester si le canal spécifié existe */ function _exists(CapacitorChannel $channel): bool { return $this->tableExists($channel->getTableName()); } function exists(?string $channel): bool { return $this->_exists($this->getChannel($channel)); } /** s'assurer que le canal spécifié existe */ function _ensureExists(CapacitorChannel $channel): void { $this->_create($channel); } function ensureExists(?string $channel): void { $this->_ensureExists($this->getChannel($channel)); } protected function _beforeReset(CapacitorChannel $channel): void { $db = $this->db; $name = $channel->getName(); $db->exec([ "delete", "from" => _migration::MIGRATION_TABLE, "where" => [ "channel" => $name, ], ]); $db->exec([ "delete", "from" => static::CHANNELS_TABLE, "where" => [ "name" => $name, ], ]); } /** supprimer le canal spécifié */ function _reset(CapacitorChannel $channel, bool $recreate=false): void { $this->_beforeReset($channel); $this->db()->exec([ "drop table if exists", $channel->getTableName(), ]); $channel->setCreated(false); if ($recreate) $this->_ensureExists($channel); } function reset(?string $channel, bool $recreate=false): void { $this->_reset($this->getChannel($channel), $recreate); } /** * charger une valeur dans le canal * * Après avoir calculé les valeurs des clés supplémentaires * avec {@link CapacitorChannel::getItemValues()}, l'une des deux fonctions * {@link CapacitorChannel::onCreate()} ou {@link CapacitorChannel::onUpdate()} * est appelée en fonction du type d'opération: création ou mise à jour * * Ensuite, si $func !== null, la fonction est appelée avec la signature de * {@link CapacitorChannel::onCreate()} ou {@link CapacitorChannel::onUpdate()} * en fonction du type d'opération: création ou mise à jour * * Dans les deux cas, si la fonction retourne un tableau, il est utilisé pour * modifier les valeurs insérées/mises à jour. De plus, $values obtient la * valeur finale des données insérées/mises à jour * * Si $args est renseigné, il est ajouté aux arguments utilisés pour appeler * les méthodes {@link CapacitorChannel::getItemValues()}, * {@link CapacitorChannel::onCreate()} et/ou * {@link CapacitorChannel::onUpdate()} * * @return int 1 si l'objet a été chargé ou mis à jour, 0 s'il existait * déjà à l'identique dans le canal */ function _charge(CapacitorChannel $channel, $item, $func, ?array $args, ?array &$values=null): int { $this->_create($channel); $tableName = $channel->getTableName(); $db = $this->db(); $args ??= []; $values = func::call([$channel, "getItemValues"], $item, ...$args); if ($values === [false]) return 0; $row = cl::merge( $channel->getSum("item", $item), $this->serialize($channel, $values)); $prow = null; $rowIds = $this->getRowIds($channel, $row, $primaryKeys); if ($rowIds !== null) { # modification $prow = $db->one([ "select", "from" => $tableName, "where" => $rowIds, ]); } $now = date("Y-m-d H:i:s"); $insert = null; if ($prow === null) { # création $row = cl::merge($row, [ "created_" => $now, "modified_" => $now, ]); $insert = true; $initFunc = func::with([$channel, "onCreate"], $args); $values = $this->unserialize($channel, $row); $pvalues = null; } else { # modification # intégrer autant que possible les valeurs de prow dans row, de façon que # l'utilisateur puisse voir clairement ce qui a été modifié if ($channel->_wasSumModified("item", $row, $prow)) { $insert = false; $row = cl::merge($prow, $row, [ "modified_" => $now, ]); } else { $row = cl::merge($prow, $row); } $initFunc = func::with([$channel, "onUpdate"], $args); $values = $this->unserialize($channel, $row); $pvalues = $this->unserialize($channel, $prow); } $updates = $initFunc->prependArgs([$item, $values, $pvalues])->invoke(); if ($updates === [false]) return 0; if (is_array($updates) && $updates) { if ($insert === null) $insert = false; if (!array_key_exists("modified_", $updates)) { $updates["modified_"] = $now; } $values = cl::merge($values, $updates); $row = cl::merge($row, $this->serialize($channel, $updates)); } if ($func !== null) { $updates = func::with($func) ->prependArgs([$item, $values, $pvalues]) ->bind($channel) ->invoke(); if ($updates === [false]) return 0; if (is_array($updates) && $updates) { if ($insert === null) $insert = false; if (!array_key_exists("modified_", $updates)) { $updates["modified_"] = $now; } $values = cl::merge($values, $updates); $row = cl::merge($row, $this->serialize($channel, $updates)); } } # aucune modification if ($insert === null) return 0; # si on est déjà dans une transaction, désactiver la gestion des transactions $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); if ($manageTransactions) { $commited = false; $db->beginTransaction(); } $nbModified = 0; try { if ($insert) { $id = $db->exec([ "insert", "into" => $tableName, "values" => $row, ]); if (count($primaryKeys) == 1 && $rowIds === null) { # mettre à jour avec l'id généré $values[$primaryKeys[0]] = $id; } $nbModified = 1; } else { # calculer ce qui a changé pour ne mettre à jour que le nécessaire $updates = []; foreach ($row as $col => $value) { if (array_key_exists($col, $rowIds)) { # ne jamais mettre à jour la clé primaire continue; } $pvalue = $prow[$col] ?? null; if ($value !== ($pvalue)) { $updates[$col] = $value; } } if (count($updates) == 1 && array_key_first($updates) == "modified_") { # si l'unique modification porte sur la date de modification, alors # la ligne n'est pas modifiée. ce cas se présente quand on altère la # valeur de $item $updates = null; } if ($updates) { $db->exec([ "update", "table" => $tableName, "values" => $updates, "where" => $rowIds, ]); $nbModified = 1; } } if ($manageTransactions) { $db->commit(); $commited = true; } return $nbModified; } finally { if ($manageTransactions && !$commited) $db->rollback(); } } function charge(?string $channel, $item, $func=null, ?array $args=null, ?array &$values=null): int { return $this->_charge($this->getChannel($channel), $item, $func, $args, $values); } /** décharger les données du canal spécifié */ function _discharge(CapacitorChannel $channel, bool $reset=true): Traversable { $this->_create($channel); $rows = $this->db()->all([ "select item__", "from" => $channel->getTableName(), ]); foreach ($rows as $row) { yield unserialize($row['item__']); } if ($reset) $this->_reset($channel); } function discharge(?string $channel, bool $reset=true): Traversable { return $this->_discharge($this->getChannel($channel), $reset); } protected function _convertValue2row(CapacitorChannel $channel, array $filter, array $cols): array { $index = 0; $fixed = []; foreach ($filter as $key => $value) { if ($key === $index) { $index++; if (is_array($value)) { $value = $this->_convertValue2row($channel, $value, $cols); } $fixed[] = $value; } else { $col = "${key}__"; if (array_key_exists($col, $cols)) { # colonne sérialisée $fixed[$col] = $channel->serialize($value); } else { $fixed[$key] = $value; } } } return $fixed; } protected function verifixFilter(CapacitorChannel $channel, &$filter): void { if ($filter !== null && !is_array($filter)) { $primaryKeys = $this->getPrimaryKeys($channel); $id = $filter; $channel->verifixId($id); $filter = [$primaryKeys[0] => $id]; } $cols = $this->ColumnDefinitions($channel); if ($filter !== null) { $filter = $this->_convertValue2row($channel, $filter, $cols); } } /** indiquer le nombre d'éléments du canal spécifié */ function _count(CapacitorChannel $channel, $filter): int { $this->_create($channel); $this->verifixFilter($channel, $filter); return $this->db()->get([ "select count(*)", "from" => $channel->getTableName(), "where" => $filter, ]); } function count(?string $channel, $filter=null): int { return $this->_count($this->getChannel($channel), $filter); } /** * obtenir la ligne correspondant au filtre sur le canal spécifié * * si $filter n'est pas un tableau, il est transformé en ["id_" => $filter] */ function _one(CapacitorChannel $channel, $filter, ?array $mergeQuery=null): ?array { if ($filter === null) throw ValueException::null("filter"); $this->_create($channel); $this->verifixFilter($channel, $filter); $row = $this->db()->one(cl::merge([ "select", "from" => $channel->getTableName(), "where" => $filter, ], $mergeQuery)); return $this->unserialize($channel, $row); } function one(?string $channel, $filter, ?array $mergeQuery=null): ?array { return $this->_one($this->getChannel($channel), $filter, $mergeQuery); } private function _allCached(string $id, CapacitorChannel $channel, $filter, ?array $mergeQuery=null): Traversable { $this->_create($channel); $this->verifixFilter($channel, $filter); $rows = $this->db()->all(cl::merge([ "select", "from" => $channel->getTableName(), "where" => $filter, ], $mergeQuery), null, $this->getPrimaryKeys($channel)); if ($channel->isUseCache()) { $cacheIds = [$id, get_class($channel)]; cache::get()->resetCached($cacheIds); $rows = cache::new(null, $cacheIds, function() use ($rows) { yield from $rows; }); } foreach ($rows as $key => $row) { yield $key => $this->unserialize($channel, $row); } } /** * obtenir les lignes correspondant au filtre sur le canal spécifié * * si $filter n'est pas un tableau, il est transformé en ["id_" => $filter] */ function _all(CapacitorChannel $channel, $filter, ?array $mergeQuery=null): Traversable { return $this->_allCached("all", $channel, $filter, $mergeQuery); } function all(?string $channel, $filter, $mergeQuery=null): Traversable { return $this->_all($this->getChannel($channel), $filter, $mergeQuery); } /** * appeler une fonction pour chaque élément du canal spécifié. * * $filter permet de filtrer parmi les élements chargés * * $func est appelé avec la signature de {@link CapacitorChannel::onEach()} * si la fonction retourne un tableau, il est utilisé pour mettre à jour la * ligne * * @param int $nbUpdated reçoit le nombre de lignes mises à jour * @return int le nombre de lignes parcourues */ function _each(CapacitorChannel $channel, $filter, $func, ?array $args, ?array $mergeQuery=null, ?int &$nbUpdated=null): int { $this->_create($channel); if ($func === null) $func = CapacitorChannel::onEach; $onEach = func::with($func)->bind($channel); $db = $this->db(); # si on est déjà dans une transaction, désactiver la gestion des transactions $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); if ($manageTransactions) { $commited = false; $db->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); } $count = 0; $nbUpdated = 0; $tableName = $channel->getTableName(); try { $args ??= []; $all = $this->_allCached("each", $channel, $filter, $mergeQuery); foreach ($all as $values) { $rowIds = $this->getRowIds($channel, $values); $updates = $onEach->invoke([$values["item"], $values, ...$args]); if (is_array($updates) && $updates) { if (!array_key_exists("modified_", $updates)) { $updates["modified_"] = date("Y-m-d H:i:s"); } $nbUpdated += $db->exec([ "update", "table" => $tableName, "values" => $this->serialize($channel, $updates), "where" => $rowIds, ]); if ($manageTransactions && $commitThreshold !== null) { $commitThreshold--; if ($commitThreshold <= 0) { $db->commit(); $db->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); } } } $count++; } if ($manageTransactions) { $db->commit(); $commited = true; } return $count; } finally { if ($manageTransactions && !$commited) $db->rollback(); } } function each(?string $channel, $filter, $func=null, ?array $args=null, ?array $mergeQuery=null, ?int &$nbUpdated=null): int { return $this->_each($this->getChannel($channel), $filter, $func, $args, $mergeQuery, $nbUpdated); } /** * supprimer tous les éléments correspondant au filtre et pour lesquels la * fonction retourne une valeur vraie si elle est spécifiée * * $filter permet de filtrer parmi les élements chargés * * $func est appelé avec la signature de {@link CapacitorChannel::onDelete()} * si la fonction retourne un tableau, il est utilisé pour mettre à jour la * ligne * * @return int le nombre de lignes parcourues */ function _delete(CapacitorChannel $channel, $filter, $func, ?array $args): int { $this->_create($channel); if ($func === null) $func = CapacitorChannel::onDelete; $onDelete = func::with($func)->bind($channel); $db = $this->db(); # si on est déjà dans une transaction, désactiver la gestion des transactions $manageTransactions = $channel->isManageTransactions() && !$db->inTransaction(); if ($manageTransactions) { $commited = false; $db->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); } $count = 0; $tableName = $channel->getTableName(); try { $args ??= []; $all = $this->_allCached("delete", $channel, $filter); foreach ($all as $values) { $rowIds = $this->getRowIds($channel, $values); $shouldDelete = boolval($onDelete->invoke([$values["item"], $values, ...$args])); if ($shouldDelete) { $db->exec([ "delete", "from" => $tableName, "where" => $rowIds, ]); if ($manageTransactions && $commitThreshold !== null) { $commitThreshold--; if ($commitThreshold <= 0) { $db->commit(); $db->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); } } } $count++; } if ($manageTransactions) { $db->commit(); $commited = true; } return $count; } finally { if ($manageTransactions && !$commited) $db->rollback(); } } function delete(?string $channel, $filter, $func=null, ?array $args=null): int { return $this->_delete($this->getChannel($channel), $filter, $func, $args); } abstract function close(): void; }