nur-sery/src/db/sqlite/SqliteCapacitor.php

242 lines
6.5 KiB
PHP

<?php
namespace nur\sery\db\sqlite;
use nur\sery\cl;
use nur\sery\db\CapacitorChannel;
use nur\sery\db\CapacitorStorage;
use nur\sery\php\func;
use nur\sery\ValueException;
/**
* Class SqliteCapacitor
*/
class SqliteCapacitor extends CapacitorStorage {
function __construct($sqlite) {
$this->sqlite = Sqlite::with($sqlite);
}
/** @var Sqlite */
protected $sqlite;
function sqlite(): Sqlite {
return $this->sqlite;
}
protected function _create(CapacitorChannel $channel): void {
if (!$channel->isCreated()) {
$columns = cl::merge([
"_id" => "integer primary key autoincrement",
"_item" => "text",
"_sum" => "varchar(40)",
"_created" => "datetime",
"_modified" => "datetime",
], $channel->getKeyDefinitions());
$this->sqlite->exec([
"create table if not exists",
"table" => $channel->getTableName(),
"cols" => $columns,
]);
$channel->setCreated();
}
}
/** @var CapacitorChannel[] */
protected $channels;
function addChannel(CapacitorChannel $channel): CapacitorChannel {
$this->_create($channel);
$this->channels[$channel->getName()] = $channel;
return $channel;
}
protected function getChannel(?string $name): CapacitorChannel {
$name = CapacitorChannel::verifix_name($name);
$channel = $this->channels[$name] ?? null;
if ($channel === null) {
$channel = $this->addChannel(new CapacitorChannel($name));
}
return $channel;
}
function _exists(CapacitorChannel $channel): bool {
$tableName = $this->sqlite->get([
"select name from sqlite_schema",
"where" => [
"name" => $channel->getTableName(),
],
]);
return $tableName !== null;
}
function _ensureExists(CapacitorChannel $channel): void {
$this->_create($channel);
}
function _reset(CapacitorChannel $channel): void {
$this->sqlite->exec([
"drop table if exists",
$channel->getTableName(),
]);
$channel->setCreated(false);
}
function _charge(CapacitorChannel $channel, $item, ?callable $func, ?array $args): int {
$this->_create($channel);
$now = date("Y-m-d H:i:s");
$_item = serialize($item);
$_sum = sha1($_item);
$values = cl::merge([
"_item" => $_item,
"_sum" => $_sum,
], $channel->getKeyValues($item));
$row = null;
$id = $values["_id"] ?? null;
if ($id !== null) {
# modification
$row = $this->sqlite->one([
"select _item, _sum, _created, _modified",
"from" => $channel->getTableName(),
"where" => ["_id" => $id],
]);
}
$insert = null;
if ($row === null) {
# création
$values = cl::merge($values, [
"_created" => $now,
"_modified" => $now,
]);
$insert = true;
} elseif ($_sum !== $row["_sum"]) {
# modification
$values = cl::merge($values, [
"_modified" => $now,
]);
$insert = false;
}
if ($func === null) $func = [$channel, "onCharge"];
$onCharge = func::_prepare($func);
$args ??= [];
$updates = func::_call($onCharge, [$item, $values, $row, ...$args]);
if (is_array($updates)) {
if (array_key_exists("_item", $updates)) {
$_item = serialize($updates["_item"]);
$updates["_item"] = $_item;
$updates["_sum"] = sha1($_item);
if (!array_key_exists("_modified", $updates)) {
$updates["_modified"] = $now;
}
}
$values = cl::merge($values, $updates);
}
if ($insert === null) {
# aucune modification
return 0;
} elseif ($insert) {
$this->sqlite->exec([
"insert",
"into" => $channel->getTableName(),
"values" => $values,
]);
} else {
$this->sqlite->exec([
"update",
"table" => $channel->getTableName(),
"values" => $values,
"where" => ["_id" => $id],
]);
}
return 1;
}
function _count(CapacitorChannel $channel, $filter): int {
if ($filter !== null && !is_array($filter)) $filter = ["_id" => $filter];
return $this->sqlite->get([
"select count(*)",
"from" => $channel->getTableName(),
"where" => $filter,
]);
}
function _discharge(CapacitorChannel $channel, $filter, ?bool $reset): iterable {
if ($filter !== null && !is_array($filter)) $filter = ["_id" => $filter];
if ($reset === null) $reset = $filter === null;
$rows = $this->sqlite->all([
"select _item",
"from" => $channel->getTableName(),
"where" => $filter,
]);
foreach ($rows as $row) {
$item = unserialize($row['_item']);
yield $item;
}
if ($reset) $this->_reset($channel);
}
function _get(CapacitorChannel $channel, $filter) {
if ($filter === null) throw ValueException::null("keys");
if (!is_array($filter)) $filter = ["_id" => $filter];
$row = $this->sqlite->one([
"select _item",
"from" => $channel->getTableName(),
"where" => $filter,
]);
if ($row === null) return null;
else return unserialize($row["_item"]);
}
function _each(CapacitorChannel $channel, $filter, ?callable $func, ?array $args): int {
if ($func === null) $func = [$channel, "onEach"];
$onEach = func::_prepare($func);
if ($filter !== null && !is_array($filter)) $filter = ["_id" => $filter];
$sqlite = $this->sqlite;
$tableName = $channel->getTableName();
$commited = false;
$count = 0;
$sqlite->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
try {
$rows = $sqlite->all([
"select",
"from" => $tableName,
"where" => $filter,
]);
$args ??= [];
foreach ($rows as $row) {
$item = unserialize($row['_item']);
$updates = func::_call($onEach, [$item, $row, ...$args]);
if (is_array($updates)) {
if (array_key_exists("_item", $updates)) {
$updates["_item"] = serialize($updates["_item"]);
}
$sqlite->exec([
"update",
"table" => $tableName,
"values" => $updates,
"where" => ["_id" => $row["_id"]],
]);
if ($commitThreshold !== null) {
$commitThreshold--;
if ($commitThreshold == 0) {
$sqlite->commit();
$commitThreshold = $channel->getEachCommitThreshold();
}
}
}
$count++;
}
$sqlite->commit();
$commited = true;
return $count;
} finally {
if (!$commited) $sqlite->rollback();
}
}
function close(): void {
$this->sqlite->close();
}
}