--- /dev/null
+<?php
+
+declare(strict_types=1);
+
+namespace app\commands;
+
+use app\services\StockHistoryService;
+use yii\console\Controller;
+use yii\console\ExitCode;
+use Yii;
+
+/**
+ * ERP-33: Управление историческими срезами остатков.
+ *
+ * Crontab:
+ * 0 8,20 * * * cd /www && php yii stock-history/collect >> /var/log/stock-history.log 2>&1
+ * 0 1 1 * * cd /www && php yii stock-history/create-partition >> /var/log/stock-history.log 2>&1
+ * 0 2 1 * * cd /www && php yii stock-history/drop-old-partitions >> /var/log/stock-history.log 2>&1
+ */
+class StockHistoryController extends Controller
+{
+ /**
+ * @var string Время среза (08:00 или 20:00). Если пусто — авто-определение.
+ */
+ public string $time = '';
+
+ public function options($actionID): array
+ {
+ return array_merge(parent::options($actionID), ['time']);
+ }
+
+ /**
+ * Собрать срез остатков.
+ */
+ public function actionCollect(): int
+ {
+ $snapshotTime = $this->resolveTime();
+ $this->stdout("Stock History ETL: collecting snapshot for {$snapshotTime}...\n");
+
+ $service = $this->createService();
+
+ try {
+ $result = $service->collect($snapshotTime);
+
+ $this->stdout("Done: {$result->getRowCount()} rows inserted.\n");
+
+ $dq = $result->getDqResult();
+ if ($dq !== null && !$dq->allPassed()) {
+ $this->stderr("DQ failures detected:\n");
+ foreach ($dq->getAll() as $f) {
+ $this->stderr(" [{$f['level']}] {$f['code']}: {$f['message']}\n");
+ }
+ return ExitCode::OK; // ETL прошёл, но DQ имеет замечания
+ }
+
+ return ExitCode::OK;
+ } catch (\RuntimeException $e) {
+ $this->stderr("ERROR: {$e->getMessage()}\n");
+ return ExitCode::SOFTWARE;
+ } catch (\Throwable $e) {
+ $this->stderr("FATAL: {$e->getMessage()}\n");
+ Yii::error("Stock History ETL fatal: " . $e->getMessage(), 'stock-history');
+ return ExitCode::SOFTWARE;
+ }
+ }
+
+ /**
+ * Создать партицию на указанный месяц.
+ * @param string $month Формат "2026-03". По умолчанию — следующий месяц.
+ */
+ public function actionCreatePartition(string $month = ''): int
+ {
+ if (empty($month)) {
+ $month = date('Y-m', strtotime('+1 month'));
+ }
+
+ $this->stdout("Creating partition for {$month}...\n");
+
+ try {
+ $this->createService()->createPartition($month);
+ $this->stdout("Partition stock_history_{$month} created.\n");
+ return ExitCode::OK;
+ } catch (\Throwable $e) {
+ $this->stderr("ERROR: {$e->getMessage()}\n");
+ return ExitCode::SOFTWARE;
+ }
+ }
+
+ /**
+ * Удалить старые партиции.
+ * @param int $months Retention в месяцах (default 24).
+ */
+ public function actionDropOldPartitions(int $months = 24): int
+ {
+ $this->stdout("Dropping partitions older than {$months} months...\n");
+
+ try {
+ $this->createService()->dropOldPartitions($months);
+ $this->stdout("Done.\n");
+ return ExitCode::OK;
+ } catch (\Throwable $e) {
+ $this->stderr("ERROR: {$e->getMessage()}\n");
+ return ExitCode::SOFTWARE;
+ }
+ }
+
+ private function resolveTime(): string
+ {
+ if (!empty($this->time)) {
+ return $this->time;
+ }
+
+ $hour = (int)date('H');
+ return $hour < 14 ? '08:00' : '20:00';
+ }
+
+ private function createService(): StockHistoryService
+ {
+ return new StockHistoryService(Yii::$app->db);
+ }
+}
--- /dev/null
+<?php
+
+use yii\db\Migration;
+
+/**
+ * ERP-33: Создание partitioned таблицы stock_history для хранения
+ * исторических срезов остатков товаров по магазинам.
+ */
+class m260221_100000_create_stock_history_table extends Migration
+{
+ public function safeUp()
+ {
+ // Partitioned table — нельзя через createTable(), используем raw SQL
+ $this->execute("
+ CREATE TABLE {{%stock_history}} (
+ id BIGSERIAL,
+ snapshot_date DATE NOT NULL,
+ snapshot_time TIME NOT NULL,
+ store_id VARCHAR(36) NOT NULL,
+ store_name VARCHAR(255),
+ product_id VARCHAR(36) NOT NULL,
+ product_name VARCHAR(255),
+ articule VARCHAR(36),
+ father_id VARCHAR(36),
+ components JSONB,
+ quantity NUMERIC(12,2) NOT NULL DEFAULT 0,
+ reserv NUMERIC(12,2) NOT NULL DEFAULT 0,
+ created_at TIMESTAMP NOT NULL DEFAULT NOW(),
+ PRIMARY KEY (id, snapshot_date)
+ ) PARTITION BY RANGE (snapshot_date)
+ ");
+
+ // Партиция текущего месяца
+ $this->execute("
+ CREATE TABLE IF NOT EXISTS {{%stock_history_2026_02}}
+ PARTITION OF {{%stock_history}}
+ FOR VALUES FROM ('2026-02-01') TO ('2026-03-01')
+ ");
+
+ // Партиция следующего месяца
+ $this->execute("
+ CREATE TABLE IF NOT EXISTS {{%stock_history_2026_03}}
+ PARTITION OF {{%stock_history}}
+ FOR VALUES FROM ('2026-03-01') TO ('2026-04-01')
+ ");
+
+ // Уникальный индекс для idempotency (ON CONFLICT)
+ $this->execute("
+ CREATE UNIQUE INDEX idx_stock_history_unique
+ ON {{%stock_history}} (snapshot_date, snapshot_time, store_id, product_id)
+ ");
+
+ $this->createIndex('idx_stock_history_store', '{{%stock_history}}', 'store_id');
+ $this->createIndex('idx_stock_history_product', '{{%stock_history}}', 'product_id');
+ $this->createIndex('idx_stock_history_father', '{{%stock_history}}', 'father_id');
+ $this->createIndex('idx_stock_history_date_time', '{{%stock_history}}', ['snapshot_date', 'snapshot_time']);
+ $this->createIndex('idx_stock_history_father_date', '{{%stock_history}}', ['father_id', 'snapshot_date']);
+ }
+
+ public function safeDown()
+ {
+ $this->execute("DROP TABLE IF EXISTS {{%stock_history}} CASCADE");
+ }
+}
--- /dev/null
+<?php
+
+declare(strict_types=1);
+
+namespace app\records;
+
+use yii\db\ActiveRecord;
+
+/**
+ * ERP-33: Исторические срезы остатков товаров по магазинам.
+ *
+ * @property int $id
+ * @property string $snapshot_date
+ * @property string $snapshot_time
+ * @property string $store_id
+ * @property string|null $store_name
+ * @property string $product_id
+ * @property string|null $product_name
+ * @property string|null $articule
+ * @property string|null $father_id
+ * @property array|null $components
+ * @property float $quantity
+ * @property float $reserv
+ * @property string $created_at
+ *
+ * @property Products1c $store
+ * @property Products1c $product
+ */
+class StockHistory extends ActiveRecord
+{
+ public static function tableName(): string
+ {
+ return '{{%stock_history}}';
+ }
+
+ public function rules(): array
+ {
+ return [
+ [['snapshot_date', 'snapshot_time', 'store_id', 'product_id'], 'required'],
+ [['store_id', 'product_id', 'father_id'], 'string', 'max' => 36],
+ [['store_name', 'product_name'], 'string', 'max' => 255],
+ [['articule'], 'string', 'max' => 36],
+ [['quantity', 'reserv'], 'number', 'min' => 0],
+ [['quantity', 'reserv'], 'default', 'value' => 0],
+ [['snapshot_date'], 'date', 'format' => 'php:Y-m-d'],
+ [['snapshot_time'], 'string'],
+ [['components'], 'safe'],
+ ];
+ }
+
+ public function getStore(): \yii\db\ActiveQuery
+ {
+ return $this->hasOne(Products1c::class, ['id' => 'store_id']);
+ }
+
+ public function getProduct(): \yii\db\ActiveQuery
+ {
+ return $this->hasOne(Products1c::class, ['id' => 'product_id']);
+ }
+}
--- /dev/null
+<?php
+
+declare(strict_types=1);
+
+namespace app\services;
+
+use yii\db\Connection;
+use Yii;
+
+/**
+ * ERP-33: ETL-сервис для сбора исторических срезов остатков.
+ *
+ * Алгоритм collect():
+ * 1. Advisory lock (pg_try_advisory_lock)
+ * 2. SELECT balances LEFT JOIN products_1c
+ * 3. Batch INSERT ON CONFLICT DO UPDATE (1000 строк)
+ * 4. DQ assertions
+ * 5. Log + unlock
+ */
+class StockHistoryService
+{
+ private const ADVISORY_LOCK_KEY = 123456789;
+ private const BATCH_SIZE = 1000;
+ private const LOCK_TIMEOUT_SEC = 30;
+ private const DEVIATION_THRESHOLD = 0.2;
+ private const MAX_RETRY = 3;
+
+ private Connection $db;
+ /** @var callable|null */
+ private $telegramCallback;
+
+ public function __construct(Connection $db, ?callable $telegramCallback = null)
+ {
+ $this->db = $db;
+ $this->telegramCallback = $telegramCallback;
+ }
+
+ /**
+ * Основной метод сбора остатков.
+ */
+ public function collect(string $snapshotTime): CollectResult
+ {
+ $snapshotDate = date('Y-m-d');
+
+ // 1. Advisory lock
+ $this->acquireLock();
+
+ try {
+ // 2. SELECT данные
+ $rows = $this->fetchBalances();
+
+ // 3. Batch INSERT
+ $insertedCount = $this->batchInsert($rows, $snapshotDate, $snapshotTime);
+
+ // 4. DQ assertions
+ $dqResult = $this->runDqAssertions($snapshotDate, $snapshotTime);
+
+ // 5. Log
+ $this->logResult($snapshotDate, $snapshotTime, $insertedCount, $dqResult);
+
+ return new CollectResult(true, $insertedCount, $dqResult);
+ } finally {
+ $this->releaseLock();
+ }
+ }
+
+ /**
+ * DQ assertions после сбора.
+ */
+ public function runDqAssertions(string $date, string $time): DqResult
+ {
+ $failures = [];
+
+ // DQ-1: stores count >= active stores
+ $activeStores = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'")
+ ->queryScalar();
+
+ $snapshotStores = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
+ ->bindValues([':date' => $date, ':time' => $time])
+ ->queryScalar();
+
+ if ($snapshotStores < $activeStores) {
+ $failures[] = [
+ 'level' => 'CRITICAL',
+ 'code' => 'DQ-1',
+ 'message' => "Stores in snapshot ($snapshotStores) < active stores ($activeStores)",
+ ];
+ }
+
+ // DQ-2: no NULL in required fields
+ $nullCount = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)")
+ ->bindValues([':date' => $date, ':time' => $time])
+ ->queryScalar();
+
+ if ($nullCount > 0) {
+ $failures[] = [
+ 'level' => 'CRITICAL',
+ 'code' => 'DQ-2',
+ 'message' => "Found $nullCount records with NULL in required fields",
+ ];
+ }
+
+ // DQ-3: no negative quantity
+ $negativeQty = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND quantity < 0")
+ ->bindValues([':date' => $date, ':time' => $time])
+ ->queryScalar();
+
+ if ($negativeQty > 0) {
+ $failures[] = [
+ 'level' => 'MAJOR',
+ 'code' => 'DQ-3',
+ 'message' => "Found $negativeQty records with negative quantity",
+ ];
+ }
+
+ // DQ-4: reserv <= quantity
+ $reservOverQty = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND reserv > quantity")
+ ->bindValues([':date' => $date, ':time' => $time])
+ ->queryScalar();
+
+ if ($reservOverQty > 0) {
+ $failures[] = [
+ 'level' => 'WARNING',
+ 'code' => 'DQ-4',
+ 'message' => "Found $reservOverQty records where reserv > quantity",
+ ];
+ }
+
+ // DQ-5: row count deviation
+ $currentCount = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time")
+ ->bindValues([':date' => $date, ':time' => $time])
+ ->queryScalar();
+
+ $previousCount = (int)$this->db->createCommand()
+ ->setSql("SELECT COUNT(*) FROM stock_history WHERE (snapshot_date, snapshot_time) < (:date, :time) ORDER BY snapshot_date DESC, snapshot_time DESC LIMIT 1")
+ ->bindValues([':date' => $date, ':time' => $time])
+ ->queryScalar();
+
+ if ($previousCount > 0) {
+ $deviation = abs($currentCount - $previousCount) / $previousCount;
+ if ($deviation > self::DEVIATION_THRESHOLD) {
+ $pct = round($deviation * 100, 1);
+ $failures[] = [
+ 'level' => 'MAJOR',
+ 'code' => 'DQ-5',
+ 'message' => "Row count deviation {$pct}% (current: $currentCount, previous: $previousCount)",
+ ];
+ }
+ }
+
+ // DQ-6: non-empty snapshot
+ if ($currentCount === 0) {
+ $failures[] = [
+ 'level' => 'CRITICAL',
+ 'code' => 'DQ-6',
+ 'message' => "Empty snapshot: 0 records for $date $time",
+ ];
+ }
+
+ $dqResult = new DqResult($failures);
+
+ // Отправить алерты при CRITICAL/MAJOR
+ $this->sendAlerts($dqResult, $date, $time);
+
+ return $dqResult;
+ }
+
+ /**
+ * Создание партиции на указанный месяц.
+ */
+ public function createPartition(string $yearMonth): void
+ {
+ $start = $yearMonth . '-01';
+ $end = date('Y-m-d', strtotime($start . ' +1 month'));
+ $suffix = str_replace('-', '_', $yearMonth);
+
+ $this->db->createCommand()
+ ->setSql("CREATE TABLE IF NOT EXISTS stock_history_{$suffix} PARTITION OF stock_history FOR VALUES FROM ('{$start}') TO ('{$end}')")
+ ->execute();
+ }
+
+ /**
+ * Удаление партиций старше retention периода.
+ */
+ public function dropOldPartitions(int $retentionMonths = 24): void
+ {
+ $cutoffDate = date('Y-m-d', strtotime("-{$retentionMonths} months"));
+ $cutoffYm = substr($cutoffDate, 0, 7); // "2024-02"
+
+ $partitions = $this->db->createCommand()
+ ->setSql("SELECT tablename FROM pg_tables WHERE schemaname = 'erp24' AND tablename LIKE 'stock_history_%' ORDER BY tablename")
+ ->queryAll();
+
+ foreach ($partitions as $row) {
+ $name = $row['tablename'];
+ // Извлекаем дату из имени: stock_history_2024_01 → 2024-01
+ if (preg_match('/stock_history_(\d{4})_(\d{2})$/', $name, $m)) {
+ $partitionYm = $m[1] . '-' . $m[2];
+ if ($partitionYm < $cutoffYm) {
+ $this->db->createCommand()
+ ->setSql("DROP TABLE IF EXISTS {$name}")
+ ->execute();
+
+ Yii::info("Dropped old partition: {$name}", 'stock-history');
+ }
+ }
+ }
+ }
+
+ // =========================================================
+ // Private methods
+ // =========================================================
+
+ protected function getLockTimeoutSec(): int
+ {
+ return self::LOCK_TIMEOUT_SEC;
+ }
+
+ private function acquireLock(): void
+ {
+ $deadline = time() + $this->getLockTimeoutSec();
+
+ while (time() < $deadline) {
+ $locked = $this->db->createCommand()
+ ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")")
+ ->queryScalar();
+
+ if ($locked) {
+ return;
+ }
+
+ usleep(500000); // 0.5 sec
+ }
+
+ $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds';
+ Yii::error($msg, 'stock-history');
+ $this->sendTelegram("CRITICAL: Stock History ETL — $msg");
+
+ throw new \RuntimeException($msg);
+ }
+
+ private function releaseLock(): void
+ {
+ $this->db->createCommand()
+ ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")")
+ ->queryScalar();
+ }
+
+ private function fetchBalances(): array
+ {
+ return $this->db->createCommand()
+ ->setSql("
+ SELECT
+ b.store_id,
+ ps.name as store_name,
+ b.product_id,
+ pp.name as product_name,
+ pp.articule,
+ pp.parent_id as father_id,
+ pp.components,
+ b.quantity,
+ b.reserv
+ FROM balances b
+ LEFT JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store'
+ LEFT JOIN products_1c pp ON pp.id = b.product_id
+ WHERE ps.id IS NOT NULL
+ ORDER BY b.store_id, b.product_id
+ ")
+ ->queryAll();
+ }
+
+ private function batchInsert(array $rows, string $date, string $time): int
+ {
+ if (empty($rows)) {
+ return 0;
+ }
+
+ $total = 0;
+ $chunks = array_chunk($rows, self::BATCH_SIZE);
+
+ foreach ($chunks as $chunk) {
+ $total += $this->insertChunkWithRetry($chunk, $date, $time);
+ }
+
+ return $total;
+ }
+
+ private function insertChunkWithRetry(array $chunk, string $date, string $time): int
+ {
+ $attempt = 0;
+
+ while ($attempt < self::MAX_RETRY) {
+ try {
+ return $this->insertChunk($chunk, $date, $time);
+ } catch (\yii\db\Exception $e) {
+ $attempt++;
+ if ($attempt >= self::MAX_RETRY) {
+ throw $e;
+ }
+ // Exponential backoff: 1s, 2s, 4s
+ sleep((int)pow(2, $attempt - 1));
+ Yii::warning("Batch INSERT retry #{$attempt}: " . $e->getMessage(), 'stock-history');
+ }
+ }
+
+ return 0;
+ }
+
+ private function insertChunk(array $chunk, string $date, string $time): int
+ {
+ $values = [];
+ $params = [];
+ $i = 0;
+
+ foreach ($chunk as $row) {
+ $components = $row['components'] ?? null;
+ if ($components !== null && !is_string($components)) {
+ $components = json_encode($components);
+ }
+
+ $values[] = "(:date_{$i}, :time_{$i}, :sid_{$i}, :sn_{$i}, :pid_{$i}, :pn_{$i}, :art_{$i}, :fid_{$i}, :comp_{$i}::jsonb, :qty_{$i}, :res_{$i}, NOW())";
+ $params[":date_{$i}"] = $date;
+ $params[":time_{$i}"] = $time;
+ $params[":sid_{$i}"] = $row['store_id'];
+ $params[":sn_{$i}"] = $row['store_name'] ?? null;
+ $params[":pid_{$i}"] = $row['product_id'];
+ $params[":pn_{$i}"] = $row['product_name'] ?? null;
+ $params[":art_{$i}"] = $row['articule'] ?? null;
+ $params[":fid_{$i}"] = $row['father_id'] ?? null;
+ $params[":comp_{$i}"] = $components;
+ $params[":qty_{$i}"] = $row['quantity'] ?? 0;
+ $params[":res_{$i}"] = $row['reserv'] ?? 0;
+ $i++;
+ }
+
+ $sql = "INSERT INTO stock_history (snapshot_date, snapshot_time, store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, created_at)
+ VALUES " . implode(', ', $values) . "
+ ON CONFLICT (snapshot_date, snapshot_time, store_id, product_id)
+ DO UPDATE SET
+ quantity = EXCLUDED.quantity,
+ reserv = EXCLUDED.reserv,
+ store_name = EXCLUDED.store_name,
+ product_name = EXCLUDED.product_name,
+ articule = EXCLUDED.articule,
+ father_id = EXCLUDED.father_id,
+ components = EXCLUDED.components,
+ created_at = NOW()";
+
+ return $this->db->createCommand()
+ ->setSql($sql)
+ ->bindValues($params)
+ ->execute();
+ }
+
+ private function logResult(string $date, string $time, int $rowCount, DqResult $dqResult): void
+ {
+ $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES';
+ Yii::info("Stock History ETL [{$date} {$time}]: {$status}, rows={$rowCount}", 'stock-history');
+ }
+
+ private function sendAlerts(DqResult $dqResult, string $date, string $time): void
+ {
+ $criticals = $dqResult->getCriticalFailures();
+ $majors = $dqResult->getMajorFailures();
+
+ if (empty($criticals) && empty($majors)) {
+ return;
+ }
+
+ $messages = [];
+ foreach ($criticals as $f) {
+ $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}";
+ Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-history');
+ }
+ foreach ($majors as $f) {
+ $messages[] = "MAJOR [{$f['code']}]: {$f['message']}";
+ Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-history');
+ }
+
+ $text = "Stock History DQ [{$date} {$time}]:\n" . implode("\n", $messages);
+ $this->sendTelegram($text);
+ }
+
+ private function sendTelegram(string $message): void
+ {
+ if ($this->telegramCallback !== null) {
+ ($this->telegramCallback)($message);
+ return;
+ }
+
+ try {
+ if (Yii::$app && Yii::$app->has('queue')) {
+ Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([
+ 'message' => $message,
+ ]));
+ }
+ } catch (\Throwable $e) {
+ Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-history');
+ }
+ }
+}