]> gitweb.erp-flowers.ru Git - erp24_rep/yii-erp24/.git/commitdiff
feat(ERP-33): add stock history ETL — migration, model, service, command
authorAleksey Filippov <Aleksey.Filippov@erp-flowers.ru>
Wed, 25 Feb 2026 14:45:52 +0000 (17:45 +0300)
committerAleksey Filippov <Aleksey.Filippov@erp-flowers.ru>
Wed, 25 Feb 2026 14:45:52 +0000 (17:45 +0300)
- migration: partitioned table stock_history, уникальный индекс ON CONFLICT
- records/StockHistory: ActiveRecord модель
- services/StockHistoryService: ETL с advisory lock, batch INSERT 1000, DQ assertions (DQ-1..6), Telegram alerts
- services/CollectResult, DqResult: DTO
- commands/StockHistoryController: actionCollect, actionCreatePartition, actionDropOldPartitions

Crontab (в комментарии контроллера):
  0 8,20 * * * php yii stock-history/collect
  0 1 1 * * php yii stock-history/create-partition
  0 2 1 * * php yii stock-history/drop-old-partitions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
erp24/commands/StockHistoryController.php [new file with mode: 0644]
erp24/migrations/m260221_100000_create_stock_history_table.php [new file with mode: 0644]
erp24/records/StockHistory.php [new file with mode: 0644]
erp24/services/CollectResult.php [new file with mode: 0644]
erp24/services/DqResult.php [new file with mode: 0644]
erp24/services/StockHistoryService.php [new file with mode: 0644]

diff --git a/erp24/commands/StockHistoryController.php b/erp24/commands/StockHistoryController.php
new file mode 100644 (file)
index 0000000..5f39864
--- /dev/null
@@ -0,0 +1,121 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\commands;
+
+use app\services\StockHistoryService;
+use yii\console\Controller;
+use yii\console\ExitCode;
+use Yii;
+
+/**
+ * ERP-33: Управление историческими срезами остатков.
+ *
+ * Crontab:
+ *   0 8,20 * * * cd /www && php yii stock-history/collect >> /var/log/stock-history.log 2>&1
+ *   0 1 1 * * cd /www && php yii stock-history/create-partition >> /var/log/stock-history.log 2>&1
+ *   0 2 1 * * cd /www && php yii stock-history/drop-old-partitions >> /var/log/stock-history.log 2>&1
+ */
+class StockHistoryController extends Controller
+{
+    /**
+     * @var string Время среза (08:00 или 20:00). Если пусто — авто-определение.
+     */
+    public string $time = '';
+
+    public function options($actionID): array
+    {
+        return array_merge(parent::options($actionID), ['time']);
+    }
+
+    /**
+     * Собрать срез остатков.
+     */
+    public function actionCollect(): int
+    {
+        $snapshotTime = $this->resolveTime();
+        $this->stdout("Stock History ETL: collecting snapshot for {$snapshotTime}...\n");
+
+        $service = $this->createService();
+
+        try {
+            $result = $service->collect($snapshotTime);
+
+            $this->stdout("Done: {$result->getRowCount()} rows inserted.\n");
+
+            $dq = $result->getDqResult();
+            if ($dq !== null && !$dq->allPassed()) {
+                $this->stderr("DQ failures detected:\n");
+                foreach ($dq->getAll() as $f) {
+                    $this->stderr("  [{$f['level']}] {$f['code']}: {$f['message']}\n");
+                }
+                return ExitCode::OK; // ETL прошёл, но DQ имеет замечания
+            }
+
+            return ExitCode::OK;
+        } catch (\RuntimeException $e) {
+            $this->stderr("ERROR: {$e->getMessage()}\n");
+            return ExitCode::SOFTWARE;
+        } catch (\Throwable $e) {
+            $this->stderr("FATAL: {$e->getMessage()}\n");
+            Yii::error("Stock History ETL fatal: " . $e->getMessage(), 'stock-history');
+            return ExitCode::SOFTWARE;
+        }
+    }
+
+    /**
+     * Создать партицию на указанный месяц.
+     * @param string $month Формат "2026-03". По умолчанию — следующий месяц.
+     */
+    public function actionCreatePartition(string $month = ''): int
+    {
+        if (empty($month)) {
+            $month = date('Y-m', strtotime('+1 month'));
+        }
+
+        $this->stdout("Creating partition for {$month}...\n");
+
+        try {
+            $this->createService()->createPartition($month);
+            $this->stdout("Partition stock_history_{$month} created.\n");
+            return ExitCode::OK;
+        } catch (\Throwable $e) {
+            $this->stderr("ERROR: {$e->getMessage()}\n");
+            return ExitCode::SOFTWARE;
+        }
+    }
+
+    /**
+     * Удалить старые партиции.
+     * @param int $months Retention в месяцах (default 24).
+     */
+    public function actionDropOldPartitions(int $months = 24): int
+    {
+        $this->stdout("Dropping partitions older than {$months} months...\n");
+
+        try {
+            $this->createService()->dropOldPartitions($months);
+            $this->stdout("Done.\n");
+            return ExitCode::OK;
+        } catch (\Throwable $e) {
+            $this->stderr("ERROR: {$e->getMessage()}\n");
+            return ExitCode::SOFTWARE;
+        }
+    }
+
+    private function resolveTime(): string
+    {
+        if (!empty($this->time)) {
+            return $this->time;
+        }
+
+        $hour = (int)date('H');
+        return $hour < 14 ? '08:00' : '20:00';
+    }
+
+    private function createService(): StockHistoryService
+    {
+        return new StockHistoryService(Yii::$app->db);
+    }
+}
diff --git a/erp24/migrations/m260221_100000_create_stock_history_table.php b/erp24/migrations/m260221_100000_create_stock_history_table.php
new file mode 100644 (file)
index 0000000..17d235b
--- /dev/null
@@ -0,0 +1,64 @@
+<?php
+
+use yii\db\Migration;
+
+/**
+ * ERP-33: Создание partitioned таблицы stock_history для хранения
+ * исторических срезов остатков товаров по магазинам.
+ */
+class m260221_100000_create_stock_history_table extends Migration
+{
+    public function safeUp()
+    {
+        // Partitioned table — нельзя через createTable(), используем raw SQL
+        $this->execute("
+            CREATE TABLE {{%stock_history}} (
+                id BIGSERIAL,
+                snapshot_date DATE NOT NULL,
+                snapshot_time TIME NOT NULL,
+                store_id VARCHAR(36) NOT NULL,
+                store_name VARCHAR(255),
+                product_id VARCHAR(36) NOT NULL,
+                product_name VARCHAR(255),
+                articule VARCHAR(36),
+                father_id VARCHAR(36),
+                components JSONB,
+                quantity NUMERIC(12,2) NOT NULL DEFAULT 0,
+                reserv NUMERIC(12,2) NOT NULL DEFAULT 0,
+                created_at TIMESTAMP NOT NULL DEFAULT NOW(),
+                PRIMARY KEY (id, snapshot_date)
+            ) PARTITION BY RANGE (snapshot_date)
+        ");
+
+        // Партиция текущего месяца
+        $this->execute("
+            CREATE TABLE IF NOT EXISTS {{%stock_history_2026_02}}
+                PARTITION OF {{%stock_history}}
+                FOR VALUES FROM ('2026-02-01') TO ('2026-03-01')
+        ");
+
+        // Партиция следующего месяца
+        $this->execute("
+            CREATE TABLE IF NOT EXISTS {{%stock_history_2026_03}}
+                PARTITION OF {{%stock_history}}
+                FOR VALUES FROM ('2026-03-01') TO ('2026-04-01')
+        ");
+
+        // Уникальный индекс для idempotency (ON CONFLICT)
+        $this->execute("
+            CREATE UNIQUE INDEX idx_stock_history_unique
+                ON {{%stock_history}} (snapshot_date, snapshot_time, store_id, product_id)
+        ");
+
+        $this->createIndex('idx_stock_history_store', '{{%stock_history}}', 'store_id');
+        $this->createIndex('idx_stock_history_product', '{{%stock_history}}', 'product_id');
+        $this->createIndex('idx_stock_history_father', '{{%stock_history}}', 'father_id');
+        $this->createIndex('idx_stock_history_date_time', '{{%stock_history}}', ['snapshot_date', 'snapshot_time']);
+        $this->createIndex('idx_stock_history_father_date', '{{%stock_history}}', ['father_id', 'snapshot_date']);
+    }
+
+    public function safeDown()
+    {
+        $this->execute("DROP TABLE IF EXISTS {{%stock_history}} CASCADE");
+    }
+}
diff --git a/erp24/records/StockHistory.php b/erp24/records/StockHistory.php
new file mode 100644 (file)
index 0000000..6fedd4e
--- /dev/null
@@ -0,0 +1,60 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\records;
+
+use yii\db\ActiveRecord;
+
+/**
+ * ERP-33: Исторические срезы остатков товаров по магазинам.
+ *
+ * @property int $id
+ * @property string $snapshot_date
+ * @property string $snapshot_time
+ * @property string $store_id
+ * @property string|null $store_name
+ * @property string $product_id
+ * @property string|null $product_name
+ * @property string|null $articule
+ * @property string|null $father_id
+ * @property array|null $components
+ * @property float $quantity
+ * @property float $reserv
+ * @property string $created_at
+ *
+ * @property Products1c $store
+ * @property Products1c $product
+ */
+class StockHistory extends ActiveRecord
+{
+    public static function tableName(): string
+    {
+        return '{{%stock_history}}';
+    }
+
+    public function rules(): array
+    {
+        return [
+            [['snapshot_date', 'snapshot_time', 'store_id', 'product_id'], 'required'],
+            [['store_id', 'product_id', 'father_id'], 'string', 'max' => 36],
+            [['store_name', 'product_name'], 'string', 'max' => 255],
+            [['articule'], 'string', 'max' => 36],
+            [['quantity', 'reserv'], 'number', 'min' => 0],
+            [['quantity', 'reserv'], 'default', 'value' => 0],
+            [['snapshot_date'], 'date', 'format' => 'php:Y-m-d'],
+            [['snapshot_time'], 'string'],
+            [['components'], 'safe'],
+        ];
+    }
+
+    public function getStore(): \yii\db\ActiveQuery
+    {
+        return $this->hasOne(Products1c::class, ['id' => 'store_id']);
+    }
+
+    public function getProduct(): \yii\db\ActiveQuery
+    {
+        return $this->hasOne(Products1c::class, ['id' => 'product_id']);
+    }
+}
diff --git a/erp24/services/CollectResult.php b/erp24/services/CollectResult.php
new file mode 100644 (file)
index 0000000..e016789
--- /dev/null
@@ -0,0 +1,37 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\services;
+
+/**
+ * DTO: результат StockHistoryService::collect().
+ */
+class CollectResult
+{
+    private bool $success;
+    private int $rowCount;
+    private ?DqResult $dqResult;
+
+    public function __construct(bool $success, int $rowCount, ?DqResult $dqResult = null)
+    {
+        $this->success = $success;
+        $this->rowCount = $rowCount;
+        $this->dqResult = $dqResult;
+    }
+
+    public function isSuccess(): bool
+    {
+        return $this->success;
+    }
+
+    public function getRowCount(): int
+    {
+        return $this->rowCount;
+    }
+
+    public function getDqResult(): ?DqResult
+    {
+        return $this->dqResult;
+    }
+}
diff --git a/erp24/services/DqResult.php b/erp24/services/DqResult.php
new file mode 100644 (file)
index 0000000..6b02325
--- /dev/null
@@ -0,0 +1,43 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\services;
+
+/**
+ * DTO: результат DQ assertions для StockHistoryService.
+ */
+class DqResult
+{
+    private array $failures;
+
+    public function __construct(array $failures = [])
+    {
+        $this->failures = $failures;
+    }
+
+    public function allPassed(): bool
+    {
+        return empty($this->failures);
+    }
+
+    public function getCriticalFailures(): array
+    {
+        return array_values(array_filter($this->failures, fn($f) => $f['level'] === 'CRITICAL'));
+    }
+
+    public function getMajorFailures(): array
+    {
+        return array_values(array_filter($this->failures, fn($f) => $f['level'] === 'MAJOR'));
+    }
+
+    public function getWarnings(): array
+    {
+        return array_values(array_filter($this->failures, fn($f) => $f['level'] === 'WARNING'));
+    }
+
+    public function getAll(): array
+    {
+        return $this->failures;
+    }
+}
diff --git a/erp24/services/StockHistoryService.php b/erp24/services/StockHistoryService.php
new file mode 100644 (file)
index 0000000..6e5b9dd
--- /dev/null
@@ -0,0 +1,407 @@
+<?php
+
+declare(strict_types=1);
+
+namespace app\services;
+
+use yii\db\Connection;
+use Yii;
+
+/**
+ * ERP-33: ETL-сервис для сбора исторических срезов остатков.
+ *
+ * Алгоритм collect():
+ * 1. Advisory lock (pg_try_advisory_lock)
+ * 2. SELECT balances LEFT JOIN products_1c
+ * 3. Batch INSERT ON CONFLICT DO UPDATE (1000 строк)
+ * 4. DQ assertions
+ * 5. Log + unlock
+ */
+class StockHistoryService
+{
+    private const ADVISORY_LOCK_KEY = 123456789;
+    private const BATCH_SIZE = 1000;
+    private const LOCK_TIMEOUT_SEC = 30;
+    private const DEVIATION_THRESHOLD = 0.2;
+    private const MAX_RETRY = 3;
+
+    private Connection $db;
+    /** @var callable|null */
+    private $telegramCallback;
+
+    public function __construct(Connection $db, ?callable $telegramCallback = null)
+    {
+        $this->db = $db;
+        $this->telegramCallback = $telegramCallback;
+    }
+
+    /**
+     * Основной метод сбора остатков.
+     */
+    public function collect(string $snapshotTime): CollectResult
+    {
+        $snapshotDate = date('Y-m-d');
+
+        // 1. Advisory lock
+        $this->acquireLock();
+
+        try {
+            // 2. SELECT данные
+            $rows = $this->fetchBalances();
+
+            // 3. Batch INSERT
+            $insertedCount = $this->batchInsert($rows, $snapshotDate, $snapshotTime);
+
+            // 4. DQ assertions
+            $dqResult = $this->runDqAssertions($snapshotDate, $snapshotTime);
+
+            // 5. Log
+            $this->logResult($snapshotDate, $snapshotTime, $insertedCount, $dqResult);
+
+            return new CollectResult(true, $insertedCount, $dqResult);
+        } finally {
+            $this->releaseLock();
+        }
+    }
+
+    /**
+     * DQ assertions после сбора.
+     */
+    public function runDqAssertions(string $date, string $time): DqResult
+    {
+        $failures = [];
+
+        // DQ-1: stores count >= active stores
+        $activeStores = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'")
+            ->queryScalar();
+
+        $snapshotStores = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
+            ->bindValues([':date' => $date, ':time' => $time])
+            ->queryScalar();
+
+        if ($snapshotStores < $activeStores) {
+            $failures[] = [
+                'level' => 'CRITICAL',
+                'code' => 'DQ-1',
+                'message' => "Stores in snapshot ($snapshotStores) < active stores ($activeStores)",
+            ];
+        }
+
+        // DQ-2: no NULL in required fields
+        $nullCount = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)")
+            ->bindValues([':date' => $date, ':time' => $time])
+            ->queryScalar();
+
+        if ($nullCount > 0) {
+            $failures[] = [
+                'level' => 'CRITICAL',
+                'code' => 'DQ-2',
+                'message' => "Found $nullCount records with NULL in required fields",
+            ];
+        }
+
+        // DQ-3: no negative quantity
+        $negativeQty = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND quantity < 0")
+            ->bindValues([':date' => $date, ':time' => $time])
+            ->queryScalar();
+
+        if ($negativeQty > 0) {
+            $failures[] = [
+                'level' => 'MAJOR',
+                'code' => 'DQ-3',
+                'message' => "Found $negativeQty records with negative quantity",
+            ];
+        }
+
+        // DQ-4: reserv <= quantity
+        $reservOverQty = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND reserv > quantity")
+            ->bindValues([':date' => $date, ':time' => $time])
+            ->queryScalar();
+
+        if ($reservOverQty > 0) {
+            $failures[] = [
+                'level' => 'WARNING',
+                'code' => 'DQ-4',
+                'message' => "Found $reservOverQty records where reserv > quantity",
+            ];
+        }
+
+        // DQ-5: row count deviation
+        $currentCount = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
+            ->bindValues([':date' => $date, ':time' => $time])
+            ->queryScalar();
+
+        $previousCount = (int)$this->db->createCommand()
+            ->setSql("SELECT COUNT(*) FROM stock_history WHERE (snapshot_date, snapshot_time) < (:date, :time) ORDER BY snapshot_date DESC, snapshot_time DESC LIMIT 1")
+            ->bindValues([':date' => $date, ':time' => $time])
+            ->queryScalar();
+
+        if ($previousCount > 0) {
+            $deviation = abs($currentCount - $previousCount) / $previousCount;
+            if ($deviation > self::DEVIATION_THRESHOLD) {
+                $pct = round($deviation * 100, 1);
+                $failures[] = [
+                    'level' => 'MAJOR',
+                    'code' => 'DQ-5',
+                    'message' => "Row count deviation {$pct}% (current: $currentCount, previous: $previousCount)",
+                ];
+            }
+        }
+
+        // DQ-6: non-empty snapshot
+        if ($currentCount === 0) {
+            $failures[] = [
+                'level' => 'CRITICAL',
+                'code' => 'DQ-6',
+                'message' => "Empty snapshot: 0 records for $date $time",
+            ];
+        }
+
+        $dqResult = new DqResult($failures);
+
+        // Отправить алерты при CRITICAL/MAJOR
+        $this->sendAlerts($dqResult, $date, $time);
+
+        return $dqResult;
+    }
+
+    /**
+     * Создание партиции на указанный месяц.
+     */
+    public function createPartition(string $yearMonth): void
+    {
+        $start = $yearMonth . '-01';
+        $end = date('Y-m-d', strtotime($start . ' +1 month'));
+        $suffix = str_replace('-', '_', $yearMonth);
+
+        $this->db->createCommand()
+            ->setSql("CREATE TABLE IF NOT EXISTS stock_history_{$suffix} PARTITION OF stock_history FOR VALUES FROM ('{$start}') TO ('{$end}')")
+            ->execute();
+    }
+
+    /**
+     * Удаление партиций старше retention периода.
+     */
+    public function dropOldPartitions(int $retentionMonths = 24): void
+    {
+        $cutoffDate = date('Y-m-d', strtotime("-{$retentionMonths} months"));
+        $cutoffYm = substr($cutoffDate, 0, 7); // "2024-02"
+
+        $partitions = $this->db->createCommand()
+            ->setSql("SELECT tablename FROM pg_tables WHERE schemaname = 'erp24' AND tablename LIKE 'stock_history_%' ORDER BY tablename")
+            ->queryAll();
+
+        foreach ($partitions as $row) {
+            $name = $row['tablename'];
+            // Извлекаем дату из имени: stock_history_2024_01 → 2024-01
+            if (preg_match('/stock_history_(\d{4})_(\d{2})$/', $name, $m)) {
+                $partitionYm = $m[1] . '-' . $m[2];
+                if ($partitionYm < $cutoffYm) {
+                    $this->db->createCommand()
+                        ->setSql("DROP TABLE IF EXISTS {$name}")
+                        ->execute();
+
+                    Yii::info("Dropped old partition: {$name}", 'stock-history');
+                }
+            }
+        }
+    }
+
+    // =========================================================
+    // Private methods
+    // =========================================================
+
+    protected function getLockTimeoutSec(): int
+    {
+        return self::LOCK_TIMEOUT_SEC;
+    }
+
+    private function acquireLock(): void
+    {
+        $deadline = time() + $this->getLockTimeoutSec();
+
+        while (time() < $deadline) {
+            $locked = $this->db->createCommand()
+                ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")")
+                ->queryScalar();
+
+            if ($locked) {
+                return;
+            }
+
+            usleep(500000); // 0.5 sec
+        }
+
+        $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds';
+        Yii::error($msg, 'stock-history');
+        $this->sendTelegram("CRITICAL: Stock History ETL — $msg");
+
+        throw new \RuntimeException($msg);
+    }
+
+    private function releaseLock(): void
+    {
+        $this->db->createCommand()
+            ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")")
+            ->queryScalar();
+    }
+
+    private function fetchBalances(): array
+    {
+        return $this->db->createCommand()
+            ->setSql("
+                SELECT
+                    b.store_id,
+                    ps.name as store_name,
+                    b.product_id,
+                    pp.name as product_name,
+                    pp.articule,
+                    pp.parent_id as father_id,
+                    pp.components,
+                    b.quantity,
+                    b.reserv
+                FROM balances b
+                LEFT JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
+                LEFT JOIN products_1c pp ON pp.id = b.product_id
+                WHERE ps.id IS NOT NULL
+                ORDER BY b.store_id, b.product_id
+            ")
+            ->queryAll();
+    }
+
+    private function batchInsert(array $rows, string $date, string $time): int
+    {
+        if (empty($rows)) {
+            return 0;
+        }
+
+        $total = 0;
+        $chunks = array_chunk($rows, self::BATCH_SIZE);
+
+        foreach ($chunks as $chunk) {
+            $total += $this->insertChunkWithRetry($chunk, $date, $time);
+        }
+
+        return $total;
+    }
+
+    private function insertChunkWithRetry(array $chunk, string $date, string $time): int
+    {
+        $attempt = 0;
+
+        while ($attempt < self::MAX_RETRY) {
+            try {
+                return $this->insertChunk($chunk, $date, $time);
+            } catch (\yii\db\Exception $e) {
+                $attempt++;
+                if ($attempt >= self::MAX_RETRY) {
+                    throw $e;
+                }
+                // Exponential backoff: 1s, 2s, 4s
+                sleep((int)pow(2, $attempt - 1));
+                Yii::warning("Batch INSERT retry #{$attempt}: " . $e->getMessage(), 'stock-history');
+            }
+        }
+
+        return 0;
+    }
+
+    private function insertChunk(array $chunk, string $date, string $time): int
+    {
+        $values = [];
+        $params = [];
+        $i = 0;
+
+        foreach ($chunk as $row) {
+            $components = $row['components'] ?? null;
+            if ($components !== null && !is_string($components)) {
+                $components = json_encode($components);
+            }
+
+            $values[] = "(:date_{$i}, :time_{$i}, :sid_{$i}, :sn_{$i}, :pid_{$i}, :pn_{$i}, :art_{$i}, :fid_{$i}, :comp_{$i}::jsonb, :qty_{$i}, :res_{$i}, NOW())";
+            $params[":date_{$i}"] = $date;
+            $params[":time_{$i}"] = $time;
+            $params[":sid_{$i}"] = $row['store_id'];
+            $params[":sn_{$i}"] = $row['store_name'] ?? null;
+            $params[":pid_{$i}"] = $row['product_id'];
+            $params[":pn_{$i}"] = $row['product_name'] ?? null;
+            $params[":art_{$i}"] = $row['articule'] ?? null;
+            $params[":fid_{$i}"] = $row['father_id'] ?? null;
+            $params[":comp_{$i}"] = $components;
+            $params[":qty_{$i}"] = $row['quantity'] ?? 0;
+            $params[":res_{$i}"] = $row['reserv'] ?? 0;
+            $i++;
+        }
+
+        $sql = "INSERT INTO stock_history (snapshot_date, snapshot_time, store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, created_at)
+                VALUES " . implode(', ', $values) . "
+                ON CONFLICT (snapshot_date, snapshot_time, store_id, product_id)
+                DO UPDATE SET
+                    quantity = EXCLUDED.quantity,
+                    reserv = EXCLUDED.reserv,
+                    store_name = EXCLUDED.store_name,
+                    product_name = EXCLUDED.product_name,
+                    articule = EXCLUDED.articule,
+                    father_id = EXCLUDED.father_id,
+                    components = EXCLUDED.components,
+                    created_at = NOW()";
+
+        return $this->db->createCommand()
+            ->setSql($sql)
+            ->bindValues($params)
+            ->execute();
+    }
+
+    private function logResult(string $date, string $time, int $rowCount, DqResult $dqResult): void
+    {
+        $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES';
+        Yii::info("Stock History ETL [{$date} {$time}]: {$status}, rows={$rowCount}", 'stock-history');
+    }
+
+    private function sendAlerts(DqResult $dqResult, string $date, string $time): void
+    {
+        $criticals = $dqResult->getCriticalFailures();
+        $majors = $dqResult->getMajorFailures();
+
+        if (empty($criticals) && empty($majors)) {
+            return;
+        }
+
+        $messages = [];
+        foreach ($criticals as $f) {
+            $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}";
+            Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-history');
+        }
+        foreach ($majors as $f) {
+            $messages[] = "MAJOR [{$f['code']}]: {$f['message']}";
+            Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-history');
+        }
+
+        $text = "Stock History DQ [{$date} {$time}]:\n" . implode("\n", $messages);
+        $this->sendTelegram($text);
+    }
+
+    private function sendTelegram(string $message): void
+    {
+        if ($this->telegramCallback !== null) {
+            ($this->telegramCallback)($message);
+            return;
+        }
+
+        try {
+            if (Yii::$app && Yii::$app->has('queue')) {
+                Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([
+                    'message' => $message,
+                ]));
+            }
+        } catch (\Throwable $e) {
+            Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-history');
+        }
+    }
+}