+++ /dev/null
-<?php
-
-declare(strict_types=1);
-
-namespace app\commands;
-
-use app\services\StockHistoryService;
-use yii\console\Controller;
-use yii\console\ExitCode;
-use Yii;
-
-/**
- * ERP-33: Управление историческими срезами остатков.
- *
- * Crontab:
- * 0 8,20 * * * cd /www && php yii stock-history/collect >> /var/log/stock-history.log 2>&1
- * 0 0 1 * * cd /www && php yii stock-history/ensure-partitions >> /var/log/stock-history.log 2>&1
- * 0 1 1 * * cd /www && php yii stock-history/create-partition >> /var/log/stock-history.log 2>&1
- * 0 2 1 * * cd /www && php yii stock-history/drop-old-partitions >> /var/log/stock-history.log 2>&1
- */
-class StockHistoryController extends Controller
-{
- /**
- * @var string Время среза (08:00 или 20:00). Если пусто — авто-определение.
- */
- public string $time = '';
-
- public function options($actionID): array
- {
- return array_merge(parent::options($actionID), ['time']);
- }
-
- /**
- * Собрать срез остатков.
- */
- public function actionCollect(): int
- {
- $snapshotTime = $this->resolveTime();
- $this->stdout("Stock History ETL: collecting snapshot for {$snapshotTime}...\n");
-
- $service = $this->createService();
-
- try {
- $result = $service->collect($snapshotTime);
-
- $this->stdout("Done: {$result->getRowCount()} rows inserted.\n");
-
- $dq = $result->getDqResult();
- if ($dq !== null && !$dq->allPassed()) {
- $this->stderr("DQ failures detected:\n");
- foreach ($dq->getAll() as $f) {
- $this->stderr(" [{$f['level']}] {$f['code']}: {$f['message']}\n");
- }
- return ExitCode::OK; // ETL прошёл, но DQ имеет замечания
- }
-
- return ExitCode::OK;
- } catch (\RuntimeException $e) {
- $this->stderr("ERROR: {$e->getMessage()}\n");
- return ExitCode::SOFTWARE;
- } catch (\Throwable $e) {
- $this->stderr("FATAL: {$e->getMessage()}\n");
- Yii::error("Stock History ETL fatal: " . $e->getMessage(), 'stock-history');
- return ExitCode::SOFTWARE;
- }
- }
-
- /**
- * Создать партицию на указанный месяц.
- * @param string $month Формат "2026-03". По умолчанию — следующий месяц.
- */
- public function actionCreatePartition(string $month = ''): int
- {
- if (empty($month)) {
- $month = date('Y-m', strtotime('+1 month'));
- }
-
- $this->stdout("Creating partition for {$month}...\n");
-
- try {
- $this->createService()->createPartition($month);
- $this->stdout("Partition stock_history_{$month} created.\n");
- return ExitCode::OK;
- } catch (\Throwable $e) {
- $this->stderr("ERROR: {$e->getMessage()}\n");
- return ExitCode::SOFTWARE;
- }
- }
-
- /**
- * Обеспечить наличие партиций для текущего и следующего месяца.
- */
- public function actionEnsurePartitions(): int
- {
- $this->stdout("Ensuring partitions for current and next month...\n");
-
- try {
- $this->createService()->ensurePartitions();
- $this->stdout("Partitions ensured.\n");
- return ExitCode::OK;
- } catch (\Throwable $e) {
- $this->stderr("ERROR: {$e->getMessage()}\n");
- return ExitCode::SOFTWARE;
- }
- }
-
- /**
- * Удалить старые партиции.
- * @param int $months Retention в месяцах (default 24).
- */
- public function actionDropOldPartitions(int $months = 24): int
- {
- $this->stdout("Dropping partitions older than {$months} months...\n");
-
- try {
- $this->createService()->dropOldPartitions($months);
- $this->stdout("Done.\n");
- return ExitCode::OK;
- } catch (\Throwable $e) {
- $this->stderr("ERROR: {$e->getMessage()}\n");
- return ExitCode::SOFTWARE;
- }
- }
-
- private function resolveTime(): string
- {
- if (!empty($this->time)) {
- return $this->time;
- }
-
- $hour = (int)date('H');
- return $hour < 14 ? '08:00' : '20:00';
- }
-
- private function createService(): StockHistoryService
- {
- return new StockHistoryService(Yii::$app->db);
- }
-}
--- /dev/null
+<?php
+
+declare(strict_types=1);
+
+namespace app\commands;
+
+use app\services\StockStateService;
+use yii\console\Controller;
+use yii\console\ExitCode;
+use Yii;
+
+/**
+ * ERP-33: SCD-2 учёт остатков.
+ *
+ * Crontab:
+ * 0 8 * * * cd /www && php yii stock-state/collect >> /var/log/stock-state.log 2>&1
+ * 0 3 1 * * cd /www && php yii stock-state/cleanup >> /var/log/stock-state.log 2>&1
+ */
+class StockStateController extends Controller
+{
+ /**
+ * Собрать SCD-2 срез остатков (1 раз/день).
+ *
+ * Логика: close changed + insert new/changed (2 SQL-запроса).
+ */
+ public function actionCollect(): int
+ {
+ $this->stdout("Stock State SCD-2: collecting...\n");
+
+ try {
+ $service = $this->createService();
+ $result = $service->collect();
+
+ $this->stdout("Done: {$result->getRowCount()} rows inserted.\n");
+
+ $dq = $result->getDqResult();
+ if ($dq !== null && !$dq->allPassed()) {
+ $this->stderr("DQ failures detected:\n");
+ foreach ($dq->getAll() as $f) {
+ $this->stderr(" [{$f['level']}] {$f['code']}: {$f['message']}\n");
+ }
+ }
+
+ return ExitCode::OK;
+ } catch (\RuntimeException $e) {
+ $this->stderr("ERROR: {$e->getMessage()}\n");
+ return ExitCode::SOFTWARE;
+ } catch (\Throwable $e) {
+ $this->stderr("FATAL: {$e->getMessage()}\n");
+ try {
+ Yii::error("Stock State ETL fatal: " . $e->getMessage(), 'stock-state');
+ } catch (\Throwable $e2) {
+ // ignore
+ }
+ return ExitCode::SOFTWARE;
+ }
+ }
+
+ /**
+ * Удалить закрытые записи старше retention.
+ * @param int $months Retention в месяцах (default 24).
+ */
+ public function actionCleanup(int $months = 24): int
+ {
+ $this->stdout("Cleaning up closed records older than {$months} months...\n");
+
+ try {
+ $deleted = $this->createService()->cleanupOldRecords($months);
+ $this->stdout("Deleted {$deleted} old records.\n");
+ return ExitCode::OK;
+ } catch (\Throwable $e) {
+ $this->stderr("ERROR: {$e->getMessage()}\n");
+ return ExitCode::SOFTWARE;
+ }
+ }
+
+ private function createService(): StockStateService
+ {
+ return new StockStateService(Yii::$app->db);
+ }
+}
+++ /dev/null
-<?php
-
-use yii\db\Migration;
-
-/**
- * ERP-33: Создание partitioned таблицы stock_history для хранения
- * исторических срезов остатков товаров по магазинам.
- */
-class m260221_100000_create_stock_history_table extends Migration
-{
- public function safeUp()
- {
- // Partitioned table — нельзя через createTable(), используем raw SQL
- $this->execute("
- CREATE TABLE {{%stock_history}} (
- id BIGSERIAL,
- snapshot_date DATE NOT NULL,
- snapshot_time TIME NOT NULL,
- store_id VARCHAR(36) NOT NULL,
- store_name VARCHAR(255),
- product_id VARCHAR(36) NOT NULL,
- product_name VARCHAR(255),
- articule VARCHAR(36),
- father_id VARCHAR(36),
- components JSONB,
- quantity NUMERIC(12,2) NOT NULL DEFAULT 0,
- reserv NUMERIC(12,2) NOT NULL DEFAULT 0,
- created_at TIMESTAMP NOT NULL DEFAULT NOW(),
- PRIMARY KEY (id, snapshot_date)
- ) PARTITION BY RANGE (snapshot_date)
- ");
-
- // Партиция текущего месяца
- $this->execute("
- CREATE TABLE IF NOT EXISTS {{%stock_history_2026_02}}
- PARTITION OF {{%stock_history}}
- FOR VALUES FROM ('2026-02-01') TO ('2026-03-01')
- ");
-
- // Партиция следующего месяца
- $this->execute("
- CREATE TABLE IF NOT EXISTS {{%stock_history_2026_03}}
- PARTITION OF {{%stock_history}}
- FOR VALUES FROM ('2026-03-01') TO ('2026-04-01')
- ");
-
- // Уникальный индекс для idempotency (ON CONFLICT)
- $this->execute("
- CREATE UNIQUE INDEX idx_stock_history_unique
- ON {{%stock_history}} (snapshot_date, snapshot_time, store_id, product_id)
- ");
-
- $this->createIndex('idx_stock_history_store', '{{%stock_history}}', 'store_id');
- $this->createIndex('idx_stock_history_product', '{{%stock_history}}', 'product_id');
- $this->createIndex('idx_stock_history_father', '{{%stock_history}}', 'father_id');
- $this->createIndex('idx_stock_history_date_time', '{{%stock_history}}', ['snapshot_date', 'snapshot_time']);
- $this->createIndex('idx_stock_history_father_date', '{{%stock_history}}', ['father_id', 'snapshot_date']);
- }
-
- public function safeDown()
- {
- $this->execute("DROP TABLE IF EXISTS {{%stock_history}} CASCADE");
- }
-}
--- /dev/null
+<?php
+
+use yii\db\Migration;
+
+/**
+ * ERP-33: Создание таблицы stock_state (SCD-2) для хранения
+ * истории изменений остатков товаров по магазинам.
+ *
+ * valid_from / valid_to — TIMESTAMP (поддержка нескольких запусков в день).
+ * valid_to = '2100-01-01 00:00:00' для активных записей (BETWEEN).
+ * is_active — флаг быстрой фильтрации.
+ * batch_ts — timestamp пакета: один запуск = одно значение.
+ */
+class m260221_100000_create_stock_state_table extends Migration
+{
+ public function safeUp()
+ {
+ $this->createTable('{{%stock_state}}', [
+ 'id' => $this->bigPrimaryKey(),
+ 'store_id' => $this->string(36)->notNull(),
+ 'store_name' => $this->string(255),
+ 'product_id' => $this->string(36)->notNull(),
+ 'product_name' => $this->string(255),
+ 'articule' => $this->string(36),
+ 'father_id' => $this->string(36),
+ 'components' => 'JSONB',
+ 'quantity' => $this->decimal(12, 2)->notNull()->defaultValue(0),
+ 'reserv' => $this->decimal(12, 2)->notNull()->defaultValue(0),
+ 'valid_from' => "TIMESTAMP NOT NULL",
+ 'valid_to' => "TIMESTAMP NOT NULL DEFAULT '2100-01-01 00:00:00'",
+ 'is_active' => $this->boolean()->notNull()->defaultValue(true),
+ 'batch_ts' => "TIMESTAMP NOT NULL DEFAULT NOW()",
+ 'created_at' => "TIMESTAMP NOT NULL DEFAULT NOW()",
+ ]);
+
+ // Partial unique: один активный record на store×product
+ $this->execute("
+ CREATE UNIQUE INDEX uq_stock_state_active
+ ON {{%stock_state}} (store_id, product_id)
+ WHERE is_active = true
+ ");
+
+ // BETWEEN запросы: WHERE :ts BETWEEN valid_from AND valid_to
+ $this->createIndex('idx_stock_state_validity', '{{%stock_state}}', ['valid_from', 'valid_to']);
+ $this->createIndex('idx_stock_state_active', '{{%stock_state}}', 'is_active');
+ $this->createIndex('idx_stock_state_batch', '{{%stock_state}}', 'batch_ts');
+ $this->createIndex('idx_stock_state_store', '{{%stock_state}}', 'store_id');
+ $this->createIndex('idx_stock_state_product', '{{%stock_state}}', 'product_id');
+ $this->createIndex('idx_stock_state_father', '{{%stock_state}}', 'father_id');
+ }
+
+ public function safeDown()
+ {
+ $this->dropTable('{{%stock_state}}');
+ }
+}
+++ /dev/null
-<?php
-
-declare(strict_types=1);
-
-namespace app\records;
-
-use yii\db\ActiveRecord;
-
-/**
- * ERP-33: Исторические срезы остатков товаров по магазинам.
- *
- * @property int $id
- * @property string $snapshot_date
- * @property string $snapshot_time
- * @property string $store_id
- * @property string|null $store_name
- * @property string $product_id
- * @property string|null $product_name
- * @property string|null $articule
- * @property string|null $father_id
- * @property array|null $components
- * @property float $quantity
- * @property float $reserv
- * @property string $created_at
- *
- * @property Products1c $store
- * @property Products1c $product
- */
-class StockHistory extends ActiveRecord
-{
- public static function tableName(): string
- {
- return '{{%stock_history}}';
- }
-
- public function rules(): array
- {
- return [
- [['snapshot_date', 'snapshot_time', 'store_id', 'product_id'], 'required'],
- [['store_id', 'product_id', 'father_id'], 'string', 'max' => 36],
- [['store_name', 'product_name'], 'string', 'max' => 255],
- [['articule'], 'string', 'max' => 36],
- [['quantity', 'reserv'], 'number', 'min' => 0],
- [['quantity', 'reserv'], 'default', 'value' => 0],
- [['snapshot_date'], 'date', 'format' => 'php:Y-m-d'],
- [['snapshot_time'], 'string'],
- [['components'], 'safe'],
- ];
- }
-
- public function getStore(): \yii\db\ActiveQuery
- {
- return $this->hasOne(Products1c::class, ['id' => 'store_id']);
- }
-
- public function getProduct(): \yii\db\ActiveQuery
- {
- return $this->hasOne(Products1c::class, ['id' => 'product_id']);
- }
-}
--- /dev/null
+<?php
+
+declare(strict_types=1);
+
+namespace app\records;
+
+use yii\db\ActiveRecord;
+
+/**
+ * ERP-33: SCD-2 история изменений остатков товаров по магазинам.
+ *
+ * is_active = true, valid_to = '2100-01-01 00:00:00' — текущая (активная) запись.
+ * is_active = false, valid_to = timestamp закрытия — запись закрыта.
+ *
+ * batch_ts — единый timestamp для всех записей одного запуска.
+ *
+ * Запрос на момент: WHERE :ts BETWEEN valid_from AND valid_to
+ *
+ * @property int $id
+ * @property string $store_id
+ * @property string|null $store_name
+ * @property string $product_id
+ * @property string|null $product_name
+ * @property string|null $articule
+ * @property string|null $father_id
+ * @property array|null $components
+ * @property float $quantity
+ * @property float $reserv
+ * @property string $valid_from
+ * @property string $valid_to
+ * @property bool $is_active
+ * @property string $batch_ts
+ * @property string $created_at
+ *
+ * @property Products1c $store
+ * @property Products1c $product
+ */
+class StockState extends ActiveRecord
+{
+ /** Timestamp «бесконечности» для активных записей */
+ public const INFINITY_TS = '2100-01-01 00:00:00';
+
+ public static function tableName(): string
+ {
+ return '{{%stock_state}}';
+ }
+
+ public function rules(): array
+ {
+ return [
+ [['store_id', 'product_id', 'valid_from'], 'required'],
+ [['store_id', 'product_id', 'father_id'], 'string', 'max' => 36],
+ [['store_name', 'product_name'], 'string', 'max' => 255],
+ [['articule'], 'string', 'max' => 36],
+ [['quantity', 'reserv'], 'number', 'min' => 0],
+ [['quantity', 'reserv'], 'default', 'value' => 0],
+ [['valid_from', 'valid_to'], 'datetime', 'format' => 'php:Y-m-d H:i:s'],
+ [['valid_to'], 'default', 'value' => self::INFINITY_TS],
+ [['is_active'], 'boolean'],
+ [['is_active'], 'default', 'value' => true],
+ [['batch_ts'], 'datetime', 'format' => 'php:Y-m-d H:i:s'],
+ [['components'], 'safe'],
+ ];
+ }
+
+ public function getStore(): \yii\db\ActiveQuery
+ {
+ return $this->hasOne(Products1c::class, ['id' => 'store_id']);
+ }
+
+ public function getProduct(): \yii\db\ActiveQuery
+ {
+ return $this->hasOne(Products1c::class, ['id' => 'product_id']);
+ }
+}
namespace app\services;
/**
- * DTO: результат StockHistoryService::collect().
+ * DTO: результат StockStateService::collect().
*/
class CollectResult
{
namespace app\services;
/**
- * DTO: результат DQ assertions для StockHistoryService.
+ * DTO: результат DQ assertions для StockStateService.
*/
class DqResult
{
+++ /dev/null
-<?php
-
-declare(strict_types=1);
-
-namespace app\services;
-
-use yii\db\Connection;
-use Yii;
-
-/**
- * ERP-33: ETL-сервис для сбора исторических срезов остатков.
- *
- * Алгоритм collect():
- * 1. Advisory lock (pg_try_advisory_lock)
- * 2. SELECT balances LEFT JOIN products_1c
- * 3. Batch INSERT ON CONFLICT DO UPDATE (1000 строк)
- * 4. DQ assertions
- * 5. Log + unlock
- */
-class StockHistoryService
-{
- private const ADVISORY_LOCK_KEY = 123456789;
- private const BATCH_SIZE = 1000;
- private const LOCK_TIMEOUT_SEC = 30;
- private const DEVIATION_THRESHOLD = 0.2;
- private const MAX_RETRY = 3;
-
- private Connection $db;
- /** @var callable|null */
- private $telegramCallback;
-
- public function __construct(Connection $db, ?callable $telegramCallback = null)
- {
- $this->db = $db;
- $this->telegramCallback = $telegramCallback;
- }
-
- /**
- * Основной метод сбора остатков.
- */
- public function collect(string $snapshotTime): CollectResult
- {
- if (!preg_match('/^\d{2}:\d{2}$/', $snapshotTime)) {
- throw new \InvalidArgumentException("Invalid snapshotTime format: {$snapshotTime}, expected HH:MM");
- }
-
- $snapshotDate = date('Y-m-d');
-
- // 1. Advisory lock
- $this->acquireLock();
-
- try {
- // 2. Обеспечить наличие партиций
- $this->ensurePartitions();
-
- // 3. SELECT данные
- $rows = $this->fetchBalances();
-
- // 4. Batch INSERT
- $insertedCount = $this->batchInsert($rows, $snapshotDate, $snapshotTime);
-
- // 5. DQ assertions
- $dqResult = $this->runDqAssertions($snapshotDate, $snapshotTime);
-
- // 6. Log
- $this->logResult($snapshotDate, $snapshotTime, $insertedCount, $dqResult);
-
- return new CollectResult(true, $insertedCount, $dqResult);
- } finally {
- $this->releaseLock();
- }
- }
-
- /**
- * DQ assertions после сбора.
- */
- public function runDqAssertions(string $date, string $time): DqResult
- {
- $failures = [];
-
- // DQ-1: stores count >= active stores
- $activeStores = (int)$this->db->createCommand()
- ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'")
- ->queryScalar();
-
- $snapshotStores = (int)$this->db->createCommand()
- ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
- ->bindValues([':date' => $date, ':time' => $time])
- ->queryScalar();
-
- if ($snapshotStores < $activeStores) {
- $failures[] = [
- 'level' => 'CRITICAL',
- 'code' => 'DQ-1',
- 'message' => "Stores in snapshot ($snapshotStores) < active stores ($activeStores)",
- ];
- }
-
- // DQ-2: no NULL in required fields
- $nullCount = (int)$this->db->createCommand()
- ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)")
- ->bindValues([':date' => $date, ':time' => $time])
- ->queryScalar();
-
- if ($nullCount > 0) {
- $failures[] = [
- 'level' => 'CRITICAL',
- 'code' => 'DQ-2',
- 'message' => "Found $nullCount records with NULL in required fields",
- ];
- }
-
- // DQ-3: no negative quantity
- $negativeQty = (int)$this->db->createCommand()
- ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND quantity < 0")
- ->bindValues([':date' => $date, ':time' => $time])
- ->queryScalar();
-
- if ($negativeQty > 0) {
- $failures[] = [
- 'level' => 'MAJOR',
- 'code' => 'DQ-3',
- 'message' => "Found $negativeQty records with negative quantity",
- ];
- }
-
- // DQ-4: reserv <= quantity
- $reservOverQty = (int)$this->db->createCommand()
- ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND reserv > quantity")
- ->bindValues([':date' => $date, ':time' => $time])
- ->queryScalar();
-
- if ($reservOverQty > 0) {
- $failures[] = [
- 'level' => 'WARNING',
- 'code' => 'DQ-4',
- 'message' => "Found $reservOverQty records where reserv > quantity",
- ];
- }
-
- // DQ-5: row count deviation
- $currentCount = (int)$this->db->createCommand()
- ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
- ->bindValues([':date' => $date, ':time' => $time])
- ->queryScalar();
-
- $previousCount = (int)$this->db->createCommand()
- ->setSql("
- SELECT COUNT(*) FROM stock_history
- WHERE (snapshot_date, snapshot_time) = (
- SELECT snapshot_date, snapshot_time FROM stock_history
- WHERE (snapshot_date, snapshot_time) < (:date, :time)
- GROUP BY snapshot_date, snapshot_time
- ORDER BY snapshot_date DESC, snapshot_time DESC
- LIMIT 1
- )
- ")
- ->bindValues([':date' => $date, ':time' => $time])
- ->queryScalar();
-
- if ($previousCount > 0) {
- $deviation = abs($currentCount - $previousCount) / $previousCount;
- if ($deviation > self::DEVIATION_THRESHOLD) {
- $pct = round($deviation * 100, 1);
- $failures[] = [
- 'level' => 'MAJOR',
- 'code' => 'DQ-5',
- 'message' => "Row count deviation {$pct}% (current: $currentCount, previous: $previousCount)",
- ];
- }
- }
-
- // DQ-6: non-empty snapshot
- if ($currentCount === 0) {
- $failures[] = [
- 'level' => 'CRITICAL',
- 'code' => 'DQ-6',
- 'message' => "Empty snapshot: 0 records for $date $time",
- ];
- }
-
- $dqResult = new DqResult($failures);
-
- // Отправить алерты при CRITICAL/MAJOR
- $this->sendAlerts($dqResult, $date, $time);
-
- return $dqResult;
- }
-
- /**
- * Обеспечить наличие партиций для текущего и следующего месяца.
- */
- public function ensurePartitions(): void
- {
- $currentMonth = date('Y-m');
- $nextMonth = date('Y-m', strtotime('+1 month'));
-
- $this->createPartition($currentMonth);
- $this->createPartition($nextMonth);
- }
-
- /**
- * Создание партиции на указанный месяц.
- */
- public function createPartition(string $yearMonth): void
- {
- if (!preg_match('/^\d{4}-\d{2}$/', $yearMonth)) {
- throw new \InvalidArgumentException("Invalid yearMonth format: {$yearMonth}, expected YYYY-MM");
- }
-
- $start = $yearMonth . '-01';
- $end = date('Y-m-d', strtotime($start . ' +1 month'));
- $suffix = str_replace('-', '_', $yearMonth);
-
- // DDL не поддерживает bind params — используем интерполяцию (формат валидирован regex)
- $this->db->createCommand()
- ->setSql("CREATE TABLE IF NOT EXISTS stock_history_{$suffix} PARTITION OF stock_history FOR VALUES FROM ('{$start}') TO ('{$end}')")
- ->execute();
- }
-
- /**
- * Удаление партиций старше retention периода.
- */
- public function dropOldPartitions(int $retentionMonths = 24): void
- {
- $cutoffDate = date('Y-m-d', strtotime("-{$retentionMonths} months"));
- $cutoffYm = substr($cutoffDate, 0, 7); // "2024-02"
-
- $partitions = $this->db->createCommand()
- ->setSql("SELECT tablename FROM pg_tables WHERE schemaname = 'erp24' AND tablename LIKE 'stock_history_%' ORDER BY tablename")
- ->queryAll();
-
- foreach ($partitions as $row) {
- $name = $row['tablename'];
- // Извлекаем дату из имени: stock_history_2024_01 → 2024-01
- if (preg_match('/stock_history_(\d{4})_(\d{2})$/', $name, $m)) {
- $partitionYm = $m[1] . '-' . $m[2];
- if ($partitionYm < $cutoffYm) {
- $this->db->createCommand()
- ->setSql("DROP TABLE IF EXISTS {$name}")
- ->execute();
-
- Yii::info("Dropped old partition: {$name}", 'stock-history');
- }
- }
- }
- }
-
- // =========================================================
- // Private methods
- // =========================================================
-
- protected function getLockTimeoutSec(): int
- {
- return self::LOCK_TIMEOUT_SEC;
- }
-
- protected function getLockRetrySleepUs(): int
- {
- return 500000; // 0.5 sec
- }
-
- private function acquireLock(): void
- {
- $deadline = time() + $this->getLockTimeoutSec();
- $sleepUs = $this->getLockRetrySleepUs();
-
- while (time() < $deadline) {
- $locked = $this->db->createCommand()
- ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")")
- ->queryScalar();
-
- if ($locked) {
- return;
- }
-
- usleep($sleepUs);
- }
-
- $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds';
- try {
- Yii::error($msg, 'stock-history');
- } catch (\Throwable $e) {
- // Логирование не должно маскировать основную ошибку
- }
- $this->sendTelegram("CRITICAL: Stock History ETL — $msg");
-
- throw new \RuntimeException($msg);
- }
-
- private function releaseLock(): void
- {
- $this->db->createCommand()
- ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")")
- ->queryScalar();
- }
-
- private function fetchBalances(): array
- {
- return $this->db->createCommand()
- ->setSql("
- SELECT
- b.store_id,
- ps.name as store_name,
- b.product_id,
- pp.name as product_name,
- pp.articule,
- pp.parent_id as father_id,
- pp.components,
- b.quantity,
- b.reserv
- FROM balances b
- LEFT JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
- LEFT JOIN products_1c pp ON pp.id = b.product_id
- WHERE ps.id IS NOT NULL
- ORDER BY b.store_id, b.product_id
- ")
- ->queryAll();
- }
-
- private function batchInsert(array $rows, string $date, string $time): int
- {
- if (empty($rows)) {
- return 0;
- }
-
- $total = 0;
- $chunks = array_chunk($rows, self::BATCH_SIZE);
-
- foreach ($chunks as $chunk) {
- $total += $this->insertChunkWithRetry($chunk, $date, $time);
- }
-
- return $total;
- }
-
- private function insertChunkWithRetry(array $chunk, string $date, string $time): int
- {
- $attempt = 0;
-
- while ($attempt < self::MAX_RETRY) {
- try {
- return $this->insertChunk($chunk, $date, $time);
- } catch (\yii\db\Exception $e) {
- $attempt++;
- if ($attempt >= self::MAX_RETRY) {
- throw $e;
- }
- // Exponential backoff: 1s, 2s, 4s
- sleep((int)pow(2, $attempt - 1));
- Yii::warning("Batch INSERT retry #{$attempt}: " . $e->getMessage(), 'stock-history');
- }
- }
-
- return 0;
- }
-
- private function insertChunk(array $chunk, string $date, string $time): int
- {
- $values = [];
- $params = [];
- $i = 0;
-
- foreach ($chunk as $row) {
- $components = $row['components'] ?? null;
- if ($components !== null && !is_string($components)) {
- $components = json_encode($components);
- }
-
- $values[] = "(:date_{$i}, :time_{$i}, :sid_{$i}, :sn_{$i}, :pid_{$i}, :pn_{$i}, :art_{$i}, :fid_{$i}, :comp_{$i}::jsonb, :qty_{$i}, :res_{$i}, NOW())";
- $params[":date_{$i}"] = $date;
- $params[":time_{$i}"] = $time;
- $params[":sid_{$i}"] = $row['store_id'];
- $params[":sn_{$i}"] = $row['store_name'] ?? null;
- $params[":pid_{$i}"] = $row['product_id'];
- $params[":pn_{$i}"] = $row['product_name'] ?? null;
- $params[":art_{$i}"] = $row['articule'] ?? null;
- $params[":fid_{$i}"] = $row['father_id'] ?? null;
- $params[":comp_{$i}"] = $components;
- $params[":qty_{$i}"] = $row['quantity'] ?? 0;
- $params[":res_{$i}"] = $row['reserv'] ?? 0;
- $i++;
- }
-
- $sql = "INSERT INTO stock_history (snapshot_date, snapshot_time, store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, created_at)
- VALUES " . implode(', ', $values) . "
- ON CONFLICT (snapshot_date, snapshot_time, store_id, product_id)
- DO UPDATE SET
- quantity = EXCLUDED.quantity,
- reserv = EXCLUDED.reserv,
- store_name = EXCLUDED.store_name,
- product_name = EXCLUDED.product_name,
- articule = EXCLUDED.articule,
- father_id = EXCLUDED.father_id,
- components = EXCLUDED.components,
- created_at = NOW()";
-
- return $this->db->createCommand()
- ->setSql($sql)
- ->bindValues($params)
- ->execute();
- }
-
- private function logResult(string $date, string $time, int $rowCount, DqResult $dqResult): void
- {
- $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES';
- try {
- Yii::info("Stock History ETL [{$date} {$time}]: {$status}, rows={$rowCount}", 'stock-history');
- } catch (\Throwable $e) {
- // Логирование не должно прерывать ETL
- }
- }
-
- private function sendAlerts(DqResult $dqResult, string $date, string $time): void
- {
- $criticals = $dqResult->getCriticalFailures();
- $majors = $dqResult->getMajorFailures();
-
- if (empty($criticals) && empty($majors)) {
- return;
- }
-
- $messages = [];
- foreach ($criticals as $f) {
- $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}";
- }
- foreach ($majors as $f) {
- $messages[] = "MAJOR [{$f['code']}]: {$f['message']}";
- }
-
- try {
- foreach ($criticals as $f) {
- Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-history');
- }
- foreach ($majors as $f) {
- Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-history');
- }
- } catch (\Throwable $e) {
- // Логирование не должно прерывать ETL
- }
-
- $text = "Stock History DQ [{$date} {$time}]:\n" . implode("\n", $messages);
- $this->sendTelegram($text);
- }
-
- private function sendTelegram(string $message): void
- {
- if ($this->telegramCallback !== null) {
- ($this->telegramCallback)($message);
- return;
- }
-
- try {
- if (Yii::$app && Yii::$app->has('queue')) {
- Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([
- 'message' => $message,
- ]));
- }
- } catch (\Throwable $e) {
- Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-history');
- }
- }
-}
--- /dev/null
+<?php
+
+declare(strict_types=1);
+
+namespace app\services;
+
+use yii\db\Connection;
+use Yii;
+
+/**
+ * ERP-33: SCD-2 сервис учёта остатков.
+ *
+ * Алгоритм collect() — 2 SQL-запроса + batch_ts:
+ * 1. UPDATE stock_state SET valid_to = :now, is_active = false WHERE quantity/reserv изменились
+ * 2. INSERT INTO stock_state ... WHERE нет активной записи (новые + изменённые)
+ *
+ * valid_from / valid_to — TIMESTAMP (поддержка нескольких запусков в день).
+ * valid_to = '2100-01-01 00:00:00' для активных записей → BETWEEN.
+ * is_active — флаг быстрой фильтрации.
+ * batch_ts — единый timestamp на весь пакет (один запуск = одно значение).
+ */
+class StockStateService
+{
+ private const ADVISORY_LOCK_KEY = 123456789;
+ private const LOCK_TIMEOUT_SEC = 30;
+ private const DEVIATION_THRESHOLD = 0.2;
+ public const INFINITY_TS = '2100-01-01 00:00:00';
+
+ private Connection $db;
+ /** @var callable|null */
+ private $telegramCallback;
+
+ public function __construct(Connection $db, ?callable $telegramCallback = null)
+ {
+ $this->db = $db;
+ $this->telegramCallback = $telegramCallback;
+ }
+
+ /**
+ * Основной метод: SCD-2 сбор остатков.
+ * batch_ts — один timestamp на весь запуск.
+ */
+ public function collect(): CollectResult
+ {
+ $now = date('Y-m-d H:i:s');
+
+ $this->acquireLock();
+
+ try {
+ // 1. Закрыть устаревшие записи (остаток изменился)
+ $closedCount = $this->closeChangedRecords($now);
+
+ // 2. Вставить новые/изменённые
+ $insertedCount = $this->insertNewRecords($now);
+
+ // 3. DQ assertions
+ $dqResult = $this->runDqAssertions();
+
+ // 4. Log
+ $this->logResult($now, $closedCount, $insertedCount, $dqResult);
+
+ return new CollectResult(true, $insertedCount, $dqResult);
+ } finally {
+ $this->releaseLock();
+ }
+ }
+
+ /**
+ * Закрыть записи, где quantity или reserv изменились.
+ * SET valid_to = :now, is_active = false.
+ */
+ private function closeChangedRecords(string $now): int
+ {
+ return $this->db->createCommand()
+ ->setSql("
+ UPDATE stock_state ss
+ SET valid_to = :now::timestamp - INTERVAL '1 second', is_active = false
+ FROM balances b
+ JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
+ WHERE ss.store_id = b.store_id
+ AND ss.product_id = b.product_id
+ AND ss.is_active = true
+ AND (ss.quantity != b.quantity OR ss.reserv != b.reserv)
+ ")
+ ->bindValue(':now', $now)
+ ->execute();
+ }
+
+ /**
+ * Вставить записи для новых товаров и для тех, чей остаток изменился.
+ * valid_from = :now, valid_to = infinity, is_active = true, batch_ts = :now.
+ */
+ private function insertNewRecords(string $now): int
+ {
+ return $this->db->createCommand()
+ ->setSql("
+ INSERT INTO stock_state (store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, valid_from, valid_to, is_active, batch_ts, created_at)
+ SELECT
+ b.store_id,
+ ps.name,
+ b.product_id,
+ pp.name,
+ pp.articule,
+ pp.parent_id,
+ pp.components,
+ b.quantity,
+ b.reserv,
+ :now,
+ :infinity,
+ true,
+ :batch_ts,
+ NOW()
+ FROM balances b
+ JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
+ LEFT JOIN products_1c pp ON pp.id = b.product_id
+ LEFT JOIN stock_state ss
+ ON ss.store_id = b.store_id
+ AND ss.product_id = b.product_id
+ AND ss.is_active = true
+ WHERE ss.id IS NULL
+ ")
+ ->bindValues([':now' => $now, ':infinity' => self::INFINITY_TS, ':batch_ts' => $now])
+ ->execute();
+ }
+
+ /**
+ * DQ assertions по активным записям (is_active = true).
+ */
+ public function runDqAssertions(): DqResult
+ {
+ $failures = [];
+
+ // DQ-1: stores count >= active stores
+ $activeStores = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'")
+ ->queryScalar();
+
+ $snapshotStores = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_state WHERE is_active = true")
+ ->queryScalar();
+
+ if ($snapshotStores < $activeStores) {
+ $failures[] = [
+ 'level' => 'CRITICAL',
+ 'code' => 'DQ-1',
+ 'message' => "Stores in active records ($snapshotStores) < active stores ($activeStores)",
+ ];
+ }
+
+ // DQ-2: no NULL in required fields
+ $nullCount = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)")
+ ->queryScalar();
+
+ if ($nullCount > 0) {
+ $failures[] = [
+ 'level' => 'CRITICAL',
+ 'code' => 'DQ-2',
+ 'message' => "Found $nullCount active records with NULL in required fields",
+ ];
+ }
+
+ // DQ-3: no negative quantity
+ $negativeQty = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND quantity < 0")
+ ->queryScalar();
+
+ if ($negativeQty > 0) {
+ $failures[] = [
+ 'level' => 'MAJOR',
+ 'code' => 'DQ-3',
+ 'message' => "Found $negativeQty active records with negative quantity",
+ ];
+ }
+
+ // DQ-4: reserv <= quantity
+ $reservOverQty = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true AND reserv > quantity")
+ ->queryScalar();
+
+ if ($reservOverQty > 0) {
+ $failures[] = [
+ 'level' => 'WARNING',
+ 'code' => 'DQ-4',
+ 'message' => "Found $reservOverQty active records where reserv > quantity",
+ ];
+ }
+
+ // DQ-5: active count deviation vs previous batch
+ $activeCount = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_state WHERE is_active = true")
+ ->queryScalar();
+
+ $prevActiveCount = (int)$this->db->createCommand()
+ ->setSql("
+ SELECT COUNT(*) FROM stock_state
+ WHERE valid_from <= (NOW() - INTERVAL '12 hours')
+ AND valid_to > (NOW() - INTERVAL '12 hours')
+ ")
+ ->queryScalar();
+
+ if ($prevActiveCount > 0) {
+ $deviation = abs($activeCount - $prevActiveCount) / $prevActiveCount;
+ if ($deviation > self::DEVIATION_THRESHOLD) {
+ $pct = round($deviation * 100, 1);
+ $failures[] = [
+ 'level' => 'MAJOR',
+ 'code' => 'DQ-5',
+ 'message' => "Active records deviation {$pct}% (current: $activeCount, previous: $prevActiveCount)",
+ ];
+ }
+ }
+
+ // DQ-6: non-empty active set
+ if ($activeCount === 0) {
+ $failures[] = [
+ 'level' => 'CRITICAL',
+ 'code' => 'DQ-6',
+ 'message' => "Zero active records in stock_state",
+ ];
+ }
+
+ $dqResult = new DqResult($failures);
+ $this->sendAlerts($dqResult);
+
+ return $dqResult;
+ }
+
+ /**
+ * Удаление закрытых записей старше retention.
+ */
+ public function cleanupOldRecords(int $retentionMonths = 24): int
+ {
+ $cutoff = date('Y-m-d H:i:s', strtotime("-{$retentionMonths} months"));
+
+ return $this->db->createCommand()
+ ->setSql("DELETE FROM stock_state WHERE is_active = false AND valid_to < :cutoff")
+ ->bindValues([':cutoff' => $cutoff])
+ ->execute();
+ }
+
+ // =========================================================
+ // Lock management
+ // =========================================================
+
+ protected function getLockTimeoutSec(): int
+ {
+ return self::LOCK_TIMEOUT_SEC;
+ }
+
+ protected function getLockRetrySleepUs(): int
+ {
+ return 500000; // 0.5 sec
+ }
+
+ private function acquireLock(): void
+ {
+ $deadline = time() + $this->getLockTimeoutSec();
+ $sleepUs = $this->getLockRetrySleepUs();
+
+ while (time() < $deadline) {
+ $locked = $this->db->createCommand()
+ ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")")
+ ->queryScalar();
+
+ if ($locked) {
+ return;
+ }
+
+ usleep($sleepUs);
+ }
+
+ $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds';
+ try {
+ Yii::error($msg, 'stock-state');
+ } catch (\Throwable $e) {
+ // Логирование не должно маскировать основную ошибку
+ }
+ $this->sendTelegram("CRITICAL: Stock State ETL — $msg");
+
+ throw new \RuntimeException($msg);
+ }
+
+ private function releaseLock(): void
+ {
+ $this->db->createCommand()
+ ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")")
+ ->queryScalar();
+ }
+
+ // =========================================================
+ // Logging & alerts
+ // =========================================================
+
+ private function logResult(string $batchTs, int $closedCount, int $insertedCount, DqResult $dqResult): void
+ {
+ $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES';
+ try {
+ Yii::info("Stock State SCD-2 [{$batchTs}]: {$status}, closed={$closedCount}, inserted={$insertedCount}", 'stock-state');
+ } catch (\Throwable $e) {
+ // Логирование не должно прерывать ETL
+ }
+ }
+
+ private function sendAlerts(DqResult $dqResult): void
+ {
+ $criticals = $dqResult->getCriticalFailures();
+ $majors = $dqResult->getMajorFailures();
+
+ if (empty($criticals) && empty($majors)) {
+ return;
+ }
+
+ $messages = [];
+ foreach ($criticals as $f) {
+ $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}";
+ }
+ foreach ($majors as $f) {
+ $messages[] = "MAJOR [{$f['code']}]: {$f['message']}";
+ }
+
+ try {
+ foreach ($criticals as $f) {
+ Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-state');
+ }
+ foreach ($majors as $f) {
+ Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-state');
+ }
+ } catch (\Throwable $e) {
+ // Логирование не должно прерывать ETL
+ }
+
+ $now = date('Y-m-d H:i:s');
+ $text = "Stock State DQ [{$now}]:\n" . implode("\n", $messages);
+ $this->sendTelegram($text);
+ }
+
+ private function sendTelegram(string $message): void
+ {
+ if ($this->telegramCallback !== null) {
+ ($this->telegramCallback)($message);
+ return;
+ }
+
+ try {
+ if (Yii::$app && Yii::$app->has('queue')) {
+ Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([
+ 'message' => $message,
+ ]));
+ }
+ } catch (\Throwable $e) {
+ try {
+ Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-state');
+ } catch (\Throwable $e2) {
+ // ignore
+ }
+ }
+ }
+}
+++ /dev/null
-<?php
-
-declare(strict_types=1);
-
-namespace tests\unit\services;
-
-use Codeception\Test\Unit;
-use app\services\StockHistoryService;
-
-/**
- * ERP-33: Тесты StockHistoryService — ETL для сбора остатков.
- *
- * @group services
- * @group stock-history
- * @group etl
- */
-class StockHistoryServiceTest extends Unit
-{
- protected $tester;
-
- /** No-op callback для подавления Telegram/Yii в тестах */
- private static function noopTelegramCallback(): callable
- {
- return static function (string $message): void {};
- }
-
- /**
- * @return array{0: \yii\db\Connection, 1: \yii\db\Command}
- */
- private function createMockDbAndCommand(): array
- {
- $db = $this->createMock(\yii\db\Connection::class);
- $command = $this->createMock(\yii\db\Command::class);
-
- $db->method('createCommand')->willReturn($command);
- $command->method('setSql')->willReturnSelf();
- $command->method('bindValues')->willReturnSelf();
- $command->method('bindValue')->willReturnSelf();
-
- return [$db, $command];
- }
-
- // =========================================================
- // collect() tests
- // =========================================================
-
- public function testCollect_Success_InsertsRecordsAndReturnsResult(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(
- true, // pg_try_advisory_lock
- 5, // active stores (DQ-1)
- 10, // current count (DQ-6 + DQ-5)
- 5, // distinct stores in snapshot (DQ-1)
- 0, // null required (DQ-2)
- 0, // negative qty (DQ-3)
- 0, // reserv > qty (DQ-4)
- 10, // previous count (DQ-5)
- true // pg_advisory_unlock
- );
-
- $command->method('queryAll')->willReturn(
- array_fill(0, 10, [
- 'store_id' => 'store-1', 'store_name' => 'Store',
- 'product_id' => 'prod-1', 'product_name' => 'Rose',
- 'articule' => 'ART-001', 'father_id' => null,
- 'components' => null, 'quantity' => 5.00, 'reserv' => 1.00,
- ])
- );
- $command->method('execute')->willReturn(10);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $result = $service->collect('08:00');
-
- $this->assertTrue($result->isSuccess());
- $this->assertEquals(10, $result->getRowCount());
- }
-
- public function testCollect_LockTimeout_ThrowsLockException(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- // Lock always fails
- $command->method('queryScalar')->willReturn(false);
-
- $this->expectException(\RuntimeException::class);
- $this->expectExceptionMessage('advisory lock');
-
- // Анонимный класс с минимальными таймаутами для быстрого теста
- $noopCallback = self::noopTelegramCallback();
- $service = new class($db, $noopCallback) extends StockHistoryService {
- protected function getLockTimeoutSec(): int { return 1; }
- protected function getLockRetrySleepUs(): int { return 100000; } // 0.1 sec
- };
- $service->collect('08:00');
- }
-
- public function testCollect_InvalidTimeFormat_ThrowsException(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $this->expectException(\InvalidArgumentException::class);
- $this->expectExceptionMessage('snapshotTime');
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $service->collect('invalid');
- }
-
- public function testCollect_EmptyBalances_ReturnsZeroRows(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(
- true, 0, 0, 0, 0, 0, 0, 0, true
- );
- $command->method('queryAll')->willReturn([]);
- $command->method('execute')->willReturn(0);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $result = $service->collect('08:00');
-
- $this->assertTrue($result->isSuccess());
- $this->assertEquals(0, $result->getRowCount());
- }
-
- public function testCollect_OnConflictUpdate_UsesUpsertQuery(): void
- {
- $db = $this->createMock(\yii\db\Connection::class);
- $command = $this->createMock(\yii\db\Command::class);
-
- $db->method('createCommand')->willReturn($command);
- $command->method('bindValues')->willReturnSelf();
- $command->method('bindValue')->willReturnSelf();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(true, 1, 1, 1, 0, 0, 0, 1, true);
-
- $executedSql = [];
- $command->method('setSql')->willReturnCallback(
- function ($sql) use ($command, &$executedSql) {
- $executedSql[] = $sql;
- return $command;
- }
- );
- $command->method('queryAll')->willReturn([
- ['store_id' => 's1', 'store_name' => 'S', 'product_id' => 'p1',
- 'product_name' => 'P', 'articule' => 'A', 'father_id' => null,
- 'components' => null, 'quantity' => 1, 'reserv' => 0],
- ]);
- $command->method('execute')->willReturn(1);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $service->collect('08:00');
-
- $hasOnConflict = false;
- foreach ($executedSql as $sql) {
- if (stripos($sql, 'ON CONFLICT') !== false) {
- $hasOnConflict = true;
- }
- }
- $this->assertTrue($hasOnConflict, 'SQL должен содержать ON CONFLICT DO UPDATE');
- }
-
- // =========================================================
- // DQ Assertions tests
- // =========================================================
-
- public function testDqAssertions_AllPass_ReturnsSuccess(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(5, 5, 0, 0, 0, 100, 95);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $result = $service->runDqAssertions('2026-02-21', '08:00');
-
- $this->assertTrue($result->allPassed());
- $this->assertEmpty($result->getCriticalFailures());
- }
-
- public function testDqAssertions_MissingStores_ReturnsCritical(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(24, 20, 0, 0, 0, 100, 100);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $result = $service->runDqAssertions('2026-02-21', '08:00');
-
- $this->assertFalse($result->allPassed());
- $criticals = $result->getCriticalFailures();
- $this->assertNotEmpty($criticals);
- $this->assertStringContainsString('store', strtolower($criticals[0]['message']));
- }
-
- public function testDqAssertions_NegativeQuantity_ReturnsMajor(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(5, 5, 0, 3, 0, 100, 100);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $result = $service->runDqAssertions('2026-02-21', '08:00');
-
- $majors = $result->getMajorFailures();
- $this->assertNotEmpty($majors);
- }
-
- public function testDqAssertions_DeviationOver20Pct_ReturnsMajor(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(5, 5, 0, 0, 0, 100, 50);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $result = $service->runDqAssertions('2026-02-21', '08:00');
-
- $majors = $result->getMajorFailures();
- $this->assertNotEmpty($majors);
- }
-
- public function testDqAssertions_EmptySnapshot_ReturnsCritical(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $command->method('queryScalar')
- ->willReturnOnConsecutiveCalls(5, 0, 0, 0, 0, 0, 100);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $result = $service->runDqAssertions('2026-02-21', '08:00');
-
- $this->assertFalse($result->allPassed());
- $criticals = $result->getCriticalFailures();
- $this->assertNotEmpty($criticals);
- }
-
- // =========================================================
- // Partition management tests
- // =========================================================
-
- public function testCreatePartition_CreatesPartitionTable(): void
- {
- $db = $this->createMock(\yii\db\Connection::class);
- $command = $this->createMock(\yii\db\Command::class);
-
- $db->method('createCommand')->willReturn($command);
-
- $executedSql = [];
- $command->method('setSql')->willReturnCallback(
- function ($sql) use ($command, &$executedSql) {
- $executedSql[] = $sql;
- return $command;
- }
- );
- $command->method('execute')->willReturn(0);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $service->createPartition('2026-04');
-
- $sql = implode(' ', $executedSql);
- $this->assertStringContainsString('PARTITION OF', $sql);
- $this->assertStringContainsString('stock_history_2026_04', $sql);
- $this->assertStringContainsString('2026-04-01', $sql);
- $this->assertStringContainsString('2026-05-01', $sql);
- }
-
- public function testCreatePartition_InvalidFormat_ThrowsException(): void
- {
- [$db, $command] = $this->createMockDbAndCommand();
-
- $this->expectException(\InvalidArgumentException::class);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $service->createPartition('invalid');
- }
-
- public function testEnsurePartitions_CreatesCurrentAndNextMonth(): void
- {
- $db = $this->createMock(\yii\db\Connection::class);
- $command = $this->createMock(\yii\db\Command::class);
-
- $db->method('createCommand')->willReturn($command);
-
- $executedSql = [];
- $command->method('setSql')->willReturnCallback(
- function ($sql) use ($command, &$executedSql) {
- $executedSql[] = $sql;
- return $command;
- }
- );
- $command->method('execute')->willReturn(0);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $service->ensurePartitions();
-
- $partitionSql = array_filter($executedSql, fn($s) => stripos($s, 'PARTITION OF') !== false);
- $this->assertCount(2, $partitionSql, 'Должны быть созданы 2 партиции (текущий и следующий месяц)');
- }
-
- public function testDropOldPartitions_DropsPartitionsOlderThanRetention(): void
- {
- $db = $this->createMock(\yii\db\Connection::class);
- $command = $this->createMock(\yii\db\Command::class);
-
- $db->method('createCommand')->willReturn($command);
- $command->method('bindValues')->willReturnSelf();
- $command->method('bindValue')->willReturnSelf();
-
- $command->method('queryAll')->willReturn([
- ['tablename' => 'stock_history_2023_11'],
- ['tablename' => 'stock_history_2023_12'],
- ['tablename' => 'stock_history_2026_01'],
- ]);
-
- $droppedSql = [];
- $command->method('setSql')->willReturnCallback(
- function ($sql) use ($command, &$droppedSql) {
- if (stripos($sql, 'DROP TABLE') !== false) {
- $droppedSql[] = $sql;
- }
- return $command;
- }
- );
- $command->method('execute')->willReturn(0);
-
- $service = new StockHistoryService($db, self::noopTelegramCallback());
- $service->dropOldPartitions(24);
-
- $this->assertCount(2, $droppedSql, 'Должны быть удалены 2 старые партиции (2023_11 и 2023_12)');
- }
-}
--- /dev/null
+<?php
+
+declare(strict_types=1);
+
+namespace tests\unit\services;
+
+use app\services\StockStateService;
+use Codeception\Test\Unit;
+
+/**
+ * @group services
+ * @group stock-state
+ * @group scd2
+ */
+class StockStateServiceTest extends Unit
+{
+ protected $tester;
+
+ private static function noopTelegramCallback(): callable
+ {
+ return static function (string $message): void {};
+ }
+
+ /**
+ * @return array{0: \yii\db\Connection, 1: \yii\db\Command}
+ */
+ private function createMockDbAndCommand(): array
+ {
+ $db = $this->createMock(\yii\db\Connection::class);
+ $command = $this->createMock(\yii\db\Command::class);
+
+ $db->method('createCommand')->willReturn($command);
+ $command->method('setSql')->willReturnSelf();
+ $command->method('bindValues')->willReturnSelf();
+ $command->method('bindValue')->willReturnSelf();
+
+ return [$db, $command];
+ }
+
+ // =========================================================
+ // collect() tests
+ // =========================================================
+
+ public function testCollect_Success_ClosesAndInsertsRecords(): void
+ {
+ $db = $this->createMock(\yii\db\Connection::class);
+ $command = $this->createMock(\yii\db\Command::class);
+
+ $db->method('createCommand')->willReturn($command);
+ $command->method('setSql')->willReturnSelf();
+ $command->method('bindValues')->willReturnSelf();
+ $command->method('bindValue')->willReturnSelf();
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(
+ true, // lock
+ 24, 24, // DQ-1
+ 0, // DQ-2
+ 0, // DQ-3
+ 0, // DQ-4
+ 11400, // DQ-5 activeCount
+ 11300, // DQ-5 prevActiveCount
+ true // unlock
+ );
+
+ $command->method('execute')
+ ->willReturnOnConsecutiveCalls(150, 150);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $result = $service->collect();
+
+ $this->assertTrue($result->isSuccess());
+ $this->assertSame(150, $result->getRowCount());
+ $this->assertTrue($result->getDqResult()->allPassed());
+ }
+
+ public function testCollect_LockTimeout_ThrowsRuntimeException(): void
+ {
+ [$db, $command] = $this->createMockDbAndCommand();
+
+ $command->method('queryScalar')->willReturn(false);
+
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('advisory lock');
+
+ $noopCallback = self::noopTelegramCallback();
+ $service = new class($db, $noopCallback) extends StockStateService {
+ protected function getLockTimeoutSec(): int { return 1; }
+ protected function getLockRetrySleepUs(): int { return 100000; }
+ };
+ $service->collect();
+ }
+
+ public function testCollect_EmptyBalances_ReturnsZeroAndDqCritical(): void
+ {
+ $db = $this->createMock(\yii\db\Connection::class);
+ $command = $this->createMock(\yii\db\Command::class);
+
+ $db->method('createCommand')->willReturn($command);
+ $command->method('setSql')->willReturnSelf();
+ $command->method('bindValues')->willReturnSelf();
+ $command->method('bindValue')->willReturnSelf();
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(
+ true, // lock
+ 24, 0, // DQ-1
+ 0, // DQ-2
+ 0, // DQ-3
+ 0, // DQ-4
+ 0, // DQ-5
+ 0, // DQ-5 prev
+ true // unlock
+ );
+
+ $command->method('execute')
+ ->willReturnOnConsecutiveCalls(0, 0);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $result = $service->collect();
+
+ $this->assertTrue($result->isSuccess());
+ $this->assertSame(0, $result->getRowCount());
+ $this->assertFalse($result->getDqResult()->allPassed());
+ $this->assertNotEmpty($result->getDqResult()->getCriticalFailures());
+ }
+
+ // =========================================================
+ // SCD-2 SQL + batch_ts verification
+ // =========================================================
+
+ public function testCollect_GeneratesScd2QueriesWithBatchTs(): void
+ {
+ $db = $this->createMock(\yii\db\Connection::class);
+ $command = $this->createMock(\yii\db\Command::class);
+
+ $db->method('createCommand')->willReturn($command);
+ $command->method('bindValue')->willReturnSelf();
+
+ $executedSql = [];
+ $boundValues = [];
+ $command->method('setSql')->willReturnCallback(
+ function ($sql) use ($command, &$executedSql) {
+ $executedSql[] = $sql;
+ return $command;
+ }
+ );
+ $command->method('bindValues')->willReturnCallback(
+ function ($values) use ($command, &$boundValues) {
+ $boundValues = array_merge($boundValues, $values);
+ return $command;
+ }
+ );
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(
+ true,
+ 24, 24,
+ 0, 0, 0,
+ 11400, 11400,
+ true
+ );
+
+ $command->method('execute')
+ ->willReturnOnConsecutiveCalls(100, 100);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $service->collect();
+
+ $allSql = implode("\n", $executedSql);
+
+ // SCD-2 close
+ $this->assertStringContainsString('is_active = false', $allSql);
+ $this->assertStringContainsString('SET valid_to', $allSql);
+ $this->assertStringContainsString('ss.quantity', $allSql);
+ $this->assertStringContainsString('ss.reserv', $allSql);
+
+ // SCD-2 insert
+ $this->assertStringContainsString('ss.id IS NULL', $allSql);
+ $this->assertStringContainsString('valid_from', $allSql);
+ $this->assertStringContainsString('batch_ts', $allSql);
+
+ // batch_ts и infinity передаются
+ $this->assertSame('2100-01-01 00:00:00', $boundValues[':infinity'] ?? null);
+ $this->assertArrayHasKey(':batch_ts', $boundValues);
+ // batch_ts = now (тот же что и :now)
+ $this->assertSame($boundValues[':now'], $boundValues[':batch_ts']);
+ }
+
+ // =========================================================
+ // DQ assertions
+ // =========================================================
+
+ public function testDqAssertions_AllPass_ReturnsSuccess(): void
+ {
+ [$db, $command] = $this->createMockDbAndCommand();
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(24, 24, 0, 0, 0, 11400, 11300);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $result = $service->runDqAssertions();
+
+ $this->assertTrue($result->allPassed());
+ }
+
+ public function testDqAssertions_MissingStores_ReturnsCritical(): void
+ {
+ [$db, $command] = $this->createMockDbAndCommand();
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(24, 20, 0, 0, 0, 11400, 11400);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $result = $service->runDqAssertions();
+
+ $this->assertFalse($result->allPassed());
+ $this->assertSame('DQ-1', $result->getCriticalFailures()[0]['code']);
+ }
+
+ public function testDqAssertions_NegativeQuantity_ReturnsMajor(): void
+ {
+ [$db, $command] = $this->createMockDbAndCommand();
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(24, 24, 0, 3, 0, 11400, 11400);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $result = $service->runDqAssertions();
+
+ $this->assertSame('DQ-3', $result->getMajorFailures()[0]['code']);
+ }
+
+ public function testDqAssertions_DeviationOver20Pct_ReturnsMajor(): void
+ {
+ [$db, $command] = $this->createMockDbAndCommand();
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(24, 24, 0, 0, 0, 11400, 5000);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $result = $service->runDqAssertions();
+
+ $this->assertSame('DQ-5', $result->getMajorFailures()[0]['code']);
+ }
+
+ public function testDqAssertions_ZeroActiveRecords_ReturnsCritical(): void
+ {
+ [$db, $command] = $this->createMockDbAndCommand();
+
+ $command->method('queryScalar')
+ ->willReturnOnConsecutiveCalls(24, 0, 0, 0, 0, 0, 0);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $result = $service->runDqAssertions();
+
+ $this->assertFalse($result->allPassed());
+ $this->assertContains('DQ-6', array_column($result->getCriticalFailures(), 'code'));
+ }
+
+ // =========================================================
+ // Cleanup
+ // =========================================================
+
+ public function testCleanupOldRecords_DeletesClosedOlderThanRetention(): void
+ {
+ $db = $this->createMock(\yii\db\Connection::class);
+ $command = $this->createMock(\yii\db\Command::class);
+
+ $db->method('createCommand')->willReturn($command);
+ $command->method('bindValues')->willReturnSelf();
+
+ $executedSql = [];
+ $command->method('setSql')->willReturnCallback(
+ function ($sql) use ($command, &$executedSql) {
+ $executedSql[] = $sql;
+ return $command;
+ }
+ );
+ $command->method('execute')->willReturn(500);
+
+ $service = new StockStateService($db, self::noopTelegramCallback());
+ $deleted = $service->cleanupOldRecords(24);
+
+ $this->assertSame(500, $deleted);
+ $sql = implode(' ', $executedSql);
+ $this->assertStringContainsString('DELETE FROM', $sql);
+ $this->assertStringContainsString('is_active = false', $sql);
+ }
+}