]> gitweb.erp-flowers.ru Git - erp24_rep/yii-erp24/.git/commitdiff
feat(ERP-33): заменить StockHistory (snapshot) на StockState (SCD-2)
authorAleksey Filippov <Aleksey.Filippov@erp-flowers.ru>
Tue, 24 Mar 2026 09:48:40 +0000 (12:48 +0300)
committerAleksey Filippov <Aleksey.Filippov@erp-flowers.ru>
Tue, 24 Mar 2026 09:48:40 +0000 (12:48 +0300)
Полная переделка подхода хранения остатков:
- StockHistory (полные снимки 11400 INSERT/запуск) → StockState (SCD-2, 2 SQL)
- Без партиций — одна таблица с valid_from/valid_to (TIMESTAMP)
- valid_to = '2100-01-01 00:00:00' для активных, закрытие на секунду раньше
- is_active boolean для быстрой фильтрации
- batch_ts — единый timestamp пакета на запуск
- Экономия ~78% объёма (660 MB vs 3 GB за 2 года)
- 10 unit-тестов, 29 assertions

12 files changed:
erp24/commands/StockHistoryController.php [deleted file]
erp24/commands/StockStateController.php [new file with mode: 0644]
erp24/migrations/m260221_100000_create_stock_history_table.php [deleted file]
erp24/migrations/m260221_100000_create_stock_state_table.php [new file with mode: 0644]
erp24/records/StockHistory.php [deleted file]
erp24/records/StockState.php [new file with mode: 0644]
erp24/services/CollectResult.php
erp24/services/DqResult.php
erp24/services/StockHistoryService.php [deleted file]
erp24/services/StockStateService.php [new file with mode: 0644]
erp24/tests/unit/services/StockHistoryServiceTest.php [deleted file]
erp24/tests/unit/services/StockStateServiceTest.php [new file with mode: 0644]

diff --git a/erp24/commands/StockHistoryController.php b/erp24/commands/StockHistoryController.php
deleted file mode 100644 (file)
index c0ec9ef..0000000
+++ /dev/null
@@ -1,139 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace app\commands;
-
-use app\services\StockHistoryService;
-use yii\console\Controller;
-use yii\console\ExitCode;
-use Yii;
-
-/**
- * ERP-33: Управление историческими срезами остатков.
- *
- * Crontab:
- *   0 8,20 * * * cd /www && php yii stock-history/collect >> /var/log/stock-history.log 2>&1
- *   0 0 1 * * cd /www && php yii stock-history/ensure-partitions >> /var/log/stock-history.log 2>&1
- *   0 1 1 * * cd /www && php yii stock-history/create-partition >> /var/log/stock-history.log 2>&1
- *   0 2 1 * * cd /www && php yii stock-history/drop-old-partitions >> /var/log/stock-history.log 2>&1
- */
-class StockHistoryController extends Controller
-{
-    /**
-     * @var string Время среза (08:00 или 20:00). Если пусто — авто-определение.
-     */
-    public string $time = '';
-
-    public function options($actionID): array
-    {
-        return array_merge(parent::options($actionID), ['time']);
-    }
-
-    /**
-     * Собрать срез остатков.
-     */
-    public function actionCollect(): int
-    {
-        $snapshotTime = $this->resolveTime();
-        $this->stdout("Stock History ETL: collecting snapshot for {$snapshotTime}...\n");
-
-        $service = $this->createService();
-
-        try {
-            $result = $service->collect($snapshotTime);
-
-            $this->stdout("Done: {$result->getRowCount()} rows inserted.\n");
-
-            $dq = $result->getDqResult();
-            if ($dq !== null && !$dq->allPassed()) {
-                $this->stderr("DQ failures detected:\n");
-                foreach ($dq->getAll() as $f) {
-                    $this->stderr("  [{$f['level']}] {$f['code']}: {$f['message']}\n");
-                }
-                return ExitCode::OK; // ETL прошёл, но DQ имеет замечания
-            }
-
-            return ExitCode::OK;
-        } catch (\RuntimeException $e) {
-            $this->stderr("ERROR: {$e->getMessage()}\n");
-            return ExitCode::SOFTWARE;
-        } catch (\Throwable $e) {
-            $this->stderr("FATAL: {$e->getMessage()}\n");
-            Yii::error("Stock History ETL fatal: " . $e->getMessage(), 'stock-history');
-            return ExitCode::SOFTWARE;
-        }
-    }
-
-    /**
-     * Создать партицию на указанный месяц.
-     * @param string $month Формат "2026-03". По умолчанию — следующий месяц.
-     */
-    public function actionCreatePartition(string $month = ''): int
-    {
-        if (empty($month)) {
-            $month = date('Y-m', strtotime('+1 month'));
-        }
-
-        $this->stdout("Creating partition for {$month}...\n");
-
-        try {
-            $this->createService()->createPartition($month);
-            $this->stdout("Partition stock_history_{$month} created.\n");
-            return ExitCode::OK;
-        } catch (\Throwable $e) {
-            $this->stderr("ERROR: {$e->getMessage()}\n");
-            return ExitCode::SOFTWARE;
-        }
-    }
-
-    /**
-     * Обеспечить наличие партиций для текущего и следующего месяца.
-     */
-    public function actionEnsurePartitions(): int
-    {
-        $this->stdout("Ensuring partitions for current and next month...\n");
-
-        try {
-            $this->createService()->ensurePartitions();
-            $this->stdout("Partitions ensured.\n");
-            return ExitCode::OK;
-        } catch (\Throwable $e) {
-            $this->stderr("ERROR: {$e->getMessage()}\n");
-            return ExitCode::SOFTWARE;
-        }
-    }
-
-    /**
-     * Удалить старые партиции.
-     * @param int $months Retention в месяцах (default 24).
-     */
-    public function actionDropOldPartitions(int $months = 24): int
-    {
-        $this->stdout("Dropping partitions older than {$months} months...\n");
-
-        try {
-            $this->createService()->dropOldPartitions($months);
-            $this->stdout("Done.\n");
-            return ExitCode::OK;
-        } catch (\Throwable $e) {
-            $this->stderr("ERROR: {$e->getMessage()}\n");
-            return ExitCode::SOFTWARE;
-        }
-    }
-
-    private function resolveTime(): string
-    {
-        if (!empty($this->time)) {
-            return $this->time;
-        }
-
-        $hour = (int)date('H');
-        return $hour < 14 ? '08:00' : '20:00';
-    }
-
-    private function createService(): StockHistoryService
-    {
-        return new StockHistoryService(Yii::$app->db);
-    }
-}
diff --git a/erp24/commands/StockStateController.php b/erp24/commands/StockStateController.php
new file mode 100644 (file)
index 0000000..43a4fed
--- /dev/null
@@ -0,0 +1,81 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\commands;
+
+use app\services\StockStateService;
+use yii\console\Controller;
+use yii\console\ExitCode;
+use Yii;
+
+/**
+ * ERP-33: SCD-2 учёт остатков.
+ *
+ * Crontab:
+ *   0 8 * * * cd /www && php yii stock-state/collect >> /var/log/stock-state.log 2>&1
+ *   0 3 1 * * cd /www && php yii stock-state/cleanup >> /var/log/stock-state.log 2>&1
+ */
+class StockStateController extends Controller
+{
+    /**
+     * Собрать SCD-2 срез остатков (1 раз/день).
+     *
+     * Логика: close changed + insert new/changed (2 SQL-запроса).
+     */
+    public function actionCollect(): int
+    {
+        $this->stdout("Stock State SCD-2: collecting...\n");
+
+        try {
+            $service = $this->createService();
+            $result = $service->collect();
+
+            $this->stdout("Done: {$result->getRowCount()} rows inserted.\n");
+
+            $dq = $result->getDqResult();
+            if ($dq !== null && !$dq->allPassed()) {
+                $this->stderr("DQ failures detected:\n");
+                foreach ($dq->getAll() as $f) {
+                    $this->stderr("  [{$f['level']}] {$f['code']}: {$f['message']}\n");
+                }
+            }
+
+            return ExitCode::OK;
+        } catch (\RuntimeException $e) {
+            $this->stderr("ERROR: {$e->getMessage()}\n");
+            return ExitCode::SOFTWARE;
+        } catch (\Throwable $e) {
+            $this->stderr("FATAL: {$e->getMessage()}\n");
+            try {
+                Yii::error("Stock State ETL fatal: " . $e->getMessage(), 'stock-state');
+            } catch (\Throwable $e2) {
+                // ignore
+            }
+            return ExitCode::SOFTWARE;
+        }
+    }
+
+    /**
+     * Удалить закрытые записи старше retention.
+     * @param int $months Retention в месяцах (default 24).
+     */
+    public function actionCleanup(int $months = 24): int
+    {
+        $this->stdout("Cleaning up closed records older than {$months} months...\n");
+
+        try {
+            $deleted = $this->createService()->cleanupOldRecords($months);
+            $this->stdout("Deleted {$deleted} old records.\n");
+            return ExitCode::OK;
+        } catch (\Throwable $e) {
+            $this->stderr("ERROR: {$e->getMessage()}\n");
+            return ExitCode::SOFTWARE;
+        }
+    }
+
+    private function createService(): StockStateService
+    {
+        return new StockStateService(Yii::$app->db);
+    }
+}
diff --git a/erp24/migrations/m260221_100000_create_stock_history_table.php b/erp24/migrations/m260221_100000_create_stock_history_table.php
deleted file mode 100644 (file)
index 17d235b..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-<?php
-
-use yii\db\Migration;
-
-/**
- * ERP-33: Создание partitioned таблицы stock_history для хранения
- * исторических срезов остатков товаров по магазинам.
- */
-class m260221_100000_create_stock_history_table extends Migration
-{
-    public function safeUp()
-    {
-        // Partitioned table — нельзя через createTable(), используем raw SQL
-        $this->execute("
-            CREATE TABLE {{%stock_history}} (
-                id BIGSERIAL,
-                snapshot_date DATE NOT NULL,
-                snapshot_time TIME NOT NULL,
-                store_id VARCHAR(36) NOT NULL,
-                store_name VARCHAR(255),
-                product_id VARCHAR(36) NOT NULL,
-                product_name VARCHAR(255),
-                articule VARCHAR(36),
-                father_id VARCHAR(36),
-                components JSONB,
-                quantity NUMERIC(12,2) NOT NULL DEFAULT 0,
-                reserv NUMERIC(12,2) NOT NULL DEFAULT 0,
-                created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-                PRIMARY KEY (id, snapshot_date)
-            ) PARTITION BY RANGE (snapshot_date)
-        ");
-
-        // Партиция текущего месяца
-        $this->execute("
-            CREATE TABLE IF NOT EXISTS {{%stock_history_2026_02}}
-                PARTITION OF {{%stock_history}}
-                FOR VALUES FROM ('2026-02-01') TO ('2026-03-01')
-        ");
-
-        // Партиция следующего месяца
-        $this->execute("
-            CREATE TABLE IF NOT EXISTS {{%stock_history_2026_03}}
-                PARTITION OF {{%stock_history}}
-                FOR VALUES FROM ('2026-03-01') TO ('2026-04-01')
-        ");
-
-        // Уникальный индекс для idempotency (ON CONFLICT)
-        $this->execute("
-            CREATE UNIQUE INDEX idx_stock_history_unique
-                ON {{%stock_history}} (snapshot_date, snapshot_time, store_id, product_id)
-        ");
-
-        $this->createIndex('idx_stock_history_store', '{{%stock_history}}', 'store_id');
-        $this->createIndex('idx_stock_history_product', '{{%stock_history}}', 'product_id');
-        $this->createIndex('idx_stock_history_father', '{{%stock_history}}', 'father_id');
-        $this->createIndex('idx_stock_history_date_time', '{{%stock_history}}', ['snapshot_date', 'snapshot_time']);
-        $this->createIndex('idx_stock_history_father_date', '{{%stock_history}}', ['father_id', 'snapshot_date']);
-    }
-
-    public function safeDown()
-    {
-        $this->execute("DROP TABLE IF EXISTS {{%stock_history}} CASCADE");
-    }
-}
diff --git a/erp24/migrations/m260221_100000_create_stock_state_table.php b/erp24/migrations/m260221_100000_create_stock_state_table.php
new file mode 100644 (file)
index 0000000..e232bfb
--- /dev/null
@@ -0,0 +1,56 @@
+<?php
+
+use yii\db\Migration;
+
+/**
+ * ERP-33: Создание таблицы stock_state (SCD-2) для хранения
+ * истории изменений остатков товаров по магазинам.
+ *
+ * valid_from / valid_to — TIMESTAMP (поддержка нескольких запусков в день).
+ * valid_to = '2100-01-01 00:00:00' для активных записей (BETWEEN).
+ * is_active — флаг быстрой фильтрации.
+ * batch_ts — timestamp пакета: один запуск = одно значение.
+ */
+class m260221_100000_create_stock_state_table extends Migration
+{
+    public function safeUp()
+    {
+        $this->createTable('{{%stock_state}}', [
+            'id' => $this->bigPrimaryKey(),
+            'store_id' => $this->string(36)->notNull(),
+            'store_name' => $this->string(255),
+            'product_id' => $this->string(36)->notNull(),
+            'product_name' => $this->string(255),
+            'articule' => $this->string(36),
+            'father_id' => $this->string(36),
+            'components' => 'JSONB',
+            'quantity' => $this->decimal(12, 2)->notNull()->defaultValue(0),
+            'reserv' => $this->decimal(12, 2)->notNull()->defaultValue(0),
+            'valid_from' => "TIMESTAMP NOT NULL",
+            'valid_to' => "TIMESTAMP NOT NULL DEFAULT '2100-01-01 00:00:00'",
+            'is_active' => $this->boolean()->notNull()->defaultValue(true),
+            'batch_ts' => "TIMESTAMP NOT NULL DEFAULT NOW()",
+            'created_at' => "TIMESTAMP NOT NULL DEFAULT NOW()",
+        ]);
+
+        // Partial unique: один активный record на store×product
+        $this->execute("
+            CREATE UNIQUE INDEX uq_stock_state_active
+                ON {{%stock_state}} (store_id, product_id)
+                WHERE is_active = true
+        ");
+
+        // BETWEEN запросы: WHERE :ts BETWEEN valid_from AND valid_to
+        $this->createIndex('idx_stock_state_validity', '{{%stock_state}}', ['valid_from', 'valid_to']);
+        $this->createIndex('idx_stock_state_active', '{{%stock_state}}', 'is_active');
+        $this->createIndex('idx_stock_state_batch', '{{%stock_state}}', 'batch_ts');
+        $this->createIndex('idx_stock_state_store', '{{%stock_state}}', 'store_id');
+        $this->createIndex('idx_stock_state_product', '{{%stock_state}}', 'product_id');
+        $this->createIndex('idx_stock_state_father', '{{%stock_state}}', 'father_id');
+    }
+
+    public function safeDown()
+    {
+        $this->dropTable('{{%stock_state}}');
+    }
+}
diff --git a/erp24/records/StockHistory.php b/erp24/records/StockHistory.php
deleted file mode 100644 (file)
index 6fedd4e..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace app\records;
-
-use yii\db\ActiveRecord;
-
-/**
- * ERP-33: Исторические срезы остатков товаров по магазинам.
- *
- * @property int $id
- * @property string $snapshot_date
- * @property string $snapshot_time
- * @property string $store_id
- * @property string|null $store_name
- * @property string $product_id
- * @property string|null $product_name
- * @property string|null $articule
- * @property string|null $father_id
- * @property array|null $components
- * @property float $quantity
- * @property float $reserv
- * @property string $created_at
- *
- * @property Products1c $store
- * @property Products1c $product
- */
-class StockHistory extends ActiveRecord
-{
-    public static function tableName(): string
-    {
-        return '{{%stock_history}}';
-    }
-
-    public function rules(): array
-    {
-        return [
-            [['snapshot_date', 'snapshot_time', 'store_id', 'product_id'], 'required'],
-            [['store_id', 'product_id', 'father_id'], 'string', 'max' => 36],
-            [['store_name', 'product_name'], 'string', 'max' => 255],
-            [['articule'], 'string', 'max' => 36],
-            [['quantity', 'reserv'], 'number', 'min' => 0],
-            [['quantity', 'reserv'], 'default', 'value' => 0],
-            [['snapshot_date'], 'date', 'format' => 'php:Y-m-d'],
-            [['snapshot_time'], 'string'],
-            [['components'], 'safe'],
-        ];
-    }
-
-    public function getStore(): \yii\db\ActiveQuery
-    {
-        return $this->hasOne(Products1c::class, ['id' => 'store_id']);
-    }
-
-    public function getProduct(): \yii\db\ActiveQuery
-    {
-        return $this->hasOne(Products1c::class, ['id' => 'product_id']);
-    }
-}
diff --git a/erp24/records/StockState.php b/erp24/records/StockState.php
new file mode 100644 (file)
index 0000000..710ed0e
--- /dev/null
@@ -0,0 +1,75 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\records;
+
+use yii\db\ActiveRecord;
+
+/**
+ * ERP-33: SCD-2 история изменений остатков товаров по магазинам.
+ *
+ * is_active = true, valid_to = '2100-01-01 00:00:00' — текущая (активная) запись.
+ * is_active = false, valid_to = timestamp закрытия — запись закрыта.
+ *
+ * batch_ts — единый timestamp для всех записей одного запуска.
+ *
+ * Запрос на момент: WHERE :ts BETWEEN valid_from AND valid_to
+ *
+ * @property int $id
+ * @property string $store_id
+ * @property string|null $store_name
+ * @property string $product_id
+ * @property string|null $product_name
+ * @property string|null $articule
+ * @property string|null $father_id
+ * @property array|null $components
+ * @property float $quantity
+ * @property float $reserv
+ * @property string $valid_from
+ * @property string $valid_to
+ * @property bool $is_active
+ * @property string $batch_ts
+ * @property string $created_at
+ *
+ * @property Products1c $store
+ * @property Products1c $product
+ */
+class StockState extends ActiveRecord
+{
+    /** Timestamp «бесконечности» для активных записей */
+    public const INFINITY_TS = '2100-01-01 00:00:00';
+
+    public static function tableName(): string
+    {
+        return '{{%stock_state}}';
+    }
+
+    public function rules(): array
+    {
+        return [
+            [['store_id', 'product_id', 'valid_from'], 'required'],
+            [['store_id', 'product_id', 'father_id'], 'string', 'max' => 36],
+            [['store_name', 'product_name'], 'string', 'max' => 255],
+            [['articule'], 'string', 'max' => 36],
+            [['quantity', 'reserv'], 'number', 'min' => 0],
+            [['quantity', 'reserv'], 'default', 'value' => 0],
+            [['valid_from', 'valid_to'], 'datetime', 'format' => 'php:Y-m-d H:i:s'],
+            [['valid_to'], 'default', 'value' => self::INFINITY_TS],
+            [['is_active'], 'boolean'],
+            [['is_active'], 'default', 'value' => true],
+            [['batch_ts'], 'datetime', 'format' => 'php:Y-m-d H:i:s'],
+            [['components'], 'safe'],
+        ];
+    }
+
+    public function getStore(): \yii\db\ActiveQuery
+    {
+        return $this->hasOne(Products1c::class, ['id' => 'store_id']);
+    }
+
+    public function getProduct(): \yii\db\ActiveQuery
+    {
+        return $this->hasOne(Products1c::class, ['id' => 'product_id']);
+    }
+}
index e01678949b6f7409365c79a2a47d8d0c3a691062..05c9e711f1b11d09a8268f3e01ed6de6a6e44d04 100644 (file)
@@ -5,7 +5,7 @@ declare(strict_types=1);
 namespace app\services;
 
 /**
- * DTO: результат StockHistoryService::collect().
+ * DTO: результат StockStateService::collect().
  */
 class CollectResult
 {
index 6b023254c2190d969f9934d8e4761d76972117bd..43e4627a6a92d592625a5fa201ee70f89e9b1824 100644 (file)
@@ -5,7 +5,7 @@ declare(strict_types=1);
 namespace app\services;
 
 /**
- * DTO: результат DQ assertions для StockHistoryService.
+ * DTO: результат DQ assertions для StockStateService.
  */
 class DqResult
 {
diff --git a/erp24/services/StockHistoryService.php b/erp24/services/StockHistoryService.php
deleted file mode 100644 (file)
index e5c09ff..0000000
+++ /dev/null
@@ -1,463 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace app\services;
-
-use yii\db\Connection;
-use Yii;
-
-/**
- * ERP-33: ETL-сервис для сбора исторических срезов остатков.
- *
- * Алгоритм collect():
- * 1. Advisory lock (pg_try_advisory_lock)
- * 2. SELECT balances LEFT JOIN products_1c
- * 3. Batch INSERT ON CONFLICT DO UPDATE (1000 строк)
- * 4. DQ assertions
- * 5. Log + unlock
- */
-class StockHistoryService
-{
-    private const ADVISORY_LOCK_KEY = 123456789;
-    private const BATCH_SIZE = 1000;
-    private const LOCK_TIMEOUT_SEC = 30;
-    private const DEVIATION_THRESHOLD = 0.2;
-    private const MAX_RETRY = 3;
-
-    private Connection $db;
-    /** @var callable|null */
-    private $telegramCallback;
-
-    public function __construct(Connection $db, ?callable $telegramCallback = null)
-    {
-        $this->db = $db;
-        $this->telegramCallback = $telegramCallback;
-    }
-
-    /**
-     * Основной метод сбора остатков.
-     */
-    public function collect(string $snapshotTime): CollectResult
-    {
-        if (!preg_match('/^\d{2}:\d{2}$/', $snapshotTime)) {
-            throw new \InvalidArgumentException("Invalid snapshotTime format: {$snapshotTime}, expected HH:MM");
-        }
-
-        $snapshotDate = date('Y-m-d');
-
-        // 1. Advisory lock
-        $this->acquireLock();
-
-        try {
-            // 2. Обеспечить наличие партиций
-            $this->ensurePartitions();
-
-            // 3. SELECT данные
-            $rows = $this->fetchBalances();
-
-            // 4. Batch INSERT
-            $insertedCount = $this->batchInsert($rows, $snapshotDate, $snapshotTime);
-
-            // 5. DQ assertions
-            $dqResult = $this->runDqAssertions($snapshotDate, $snapshotTime);
-
-            // 6. Log
-            $this->logResult($snapshotDate, $snapshotTime, $insertedCount, $dqResult);
-
-            return new CollectResult(true, $insertedCount, $dqResult);
-        } finally {
-            $this->releaseLock();
-        }
-    }
-
-    /**
-     * DQ assertions после сбора.
-     */
-    public function runDqAssertions(string $date, string $time): DqResult
-    {
-        $failures = [];
-
-        // DQ-1: stores count >= active stores
-        $activeStores = (int)$this->db->createCommand()
-            ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'")
-            ->queryScalar();
-
-        $snapshotStores = (int)$this->db->createCommand()
-            ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
-            ->bindValues([':date' => $date, ':time' => $time])
-            ->queryScalar();
-
-        if ($snapshotStores < $activeStores) {
-            $failures[] = [
-                'level' => 'CRITICAL',
-                'code' => 'DQ-1',
-                'message' => "Stores in snapshot ($snapshotStores) < active stores ($activeStores)",
-            ];
-        }
-
-        // DQ-2: no NULL in required fields
-        $nullCount = (int)$this->db->createCommand()
-            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)")
-            ->bindValues([':date' => $date, ':time' => $time])
-            ->queryScalar();
-
-        if ($nullCount > 0) {
-            $failures[] = [
-                'level' => 'CRITICAL',
-                'code' => 'DQ-2',
-                'message' => "Found $nullCount records with NULL in required fields",
-            ];
-        }
-
-        // DQ-3: no negative quantity
-        $negativeQty = (int)$this->db->createCommand()
-            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND quantity < 0")
-            ->bindValues([':date' => $date, ':time' => $time])
-            ->queryScalar();
-
-        if ($negativeQty > 0) {
-            $failures[] = [
-                'level' => 'MAJOR',
-                'code' => 'DQ-3',
-                'message' => "Found $negativeQty records with negative quantity",
-            ];
-        }
-
-        // DQ-4: reserv <= quantity
-        $reservOverQty = (int)$this->db->createCommand()
-            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND reserv > quantity")
-            ->bindValues([':date' => $date, ':time' => $time])
-            ->queryScalar();
-
-        if ($reservOverQty > 0) {
-            $failures[] = [
-                'level' => 'WARNING',
-                'code' => 'DQ-4',
-                'message' => "Found $reservOverQty records where reserv > quantity",
-            ];
-        }
-
-        // DQ-5: row count deviation
-        $currentCount = (int)$this->db->createCommand()
-            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
-            ->bindValues([':date' => $date, ':time' => $time])
-            ->queryScalar();
-
-        $previousCount = (int)$this->db->createCommand()
-            ->setSql("
-                SELECT COUNT(*) FROM stock_history
-                WHERE (snapshot_date, snapshot_time) = (
-                    SELECT snapshot_date, snapshot_time FROM stock_history
-                    WHERE (snapshot_date, snapshot_time) < (:date, :time)
-                    GROUP BY snapshot_date, snapshot_time
-                    ORDER BY snapshot_date DESC, snapshot_time DESC
-                    LIMIT 1
-                )
-            ")
-            ->bindValues([':date' => $date, ':time' => $time])
-            ->queryScalar();
-
-        if ($previousCount > 0) {
-            $deviation = abs($currentCount - $previousCount) / $previousCount;
-            if ($deviation > self::DEVIATION_THRESHOLD) {
-                $pct = round($deviation * 100, 1);
-                $failures[] = [
-                    'level' => 'MAJOR',
-                    'code' => 'DQ-5',
-                    'message' => "Row count deviation {$pct}% (current: $currentCount, previous: $previousCount)",
-                ];
-            }
-        }
-
-        // DQ-6: non-empty snapshot
-        if ($currentCount === 0) {
-            $failures[] = [
-                'level' => 'CRITICAL',
-                'code' => 'DQ-6',
-                'message' => "Empty snapshot: 0 records for $date $time",
-            ];
-        }
-
-        $dqResult = new DqResult($failures);
-
-        // Отправить алерты при CRITICAL/MAJOR
-        $this->sendAlerts($dqResult, $date, $time);
-
-        return $dqResult;
-    }
-
-    /**
-     * Обеспечить наличие партиций для текущего и следующего месяца.
-     */
-    public function ensurePartitions(): void
-    {
-        $currentMonth = date('Y-m');
-        $nextMonth = date('Y-m', strtotime('+1 month'));
-
-        $this->createPartition($currentMonth);
-        $this->createPartition($nextMonth);
-    }
-
-    /**
-     * Создание партиции на указанный месяц.
-     */
-    public function createPartition(string $yearMonth): void
-    {
-        if (!preg_match('/^\d{4}-\d{2}$/', $yearMonth)) {
-            throw new \InvalidArgumentException("Invalid yearMonth format: {$yearMonth}, expected YYYY-MM");
-        }
-
-        $start = $yearMonth . '-01';
-        $end = date('Y-m-d', strtotime($start . ' +1 month'));
-        $suffix = str_replace('-', '_', $yearMonth);
-
-        // DDL не поддерживает bind params — используем интерполяцию (формат валидирован regex)
-        $this->db->createCommand()
-            ->setSql("CREATE TABLE IF NOT EXISTS stock_history_{$suffix} PARTITION OF stock_history FOR VALUES FROM ('{$start}') TO ('{$end}')")
-            ->execute();
-    }
-
-    /**
-     * Удаление партиций старше retention периода.
-     */
-    public function dropOldPartitions(int $retentionMonths = 24): void
-    {
-        $cutoffDate = date('Y-m-d', strtotime("-{$retentionMonths} months"));
-        $cutoffYm = substr($cutoffDate, 0, 7); // "2024-02"
-
-        $partitions = $this->db->createCommand()
-            ->setSql("SELECT tablename FROM pg_tables WHERE schemaname = 'erp24' AND tablename LIKE 'stock_history_%' ORDER BY tablename")
-            ->queryAll();
-
-        foreach ($partitions as $row) {
-            $name = $row['tablename'];
-            // Извлекаем дату из имени: stock_history_2024_01 → 2024-01
-            if (preg_match('/stock_history_(\d{4})_(\d{2})$/', $name, $m)) {
-                $partitionYm = $m[1] . '-' . $m[2];
-                if ($partitionYm < $cutoffYm) {
-                    $this->db->createCommand()
-                        ->setSql("DROP TABLE IF EXISTS {$name}")
-                        ->execute();
-
-                    Yii::info("Dropped old partition: {$name}", 'stock-history');
-                }
-            }
-        }
-    }
-
-    // =========================================================
-    // Private methods
-    // =========================================================
-
-    protected function getLockTimeoutSec(): int
-    {
-        return self::LOCK_TIMEOUT_SEC;
-    }
-
-    protected function getLockRetrySleepUs(): int
-    {
-        return 500000; // 0.5 sec
-    }
-
-    private function acquireLock(): void
-    {
-        $deadline = time() + $this->getLockTimeoutSec();
-        $sleepUs = $this->getLockRetrySleepUs();
-
-        while (time() < $deadline) {
-            $locked = $this->db->createCommand()
-                ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")")
-                ->queryScalar();
-
-            if ($locked) {
-                return;
-            }
-
-            usleep($sleepUs);
-        }
-
-        $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds';
-        try {
-            Yii::error($msg, 'stock-history');
-        } catch (\Throwable $e) {
-            // Логирование не должно маскировать основную ошибку
-        }
-        $this->sendTelegram("CRITICAL: Stock History ETL — $msg");
-
-        throw new \RuntimeException($msg);
-    }
-
-    private function releaseLock(): void
-    {
-        $this->db->createCommand()
-            ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")")
-            ->queryScalar();
-    }
-
-    private function fetchBalances(): array
-    {
-        return $this->db->createCommand()
-            ->setSql("
-                SELECT
-                    b.store_id,
-                    ps.name as store_name,
-                    b.product_id,
-                    pp.name as product_name,
-                    pp.articule,
-                    pp.parent_id as father_id,
-                    pp.components,
-                    b.quantity,
-                    b.reserv
-                FROM balances b
-                LEFT JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
-                LEFT JOIN products_1c pp ON pp.id = b.product_id
-                WHERE ps.id IS NOT NULL
-                ORDER BY b.store_id, b.product_id
-            ")
-            ->queryAll();
-    }
-
-    private function batchInsert(array $rows, string $date, string $time): int
-    {
-        if (empty($rows)) {
-            return 0;
-        }
-
-        $total = 0;
-        $chunks = array_chunk($rows, self::BATCH_SIZE);
-
-        foreach ($chunks as $chunk) {
-            $total += $this->insertChunkWithRetry($chunk, $date, $time);
-        }
-
-        return $total;
-    }
-
-    private function insertChunkWithRetry(array $chunk, string $date, string $time): int
-    {
-        $attempt = 0;
-
-        while ($attempt < self::MAX_RETRY) {
-            try {
-                return $this->insertChunk($chunk, $date, $time);
-            } catch (\yii\db\Exception $e) {
-                $attempt++;
-                if ($attempt >= self::MAX_RETRY) {
-                    throw $e;
-                }
-                // Exponential backoff: 1s, 2s, 4s
-                sleep((int)pow(2, $attempt - 1));
-                Yii::warning("Batch INSERT retry #{$attempt}: " . $e->getMessage(), 'stock-history');
-            }
-        }
-
-        return 0;
-    }
-
-    private function insertChunk(array $chunk, string $date, string $time): int
-    {
-        $values = [];
-        $params = [];
-        $i = 0;
-
-        foreach ($chunk as $row) {
-            $components = $row['components'] ?? null;
-            if ($components !== null && !is_string($components)) {
-                $components = json_encode($components);
-            }
-
-            $values[] = "(:date_{$i}, :time_{$i}, :sid_{$i}, :sn_{$i}, :pid_{$i}, :pn_{$i}, :art_{$i}, :fid_{$i}, :comp_{$i}::jsonb, :qty_{$i}, :res_{$i}, NOW())";
-            $params[":date_{$i}"] = $date;
-            $params[":time_{$i}"] = $time;
-            $params[":sid_{$i}"] = $row['store_id'];
-            $params[":sn_{$i}"] = $row['store_name'] ?? null;
-            $params[":pid_{$i}"] = $row['product_id'];
-            $params[":pn_{$i}"] = $row['product_name'] ?? null;
-            $params[":art_{$i}"] = $row['articule'] ?? null;
-            $params[":fid_{$i}"] = $row['father_id'] ?? null;
-            $params[":comp_{$i}"] = $components;
-            $params[":qty_{$i}"] = $row['quantity'] ?? 0;
-            $params[":res_{$i}"] = $row['reserv'] ?? 0;
-            $i++;
-        }
-
-        $sql = "INSERT INTO stock_history (snapshot_date, snapshot_time, store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, created_at)
-                VALUES " . implode(', ', $values) . "
-                ON CONFLICT (snapshot_date, snapshot_time, store_id, product_id)
-                DO UPDATE SET
-                    quantity = EXCLUDED.quantity,
-                    reserv = EXCLUDED.reserv,
-                    store_name = EXCLUDED.store_name,
-                    product_name = EXCLUDED.product_name,
-                    articule = EXCLUDED.articule,
-                    father_id = EXCLUDED.father_id,
-                    components = EXCLUDED.components,
-                    created_at = NOW()";
-
-        return $this->db->createCommand()
-            ->setSql($sql)
-            ->bindValues($params)
-            ->execute();
-    }
-
-    private function logResult(string $date, string $time, int $rowCount, DqResult $dqResult): void
-    {
-        $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES';
-        try {
-            Yii::info("Stock History ETL [{$date} {$time}]: {$status}, rows={$rowCount}", 'stock-history');
-        } catch (\Throwable $e) {
-            // Логирование не должно прерывать ETL
-        }
-    }
-
-    private function sendAlerts(DqResult $dqResult, string $date, string $time): void
-    {
-        $criticals = $dqResult->getCriticalFailures();
-        $majors = $dqResult->getMajorFailures();
-
-        if (empty($criticals) && empty($majors)) {
-            return;
-        }
-
-        $messages = [];
-        foreach ($criticals as $f) {
-            $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}";
-        }
-        foreach ($majors as $f) {
-            $messages[] = "MAJOR [{$f['code']}]: {$f['message']}";
-        }
-
-        try {
-            foreach ($criticals as $f) {
-                Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-history');
-            }
-            foreach ($majors as $f) {
-                Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-history');
-            }
-        } catch (\Throwable $e) {
-            // Логирование не должно прерывать ETL
-        }
-
-        $text = "Stock History DQ [{$date} {$time}]:\n" . implode("\n", $messages);
-        $this->sendTelegram($text);
-    }
-
-    private function sendTelegram(string $message): void
-    {
-        if ($this->telegramCallback !== null) {
-            ($this->telegramCallback)($message);
-            return;
-        }
-
-        try {
-            if (Yii::$app && Yii::$app->has('queue')) {
-                Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([
-                    'message' => $message,
-                ]));
-            }
-        } catch (\Throwable $e) {
-            Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-history');
-        }
-    }
-}
diff --git a/erp24/services/StockStateService.php b/erp24/services/StockStateService.php
new file mode 100644 (file)
index 0000000..21a9759
--- /dev/null
@@ -0,0 +1,359 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\services;
+
+use yii\db\Connection;
+use Yii;
+
+/**
+ * ERP-33: SCD-2 сервис учёта остатков.
+ *
+ * Алгоритм collect() — 2 SQL-запроса + batch_ts:
+ * 1. UPDATE stock_state SET valid_to = :now, is_active = false WHERE quantity/reserv изменились
+ * 2. INSERT INTO stock_state ... WHERE нет активной записи (новые + изменённые)
+ *
+ * valid_from / valid_to — TIMESTAMP (поддержка нескольких запусков в день).
+ * valid_to = '2100-01-01 00:00:00' для активных записей → BETWEEN.
+ * is_active — флаг быстрой фильтрации.
+ * batch_ts — единый timestamp на весь пакет (один запуск = одно значение).
+ */
+class StockStateService
+{
+    private const ADVISORY_LOCK_KEY = 123456789;
+    private const LOCK_TIMEOUT_SEC = 30;
+    private const DEVIATION_THRESHOLD = 0.2;
+    public const INFINITY_TS = '2100-01-01 00:00:00';
+
+    private Connection $db;
+    /** @var callable|null */
+    private $telegramCallback;
+
+    public function __construct(Connection $db, ?callable $telegramCallback = null)
+    {
+        $this->db = $db;
+        $this->telegramCallback = $telegramCallback;
+    }
+
+    /**
+     * Основной метод: SCD-2 сбор остатков.
+     * batch_ts — один timestamp на весь запуск.
+     */
+    public function collect(): CollectResult
+    {
+        $now = date('Y-m-d H:i:s');
+
+        $this->acquireLock();
+
+        try {
+            // 1. Закрыть устаревшие записи (остаток изменился)
+            $closedCount = $this->closeChangedRecords($now);
+
+            // 2. Вставить новые/изменённые
+            $insertedCount = $this->insertNewRecords($now);
+
+            // 3. DQ assertions
+            $dqResult = $this->runDqAssertions();
+
+            // 4. Log
+            $this->logResult($now, $closedCount, $insertedCount, $dqResult);
+
+            return new CollectResult(true, $insertedCount, $dqResult);
+        } finally {
+            $this->releaseLock();
+        }
+    }
+
+    /**
+     * Закрыть записи, где quantity или reserv изменились.
+     * SET valid_to = :now, is_active = false.
+     */
+    private function closeChangedRecords(string $now): int
+    {
+        return $this->db->createCommand()
+            ->setSql("
+                UPDATE stock_state ss
+                SET valid_to = :now::timestamp - INTERVAL '1 second', is_active = false
+                FROM balances b
+                JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
+                WHERE ss.store_id = b.store_id
+                  AND ss.product_id = b.product_id
+                  AND ss.is_active = true
+                  AND (ss.quantity != b.quantity OR ss.reserv != b.reserv)
+            ")
+            ->bindValue(':now', $now)
+            ->execute();
+    }
+
+    /**
+     * Вставить записи для новых товаров и для тех, чей остаток изменился.
+     * valid_from = :now, valid_to = infinity, is_active = true, batch_ts = :now.
+     */
+    private function insertNewRecords(string $now): int
+    {
+        return $this->db->createCommand()
+            ->setSql("
+                INSERT INTO stock_state (store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, valid_from, valid_to, is_active, batch_ts, created_at)
+                SELECT
+                    b.store_id,
+                    ps.name,
+                    b.product_id,
+                    pp.name,
+                    pp.articule,
+                    pp.parent_id,
+                    pp.components,
+                    b.quantity,
+                    b.reserv,
+                    :now,
+                    :infinity,
+                    true,
+                    :batch_ts,
+                    NOW()
+                FROM balances b
+                JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
+                LEFT JOIN products_1c pp ON pp.id = b.product_id
+                LEFT JOIN stock_state ss
+                    ON ss.store_id = b.store_id
+                    AND ss.product_id = b.product_id
+                    AND ss.is_active = true
+                WHERE ss.id IS NULL
+            ")
+            ->bindValues([':now' => $now, ':infinity' => self::INFINITY_TS, ':batch_ts' => $now])
+            ->execute();
+    }
+
+    /**
+     * DQ assertions по активным записям (is_active = true).
+     */
+    public function runDqAssertions(): DqResult
+    {
+        $failures = [];
+
+        // DQ-1: stores count >= active stores
+        $activeStores = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'")
+            ->queryScalar();
+
+        $snapshotStores = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_state WHERE is_active = true")
+            ->queryScalar();
+
+        if ($snapshotStores < $activeStores) {
+            $failures[] = [
+                'level' => 'CRITICAL',
+                'code' => 'DQ-1',
+                'message' => "Stores in active records ($snapshotStores) < active stores ($activeStores)",
+            ];
+        }
+
+        // DQ-2: no NULL in required fields
+        $nullCount = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)")
+            ->queryScalar();
+
+        if ($nullCount > 0) {
+            $failures[] = [
+                'level' => 'CRITICAL',
+                'code' => 'DQ-2',
+                'message' => "Found $nullCount active records with NULL in required fields",
+            ];
+        }
+
+        // DQ-3: no negative quantity
+        $negativeQty = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND quantity < 0")
+            ->queryScalar();
+
+        if ($negativeQty > 0) {
+            $failures[] = [
+                'level' => 'MAJOR',
+                'code' => 'DQ-3',
+                'message' => "Found $negativeQty active records with negative quantity",
+            ];
+        }
+
+        // DQ-4: reserv <= quantity
+        $reservOverQty = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND reserv > quantity")
+            ->queryScalar();
+
+        if ($reservOverQty > 0) {
+            $failures[] = [
+                'level' => 'WARNING',
+                'code' => 'DQ-4',
+                'message' => "Found $reservOverQty active records where reserv > quantity",
+            ];
+        }
+
+        // DQ-5: active count deviation vs previous batch
+        $activeCount = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true")
+            ->queryScalar();
+
+        $prevActiveCount = (int)$this->db->createCommand()
+            ->setSql("
+                SELECT COUNT(*) FROM stock_state
+                WHERE valid_from <= (NOW() - INTERVAL '12 hours')
+                  AND valid_to > (NOW() - INTERVAL '12 hours')
+            ")
+            ->queryScalar();
+
+        if ($prevActiveCount > 0) {
+            $deviation = abs($activeCount - $prevActiveCount) / $prevActiveCount;
+            if ($deviation > self::DEVIATION_THRESHOLD) {
+                $pct = round($deviation * 100, 1);
+                $failures[] = [
+                    'level' => 'MAJOR',
+                    'code' => 'DQ-5',
+                    'message' => "Active records deviation {$pct}% (current: $activeCount, previous: $prevActiveCount)",
+                ];
+            }
+        }
+
+        // DQ-6: non-empty active set
+        if ($activeCount === 0) {
+            $failures[] = [
+                'level' => 'CRITICAL',
+                'code' => 'DQ-6',
+                'message' => "Zero active records in stock_state",
+            ];
+        }
+
+        $dqResult = new DqResult($failures);
+        $this->sendAlerts($dqResult);
+
+        return $dqResult;
+    }
+
+    /**
+     * Удаление закрытых записей старше retention.
+     */
+    public function cleanupOldRecords(int $retentionMonths = 24): int
+    {
+        $cutoff = date('Y-m-d H:i:s', strtotime("-{$retentionMonths} months"));
+
+        return $this->db->createCommand()
+            ->setSql("DELETE FROM stock_state WHERE is_active = false AND valid_to < :cutoff")
+            ->bindValues([':cutoff' => $cutoff])
+            ->execute();
+    }
+
+    // =========================================================
+    // Lock management
+    // =========================================================
+
+    protected function getLockTimeoutSec(): int
+    {
+        return self::LOCK_TIMEOUT_SEC;
+    }
+
+    protected function getLockRetrySleepUs(): int
+    {
+        return 500000; // 0.5 sec
+    }
+
+    private function acquireLock(): void
+    {
+        $deadline = time() + $this->getLockTimeoutSec();
+        $sleepUs = $this->getLockRetrySleepUs();
+
+        while (time() < $deadline) {
+            $locked = $this->db->createCommand()
+                ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")")
+                ->queryScalar();
+
+            if ($locked) {
+                return;
+            }
+
+            usleep($sleepUs);
+        }
+
+        $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds';
+        try {
+            Yii::error($msg, 'stock-state');
+        } catch (\Throwable $e) {
+            // Логирование не должно маскировать основную ошибку
+        }
+        $this->sendTelegram("CRITICAL: Stock State ETL — $msg");
+
+        throw new \RuntimeException($msg);
+    }
+
+    private function releaseLock(): void
+    {
+        $this->db->createCommand()
+            ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")")
+            ->queryScalar();
+    }
+
+    // =========================================================
+    // Logging & alerts
+    // =========================================================
+
+    private function logResult(string $batchTs, int $closedCount, int $insertedCount, DqResult $dqResult): void
+    {
+        $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES';
+        try {
+            Yii::info("Stock State SCD-2 [{$batchTs}]: {$status}, closed={$closedCount}, inserted={$insertedCount}", 'stock-state');
+        } catch (\Throwable $e) {
+            // Логирование не должно прерывать ETL
+        }
+    }
+
+    private function sendAlerts(DqResult $dqResult): void
+    {
+        $criticals = $dqResult->getCriticalFailures();
+        $majors = $dqResult->getMajorFailures();
+
+        if (empty($criticals) && empty($majors)) {
+            return;
+        }
+
+        $messages = [];
+        foreach ($criticals as $f) {
+            $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}";
+        }
+        foreach ($majors as $f) {
+            $messages[] = "MAJOR [{$f['code']}]: {$f['message']}";
+        }
+
+        try {
+            foreach ($criticals as $f) {
+                Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-state');
+            }
+            foreach ($majors as $f) {
+                Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-state');
+            }
+        } catch (\Throwable $e) {
+            // Логирование не должно прерывать ETL
+        }
+
+        $now = date('Y-m-d H:i:s');
+        $text = "Stock State DQ [{$now}]:\n" . implode("\n", $messages);
+        $this->sendTelegram($text);
+    }
+
+    private function sendTelegram(string $message): void
+    {
+        if ($this->telegramCallback !== null) {
+            ($this->telegramCallback)($message);
+            return;
+        }
+
+        try {
+            if (Yii::$app && Yii::$app->has('queue')) {
+                Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([
+                    'message' => $message,
+                ]));
+            }
+        } catch (\Throwable $e) {
+            try {
+                Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-state');
+            } catch (\Throwable $e2) {
+                // ignore
+            }
+        }
+    }
+}
diff --git a/erp24/tests/unit/services/StockHistoryServiceTest.php b/erp24/tests/unit/services/StockHistoryServiceTest.php
deleted file mode 100644 (file)
index 766116a..0000000
+++ /dev/null
@@ -1,338 +0,0 @@
-<?php
-
-declare(strict_types=1);
-
-namespace tests\unit\services;
-
-use Codeception\Test\Unit;
-use app\services\StockHistoryService;
-
-/**
- * ERP-33: Тесты StockHistoryService — ETL для сбора остатков.
- *
- * @group services
- * @group stock-history
- * @group etl
- */
-class StockHistoryServiceTest extends Unit
-{
-    protected $tester;
-
-    /** No-op callback для подавления Telegram/Yii в тестах */
-    private static function noopTelegramCallback(): callable
-    {
-        return static function (string $message): void {};
-    }
-
-    /**
-     * @return array{0: \yii\db\Connection, 1: \yii\db\Command}
-     */
-    private function createMockDbAndCommand(): array
-    {
-        $db = $this->createMock(\yii\db\Connection::class);
-        $command = $this->createMock(\yii\db\Command::class);
-
-        $db->method('createCommand')->willReturn($command);
-        $command->method('setSql')->willReturnSelf();
-        $command->method('bindValues')->willReturnSelf();
-        $command->method('bindValue')->willReturnSelf();
-
-        return [$db, $command];
-    }
-
-    // =========================================================
-    // collect() tests
-    // =========================================================
-
-    public function testCollect_Success_InsertsRecordsAndReturnsResult(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(
-                true,   // pg_try_advisory_lock
-                5,      // active stores (DQ-1)
-                10,     // current count (DQ-6 + DQ-5)
-                5,      // distinct stores in snapshot (DQ-1)
-                0,      // null required (DQ-2)
-                0,      // negative qty (DQ-3)
-                0,      // reserv > qty (DQ-4)
-                10,     // previous count (DQ-5)
-                true    // pg_advisory_unlock
-            );
-
-        $command->method('queryAll')->willReturn(
-            array_fill(0, 10, [
-                'store_id' => 'store-1', 'store_name' => 'Store',
-                'product_id' => 'prod-1', 'product_name' => 'Rose',
-                'articule' => 'ART-001', 'father_id' => null,
-                'components' => null, 'quantity' => 5.00, 'reserv' => 1.00,
-            ])
-        );
-        $command->method('execute')->willReturn(10);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $result = $service->collect('08:00');
-
-        $this->assertTrue($result->isSuccess());
-        $this->assertEquals(10, $result->getRowCount());
-    }
-
-    public function testCollect_LockTimeout_ThrowsLockException(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        // Lock always fails
-        $command->method('queryScalar')->willReturn(false);
-
-        $this->expectException(\RuntimeException::class);
-        $this->expectExceptionMessage('advisory lock');
-
-        // Анонимный класс с минимальными таймаутами для быстрого теста
-        $noopCallback = self::noopTelegramCallback();
-        $service = new class($db, $noopCallback) extends StockHistoryService {
-            protected function getLockTimeoutSec(): int { return 1; }
-            protected function getLockRetrySleepUs(): int { return 100000; } // 0.1 sec
-        };
-        $service->collect('08:00');
-    }
-
-    public function testCollect_InvalidTimeFormat_ThrowsException(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $this->expectException(\InvalidArgumentException::class);
-        $this->expectExceptionMessage('snapshotTime');
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $service->collect('invalid');
-    }
-
-    public function testCollect_EmptyBalances_ReturnsZeroRows(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(
-                true, 0, 0, 0, 0, 0, 0, 0, true
-            );
-        $command->method('queryAll')->willReturn([]);
-        $command->method('execute')->willReturn(0);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $result = $service->collect('08:00');
-
-        $this->assertTrue($result->isSuccess());
-        $this->assertEquals(0, $result->getRowCount());
-    }
-
-    public function testCollect_OnConflictUpdate_UsesUpsertQuery(): void
-    {
-        $db = $this->createMock(\yii\db\Connection::class);
-        $command = $this->createMock(\yii\db\Command::class);
-
-        $db->method('createCommand')->willReturn($command);
-        $command->method('bindValues')->willReturnSelf();
-        $command->method('bindValue')->willReturnSelf();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(true, 1, 1, 1, 0, 0, 0, 1, true);
-
-        $executedSql = [];
-        $command->method('setSql')->willReturnCallback(
-            function ($sql) use ($command, &$executedSql) {
-                $executedSql[] = $sql;
-                return $command;
-            }
-        );
-        $command->method('queryAll')->willReturn([
-            ['store_id' => 's1', 'store_name' => 'S', 'product_id' => 'p1',
-             'product_name' => 'P', 'articule' => 'A', 'father_id' => null,
-             'components' => null, 'quantity' => 1, 'reserv' => 0],
-        ]);
-        $command->method('execute')->willReturn(1);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $service->collect('08:00');
-
-        $hasOnConflict = false;
-        foreach ($executedSql as $sql) {
-            if (stripos($sql, 'ON CONFLICT') !== false) {
-                $hasOnConflict = true;
-            }
-        }
-        $this->assertTrue($hasOnConflict, 'SQL должен содержать ON CONFLICT DO UPDATE');
-    }
-
-    // =========================================================
-    // DQ Assertions tests
-    // =========================================================
-
-    public function testDqAssertions_AllPass_ReturnsSuccess(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(5, 5, 0, 0, 0, 100, 95);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $result = $service->runDqAssertions('2026-02-21', '08:00');
-
-        $this->assertTrue($result->allPassed());
-        $this->assertEmpty($result->getCriticalFailures());
-    }
-
-    public function testDqAssertions_MissingStores_ReturnsCritical(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(24, 20, 0, 0, 0, 100, 100);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $result = $service->runDqAssertions('2026-02-21', '08:00');
-
-        $this->assertFalse($result->allPassed());
-        $criticals = $result->getCriticalFailures();
-        $this->assertNotEmpty($criticals);
-        $this->assertStringContainsString('store', strtolower($criticals[0]['message']));
-    }
-
-    public function testDqAssertions_NegativeQuantity_ReturnsMajor(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(5, 5, 0, 3, 0, 100, 100);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $result = $service->runDqAssertions('2026-02-21', '08:00');
-
-        $majors = $result->getMajorFailures();
-        $this->assertNotEmpty($majors);
-    }
-
-    public function testDqAssertions_DeviationOver20Pct_ReturnsMajor(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(5, 5, 0, 0, 0, 100, 50);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $result = $service->runDqAssertions('2026-02-21', '08:00');
-
-        $majors = $result->getMajorFailures();
-        $this->assertNotEmpty($majors);
-    }
-
-    public function testDqAssertions_EmptySnapshot_ReturnsCritical(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $command->method('queryScalar')
-            ->willReturnOnConsecutiveCalls(5, 0, 0, 0, 0, 0, 100);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $result = $service->runDqAssertions('2026-02-21', '08:00');
-
-        $this->assertFalse($result->allPassed());
-        $criticals = $result->getCriticalFailures();
-        $this->assertNotEmpty($criticals);
-    }
-
-    // =========================================================
-    // Partition management tests
-    // =========================================================
-
-    public function testCreatePartition_CreatesPartitionTable(): void
-    {
-        $db = $this->createMock(\yii\db\Connection::class);
-        $command = $this->createMock(\yii\db\Command::class);
-
-        $db->method('createCommand')->willReturn($command);
-
-        $executedSql = [];
-        $command->method('setSql')->willReturnCallback(
-            function ($sql) use ($command, &$executedSql) {
-                $executedSql[] = $sql;
-                return $command;
-            }
-        );
-        $command->method('execute')->willReturn(0);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $service->createPartition('2026-04');
-
-        $sql = implode(' ', $executedSql);
-        $this->assertStringContainsString('PARTITION OF', $sql);
-        $this->assertStringContainsString('stock_history_2026_04', $sql);
-        $this->assertStringContainsString('2026-04-01', $sql);
-        $this->assertStringContainsString('2026-05-01', $sql);
-    }
-
-    public function testCreatePartition_InvalidFormat_ThrowsException(): void
-    {
-        [$db, $command] = $this->createMockDbAndCommand();
-
-        $this->expectException(\InvalidArgumentException::class);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $service->createPartition('invalid');
-    }
-
-    public function testEnsurePartitions_CreatesCurrentAndNextMonth(): void
-    {
-        $db = $this->createMock(\yii\db\Connection::class);
-        $command = $this->createMock(\yii\db\Command::class);
-
-        $db->method('createCommand')->willReturn($command);
-
-        $executedSql = [];
-        $command->method('setSql')->willReturnCallback(
-            function ($sql) use ($command, &$executedSql) {
-                $executedSql[] = $sql;
-                return $command;
-            }
-        );
-        $command->method('execute')->willReturn(0);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $service->ensurePartitions();
-
-        $partitionSql = array_filter($executedSql, fn($s) => stripos($s, 'PARTITION OF') !== false);
-        $this->assertCount(2, $partitionSql, 'Должны быть созданы 2 партиции (текущий и следующий месяц)');
-    }
-
-    public function testDropOldPartitions_DropsPartitionsOlderThanRetention(): void
-    {
-        $db = $this->createMock(\yii\db\Connection::class);
-        $command = $this->createMock(\yii\db\Command::class);
-
-        $db->method('createCommand')->willReturn($command);
-        $command->method('bindValues')->willReturnSelf();
-        $command->method('bindValue')->willReturnSelf();
-
-        $command->method('queryAll')->willReturn([
-            ['tablename' => 'stock_history_2023_11'],
-            ['tablename' => 'stock_history_2023_12'],
-            ['tablename' => 'stock_history_2026_01'],
-        ]);
-
-        $droppedSql = [];
-        $command->method('setSql')->willReturnCallback(
-            function ($sql) use ($command, &$droppedSql) {
-                if (stripos($sql, 'DROP TABLE') !== false) {
-                    $droppedSql[] = $sql;
-                }
-                return $command;
-            }
-        );
-        $command->method('execute')->willReturn(0);
-
-        $service = new StockHistoryService($db, self::noopTelegramCallback());
-        $service->dropOldPartitions(24);
-
-        $this->assertCount(2, $droppedSql, 'Должны быть удалены 2 старые партиции (2023_11 и 2023_12)');
-    }
-}
diff --git a/erp24/tests/unit/services/StockStateServiceTest.php b/erp24/tests/unit/services/StockStateServiceTest.php
new file mode 100644 (file)
index 0000000..b947440
--- /dev/null
@@ -0,0 +1,290 @@
+<?php
+
+declare(strict_types=1);
+
+namespace tests\unit\services;
+
+use app\services\StockStateService;
+use Codeception\Test\Unit;
+
+/**
+ * @group services
+ * @group stock-state
+ * @group scd2
+ */
+class StockStateServiceTest extends Unit
+{
+    protected $tester;
+
+    private static function noopTelegramCallback(): callable
+    {
+        return static function (string $message): void {};
+    }
+
+    /**
+     * @return array{0: \yii\db\Connection, 1: \yii\db\Command}
+     */
+    private function createMockDbAndCommand(): array
+    {
+        $db = $this->createMock(\yii\db\Connection::class);
+        $command = $this->createMock(\yii\db\Command::class);
+
+        $db->method('createCommand')->willReturn($command);
+        $command->method('setSql')->willReturnSelf();
+        $command->method('bindValues')->willReturnSelf();
+        $command->method('bindValue')->willReturnSelf();
+
+        return [$db, $command];
+    }
+
+    // =========================================================
+    // collect() tests
+    // =========================================================
+
+    public function testCollect_Success_ClosesAndInsertsRecords(): void
+    {
+        $db = $this->createMock(\yii\db\Connection::class);
+        $command = $this->createMock(\yii\db\Command::class);
+
+        $db->method('createCommand')->willReturn($command);
+        $command->method('setSql')->willReturnSelf();
+        $command->method('bindValues')->willReturnSelf();
+        $command->method('bindValue')->willReturnSelf();
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(
+                true,       // lock
+                24, 24,     // DQ-1
+                0,          // DQ-2
+                0,          // DQ-3
+                0,          // DQ-4
+                11400,      // DQ-5 activeCount
+                11300,      // DQ-5 prevActiveCount
+                true        // unlock
+            );
+
+        $command->method('execute')
+            ->willReturnOnConsecutiveCalls(150, 150);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $result = $service->collect();
+
+        $this->assertTrue($result->isSuccess());
+        $this->assertSame(150, $result->getRowCount());
+        $this->assertTrue($result->getDqResult()->allPassed());
+    }
+
+    public function testCollect_LockTimeout_ThrowsRuntimeException(): void
+    {
+        [$db, $command] = $this->createMockDbAndCommand();
+
+        $command->method('queryScalar')->willReturn(false);
+
+        $this->expectException(\RuntimeException::class);
+        $this->expectExceptionMessage('advisory lock');
+
+        $noopCallback = self::noopTelegramCallback();
+        $service = new class($db, $noopCallback) extends StockStateService {
+            protected function getLockTimeoutSec(): int { return 1; }
+            protected function getLockRetrySleepUs(): int { return 100000; }
+        };
+        $service->collect();
+    }
+
+    public function testCollect_EmptyBalances_ReturnsZeroAndDqCritical(): void
+    {
+        $db = $this->createMock(\yii\db\Connection::class);
+        $command = $this->createMock(\yii\db\Command::class);
+
+        $db->method('createCommand')->willReturn($command);
+        $command->method('setSql')->willReturnSelf();
+        $command->method('bindValues')->willReturnSelf();
+        $command->method('bindValue')->willReturnSelf();
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(
+                true,       // lock
+                24, 0,      // DQ-1
+                0,          // DQ-2
+                0,          // DQ-3
+                0,          // DQ-4
+                0,          // DQ-5
+                0,          // DQ-5 prev
+                true        // unlock
+            );
+
+        $command->method('execute')
+            ->willReturnOnConsecutiveCalls(0, 0);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $result = $service->collect();
+
+        $this->assertTrue($result->isSuccess());
+        $this->assertSame(0, $result->getRowCount());
+        $this->assertFalse($result->getDqResult()->allPassed());
+        $this->assertNotEmpty($result->getDqResult()->getCriticalFailures());
+    }
+
+    // =========================================================
+    // SCD-2 SQL + batch_ts verification
+    // =========================================================
+
+    public function testCollect_GeneratesScd2QueriesWithBatchTs(): void
+    {
+        $db = $this->createMock(\yii\db\Connection::class);
+        $command = $this->createMock(\yii\db\Command::class);
+
+        $db->method('createCommand')->willReturn($command);
+        $command->method('bindValue')->willReturnSelf();
+
+        $executedSql = [];
+        $boundValues = [];
+        $command->method('setSql')->willReturnCallback(
+            function ($sql) use ($command, &$executedSql) {
+                $executedSql[] = $sql;
+                return $command;
+            }
+        );
+        $command->method('bindValues')->willReturnCallback(
+            function ($values) use ($command, &$boundValues) {
+                $boundValues = array_merge($boundValues, $values);
+                return $command;
+            }
+        );
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(
+                true,
+                24, 24,
+                0, 0, 0,
+                11400, 11400,
+                true
+            );
+
+        $command->method('execute')
+            ->willReturnOnConsecutiveCalls(100, 100);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $service->collect();
+
+        $allSql = implode("\n", $executedSql);
+
+        // SCD-2 close
+        $this->assertStringContainsString('is_active = false', $allSql);
+        $this->assertStringContainsString('SET valid_to', $allSql);
+        $this->assertStringContainsString('ss.quantity', $allSql);
+        $this->assertStringContainsString('ss.reserv', $allSql);
+
+        // SCD-2 insert
+        $this->assertStringContainsString('ss.id IS NULL', $allSql);
+        $this->assertStringContainsString('valid_from', $allSql);
+        $this->assertStringContainsString('batch_ts', $allSql);
+
+        // batch_ts и infinity передаются
+        $this->assertSame('2100-01-01 00:00:00', $boundValues[':infinity'] ?? null);
+        $this->assertArrayHasKey(':batch_ts', $boundValues);
+        // batch_ts = now (тот же что и :now)
+        $this->assertSame($boundValues[':now'], $boundValues[':batch_ts']);
+    }
+
+    // =========================================================
+    // DQ assertions
+    // =========================================================
+
+    public function testDqAssertions_AllPass_ReturnsSuccess(): void
+    {
+        [$db, $command] = $this->createMockDbAndCommand();
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(24, 24, 0, 0, 0, 11400, 11300);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $result = $service->runDqAssertions();
+
+        $this->assertTrue($result->allPassed());
+    }
+
+    public function testDqAssertions_MissingStores_ReturnsCritical(): void
+    {
+        [$db, $command] = $this->createMockDbAndCommand();
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(24, 20, 0, 0, 0, 11400, 11400);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $result = $service->runDqAssertions();
+
+        $this->assertFalse($result->allPassed());
+        $this->assertSame('DQ-1', $result->getCriticalFailures()[0]['code']);
+    }
+
+    public function testDqAssertions_NegativeQuantity_ReturnsMajor(): void
+    {
+        [$db, $command] = $this->createMockDbAndCommand();
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(24, 24, 0, 3, 0, 11400, 11400);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $result = $service->runDqAssertions();
+
+        $this->assertSame('DQ-3', $result->getMajorFailures()[0]['code']);
+    }
+
+    public function testDqAssertions_DeviationOver20Pct_ReturnsMajor(): void
+    {
+        [$db, $command] = $this->createMockDbAndCommand();
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(24, 24, 0, 0, 0, 11400, 5000);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $result = $service->runDqAssertions();
+
+        $this->assertSame('DQ-5', $result->getMajorFailures()[0]['code']);
+    }
+
+    public function testDqAssertions_ZeroActiveRecords_ReturnsCritical(): void
+    {
+        [$db, $command] = $this->createMockDbAndCommand();
+
+        $command->method('queryScalar')
+            ->willReturnOnConsecutiveCalls(24, 0, 0, 0, 0, 0, 0);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $result = $service->runDqAssertions();
+
+        $this->assertFalse($result->allPassed());
+        $this->assertContains('DQ-6', array_column($result->getCriticalFailures(), 'code'));
+    }
+
+    // =========================================================
+    // Cleanup
+    // =========================================================
+
+    public function testCleanupOldRecords_DeletesClosedOlderThanRetention(): void
+    {
+        $db = $this->createMock(\yii\db\Connection::class);
+        $command = $this->createMock(\yii\db\Command::class);
+
+        $db->method('createCommand')->willReturn($command);
+        $command->method('bindValues')->willReturnSelf();
+
+        $executedSql = [];
+        $command->method('setSql')->willReturnCallback(
+            function ($sql) use ($command, &$executedSql) {
+                $executedSql[] = $sql;
+                return $command;
+            }
+        );
+        $command->method('execute')->willReturn(500);
+
+        $service = new StockStateService($db, self::noopTelegramCallback());
+        $deleted = $service->cleanupOldRecords(24);
+
+        $this->assertSame(500, $deleted);
+        $sql = implode(' ', $executedSql);
+        $this->assertStringContainsString('DELETE FROM', $sql);
+        $this->assertStringContainsString('is_active = false', $sql);
+    }
+}