From: Aleksey Filippov Date: Tue, 24 Mar 2026 09:48:40 +0000 (+0300) Subject: feat(ERP-33): заменить StockHistory (snapshot) на StockState (SCD-2) X-Git-Url: https://gitweb.erp-flowers.ru/?a=commitdiff_plain;h=8e0b08fb917460ea9259d9c692ca7a6928d8da0c;p=erp24_rep%2Fyii-erp24%2F.git feat(ERP-33): заменить StockHistory (snapshot) на StockState (SCD-2) Полная переделка подхода хранения остатков: - StockHistory (полные снимки 11400 INSERT/запуск) → StockState (SCD-2, 2 SQL) - Без партиций — одна таблица с valid_from/valid_to (TIMESTAMP) - valid_to = '2100-01-01 00:00:00' для активных, закрытие на секунду раньше - is_active boolean для быстрой фильтрации - batch_ts — единый timestamp пакета на запуск - Экономия ~78% объёма (660 MB vs 3 GB за 2 года) - 10 unit-тестов, 29 assertions --- diff --git a/erp24/commands/StockHistoryController.php b/erp24/commands/StockHistoryController.php deleted file mode 100644 index c0ec9ef6..00000000 --- a/erp24/commands/StockHistoryController.php +++ /dev/null @@ -1,139 +0,0 @@ -> /var/log/stock-history.log 2>&1 - * 0 0 1 * * cd /www && php yii stock-history/ensure-partitions >> /var/log/stock-history.log 2>&1 - * 0 1 1 * * cd /www && php yii stock-history/create-partition >> /var/log/stock-history.log 2>&1 - * 0 2 1 * * cd /www && php yii stock-history/drop-old-partitions >> /var/log/stock-history.log 2>&1 - */ -class StockHistoryController extends Controller -{ - /** - * @var string Время среза (08:00 или 20:00). Если пусто — авто-определение. - */ - public string $time = ''; - - public function options($actionID): array - { - return array_merge(parent::options($actionID), ['time']); - } - - /** - * Собрать срез остатков. - */ - public function actionCollect(): int - { - $snapshotTime = $this->resolveTime(); - $this->stdout("Stock History ETL: collecting snapshot for {$snapshotTime}...\n"); - - $service = $this->createService(); - - try { - $result = $service->collect($snapshotTime); - - $this->stdout("Done: {$result->getRowCount()} rows inserted.\n"); - - $dq = $result->getDqResult(); - if ($dq !== null && !$dq->allPassed()) { - $this->stderr("DQ failures detected:\n"); - foreach ($dq->getAll() as $f) { - $this->stderr(" [{$f['level']}] {$f['code']}: {$f['message']}\n"); - } - return ExitCode::OK; // ETL прошёл, но DQ имеет замечания - } - - return ExitCode::OK; - } catch (\RuntimeException $e) { - $this->stderr("ERROR: {$e->getMessage()}\n"); - return ExitCode::SOFTWARE; - } catch (\Throwable $e) { - $this->stderr("FATAL: {$e->getMessage()}\n"); - Yii::error("Stock History ETL fatal: " . $e->getMessage(), 'stock-history'); - return ExitCode::SOFTWARE; - } - } - - /** - * Создать партицию на указанный месяц. - * @param string $month Формат "2026-03". По умолчанию — следующий месяц. - */ - public function actionCreatePartition(string $month = ''): int - { - if (empty($month)) { - $month = date('Y-m', strtotime('+1 month')); - } - - $this->stdout("Creating partition for {$month}...\n"); - - try { - $this->createService()->createPartition($month); - $this->stdout("Partition stock_history_{$month} created.\n"); - return ExitCode::OK; - } catch (\Throwable $e) { - $this->stderr("ERROR: {$e->getMessage()}\n"); - return ExitCode::SOFTWARE; - } - } - - /** - * Обеспечить наличие партиций для текущего и следующего месяца. - */ - public function actionEnsurePartitions(): int - { - $this->stdout("Ensuring partitions for current and next month...\n"); - - try { - $this->createService()->ensurePartitions(); - $this->stdout("Partitions ensured.\n"); - return ExitCode::OK; - } catch (\Throwable $e) { - $this->stderr("ERROR: {$e->getMessage()}\n"); - return ExitCode::SOFTWARE; - } - } - - /** - * Удалить старые партиции. - * @param int $months Retention в месяцах (default 24). - */ - public function actionDropOldPartitions(int $months = 24): int - { - $this->stdout("Dropping partitions older than {$months} months...\n"); - - try { - $this->createService()->dropOldPartitions($months); - $this->stdout("Done.\n"); - return ExitCode::OK; - } catch (\Throwable $e) { - $this->stderr("ERROR: {$e->getMessage()}\n"); - return ExitCode::SOFTWARE; - } - } - - private function resolveTime(): string - { - if (!empty($this->time)) { - return $this->time; - } - - $hour = (int)date('H'); - return $hour < 14 ? '08:00' : '20:00'; - } - - private function createService(): StockHistoryService - { - return new StockHistoryService(Yii::$app->db); - } -} diff --git a/erp24/commands/StockStateController.php b/erp24/commands/StockStateController.php new file mode 100644 index 00000000..43a4fedc --- /dev/null +++ b/erp24/commands/StockStateController.php @@ -0,0 +1,81 @@ +> /var/log/stock-state.log 2>&1 + * 0 3 1 * * cd /www && php yii stock-state/cleanup >> /var/log/stock-state.log 2>&1 + */ +class StockStateController extends Controller +{ + /** + * Собрать SCD-2 срез остатков (1 раз/день). + * + * Логика: close changed + insert new/changed (2 SQL-запроса). + */ + public function actionCollect(): int + { + $this->stdout("Stock State SCD-2: collecting...\n"); + + try { + $service = $this->createService(); + $result = $service->collect(); + + $this->stdout("Done: {$result->getRowCount()} rows inserted.\n"); + + $dq = $result->getDqResult(); + if ($dq !== null && !$dq->allPassed()) { + $this->stderr("DQ failures detected:\n"); + foreach ($dq->getAll() as $f) { + $this->stderr(" [{$f['level']}] {$f['code']}: {$f['message']}\n"); + } + } + + return ExitCode::OK; + } catch (\RuntimeException $e) { + $this->stderr("ERROR: {$e->getMessage()}\n"); + return ExitCode::SOFTWARE; + } catch (\Throwable $e) { + $this->stderr("FATAL: {$e->getMessage()}\n"); + try { + Yii::error("Stock State ETL fatal: " . $e->getMessage(), 'stock-state'); + } catch (\Throwable $e2) { + // ignore + } + return ExitCode::SOFTWARE; + } + } + + /** + * Удалить закрытые записи старше retention. + * @param int $months Retention в месяцах (default 24). + */ + public function actionCleanup(int $months = 24): int + { + $this->stdout("Cleaning up closed records older than {$months} months...\n"); + + try { + $deleted = $this->createService()->cleanupOldRecords($months); + $this->stdout("Deleted {$deleted} old records.\n"); + return ExitCode::OK; + } catch (\Throwable $e) { + $this->stderr("ERROR: {$e->getMessage()}\n"); + return ExitCode::SOFTWARE; + } + } + + private function createService(): StockStateService + { + return new StockStateService(Yii::$app->db); + } +} diff --git a/erp24/migrations/m260221_100000_create_stock_history_table.php b/erp24/migrations/m260221_100000_create_stock_history_table.php deleted file mode 100644 index 17d235bf..00000000 --- a/erp24/migrations/m260221_100000_create_stock_history_table.php +++ /dev/null @@ -1,64 +0,0 @@ -execute(" - CREATE TABLE {{%stock_history}} ( - id BIGSERIAL, - snapshot_date DATE NOT NULL, - snapshot_time TIME NOT NULL, - store_id VARCHAR(36) NOT NULL, - store_name VARCHAR(255), - product_id VARCHAR(36) NOT NULL, - product_name VARCHAR(255), - articule VARCHAR(36), - father_id VARCHAR(36), - components JSONB, - quantity NUMERIC(12,2) NOT NULL DEFAULT 0, - reserv NUMERIC(12,2) NOT NULL DEFAULT 0, - created_at TIMESTAMP NOT NULL DEFAULT NOW(), - PRIMARY KEY (id, snapshot_date) - ) PARTITION BY RANGE (snapshot_date) - "); - - // Партиция текущего месяца - $this->execute(" - CREATE TABLE IF NOT EXISTS {{%stock_history_2026_02}} - PARTITION OF {{%stock_history}} - FOR VALUES FROM ('2026-02-01') TO ('2026-03-01') - "); - - // Партиция следующего месяца - $this->execute(" - CREATE TABLE IF NOT EXISTS {{%stock_history_2026_03}} - PARTITION OF {{%stock_history}} - FOR VALUES FROM ('2026-03-01') TO ('2026-04-01') - "); - - // Уникальный индекс для idempotency (ON CONFLICT) - $this->execute(" - CREATE UNIQUE INDEX idx_stock_history_unique - ON {{%stock_history}} (snapshot_date, snapshot_time, store_id, product_id) - "); - - $this->createIndex('idx_stock_history_store', '{{%stock_history}}', 'store_id'); - $this->createIndex('idx_stock_history_product', '{{%stock_history}}', 'product_id'); - $this->createIndex('idx_stock_history_father', '{{%stock_history}}', 'father_id'); - $this->createIndex('idx_stock_history_date_time', '{{%stock_history}}', ['snapshot_date', 'snapshot_time']); - $this->createIndex('idx_stock_history_father_date', '{{%stock_history}}', ['father_id', 'snapshot_date']); - } - - public function safeDown() - { - $this->execute("DROP TABLE IF EXISTS {{%stock_history}} CASCADE"); - } -} diff --git a/erp24/migrations/m260221_100000_create_stock_state_table.php b/erp24/migrations/m260221_100000_create_stock_state_table.php new file mode 100644 index 00000000..e232bfb0 --- /dev/null +++ b/erp24/migrations/m260221_100000_create_stock_state_table.php @@ -0,0 +1,56 @@ +createTable('{{%stock_state}}', [ + 'id' => $this->bigPrimaryKey(), + 'store_id' => $this->string(36)->notNull(), + 'store_name' => $this->string(255), + 'product_id' => $this->string(36)->notNull(), + 'product_name' => $this->string(255), + 'articule' => $this->string(36), + 'father_id' => $this->string(36), + 'components' => 'JSONB', + 'quantity' => $this->decimal(12, 2)->notNull()->defaultValue(0), + 'reserv' => $this->decimal(12, 2)->notNull()->defaultValue(0), + 'valid_from' => "TIMESTAMP NOT NULL", + 'valid_to' => "TIMESTAMP NOT NULL DEFAULT '2100-01-01 00:00:00'", + 'is_active' => $this->boolean()->notNull()->defaultValue(true), + 'batch_ts' => "TIMESTAMP NOT NULL DEFAULT NOW()", + 'created_at' => "TIMESTAMP NOT NULL DEFAULT NOW()", + ]); + + // Partial unique: один активный record на store×product + $this->execute(" + CREATE UNIQUE INDEX uq_stock_state_active + ON {{%stock_state}} (store_id, product_id) + WHERE is_active = true + "); + + // BETWEEN запросы: WHERE :ts BETWEEN valid_from AND valid_to + $this->createIndex('idx_stock_state_validity', '{{%stock_state}}', ['valid_from', 'valid_to']); + $this->createIndex('idx_stock_state_active', '{{%stock_state}}', 'is_active'); + $this->createIndex('idx_stock_state_batch', '{{%stock_state}}', 'batch_ts'); + $this->createIndex('idx_stock_state_store', '{{%stock_state}}', 'store_id'); + $this->createIndex('idx_stock_state_product', '{{%stock_state}}', 'product_id'); + $this->createIndex('idx_stock_state_father', '{{%stock_state}}', 'father_id'); + } + + public function safeDown() + { + $this->dropTable('{{%stock_state}}'); + } +} diff --git a/erp24/records/StockHistory.php b/erp24/records/StockHistory.php deleted file mode 100644 index 6fedd4ea..00000000 --- a/erp24/records/StockHistory.php +++ /dev/null @@ -1,60 +0,0 @@ - 36], - [['store_name', 'product_name'], 'string', 'max' => 255], - [['articule'], 'string', 'max' => 36], - [['quantity', 'reserv'], 'number', 'min' => 0], - [['quantity', 'reserv'], 'default', 'value' => 0], - [['snapshot_date'], 'date', 'format' => 'php:Y-m-d'], - [['snapshot_time'], 'string'], - [['components'], 'safe'], - ]; - } - - public function getStore(): \yii\db\ActiveQuery - { - return $this->hasOne(Products1c::class, ['id' => 'store_id']); - } - - public function getProduct(): \yii\db\ActiveQuery - { - return $this->hasOne(Products1c::class, ['id' => 'product_id']); - } -} diff --git a/erp24/records/StockState.php b/erp24/records/StockState.php new file mode 100644 index 00000000..710ed0e9 --- /dev/null +++ b/erp24/records/StockState.php @@ -0,0 +1,75 @@ + 36], + [['store_name', 'product_name'], 'string', 'max' => 255], + [['articule'], 'string', 'max' => 36], + [['quantity', 'reserv'], 'number', 'min' => 0], + [['quantity', 'reserv'], 'default', 'value' => 0], + [['valid_from', 'valid_to'], 'datetime', 'format' => 'php:Y-m-d H:i:s'], + [['valid_to'], 'default', 'value' => self::INFINITY_TS], + [['is_active'], 'boolean'], + [['is_active'], 'default', 'value' => true], + [['batch_ts'], 'datetime', 'format' => 'php:Y-m-d H:i:s'], + [['components'], 'safe'], + ]; + } + + public function getStore(): \yii\db\ActiveQuery + { + return $this->hasOne(Products1c::class, ['id' => 'store_id']); + } + + public function getProduct(): \yii\db\ActiveQuery + { + return $this->hasOne(Products1c::class, ['id' => 'product_id']); + } +} diff --git a/erp24/services/CollectResult.php b/erp24/services/CollectResult.php index e0167894..05c9e711 100644 --- a/erp24/services/CollectResult.php +++ b/erp24/services/CollectResult.php @@ -5,7 +5,7 @@ declare(strict_types=1); namespace app\services; /** - * DTO: результат StockHistoryService::collect(). + * DTO: результат StockStateService::collect(). */ class CollectResult { diff --git a/erp24/services/DqResult.php b/erp24/services/DqResult.php index 6b023254..43e4627a 100644 --- a/erp24/services/DqResult.php +++ b/erp24/services/DqResult.php @@ -5,7 +5,7 @@ declare(strict_types=1); namespace app\services; /** - * DTO: результат DQ assertions для StockHistoryService. + * DTO: результат DQ assertions для StockStateService. */ class DqResult { diff --git a/erp24/services/StockHistoryService.php b/erp24/services/StockHistoryService.php deleted file mode 100644 index e5c09ff1..00000000 --- a/erp24/services/StockHistoryService.php +++ /dev/null @@ -1,463 +0,0 @@ -db = $db; - $this->telegramCallback = $telegramCallback; - } - - /** - * Основной метод сбора остатков. - */ - public function collect(string $snapshotTime): CollectResult - { - if (!preg_match('/^\d{2}:\d{2}$/', $snapshotTime)) { - throw new \InvalidArgumentException("Invalid snapshotTime format: {$snapshotTime}, expected HH:MM"); - } - - $snapshotDate = date('Y-m-d'); - - // 1. Advisory lock - $this->acquireLock(); - - try { - // 2. Обеспечить наличие партиций - $this->ensurePartitions(); - - // 3. SELECT данные - $rows = $this->fetchBalances(); - - // 4. Batch INSERT - $insertedCount = $this->batchInsert($rows, $snapshotDate, $snapshotTime); - - // 5. DQ assertions - $dqResult = $this->runDqAssertions($snapshotDate, $snapshotTime); - - // 6. Log - $this->logResult($snapshotDate, $snapshotTime, $insertedCount, $dqResult); - - return new CollectResult(true, $insertedCount, $dqResult); - } finally { - $this->releaseLock(); - } - } - - /** - * DQ assertions после сбора. - */ - public function runDqAssertions(string $date, string $time): DqResult - { - $failures = []; - - // DQ-1: stores count >= active stores - $activeStores = (int)$this->db->createCommand() - ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'") - ->queryScalar(); - - $snapshotStores = (int)$this->db->createCommand() - ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time") - ->bindValues([':date' => $date, ':time' => $time]) - ->queryScalar(); - - if ($snapshotStores < $activeStores) { - $failures[] = [ - 'level' => 'CRITICAL', - 'code' => 'DQ-1', - 'message' => "Stores in snapshot ($snapshotStores) < active stores ($activeStores)", - ]; - } - - // DQ-2: no NULL in required fields - $nullCount = (int)$this->db->createCommand() - ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)") - ->bindValues([':date' => $date, ':time' => $time]) - ->queryScalar(); - - if ($nullCount > 0) { - $failures[] = [ - 'level' => 'CRITICAL', - 'code' => 'DQ-2', - 'message' => "Found $nullCount records with NULL in required fields", - ]; - } - - // DQ-3: no negative quantity - $negativeQty = (int)$this->db->createCommand() - ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND quantity < 0") - ->bindValues([':date' => $date, ':time' => $time]) - ->queryScalar(); - - if ($negativeQty > 0) { - $failures[] = [ - 'level' => 'MAJOR', - 'code' => 'DQ-3', - 'message' => "Found $negativeQty records with negative quantity", - ]; - } - - // DQ-4: reserv <= quantity - $reservOverQty = (int)$this->db->createCommand() - ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND reserv > quantity") - ->bindValues([':date' => $date, ':time' => $time]) - ->queryScalar(); - - if ($reservOverQty > 0) { - $failures[] = [ - 'level' => 'WARNING', - 'code' => 'DQ-4', - 'message' => "Found $reservOverQty records where reserv > quantity", - ]; - } - - // DQ-5: row count deviation - $currentCount = (int)$this->db->createCommand() - ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time") - ->bindValues([':date' => $date, ':time' => $time]) - ->queryScalar(); - - $previousCount = (int)$this->db->createCommand() - ->setSql(" - SELECT COUNT(*) FROM stock_history - WHERE (snapshot_date, snapshot_time) = ( - SELECT snapshot_date, snapshot_time FROM stock_history - WHERE (snapshot_date, snapshot_time) < (:date, :time) - GROUP BY snapshot_date, snapshot_time - ORDER BY snapshot_date DESC, snapshot_time DESC - LIMIT 1 - ) - ") - ->bindValues([':date' => $date, ':time' => $time]) - ->queryScalar(); - - if ($previousCount > 0) { - $deviation = abs($currentCount - $previousCount) / $previousCount; - if ($deviation > self::DEVIATION_THRESHOLD) { - $pct = round($deviation * 100, 1); - $failures[] = [ - 'level' => 'MAJOR', - 'code' => 'DQ-5', - 'message' => "Row count deviation {$pct}% (current: $currentCount, previous: $previousCount)", - ]; - } - } - - // DQ-6: non-empty snapshot - if ($currentCount === 0) { - $failures[] = [ - 'level' => 'CRITICAL', - 'code' => 'DQ-6', - 'message' => "Empty snapshot: 0 records for $date $time", - ]; - } - - $dqResult = new DqResult($failures); - - // Отправить алерты при CRITICAL/MAJOR - $this->sendAlerts($dqResult, $date, $time); - - return $dqResult; - } - - /** - * Обеспечить наличие партиций для текущего и следующего месяца. - */ - public function ensurePartitions(): void - { - $currentMonth = date('Y-m'); - $nextMonth = date('Y-m', strtotime('+1 month')); - - $this->createPartition($currentMonth); - $this->createPartition($nextMonth); - } - - /** - * Создание партиции на указанный месяц. - */ - public function createPartition(string $yearMonth): void - { - if (!preg_match('/^\d{4}-\d{2}$/', $yearMonth)) { - throw new \InvalidArgumentException("Invalid yearMonth format: {$yearMonth}, expected YYYY-MM"); - } - - $start = $yearMonth . '-01'; - $end = date('Y-m-d', strtotime($start . ' +1 month')); - $suffix = str_replace('-', '_', $yearMonth); - - // DDL не поддерживает bind params — используем интерполяцию (формат валидирован regex) - $this->db->createCommand() - ->setSql("CREATE TABLE IF NOT EXISTS stock_history_{$suffix} PARTITION OF stock_history FOR VALUES FROM ('{$start}') TO ('{$end}')") - ->execute(); - } - - /** - * Удаление партиций старше retention периода. - */ - public function dropOldPartitions(int $retentionMonths = 24): void - { - $cutoffDate = date('Y-m-d', strtotime("-{$retentionMonths} months")); - $cutoffYm = substr($cutoffDate, 0, 7); // "2024-02" - - $partitions = $this->db->createCommand() - ->setSql("SELECT tablename FROM pg_tables WHERE schemaname = 'erp24' AND tablename LIKE 'stock_history_%' ORDER BY tablename") - ->queryAll(); - - foreach ($partitions as $row) { - $name = $row['tablename']; - // Извлекаем дату из имени: stock_history_2024_01 → 2024-01 - if (preg_match('/stock_history_(\d{4})_(\d{2})$/', $name, $m)) { - $partitionYm = $m[1] . '-' . $m[2]; - if ($partitionYm < $cutoffYm) { - $this->db->createCommand() - ->setSql("DROP TABLE IF EXISTS {$name}") - ->execute(); - - Yii::info("Dropped old partition: {$name}", 'stock-history'); - } - } - } - } - - // ========================================================= - // Private methods - // ========================================================= - - protected function getLockTimeoutSec(): int - { - return self::LOCK_TIMEOUT_SEC; - } - - protected function getLockRetrySleepUs(): int - { - return 500000; // 0.5 sec - } - - private function acquireLock(): void - { - $deadline = time() + $this->getLockTimeoutSec(); - $sleepUs = $this->getLockRetrySleepUs(); - - while (time() < $deadline) { - $locked = $this->db->createCommand() - ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")") - ->queryScalar(); - - if ($locked) { - return; - } - - usleep($sleepUs); - } - - $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds'; - try { - Yii::error($msg, 'stock-history'); - } catch (\Throwable $e) { - // Логирование не должно маскировать основную ошибку - } - $this->sendTelegram("CRITICAL: Stock History ETL — $msg"); - - throw new \RuntimeException($msg); - } - - private function releaseLock(): void - { - $this->db->createCommand() - ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")") - ->queryScalar(); - } - - private function fetchBalances(): array - { - return $this->db->createCommand() - ->setSql(" - SELECT - b.store_id, - ps.name as store_name, - b.product_id, - pp.name as product_name, - pp.articule, - pp.parent_id as father_id, - pp.components, - b.quantity, - b.reserv - FROM balances b - LEFT JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store' - LEFT JOIN products_1c pp ON pp.id = b.product_id - WHERE ps.id IS NOT NULL - ORDER BY b.store_id, b.product_id - ") - ->queryAll(); - } - - private function batchInsert(array $rows, string $date, string $time): int - { - if (empty($rows)) { - return 0; - } - - $total = 0; - $chunks = array_chunk($rows, self::BATCH_SIZE); - - foreach ($chunks as $chunk) { - $total += $this->insertChunkWithRetry($chunk, $date, $time); - } - - return $total; - } - - private function insertChunkWithRetry(array $chunk, string $date, string $time): int - { - $attempt = 0; - - while ($attempt < self::MAX_RETRY) { - try { - return $this->insertChunk($chunk, $date, $time); - } catch (\yii\db\Exception $e) { - $attempt++; - if ($attempt >= self::MAX_RETRY) { - throw $e; - } - // Exponential backoff: 1s, 2s, 4s - sleep((int)pow(2, $attempt - 1)); - Yii::warning("Batch INSERT retry #{$attempt}: " . $e->getMessage(), 'stock-history'); - } - } - - return 0; - } - - private function insertChunk(array $chunk, string $date, string $time): int - { - $values = []; - $params = []; - $i = 0; - - foreach ($chunk as $row) { - $components = $row['components'] ?? null; - if ($components !== null && !is_string($components)) { - $components = json_encode($components); - } - - $values[] = "(:date_{$i}, :time_{$i}, :sid_{$i}, :sn_{$i}, :pid_{$i}, :pn_{$i}, :art_{$i}, :fid_{$i}, :comp_{$i}::jsonb, :qty_{$i}, :res_{$i}, NOW())"; - $params[":date_{$i}"] = $date; - $params[":time_{$i}"] = $time; - $params[":sid_{$i}"] = $row['store_id']; - $params[":sn_{$i}"] = $row['store_name'] ?? null; - $params[":pid_{$i}"] = $row['product_id']; - $params[":pn_{$i}"] = $row['product_name'] ?? null; - $params[":art_{$i}"] = $row['articule'] ?? null; - $params[":fid_{$i}"] = $row['father_id'] ?? null; - $params[":comp_{$i}"] = $components; - $params[":qty_{$i}"] = $row['quantity'] ?? 0; - $params[":res_{$i}"] = $row['reserv'] ?? 0; - $i++; - } - - $sql = "INSERT INTO stock_history (snapshot_date, snapshot_time, store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, created_at) - VALUES " . implode(', ', $values) . " - ON CONFLICT (snapshot_date, snapshot_time, store_id, product_id) - DO UPDATE SET - quantity = EXCLUDED.quantity, - reserv = EXCLUDED.reserv, - store_name = EXCLUDED.store_name, - product_name = EXCLUDED.product_name, - articule = EXCLUDED.articule, - father_id = EXCLUDED.father_id, - components = EXCLUDED.components, - created_at = NOW()"; - - return $this->db->createCommand() - ->setSql($sql) - ->bindValues($params) - ->execute(); - } - - private function logResult(string $date, string $time, int $rowCount, DqResult $dqResult): void - { - $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES'; - try { - Yii::info("Stock History ETL [{$date} {$time}]: {$status}, rows={$rowCount}", 'stock-history'); - } catch (\Throwable $e) { - // Логирование не должно прерывать ETL - } - } - - private function sendAlerts(DqResult $dqResult, string $date, string $time): void - { - $criticals = $dqResult->getCriticalFailures(); - $majors = $dqResult->getMajorFailures(); - - if (empty($criticals) && empty($majors)) { - return; - } - - $messages = []; - foreach ($criticals as $f) { - $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}"; - } - foreach ($majors as $f) { - $messages[] = "MAJOR [{$f['code']}]: {$f['message']}"; - } - - try { - foreach ($criticals as $f) { - Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-history'); - } - foreach ($majors as $f) { - Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-history'); - } - } catch (\Throwable $e) { - // Логирование не должно прерывать ETL - } - - $text = "Stock History DQ [{$date} {$time}]:\n" . implode("\n", $messages); - $this->sendTelegram($text); - } - - private function sendTelegram(string $message): void - { - if ($this->telegramCallback !== null) { - ($this->telegramCallback)($message); - return; - } - - try { - if (Yii::$app && Yii::$app->has('queue')) { - Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([ - 'message' => $message, - ])); - } - } catch (\Throwable $e) { - Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-history'); - } - } -} diff --git a/erp24/services/StockStateService.php b/erp24/services/StockStateService.php new file mode 100644 index 00000000..21a97597 --- /dev/null +++ b/erp24/services/StockStateService.php @@ -0,0 +1,359 @@ +db = $db; + $this->telegramCallback = $telegramCallback; + } + + /** + * Основной метод: SCD-2 сбор остатков. + * batch_ts — один timestamp на весь запуск. + */ + public function collect(): CollectResult + { + $now = date('Y-m-d H:i:s'); + + $this->acquireLock(); + + try { + // 1. Закрыть устаревшие записи (остаток изменился) + $closedCount = $this->closeChangedRecords($now); + + // 2. Вставить новые/изменённые + $insertedCount = $this->insertNewRecords($now); + + // 3. DQ assertions + $dqResult = $this->runDqAssertions(); + + // 4. Log + $this->logResult($now, $closedCount, $insertedCount, $dqResult); + + return new CollectResult(true, $insertedCount, $dqResult); + } finally { + $this->releaseLock(); + } + } + + /** + * Закрыть записи, где quantity или reserv изменились. + * SET valid_to = :now, is_active = false. + */ + private function closeChangedRecords(string $now): int + { + return $this->db->createCommand() + ->setSql(" + UPDATE stock_state ss + SET valid_to = :now::timestamp - INTERVAL '1 second', is_active = false + FROM balances b + JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store' + WHERE ss.store_id = b.store_id + AND ss.product_id = b.product_id + AND ss.is_active = true + AND (ss.quantity != b.quantity OR ss.reserv != b.reserv) + ") + ->bindValue(':now', $now) + ->execute(); + } + + /** + * Вставить записи для новых товаров и для тех, чей остаток изменился. + * valid_from = :now, valid_to = infinity, is_active = true, batch_ts = :now. + */ + private function insertNewRecords(string $now): int + { + return $this->db->createCommand() + ->setSql(" + INSERT INTO stock_state (store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, valid_from, valid_to, is_active, batch_ts, created_at) + SELECT + b.store_id, + ps.name, + b.product_id, + pp.name, + pp.articule, + pp.parent_id, + pp.components, + b.quantity, + b.reserv, + :now, + :infinity, + true, + :batch_ts, + NOW() + FROM balances b + JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store' + LEFT JOIN products_1c pp ON pp.id = b.product_id + LEFT JOIN stock_state ss + ON ss.store_id = b.store_id + AND ss.product_id = b.product_id + AND ss.is_active = true + WHERE ss.id IS NULL + ") + ->bindValues([':now' => $now, ':infinity' => self::INFINITY_TS, ':batch_ts' => $now]) + ->execute(); + } + + /** + * DQ assertions по активным записям (is_active = true). + */ + public function runDqAssertions(): DqResult + { + $failures = []; + + // DQ-1: stores count >= active stores + $activeStores = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'") + ->queryScalar(); + + $snapshotStores = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_state WHERE is_active = true") + ->queryScalar(); + + if ($snapshotStores < $activeStores) { + $failures[] = [ + 'level' => 'CRITICAL', + 'code' => 'DQ-1', + 'message' => "Stores in active records ($snapshotStores) < active stores ($activeStores)", + ]; + } + + // DQ-2: no NULL in required fields + $nullCount = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)") + ->queryScalar(); + + if ($nullCount > 0) { + $failures[] = [ + 'level' => 'CRITICAL', + 'code' => 'DQ-2', + 'message' => "Found $nullCount active records with NULL in required fields", + ]; + } + + // DQ-3: no negative quantity + $negativeQty = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND quantity < 0") + ->queryScalar(); + + if ($negativeQty > 0) { + $failures[] = [ + 'level' => 'MAJOR', + 'code' => 'DQ-3', + 'message' => "Found $negativeQty active records with negative quantity", + ]; + } + + // DQ-4: reserv <= quantity + $reservOverQty = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND reserv > quantity") + ->queryScalar(); + + if ($reservOverQty > 0) { + $failures[] = [ + 'level' => 'WARNING', + 'code' => 'DQ-4', + 'message' => "Found $reservOverQty active records where reserv > quantity", + ]; + } + + // DQ-5: active count deviation vs previous batch + $activeCount = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true") + ->queryScalar(); + + $prevActiveCount = (int)$this->db->createCommand() + ->setSql(" + SELECT COUNT(*) FROM stock_state + WHERE valid_from <= (NOW() - INTERVAL '12 hours') + AND valid_to > (NOW() - INTERVAL '12 hours') + ") + ->queryScalar(); + + if ($prevActiveCount > 0) { + $deviation = abs($activeCount - $prevActiveCount) / $prevActiveCount; + if ($deviation > self::DEVIATION_THRESHOLD) { + $pct = round($deviation * 100, 1); + $failures[] = [ + 'level' => 'MAJOR', + 'code' => 'DQ-5', + 'message' => "Active records deviation {$pct}% (current: $activeCount, previous: $prevActiveCount)", + ]; + } + } + + // DQ-6: non-empty active set + if ($activeCount === 0) { + $failures[] = [ + 'level' => 'CRITICAL', + 'code' => 'DQ-6', + 'message' => "Zero active records in stock_state", + ]; + } + + $dqResult = new DqResult($failures); + $this->sendAlerts($dqResult); + + return $dqResult; + } + + /** + * Удаление закрытых записей старше retention. + */ + public function cleanupOldRecords(int $retentionMonths = 24): int + { + $cutoff = date('Y-m-d H:i:s', strtotime("-{$retentionMonths} months")); + + return $this->db->createCommand() + ->setSql("DELETE FROM stock_state WHERE is_active = false AND valid_to < :cutoff") + ->bindValues([':cutoff' => $cutoff]) + ->execute(); + } + + // ========================================================= + // Lock management + // ========================================================= + + protected function getLockTimeoutSec(): int + { + return self::LOCK_TIMEOUT_SEC; + } + + protected function getLockRetrySleepUs(): int + { + return 500000; // 0.5 sec + } + + private function acquireLock(): void + { + $deadline = time() + $this->getLockTimeoutSec(); + $sleepUs = $this->getLockRetrySleepUs(); + + while (time() < $deadline) { + $locked = $this->db->createCommand() + ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")") + ->queryScalar(); + + if ($locked) { + return; + } + + usleep($sleepUs); + } + + $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds'; + try { + Yii::error($msg, 'stock-state'); + } catch (\Throwable $e) { + // Логирование не должно маскировать основную ошибку + } + $this->sendTelegram("CRITICAL: Stock State ETL — $msg"); + + throw new \RuntimeException($msg); + } + + private function releaseLock(): void + { + $this->db->createCommand() + ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")") + ->queryScalar(); + } + + // ========================================================= + // Logging & alerts + // ========================================================= + + private function logResult(string $batchTs, int $closedCount, int $insertedCount, DqResult $dqResult): void + { + $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES'; + try { + Yii::info("Stock State SCD-2 [{$batchTs}]: {$status}, closed={$closedCount}, inserted={$insertedCount}", 'stock-state'); + } catch (\Throwable $e) { + // Логирование не должно прерывать ETL + } + } + + private function sendAlerts(DqResult $dqResult): void + { + $criticals = $dqResult->getCriticalFailures(); + $majors = $dqResult->getMajorFailures(); + + if (empty($criticals) && empty($majors)) { + return; + } + + $messages = []; + foreach ($criticals as $f) { + $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}"; + } + foreach ($majors as $f) { + $messages[] = "MAJOR [{$f['code']}]: {$f['message']}"; + } + + try { + foreach ($criticals as $f) { + Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-state'); + } + foreach ($majors as $f) { + Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-state'); + } + } catch (\Throwable $e) { + // Логирование не должно прерывать ETL + } + + $now = date('Y-m-d H:i:s'); + $text = "Stock State DQ [{$now}]:\n" . implode("\n", $messages); + $this->sendTelegram($text); + } + + private function sendTelegram(string $message): void + { + if ($this->telegramCallback !== null) { + ($this->telegramCallback)($message); + return; + } + + try { + if (Yii::$app && Yii::$app->has('queue')) { + Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([ + 'message' => $message, + ])); + } + } catch (\Throwable $e) { + try { + Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-state'); + } catch (\Throwable $e2) { + // ignore + } + } + } +} diff --git a/erp24/tests/unit/services/StockHistoryServiceTest.php b/erp24/tests/unit/services/StockHistoryServiceTest.php deleted file mode 100644 index 766116a2..00000000 --- a/erp24/tests/unit/services/StockHistoryServiceTest.php +++ /dev/null @@ -1,338 +0,0 @@ -createMock(\yii\db\Connection::class); - $command = $this->createMock(\yii\db\Command::class); - - $db->method('createCommand')->willReturn($command); - $command->method('setSql')->willReturnSelf(); - $command->method('bindValues')->willReturnSelf(); - $command->method('bindValue')->willReturnSelf(); - - return [$db, $command]; - } - - // ========================================================= - // collect() tests - // ========================================================= - - public function testCollect_Success_InsertsRecordsAndReturnsResult(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls( - true, // pg_try_advisory_lock - 5, // active stores (DQ-1) - 10, // current count (DQ-6 + DQ-5) - 5, // distinct stores in snapshot (DQ-1) - 0, // null required (DQ-2) - 0, // negative qty (DQ-3) - 0, // reserv > qty (DQ-4) - 10, // previous count (DQ-5) - true // pg_advisory_unlock - ); - - $command->method('queryAll')->willReturn( - array_fill(0, 10, [ - 'store_id' => 'store-1', 'store_name' => 'Store', - 'product_id' => 'prod-1', 'product_name' => 'Rose', - 'articule' => 'ART-001', 'father_id' => null, - 'components' => null, 'quantity' => 5.00, 'reserv' => 1.00, - ]) - ); - $command->method('execute')->willReturn(10); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $result = $service->collect('08:00'); - - $this->assertTrue($result->isSuccess()); - $this->assertEquals(10, $result->getRowCount()); - } - - public function testCollect_LockTimeout_ThrowsLockException(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - // Lock always fails - $command->method('queryScalar')->willReturn(false); - - $this->expectException(\RuntimeException::class); - $this->expectExceptionMessage('advisory lock'); - - // Анонимный класс с минимальными таймаутами для быстрого теста - $noopCallback = self::noopTelegramCallback(); - $service = new class($db, $noopCallback) extends StockHistoryService { - protected function getLockTimeoutSec(): int { return 1; } - protected function getLockRetrySleepUs(): int { return 100000; } // 0.1 sec - }; - $service->collect('08:00'); - } - - public function testCollect_InvalidTimeFormat_ThrowsException(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('snapshotTime'); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $service->collect('invalid'); - } - - public function testCollect_EmptyBalances_ReturnsZeroRows(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls( - true, 0, 0, 0, 0, 0, 0, 0, true - ); - $command->method('queryAll')->willReturn([]); - $command->method('execute')->willReturn(0); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $result = $service->collect('08:00'); - - $this->assertTrue($result->isSuccess()); - $this->assertEquals(0, $result->getRowCount()); - } - - public function testCollect_OnConflictUpdate_UsesUpsertQuery(): void - { - $db = $this->createMock(\yii\db\Connection::class); - $command = $this->createMock(\yii\db\Command::class); - - $db->method('createCommand')->willReturn($command); - $command->method('bindValues')->willReturnSelf(); - $command->method('bindValue')->willReturnSelf(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls(true, 1, 1, 1, 0, 0, 0, 1, true); - - $executedSql = []; - $command->method('setSql')->willReturnCallback( - function ($sql) use ($command, &$executedSql) { - $executedSql[] = $sql; - return $command; - } - ); - $command->method('queryAll')->willReturn([ - ['store_id' => 's1', 'store_name' => 'S', 'product_id' => 'p1', - 'product_name' => 'P', 'articule' => 'A', 'father_id' => null, - 'components' => null, 'quantity' => 1, 'reserv' => 0], - ]); - $command->method('execute')->willReturn(1); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $service->collect('08:00'); - - $hasOnConflict = false; - foreach ($executedSql as $sql) { - if (stripos($sql, 'ON CONFLICT') !== false) { - $hasOnConflict = true; - } - } - $this->assertTrue($hasOnConflict, 'SQL должен содержать ON CONFLICT DO UPDATE'); - } - - // ========================================================= - // DQ Assertions tests - // ========================================================= - - public function testDqAssertions_AllPass_ReturnsSuccess(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls(5, 5, 0, 0, 0, 100, 95); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $result = $service->runDqAssertions('2026-02-21', '08:00'); - - $this->assertTrue($result->allPassed()); - $this->assertEmpty($result->getCriticalFailures()); - } - - public function testDqAssertions_MissingStores_ReturnsCritical(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls(24, 20, 0, 0, 0, 100, 100); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $result = $service->runDqAssertions('2026-02-21', '08:00'); - - $this->assertFalse($result->allPassed()); - $criticals = $result->getCriticalFailures(); - $this->assertNotEmpty($criticals); - $this->assertStringContainsString('store', strtolower($criticals[0]['message'])); - } - - public function testDqAssertions_NegativeQuantity_ReturnsMajor(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls(5, 5, 0, 3, 0, 100, 100); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $result = $service->runDqAssertions('2026-02-21', '08:00'); - - $majors = $result->getMajorFailures(); - $this->assertNotEmpty($majors); - } - - public function testDqAssertions_DeviationOver20Pct_ReturnsMajor(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls(5, 5, 0, 0, 0, 100, 50); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $result = $service->runDqAssertions('2026-02-21', '08:00'); - - $majors = $result->getMajorFailures(); - $this->assertNotEmpty($majors); - } - - public function testDqAssertions_EmptySnapshot_ReturnsCritical(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $command->method('queryScalar') - ->willReturnOnConsecutiveCalls(5, 0, 0, 0, 0, 0, 100); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $result = $service->runDqAssertions('2026-02-21', '08:00'); - - $this->assertFalse($result->allPassed()); - $criticals = $result->getCriticalFailures(); - $this->assertNotEmpty($criticals); - } - - // ========================================================= - // Partition management tests - // ========================================================= - - public function testCreatePartition_CreatesPartitionTable(): void - { - $db = $this->createMock(\yii\db\Connection::class); - $command = $this->createMock(\yii\db\Command::class); - - $db->method('createCommand')->willReturn($command); - - $executedSql = []; - $command->method('setSql')->willReturnCallback( - function ($sql) use ($command, &$executedSql) { - $executedSql[] = $sql; - return $command; - } - ); - $command->method('execute')->willReturn(0); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $service->createPartition('2026-04'); - - $sql = implode(' ', $executedSql); - $this->assertStringContainsString('PARTITION OF', $sql); - $this->assertStringContainsString('stock_history_2026_04', $sql); - $this->assertStringContainsString('2026-04-01', $sql); - $this->assertStringContainsString('2026-05-01', $sql); - } - - public function testCreatePartition_InvalidFormat_ThrowsException(): void - { - [$db, $command] = $this->createMockDbAndCommand(); - - $this->expectException(\InvalidArgumentException::class); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $service->createPartition('invalid'); - } - - public function testEnsurePartitions_CreatesCurrentAndNextMonth(): void - { - $db = $this->createMock(\yii\db\Connection::class); - $command = $this->createMock(\yii\db\Command::class); - - $db->method('createCommand')->willReturn($command); - - $executedSql = []; - $command->method('setSql')->willReturnCallback( - function ($sql) use ($command, &$executedSql) { - $executedSql[] = $sql; - return $command; - } - ); - $command->method('execute')->willReturn(0); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $service->ensurePartitions(); - - $partitionSql = array_filter($executedSql, fn($s) => stripos($s, 'PARTITION OF') !== false); - $this->assertCount(2, $partitionSql, 'Должны быть созданы 2 партиции (текущий и следующий месяц)'); - } - - public function testDropOldPartitions_DropsPartitionsOlderThanRetention(): void - { - $db = $this->createMock(\yii\db\Connection::class); - $command = $this->createMock(\yii\db\Command::class); - - $db->method('createCommand')->willReturn($command); - $command->method('bindValues')->willReturnSelf(); - $command->method('bindValue')->willReturnSelf(); - - $command->method('queryAll')->willReturn([ - ['tablename' => 'stock_history_2023_11'], - ['tablename' => 'stock_history_2023_12'], - ['tablename' => 'stock_history_2026_01'], - ]); - - $droppedSql = []; - $command->method('setSql')->willReturnCallback( - function ($sql) use ($command, &$droppedSql) { - if (stripos($sql, 'DROP TABLE') !== false) { - $droppedSql[] = $sql; - } - return $command; - } - ); - $command->method('execute')->willReturn(0); - - $service = new StockHistoryService($db, self::noopTelegramCallback()); - $service->dropOldPartitions(24); - - $this->assertCount(2, $droppedSql, 'Должны быть удалены 2 старые партиции (2023_11 и 2023_12)'); - } -} diff --git a/erp24/tests/unit/services/StockStateServiceTest.php b/erp24/tests/unit/services/StockStateServiceTest.php new file mode 100644 index 00000000..b947440c --- /dev/null +++ b/erp24/tests/unit/services/StockStateServiceTest.php @@ -0,0 +1,290 @@ +createMock(\yii\db\Connection::class); + $command = $this->createMock(\yii\db\Command::class); + + $db->method('createCommand')->willReturn($command); + $command->method('setSql')->willReturnSelf(); + $command->method('bindValues')->willReturnSelf(); + $command->method('bindValue')->willReturnSelf(); + + return [$db, $command]; + } + + // ========================================================= + // collect() tests + // ========================================================= + + public function testCollect_Success_ClosesAndInsertsRecords(): void + { + $db = $this->createMock(\yii\db\Connection::class); + $command = $this->createMock(\yii\db\Command::class); + + $db->method('createCommand')->willReturn($command); + $command->method('setSql')->willReturnSelf(); + $command->method('bindValues')->willReturnSelf(); + $command->method('bindValue')->willReturnSelf(); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls( + true, // lock + 24, 24, // DQ-1 + 0, // DQ-2 + 0, // DQ-3 + 0, // DQ-4 + 11400, // DQ-5 activeCount + 11300, // DQ-5 prevActiveCount + true // unlock + ); + + $command->method('execute') + ->willReturnOnConsecutiveCalls(150, 150); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $result = $service->collect(); + + $this->assertTrue($result->isSuccess()); + $this->assertSame(150, $result->getRowCount()); + $this->assertTrue($result->getDqResult()->allPassed()); + } + + public function testCollect_LockTimeout_ThrowsRuntimeException(): void + { + [$db, $command] = $this->createMockDbAndCommand(); + + $command->method('queryScalar')->willReturn(false); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('advisory lock'); + + $noopCallback = self::noopTelegramCallback(); + $service = new class($db, $noopCallback) extends StockStateService { + protected function getLockTimeoutSec(): int { return 1; } + protected function getLockRetrySleepUs(): int { return 100000; } + }; + $service->collect(); + } + + public function testCollect_EmptyBalances_ReturnsZeroAndDqCritical(): void + { + $db = $this->createMock(\yii\db\Connection::class); + $command = $this->createMock(\yii\db\Command::class); + + $db->method('createCommand')->willReturn($command); + $command->method('setSql')->willReturnSelf(); + $command->method('bindValues')->willReturnSelf(); + $command->method('bindValue')->willReturnSelf(); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls( + true, // lock + 24, 0, // DQ-1 + 0, // DQ-2 + 0, // DQ-3 + 0, // DQ-4 + 0, // DQ-5 + 0, // DQ-5 prev + true // unlock + ); + + $command->method('execute') + ->willReturnOnConsecutiveCalls(0, 0); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $result = $service->collect(); + + $this->assertTrue($result->isSuccess()); + $this->assertSame(0, $result->getRowCount()); + $this->assertFalse($result->getDqResult()->allPassed()); + $this->assertNotEmpty($result->getDqResult()->getCriticalFailures()); + } + + // ========================================================= + // SCD-2 SQL + batch_ts verification + // ========================================================= + + public function testCollect_GeneratesScd2QueriesWithBatchTs(): void + { + $db = $this->createMock(\yii\db\Connection::class); + $command = $this->createMock(\yii\db\Command::class); + + $db->method('createCommand')->willReturn($command); + $command->method('bindValue')->willReturnSelf(); + + $executedSql = []; + $boundValues = []; + $command->method('setSql')->willReturnCallback( + function ($sql) use ($command, &$executedSql) { + $executedSql[] = $sql; + return $command; + } + ); + $command->method('bindValues')->willReturnCallback( + function ($values) use ($command, &$boundValues) { + $boundValues = array_merge($boundValues, $values); + return $command; + } + ); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls( + true, + 24, 24, + 0, 0, 0, + 11400, 11400, + true + ); + + $command->method('execute') + ->willReturnOnConsecutiveCalls(100, 100); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $service->collect(); + + $allSql = implode("\n", $executedSql); + + // SCD-2 close + $this->assertStringContainsString('is_active = false', $allSql); + $this->assertStringContainsString('SET valid_to', $allSql); + $this->assertStringContainsString('ss.quantity', $allSql); + $this->assertStringContainsString('ss.reserv', $allSql); + + // SCD-2 insert + $this->assertStringContainsString('ss.id IS NULL', $allSql); + $this->assertStringContainsString('valid_from', $allSql); + $this->assertStringContainsString('batch_ts', $allSql); + + // batch_ts и infinity передаются + $this->assertSame('2100-01-01 00:00:00', $boundValues[':infinity'] ?? null); + $this->assertArrayHasKey(':batch_ts', $boundValues); + // batch_ts = now (тот же что и :now) + $this->assertSame($boundValues[':now'], $boundValues[':batch_ts']); + } + + // ========================================================= + // DQ assertions + // ========================================================= + + public function testDqAssertions_AllPass_ReturnsSuccess(): void + { + [$db, $command] = $this->createMockDbAndCommand(); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls(24, 24, 0, 0, 0, 11400, 11300); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $result = $service->runDqAssertions(); + + $this->assertTrue($result->allPassed()); + } + + public function testDqAssertions_MissingStores_ReturnsCritical(): void + { + [$db, $command] = $this->createMockDbAndCommand(); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls(24, 20, 0, 0, 0, 11400, 11400); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $result = $service->runDqAssertions(); + + $this->assertFalse($result->allPassed()); + $this->assertSame('DQ-1', $result->getCriticalFailures()[0]['code']); + } + + public function testDqAssertions_NegativeQuantity_ReturnsMajor(): void + { + [$db, $command] = $this->createMockDbAndCommand(); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls(24, 24, 0, 3, 0, 11400, 11400); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $result = $service->runDqAssertions(); + + $this->assertSame('DQ-3', $result->getMajorFailures()[0]['code']); + } + + public function testDqAssertions_DeviationOver20Pct_ReturnsMajor(): void + { + [$db, $command] = $this->createMockDbAndCommand(); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls(24, 24, 0, 0, 0, 11400, 5000); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $result = $service->runDqAssertions(); + + $this->assertSame('DQ-5', $result->getMajorFailures()[0]['code']); + } + + public function testDqAssertions_ZeroActiveRecords_ReturnsCritical(): void + { + [$db, $command] = $this->createMockDbAndCommand(); + + $command->method('queryScalar') + ->willReturnOnConsecutiveCalls(24, 0, 0, 0, 0, 0, 0); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $result = $service->runDqAssertions(); + + $this->assertFalse($result->allPassed()); + $this->assertContains('DQ-6', array_column($result->getCriticalFailures(), 'code')); + } + + // ========================================================= + // Cleanup + // ========================================================= + + public function testCleanupOldRecords_DeletesClosedOlderThanRetention(): void + { + $db = $this->createMock(\yii\db\Connection::class); + $command = $this->createMock(\yii\db\Command::class); + + $db->method('createCommand')->willReturn($command); + $command->method('bindValues')->willReturnSelf(); + + $executedSql = []; + $command->method('setSql')->willReturnCallback( + function ($sql) use ($command, &$executedSql) { + $executedSql[] = $sql; + return $command; + } + ); + $command->method('execute')->willReturn(500); + + $service = new StockStateService($db, self::noopTelegramCallback()); + $deleted = $service->cleanupOldRecords(24); + + $this->assertSame(500, $deleted); + $sql = implode(' ', $executedSql); + $this->assertStringContainsString('DELETE FROM', $sql); + $this->assertStringContainsString('is_active = false', $sql); + } +}