db = Sqlite::with($sqlite); } /** @var Sqlite */ protected $db; function db(): Sqlite { return $this->db; } const PRIMARY_KEY_DEFINITION = [ "id_" => "integer primary key autoincrement", ]; protected function _create(CapacitorChannel $channel): void { if (!$channel->isCreated()) { $cols = $this->ColumnDefinitions($channel); $this->db->exec([ "create table if not exists", "table" => $channel->getTableName(), "cols" => $cols, ]); $channel->setCreated(); } } function _exists(CapacitorChannel $channel): bool { $tableName = $this->db->get([ "select name from sqlite_schema", "where" => [ "name" => $channel->getTableName(), ], ]); return $tableName !== null; } function _ensureExists(CapacitorChannel $channel): void { $this->_create($channel); } function _reset(CapacitorChannel $channel): void { $this->db->exec([ "drop table if exists", $channel->getTableName(), ]); $channel->setCreated(false); } function _charge(CapacitorChannel $channel, $item, $func, ?array $args): int { $this->_create($channel); $tableName = $channel->getTableName(); $now = date("Y-m-d H:i:s"); $row = cl::merge( $channel->getSum("item", $item), $this->unserialize($channel, $channel->getItemValues($item))); $prow = null; $rowIds = $this->getRowIds($channel, $row, $primaryKeys); if ($rowIds !== null) { # modification $prow = $this->db->one([ "select", "from" => $tableName, "where" => $rowIds, ]); } $insert = null; if ($prow === null) { # création $row = cl::merge($row, [ "created_" => $now, "modified_" => $now, ]); $insert = true; if ($func === null) $func = "->onCreate"; func::ensure_func($func, $channel, $args); $values = $this->unserialize($channel, $row); $args = [$item, $values, ...$args]; } else { # modification if ($channel->_wasSumModified("item", $row, $prow)) { $insert = false; $row = cl::merge($row, [ "modified_" => $now, ]); } if ($func === null) $func = "->onUpdate"; func::ensure_func($func, $channel, $args); $values = $this->unserialize($channel, $row); $pvalues = $this->unserialize($channel, $prow); $args = [$item, $values, $pvalues, ...$args]; } $updates = func::call($func, ...$args); if (is_array($updates) && $updates) { if ($insert === null) $insert = false; if (!array_key_exists("modified_", $updates)) { $updates["modified_"] = $now; } $row = cl::merge($row, $this->serialize($channel, $updates)); } if ($insert === null) { # aucune modification return 0; } elseif ($insert) { $this->db->exec([ "insert", "into" => $tableName, "values" => $row, ]); } else { $this->db->exec([ "update", "table" => $tableName, "values" => $row, "where" => $rowIds, ]); } return 1; } function _discharge(CapacitorChannel $channel, bool $reset=true): iterable { $rows = $this->db->all([ "select item__", "from" => $channel->getTableName(), ]); foreach ($rows as $row) { yield unserialize($row['item__']); } if ($reset) $this->_reset($channel); } protected function verifixFilter(CapacitorChannel $channel, &$filter): void { if ($filter !== null && !is_array($filter)) { $id = $filter; $channel->verifixId($id); $filter = ["id_" => $id]; } $filter = $this->serialize($channel, $filter); } function _count(CapacitorChannel $channel, $filter): int { $this->verifixFilter($channel, $filter); return $this->db->get([ "select count(*)", "from" => $channel->getTableName(), "where" => $filter, ]); } function _one(CapacitorChannel $channel, $filter): ?array { if ($filter === null) throw ValueException::null("filter"); $this->verifixFilter($channel, $filter); $row = $this->db->one([ "select", "from" => $channel->getTableName(), "where" => $filter, ]); return $this->unserialize($channel, $row); } function _all(CapacitorChannel $channel, $filter): iterable { $this->verifixFilter($channel, $filter); $rows = $this->db->all([ "select", "from" => $channel->getTableName(), "where" => $filter, ], null, $this->getPrimaryKeys($channel)); foreach ($rows as $key => $row) { yield $key => $this->unserialize($channel, $row); } } function _each(CapacitorChannel $channel, $filter, $func, ?array $args): int { if ($func === null) $func = "->onEach"; func::ensure_func($func, $channel, $args); $onEach = func::_prepare($func); $sqlite = $this->db; $tableName = $channel->getTableName(); $commited = false; $count = 0; $sqlite->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); try { $args ??= []; foreach ($this->_all($channel, $filter) as $rowValues) { $rowIds = $this->getRowIds($channel, $rowValues); $updates = func::_call($onEach, [$rowValues["item"], $rowValues, ...$args]); if (is_array($updates) && $updates) { if (!array_key_exists("modified_", $updates)) { $updates["modified_"] = date("Y-m-d H:i:s"); } $sqlite->exec([ "update", "table" => $tableName, "values" => $this->serialize($channel, $updates), "where" => $rowIds, ]); if ($commitThreshold !== null) { $commitThreshold--; if ($commitThreshold == 0) { $sqlite->commit(); $sqlite->beginTransaction(); $commitThreshold = $channel->getEachCommitThreshold(); } } } $count++; } $sqlite->commit(); $commited = true; return $count; } finally { if (!$commited) $sqlite->rollback(); } } function close(): void { $this->db->close(); } }