nur-sery/src/db/mysql/MysqlStorage.php

245 lines
6.7 KiB
PHP

<?php
namespace nur\sery\db\mysql;
use nur\sery\cl;
use nur\sery\db\CapacitorChannel;
use nur\sery\db\CapacitorStorage;
use nur\sery\php\func;
use nur\sery\str;
use nur\sery\ValueException;
/**
* Class MysqlStorage
*/
class MysqlStorage extends CapacitorStorage {
function __construct($mysql) {
$this->mysql = Mysql::with($mysql);
}
/** @var Mysql */
protected $mysql;
function mysql(): Mysql {
return $this->mysql;
}
protected function _create(CapacitorChannel $channel): void {
if (!$channel->isCreated()) {
$cols = $this->ColumnDefinitions($channel);
$this->mysql->exec([
"create table if not exists",
"table" => $channel->getTableName(),
"cols" => $cols,
]);
$channel->setCreated();
}
}
function _exists(CapacitorChannel $channel): bool {
$mysql = $this->mysql;
$tableName = $mysql->get([
"select table_name from information_schema.tables",
"where" => [
"table_schema" => $mysql->getDbname(),
"table_name" => $channel->getTableName(),
],
]);
return $tableName !== null;
}
function _ensureExists(CapacitorChannel $channel): void {
$this->_create($channel);
}
function _reset(CapacitorChannel $channel): void {
$this->mysql->exec([
"drop table if exists",
$channel->getTableName(),
]);
$channel->setCreated(false);
}
function _charge(CapacitorChannel $channel, $item, ?callable $func, ?array $args): int {
$this->_create($channel);
$now = date("Y-m-d H:i:s");
$item__ = serialize($item);
$sum_ = sha1($item__);
$row = cl::merge([
"item__" => $item__,
"sum_" => $sum_,
], $this->unserialize($channel, $channel->getKeyValues($item)));
$prow = null;
$rowIds = $this->getRowIds($channel, $row, $primaryKeys);
if ($rowIds !== null) {
# modification
$prow = $this->mysql->one([
"select",
"cols" => array_merge($primaryKeys, [
"item__",
"sum_",
"created_",
"modified_",
]),
"from" => $channel->getTableName(),
"where" => $rowIds,
]);
}
$insert = null;
if ($prow === null) {
# création
$row = cl::merge($row, [
"created_" => $now,
"modified_" => $now,
]);
$insert = true;
} elseif ($sum_ !== $prow["sum_"]) {
# modification
$row = cl::merge($row, [
"modified_" => $now,
]);
$insert = false;
}
if ($func === null) $func = [$channel, "onCharge"];
$onCharge = func::_prepare($func);
$args ??= [];
$values = $this->unserialize($channel, $row);
$pvalues = $this->unserialize($channel, $prow);
$updates = func::_call($onCharge, [$item, $values, $pvalues, ...$args]);
if (is_array($updates)) {
$updates = $this->serialize($channel, $updates);
if (array_key_exists("item__", $updates)) {
# si item a été mis à jour, il faut mettre à jour sum_
$updates["sum_"] = sha1($updates["item__"]);
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = $now;
}
}
$row = cl::merge($row, $updates);
}
if ($insert === null) {
# aucune modification
return 0;
} elseif ($insert) {
$this->mysql->exec([
"insert",
"into" => $channel->getTableName(),
"values" => $row,
]);
} else {
$this->mysql->exec([
"update",
"table" => $channel->getTableName(),
"values" => $row,
"where" => $rowIds,
]);
}
return 1;
}
function _discharge(CapacitorChannel $channel, bool $reset=true): iterable {
$rows = $this->mysql->all([
"select item__",
"from" => $channel->getTableName(),
]);
foreach ($rows as $row) {
yield unserialize($row['item__']);
}
if ($reset) $this->_reset($channel);
}
protected function verifixFilter(CapacitorChannel $channel, &$filter): void {
if ($filter !== null && !is_array($filter)) {
$id = $filter;
$channel->verifixId($id);
$filter = ["id_" => $id];
}
$filter = $this->serialize($channel, $filter);
}
function _count(CapacitorChannel $channel, $filter): int {
$this->verifixFilter($channel, $filter);
return $this->mysql->get([
"select count(*)",
"from" => $channel->getTableName(),
"where" => $filter,
]);
}
function _one(CapacitorChannel $channel, $filter): ?array {
if ($filter === null) throw ValueException::null("filter");
$this->verifixFilter($channel, $filter);
$row = $this->mysql->one([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
]);
return $this->unserialize($channel, $row);
}
function _all(CapacitorChannel $channel, $filter): iterable {
$this->verifixFilter($channel, $filter);
$rows = $this->mysql->all([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
], null, $this->getPrimaryKeys($channel));
foreach ($rows as $key => $row) {
yield $key => $this->unserialize($channel, $row);
}
}
function _each(CapacitorChannel $channel, $filter, ?callable $func, ?array $args): int {
if ($func === null) $func = [$channel, "onEach"];
$onEach = func::_prepare($func);
$mysql = $this->mysql;
$tableName = $channel->getTableName();
$commited = false;
$count = 0;
$mysql->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
try {
$args ??= [];
foreach ($this->_all($channel, $filter) as $row) {
$rowIds = $this->getRowIds($channel, $row);
$updates = func::_call($onEach, [$row["item"], $row, ...$args]);
if (is_array($updates)) {
$updates = $this->serialize($channel, $updates);
if (array_key_exists("item__", $updates)) {
# si item a été mis à jour, il faut mettre à jour sum_
$updates["sum_"] = sha1($updates["item__"]);
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = date("Y-m-d H:i:s");
}
}
$mysql->exec([
"update",
"table" => $tableName,
"values" => $updates,
"where" => $rowIds,
]);
if ($commitThreshold !== null) {
$commitThreshold--;
if ($commitThreshold == 0) {
$mysql->commit();
$mysql->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
}
}
}
$count++;
}
$mysql->commit();
$commited = true;
return $count;
} finally {
if (!$commited) $mysql->rollback();
}
}
function close(): void {
$this->mysql->close();
}
}