modifs.mineures sans commentaires

This commit is contained in:
Jephté Clain 2024-06-12 17:17:26 +04:00
parent 375351a0b4
commit ab89d08933
8 changed files with 295 additions and 358 deletions

View File

@ -19,6 +19,10 @@ class Capacitor {
return $this->storage;
}
function db(): IDatabase {
return $this->getStorage()->db();
}
/** @var CapacitorChannel */
protected $channel;
@ -26,6 +30,10 @@ class Capacitor {
return $this->channel;
}
function getTableName(): string {
return $this->getChannel()->getTableName();
}
function exists(): bool {
return $this->storage->_exists($this->channel);
}
@ -62,6 +70,10 @@ class Capacitor {
return $this->storage->_each($this->channel, $filter, $func, $args);
}
function delete($filter, $func=null, ?array $args=null): int {
return $this->storage->_delete($this->channel, $filter, $func, $args);
}
function close(): void {
$this->storage->close();
}

View File

@ -242,4 +242,16 @@ class CapacitorChannel {
function onEach($item, array $rowValues): ?array {
return null;
}
/**
* méthode appelée lors du parcours des éléments avec
* {@link Capacitor::delete()}
*
* @param mixed $item l'élément courant
* @param ?array $rowValues la ligne courante
* @return bool true s'il faut supprimer la ligne, false sinon
*/
function onDelete($item, array $rowValues): bool {
return true;
}
}

View File

@ -2,12 +2,16 @@
namespace nur\sery\db;
use nur\sery\cl;
use nur\sery\php\func;
use nur\sery\ValueException;
/**
* Class CapacitorStorage: objet permettant d'accumuler des données pour les
* réutiliser plus tard
*/
abstract class CapacitorStorage {
abstract function db(): IDatabase;
/** @var CapacitorChannel[] */
protected $channels;
@ -130,14 +134,89 @@ abstract class CapacitorStorage {
$this->_reset($this->getChannel($channel));
}
abstract function _charge(CapacitorChannel $channel, $item, $func, ?array $args): int;
function _charge(CapacitorChannel $channel, $item, $func, ?array $args): int {
$this->_create($channel);
$tableName = $channel->getTableName();
$now = date("Y-m-d H:i:s");
$row = cl::merge(
$channel->getSum("item", $item),
$this->serialize($channel, $channel->getItemValues($item)));
$prow = null;
$rowIds = $this->getRowIds($channel, $row, $primaryKeys);
if ($rowIds !== null) {
# modification
$prow = $this->db()->one([
"select",
"from" => $tableName,
"where" => $rowIds,
]);
}
$insert = null;
if ($prow === null) {
# création
$row = cl::merge($row, [
"created_" => $now,
"modified_" => $now,
]);
$insert = true;
if ($func === null) $func = "->onCreate";
func::ensure_func($func, $channel, $args);
$values = $this->unserialize($channel, $row);
$args = [$item, $values, ...$args];
} else {
# modification
if ($channel->_wasSumModified("item", $row, $prow)) {
$insert = false;
$row = cl::merge($row, [
"modified_" => $now,
]);
}
if ($func === null) $func = "->onUpdate";
func::ensure_func($func, $channel, $args);
$values = $this->unserialize($channel, $row);
$pvalues = $this->unserialize($channel, $prow);
$args = [$item, $values, $pvalues, ...$args];
}
$updates = func::call($func, ...$args);
if (is_array($updates) && $updates) {
if ($insert === null) $insert = false;
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = $now;
}
$row = cl::merge($row, $this->serialize($channel, $updates));
}
if ($insert === null) {
# aucune modification
return 0;
} elseif ($insert) {
$this->db()->exec([
"insert",
"into" => $tableName,
"values" => $row,
]);
} else {
$this->db()->exec([
"update",
"table" => $tableName,
"values" => $row,
"where" => $rowIds,
]);
}
return 1;
}
/**
* charger une valeur dans le canal
*
* Si $func!==null, après avoir calculé les valeurs des clés supplémentaires
* avec {@link CapacitorChannel::getItemValues()}, la fonction est appelée avec
* la signature ($item, $keyValues, $row, ...$args)
* avec {@link CapacitorChannel::getItemValues()}, la fonction est appelée
* avec la signature de {@link CapacitorChannel::onCreate()} ou
* {@link CapacitorChannel::onUpdate()} en fonction du type d'opération:
* création ou mise à jour
*
* Si la fonction retourne un tableau, il est utilisé pour modifier les valeurs
* insérées/mises à jour
*
@ -148,21 +227,55 @@ abstract class CapacitorStorage {
return $this->_charge($this->getChannel($channel), $item, $func, $args);
}
abstract function _discharge(CapacitorChannel $channel, bool $reset=true): iterable;
function _discharge(CapacitorChannel $channel, bool $reset=true): iterable {
$rows = $this->db()->all([
"select item__",
"from" => $channel->getTableName(),
]);
foreach ($rows as $row) {
yield unserialize($row['item__']);
}
if ($reset) $this->_reset($channel);
}
/** décharger les données du canal spécifié */
function discharge(?string $channel, bool $reset=true): iterable {
return $this->_discharge($this->getChannel($channel), $reset);
}
abstract function _count(CapacitorChannel $channel, $filter): int;
protected function verifixFilter(CapacitorChannel $channel, &$filter): void {
if ($filter !== null && !is_array($filter)) {
$id = $filter;
$channel->verifixId($id);
$filter = ["id_" => $id];
}
$filter = $this->serialize($channel, $filter);
}
function _count(CapacitorChannel $channel, $filter): int {
$this->verifixFilter($channel, $filter);
return $this->db()->get([
"select count(*)",
"from" => $channel->getTableName(),
"where" => $filter,
]);
}
/** indiquer le nombre d'éléments du canal spécifié */
function count(?string $channel, $filter=null): int {
return $this->_count($this->getChannel($channel), $filter);
}
abstract function _one(CapacitorChannel $channel, $filter): ?array;
function _one(CapacitorChannel $channel, $filter): ?array {
if ($filter === null) throw ValueException::null("filter");
$this->verifixFilter($channel, $filter);
$row = $this->db()->one([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
]);
return $this->unserialize($channel, $row);
}
/**
* obtenir la ligne correspondant au filtre sur le canal spécifié
@ -173,7 +286,17 @@ abstract class CapacitorStorage {
return $this->_one($this->getChannel($channel), $filter);
}
abstract function _all(CapacitorChannel $channel, $filter): iterable;
function _all(CapacitorChannel $channel, $filter): iterable {
$this->verifixFilter($channel, $filter);
$rows = $this->db()->all([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
], null, $this->getPrimaryKeys($channel));
foreach ($rows as $key => $row) {
yield $key => $this->unserialize($channel, $row);
}
}
/**
* obtenir les lignes correspondant au filtre sur le canal spécifié
@ -184,15 +307,58 @@ abstract class CapacitorStorage {
return $this->_all($this->getChannel($channel), $filter);
}
abstract function _each(CapacitorChannel $channel, $filter, $func, ?array $args): int;
function _each(CapacitorChannel $channel, $filter, $func, ?array $args): int {
if ($func === null) $func = "->onEach";
func::ensure_func($func, $channel, $args);
$onEach = func::_prepare($func);
$db = $this->db();
$tableName = $channel->getTableName();
$commited = false;
$count = 0;
$db->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
try {
$args ??= [];
foreach ($this->_all($channel, $filter) as $rowValues) {
$rowIds = $this->getRowIds($channel, $rowValues);
$updates = func::_call($onEach, [$rowValues["item"], $rowValues, ...$args]);
if (is_array($updates) && $updates) {
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = date("Y-m-d H:i:s");
}
$db->exec([
"update",
"table" => $tableName,
"values" => $this->serialize($channel, $updates),
"where" => $rowIds,
]);
if ($commitThreshold !== null) {
$commitThreshold--;
if ($commitThreshold == 0) {
$db->commit();
$db->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
}
}
}
$count++;
}
$db->commit();
$commited = true;
return $count;
} finally {
if (!$commited) $db->rollback();
}
}
/**
* appeler une fonction pour chaque élément du canal spécifié.
*
* $filter permet de filtrer parmi les élements chargés
*
* $func est appelé avec la signature ($item, $row, ...$args). si la fonction
* retourne un tableau, il est utilisé pour mettre à jour la ligne
* $func est appelé avec la signature de {@link CapacitorChannel::onEach()}
* si la fonction retourne un tableau, il est utilisé pour mettre à jour la
* ligne
*
* @return int le nombre de lignes parcourues
*/
@ -200,5 +366,68 @@ abstract class CapacitorStorage {
return $this->_each($this->getChannel($channel), $filter, $func, $args);
}
abstract function close(): void;
function _delete(CapacitorChannel $channel, $filter, $func, ?array $args): int {
if ($func === null) $func = "->onDelete";
func::ensure_func($func, $channel, $args);
$onEach = func::_prepare($func);
$db = $this->db();
$tableName = $channel->getTableName();
$commited = false;
$count = 0;
$db->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
try {
$args ??= [];
foreach ($this->_all($channel, $filter) as $rowValues) {
$rowIds = $this->getRowIds($channel, $rowValues);
$delete = boolval(func::_call($onEach, [$rowValues["item"], $rowValues, ...$args]));
if ($delete) {
$db->exec([
"delete",
"from" => $tableName,
"where" => $rowIds,
]);
if ($commitThreshold !== null) {
$commitThreshold--;
if ($commitThreshold == 0) {
$db->commit();
$db->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
}
}
}
$count++;
}
$db->commit();
$commited = true;
return $count;
} finally {
if (!$commited) $db->rollback();
}
}
/**
* supprimer tous les éléments correspondant au filtre et pour lesquels la
* fonction retourne une valeur vraie si elle est spécifiée
*
* $filter permet de filtrer parmi les élements chargés
*
* $func est appelé avec la signature de {@link CapacitorChannel::onDelete()}
* si la fonction retourne un tableau, il est utilisé pour mettre à jour la
* ligne
*
* @return int le nombre de lignes parcourues
*/
function delete(?string $channel, $filter, $func=null, ?array $args=null): int {
return $this->_delete($this->getChannel($channel), $filter, $func, $args);
}
abstract function close(): void;
}

19
src/db/IDatabase.php Normal file
View File

@ -0,0 +1,19 @@
<?php
namespace nur\sery\db;
interface IDatabase {
function beginTransaction(): void;
function commit(): void;
function rollback(): void;
/** @return int|bool en fonction des implémentations */
function exec($query, ?array $params=null);
function get($query, ?array $params=null, bool $entireRow=false);
function one($query, ?array $params=null): ?array;
function all($query, ?array $params=null, $primaryKeys=null): iterable;
}

View File

@ -39,11 +39,11 @@ class MysqlStorage extends CapacitorStorage {
}
function _exists(CapacitorChannel $channel): bool {
$mysql = $this->db;
$tableName = $mysql->get([
$db = $this->db;
$tableName = $db->get([
"select table_name from information_schema.tables",
"where" => [
"table_schema" => $mysql->getDbname(),
"table_schema" => $db->getDbname(),
"table_name" => $channel->getTableName(),
],
]);
@ -62,176 +62,6 @@ class MysqlStorage extends CapacitorStorage {
$channel->setCreated(false);
}
function _charge(CapacitorChannel $channel, $item, $func, ?array $args): int {
$this->_create($channel);
$tableName = $channel->getTableName();
$now = date("Y-m-d H:i:s");
$row = cl::merge(
$channel->getSum("item", $item),
$this->serialize($channel, $channel->getItemValues($item)));
$prow = null;
$rowIds = $this->getRowIds($channel, $row, $primaryKeys);
if ($rowIds !== null) {
# modification
$prow = $this->db->one([
"select",
"from" => $tableName,
"where" => $rowIds,
]);
}
$insert = null;
if ($prow === null) {
# création
$row = cl::merge($row, [
"created_" => $now,
"modified_" => $now,
]);
$insert = true;
if ($func === null) $func = "->onCreate";
func::ensure_func($func, $channel, $args);
$values = $this->unserialize($channel, $row);
$args = [$item, $values, ...$args];
} else {
# modification
if ($channel->_wasSumModified("item", $row, $prow)) {
$insert = false;
$row = cl::merge($row, [
"modified_" => $now,
]);
}
if ($func === null) $func = "->onUpdate";
func::ensure_func($func, $channel, $args);
$values = $this->unserialize($channel, $row);
$pvalues = $this->unserialize($channel, $prow);
$args = [$item, $values, $pvalues, ...$args];
}
$updates = func::call($func, ...$args);
if (is_array($updates) && $updates) {
if ($insert === null) $insert = false;
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = $now;
}
$row = cl::merge($row, $this->serialize($channel, $updates));
}
if ($insert === null) {
# aucune modification
return 0;
} elseif ($insert) {
$this->db->exec([
"insert",
"into" => $tableName,
"values" => $row,
]);
} else {
$this->db->exec([
"update",
"table" => $tableName,
"values" => $row,
"where" => $rowIds,
]);
}
return 1;
}
function _discharge(CapacitorChannel $channel, bool $reset=true): iterable {
$rows = $this->db->all([
"select item__",
"from" => $channel->getTableName(),
]);
foreach ($rows as $row) {
yield unserialize($row['item__']);
}
if ($reset) $this->_reset($channel);
}
protected function verifixFilter(CapacitorChannel $channel, &$filter): void {
if ($filter !== null && !is_array($filter)) {
$id = $filter;
$channel->verifixId($id);
$filter = ["id_" => $id];
}
$filter = $this->serialize($channel, $filter);
}
function _count(CapacitorChannel $channel, $filter): int {
$this->verifixFilter($channel, $filter);
return $this->db->get([
"select count(*)",
"from" => $channel->getTableName(),
"where" => $filter,
]);
}
function _one(CapacitorChannel $channel, $filter): ?array {
if ($filter === null) throw ValueException::null("filter");
$this->verifixFilter($channel, $filter);
$row = $this->db->one([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
]);
return $this->unserialize($channel, $row);
}
function _all(CapacitorChannel $channel, $filter): iterable {
$this->verifixFilter($channel, $filter);
$rows = $this->db->all([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
], null, $this->getPrimaryKeys($channel));
foreach ($rows as $key => $row) {
yield $key => $this->unserialize($channel, $row);
}
}
function _each(CapacitorChannel $channel, $filter, $func, ?array $args): int {
if ($func === null) $func = "->onEach";
func::ensure_func($func, $channel, $args);
$onEach = func::_prepare($func);
$mysql = $this->db;
$tableName = $channel->getTableName();
$commited = false;
$count = 0;
$mysql->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
try {
$args ??= [];
foreach ($this->_all($channel, $filter) as $rowValues) {
$rowIds = $this->getRowIds($channel, $rowValues);
$updates = func::_call($onEach, [$rowValues["item"], $rowValues, ...$args]);
if (is_array($updates)) {
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = date("Y-m-d H:i:s");
}
$mysql->exec([
"update",
"table" => $tableName,
"values" => $this->serialize($channel, $updates),
"where" => $rowIds,
]);
if ($commitThreshold !== null) {
$commitThreshold--;
if ($commitThreshold == 0) {
$mysql->commit();
$mysql->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
}
}
}
$count++;
}
$mysql->commit();
$commited = true;
return $count;
} finally {
if (!$commited) $mysql->rollback();
}
}
function close(): void {
$this->db->close();
}

View File

@ -3,11 +3,12 @@ namespace nur\sery\db\pdo;
use Generator;
use nur\sery\cl;
use nur\sery\db\IDatabase;
use nur\sery\php\func;
use nur\sery\php\time\Date;
use nur\sery\php\time\DateTime;
class Pdo {
class Pdo implements IDatabase {
static function with($pdo, ?array $params=null): self {
if ($pdo instanceof static) {
return $pdo;

View File

@ -3,6 +3,7 @@ namespace nur\sery\db\sqlite;
use Generator;
use nur\sery\cl;
use nur\sery\db\IDatabase;
use SQLite3;
use SQLite3Result;
use SQLite3Stmt;
@ -10,7 +11,7 @@ use SQLite3Stmt;
/**
* Class Sqlite: frontend vers une base de données sqlite3
*/
class Sqlite {
class Sqlite implements IDatabase {
static function with($sqlite, ?array $params=null): self {
if ($sqlite instanceof static) {
return $sqlite;
@ -145,12 +146,15 @@ class Sqlite {
return $this->db()->exec($query);
}
function exec($query, ?array $params=null): bool {
/** @return bool */
function exec($query, ?array $params=null) {
$db = $this->db();
$query = new _query_base($query, $params);
if ($query->useStmt($db, $stmt, $sql)) {
try {
return $stmt->execute()->finalize();
$result = $stmt->execute();
if ($result === false) return false;
return $result->finalize();
} finally {
$stmt->close();
}

View File

@ -60,176 +60,6 @@ class SqliteStorage extends CapacitorStorage {
$channel->setCreated(false);
}
function _charge(CapacitorChannel $channel, $item, $func, ?array $args): int {
$this->_create($channel);
$tableName = $channel->getTableName();
$now = date("Y-m-d H:i:s");
$row = cl::merge(
$channel->getSum("item", $item),
$this->serialize($channel, $channel->getItemValues($item)));
$prow = null;
$rowIds = $this->getRowIds($channel, $row, $primaryKeys);
if ($rowIds !== null) {
# modification
$prow = $this->db->one([
"select",
"from" => $tableName,
"where" => $rowIds,
]);
}
$insert = null;
if ($prow === null) {
# création
$row = cl::merge($row, [
"created_" => $now,
"modified_" => $now,
]);
$insert = true;
if ($func === null) $func = "->onCreate";
func::ensure_func($func, $channel, $args);
$values = $this->unserialize($channel, $row);
$args = [$item, $values, ...$args];
} else {
# modification
if ($channel->_wasSumModified("item", $row, $prow)) {
$insert = false;
$row = cl::merge($row, [
"modified_" => $now,
]);
}
if ($func === null) $func = "->onUpdate";
func::ensure_func($func, $channel, $args);
$values = $this->unserialize($channel, $row);
$pvalues = $this->unserialize($channel, $prow);
$args = [$item, $values, $pvalues, ...$args];
}
$updates = func::call($func, ...$args);
if (is_array($updates) && $updates) {
if ($insert === null) $insert = false;
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = $now;
}
$row = cl::merge($row, $this->serialize($channel, $updates));
}
if ($insert === null) {
# aucune modification
return 0;
} elseif ($insert) {
$this->db->exec([
"insert",
"into" => $tableName,
"values" => $row,
]);
} else {
$this->db->exec([
"update",
"table" => $tableName,
"values" => $row,
"where" => $rowIds,
]);
}
return 1;
}
function _discharge(CapacitorChannel $channel, bool $reset=true): iterable {
$rows = $this->db->all([
"select item__",
"from" => $channel->getTableName(),
]);
foreach ($rows as $row) {
yield unserialize($row['item__']);
}
if ($reset) $this->_reset($channel);
}
protected function verifixFilter(CapacitorChannel $channel, &$filter): void {
if ($filter !== null && !is_array($filter)) {
$id = $filter;
$channel->verifixId($id);
$filter = ["id_" => $id];
}
$filter = $this->serialize($channel, $filter);
}
function _count(CapacitorChannel $channel, $filter): int {
$this->verifixFilter($channel, $filter);
return $this->db->get([
"select count(*)",
"from" => $channel->getTableName(),
"where" => $filter,
]);
}
function _one(CapacitorChannel $channel, $filter): ?array {
if ($filter === null) throw ValueException::null("filter");
$this->verifixFilter($channel, $filter);
$row = $this->db->one([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
]);
return $this->unserialize($channel, $row);
}
function _all(CapacitorChannel $channel, $filter): iterable {
$this->verifixFilter($channel, $filter);
$rows = $this->db->all([
"select",
"from" => $channel->getTableName(),
"where" => $filter,
], null, $this->getPrimaryKeys($channel));
foreach ($rows as $key => $row) {
yield $key => $this->unserialize($channel, $row);
}
}
function _each(CapacitorChannel $channel, $filter, $func, ?array $args): int {
if ($func === null) $func = "->onEach";
func::ensure_func($func, $channel, $args);
$onEach = func::_prepare($func);
$sqlite = $this->db;
$tableName = $channel->getTableName();
$commited = false;
$count = 0;
$sqlite->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
try {
$args ??= [];
foreach ($this->_all($channel, $filter) as $rowValues) {
$rowIds = $this->getRowIds($channel, $rowValues);
$updates = func::_call($onEach, [$rowValues["item"], $rowValues, ...$args]);
if (is_array($updates) && $updates) {
if (!array_key_exists("modified_", $updates)) {
$updates["modified_"] = date("Y-m-d H:i:s");
}
$sqlite->exec([
"update",
"table" => $tableName,
"values" => $this->serialize($channel, $updates),
"where" => $rowIds,
]);
if ($commitThreshold !== null) {
$commitThreshold--;
if ($commitThreshold == 0) {
$sqlite->commit();
$sqlite->beginTransaction();
$commitThreshold = $channel->getEachCommitThreshold();
}
}
}
$count++;
}
$sqlite->commit();
$commited = true;
return $count;
} finally {
if (!$commited) $sqlite->rollback();
}
}
function close(): void {
$this->db->close();
}