sqlite = Sqlite::with($sqlite); } /** @var Sqlite */ protected $sqlite; function sqlite(): Sqlite { return $this->sqlite; } const KEY_DEFINITIONS = [ "id_" => "integer primary key autoincrement", "item__" => "text", "sum_" => "varchar(40)", "created_" => "datetime", "modified_" => "datetime", ]; /** sérialiser les valeurs qui doivent l'être dans $values */ protected function serialize(CapacitorChannel $channel, ?array $values): ?array { if ($values === null) return null; $columns = cl::merge(self::KEY_DEFINITIONS, $channel->getKeyDefinitions()); $index = 0; $row = []; foreach (array_keys($columns) as $column) { $key = $column; if ($key === $index) { $index++; continue; } elseif (str::del_suffix($key, "__")) { if (!array_key_exists($key, $values)) continue; $value = $values[$key]; if ($value !== null) $value = serialize($value); } else { if (!array_key_exists($key, $values)) continue; $value = $values[$key]; } $row[$column] = $value; } return $row; } /** désérialiser les valeurs qui doivent l'être dans $values */ protected function unserialize(CapacitorChannel $channel, ?array $row): ?array { if ($row === null) return null; $columns = cl::merge(self::KEY_DEFINITIONS, $channel->getKeyDefinitions()); $index = 0; $values = []; foreach (array_keys($columns) as $column) { $key = $column; if ($key === $index) { $index++; continue; } elseif (!array_key_exists($column, $row)) { continue; } elseif (str::del_suffix($key, "__")) { $value = $row[$column]; if ($value !== null) $value = unserialize($value); } else { $value = $row[$column]; } $values[$key] = $value; } return $values; } protected function _create(CapacitorChannel $channel): void { if (!$channel->isCreated()) { $columns = cl::merge(self::KEY_DEFINITIONS, $channel->getKeyDefinitions()); $this->sqlite->exec([ "create table if not exists", "table" => $channel->getTableName(), "cols" => $columns, ]); $channel->setCreated(); } } /** @var CapacitorChannel[] */ protected $channels; function addChannel(CapacitorChannel $channel): CapacitorChannel { $this->_create($channel); $this->channels[$channel->getName()] = $channel; return $channel; } protected function getChannel(?string $name): CapacitorChannel { $name = CapacitorChannel::verifix_name($name); $channel = $this->channels[$name] ?? null; if ($channel === null) { $channel = $this->addChannel(new CapacitorChannel($name)); } return $channel; } function _exists(CapacitorChannel $channel): bool { $tableName = $this->sqlite->get([ "select name from sqlite_schema", "where" => [ "name" => $channel->getTableName(), ], ]); return $tableName !== null; } function _ensureExists(CapacitorChannel $channel): void { $this->_create($channel); } function _reset(CapacitorChannel $channel): void { $this->sqlite->exec([ "drop table if exists", $channel->getTableName(), ]); $channel->setCreated(false); } function _charge(CapacitorChannel $channel, $item, ?callable $func, ?array $args): int { $this->_create($channel); $now = date("Y-m-d H:i:s"); $item__ = serialize($item); $sum_ = sha1($item__); $row = cl::merge([ "item__" => $item__, "sum_" => $sum_, ], $this->unserialize($channel, $channel->getKeyValues($item))); $prow = null; $id_ = $row["id_"] ?? null; if ($id_ !== null) { # modification $prow = $this->sqlite->one([ "select id_, item__, sum_, created_, modified_", "from" => $channel->getTableName(), "where" => ["id_" => $id_], ]); } $insert = null; if ($prow === null) { # création $row = cl::merge($row, [ "created_" => $now, "modified_" => $now, ]); $insert = true; } elseif ($sum_ !== $prow["sum_"]) { # modification $row = cl::merge($row, [ "modified_" => $now, ]); $insert = false; } if ($func === null) $func = [$channel, "onCharge"]; $onCharge = func::_prepare($func); $args ??= []; $values = $this->unserialize($channel, $row); $pvalues = $this->unserialize($channel, $prow); $updates = func::_call($onCharge, [$item, $values, $pvalues, ...$args]); if (is_array($updates)) { $updates = $this->serialize($channel, $updates); if (array_key_exists("item__", $updates)) { # si item a été mis à jour, il faut mettre à jour sum_ $updates["sum_"] = sha1($updates["item__"]); if (!array_key_exists("modified_", $updates)) { $updates["modified_"] = $now; } } $row = cl::merge($row, $updates); } if ($insert === null) { # aucune modification return 0; } elseif ($insert) { $this->sqlite->exec([ "insert", "into" => $channel->getTableName(), "values" => $row, ]); } else { $this->sqlite->exec([ "update", "table" => $channel->getTableName(), "values" => $row, "where" => ["id_" => $id_], ]); } return 1; } protected function verifixFilter(CapacitorChannel $channel, &$filter): void { if ($filter !== null && !is_array($filter)) { $id = $filter; $channel->verifixId($id); $filter = ["id_" => $id]; } $filter = $this->serialize($channel, $filter); } function _count(CapacitorChannel $channel, $filter): int { $this->verifixFilter($channel, $filter); return $this->sqlite->get([ "select count(*)", "from" => $channel->getTableName(), "where" => $filter, ]); } function _discharge(CapacitorChannel $channel, $filter, ?bool $reset): iterable { $this->verifixFilter($channel, $filter); if ($reset === null) $reset = $filter === null; $rows = $this->sqlite->all([ "select item__", "from" => $channel->getTableName(), "where" => $filter, ]); foreach ($rows as $row) { yield unserialize($row['item__']); } if ($reset) $this->_reset($channel); } function _get(CapacitorChannel $channel, $filter) { if ($filter === null) throw ValueException::null("filter"); $this->verifixFilter($channel, $filter); $row = $this->sqlite->one([ "select item__", "from" => $channel->getTableName(), "where" => $filter, ]); if ($row === null) return null; else return unserialize($row["item__"]); } function _each(CapacitorChannel $channel, $filter, ?callable $func, ?array $args): int { $this->verifixFilter($channel, $filter); if ($func === null) $func = [$channel, "onEach"]; $onEach = func::_prepare($func); $sqlite = $this->sqlite; $tableName = $channel->getTableName(); $commited = false; $count = 0; $sqlite->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); try { $rows = $sqlite->all([ "select", "from" => $tableName, "where" => $filter, ]); $args ??= []; foreach ($rows as $row) { $values = $this->unserialize($channel, $row); $updates = func::_call($onEach, [$values["item"], $values, ...$args]); if (is_array($updates)) { $updates = $this->serialize($channel, $updates); if (array_key_exists("item__", $updates)) { # si item a été mis à jour, il faut mettre à jour sum_ $updates["sum_"] = sha1($updates["item__"]); if (!array_key_exists("modified_", $updates)) { $updates["modified_"] = date("Y-m-d H:i:s"); } } $sqlite->exec([ "update", "table" => $tableName, "values" => $updates, "where" => ["id_" => $row["id_"]], ]); if ($commitThreshold !== null) { $commitThreshold--; if ($commitThreshold == 0) { $sqlite->commit(); $sqlite->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); } } } $count++; } $sqlite->commit(); $commited = true; return $count; } finally { if (!$commited) $sqlite->rollback(); } } function close(): void { $this->sqlite->close(); } }