From: Aleksey Filippov Date: Wed, 25 Feb 2026 14:45:52 +0000 (+0300) Subject: feat(ERP-33): add stock history ETL — migration, model, service, command X-Git-Url: https://gitweb.erp-flowers.ru/?a=commitdiff_plain;h=c6ee891e57247809800038842f87997f93fcef3a;p=erp24_rep%2Fyii-erp24%2F.git feat(ERP-33): add stock history ETL — migration, model, service, command - migration: partitioned table stock_history, уникальный индекс ON CONFLICT - records/StockHistory: ActiveRecord модель - services/StockHistoryService: ETL с advisory lock, batch INSERT 1000, DQ assertions (DQ-1..6), Telegram alerts - services/CollectResult, DqResult: DTO - commands/StockHistoryController: actionCollect, actionCreatePartition, actionDropOldPartitions Crontab (в комментарии контроллера): 0 8,20 * * * php yii stock-history/collect 0 1 1 * * php yii stock-history/create-partition 0 2 1 * * php yii stock-history/drop-old-partitions Co-Authored-By: Claude Sonnet 4.6 --- diff --git a/erp24/commands/StockHistoryController.php b/erp24/commands/StockHistoryController.php new file mode 100644 index 00000000..5f398644 --- /dev/null +++ b/erp24/commands/StockHistoryController.php @@ -0,0 +1,121 @@ +> /var/log/stock-history.log 2>&1 + * 0 1 1 * * cd /www && php yii stock-history/create-partition >> /var/log/stock-history.log 2>&1 + * 0 2 1 * * cd /www && php yii stock-history/drop-old-partitions >> /var/log/stock-history.log 2>&1 + */ +class StockHistoryController extends Controller +{ + /** + * @var string Время среза (08:00 или 20:00). Если пусто — авто-определение. + */ + public string $time = ''; + + public function options($actionID): array + { + return array_merge(parent::options($actionID), ['time']); + } + + /** + * Собрать срез остатков. + */ + public function actionCollect(): int + { + $snapshotTime = $this->resolveTime(); + $this->stdout("Stock History ETL: collecting snapshot for {$snapshotTime}...\n"); + + $service = $this->createService(); + + try { + $result = $service->collect($snapshotTime); + + $this->stdout("Done: {$result->getRowCount()} rows inserted.\n"); + + $dq = $result->getDqResult(); + if ($dq !== null && !$dq->allPassed()) { + $this->stderr("DQ failures detected:\n"); + foreach ($dq->getAll() as $f) { + $this->stderr(" [{$f['level']}] {$f['code']}: {$f['message']}\n"); + } + return ExitCode::OK; // ETL прошёл, но DQ имеет замечания + } + + return ExitCode::OK; + } catch (\RuntimeException $e) { + $this->stderr("ERROR: {$e->getMessage()}\n"); + return ExitCode::SOFTWARE; + } catch (\Throwable $e) { + $this->stderr("FATAL: {$e->getMessage()}\n"); + Yii::error("Stock History ETL fatal: " . $e->getMessage(), 'stock-history'); + return ExitCode::SOFTWARE; + } + } + + /** + * Создать партицию на указанный месяц. + * @param string $month Формат "2026-03". По умолчанию — следующий месяц. + */ + public function actionCreatePartition(string $month = ''): int + { + if (empty($month)) { + $month = date('Y-m', strtotime('+1 month')); + } + + $this->stdout("Creating partition for {$month}...\n"); + + try { + $this->createService()->createPartition($month); + $this->stdout("Partition stock_history_{$month} created.\n"); + return ExitCode::OK; + } catch (\Throwable $e) { + $this->stderr("ERROR: {$e->getMessage()}\n"); + return ExitCode::SOFTWARE; + } + } + + /** + * Удалить старые партиции. + * @param int $months Retention в месяцах (default 24). + */ + public function actionDropOldPartitions(int $months = 24): int + { + $this->stdout("Dropping partitions older than {$months} months...\n"); + + try { + $this->createService()->dropOldPartitions($months); + $this->stdout("Done.\n"); + return ExitCode::OK; + } catch (\Throwable $e) { + $this->stderr("ERROR: {$e->getMessage()}\n"); + return ExitCode::SOFTWARE; + } + } + + private function resolveTime(): string + { + if (!empty($this->time)) { + return $this->time; + } + + $hour = (int)date('H'); + return $hour < 14 ? '08:00' : '20:00'; + } + + private function createService(): StockHistoryService + { + return new StockHistoryService(Yii::$app->db); + } +} diff --git a/erp24/migrations/m260221_100000_create_stock_history_table.php b/erp24/migrations/m260221_100000_create_stock_history_table.php new file mode 100644 index 00000000..17d235bf --- /dev/null +++ b/erp24/migrations/m260221_100000_create_stock_history_table.php @@ -0,0 +1,64 @@ +execute(" + CREATE TABLE {{%stock_history}} ( + id BIGSERIAL, + snapshot_date DATE NOT NULL, + snapshot_time TIME NOT NULL, + store_id VARCHAR(36) NOT NULL, + store_name VARCHAR(255), + product_id VARCHAR(36) NOT NULL, + product_name VARCHAR(255), + articule VARCHAR(36), + father_id VARCHAR(36), + components JSONB, + quantity NUMERIC(12,2) NOT NULL DEFAULT 0, + reserv NUMERIC(12,2) NOT NULL DEFAULT 0, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + PRIMARY KEY (id, snapshot_date) + ) PARTITION BY RANGE (snapshot_date) + "); + + // Партиция текущего месяца + $this->execute(" + CREATE TABLE IF NOT EXISTS {{%stock_history_2026_02}} + PARTITION OF {{%stock_history}} + FOR VALUES FROM ('2026-02-01') TO ('2026-03-01') + "); + + // Партиция следующего месяца + $this->execute(" + CREATE TABLE IF NOT EXISTS {{%stock_history_2026_03}} + PARTITION OF {{%stock_history}} + FOR VALUES FROM ('2026-03-01') TO ('2026-04-01') + "); + + // Уникальный индекс для idempotency (ON CONFLICT) + $this->execute(" + CREATE UNIQUE INDEX idx_stock_history_unique + ON {{%stock_history}} (snapshot_date, snapshot_time, store_id, product_id) + "); + + $this->createIndex('idx_stock_history_store', '{{%stock_history}}', 'store_id'); + $this->createIndex('idx_stock_history_product', '{{%stock_history}}', 'product_id'); + $this->createIndex('idx_stock_history_father', '{{%stock_history}}', 'father_id'); + $this->createIndex('idx_stock_history_date_time', '{{%stock_history}}', ['snapshot_date', 'snapshot_time']); + $this->createIndex('idx_stock_history_father_date', '{{%stock_history}}', ['father_id', 'snapshot_date']); + } + + public function safeDown() + { + $this->execute("DROP TABLE IF EXISTS {{%stock_history}} CASCADE"); + } +} diff --git a/erp24/records/StockHistory.php b/erp24/records/StockHistory.php new file mode 100644 index 00000000..6fedd4ea --- /dev/null +++ b/erp24/records/StockHistory.php @@ -0,0 +1,60 @@ + 36], + [['store_name', 'product_name'], 'string', 'max' => 255], + [['articule'], 'string', 'max' => 36], + [['quantity', 'reserv'], 'number', 'min' => 0], + [['quantity', 'reserv'], 'default', 'value' => 0], + [['snapshot_date'], 'date', 'format' => 'php:Y-m-d'], + [['snapshot_time'], 'string'], + [['components'], 'safe'], + ]; + } + + public function getStore(): \yii\db\ActiveQuery + { + return $this->hasOne(Products1c::class, ['id' => 'store_id']); + } + + public function getProduct(): \yii\db\ActiveQuery + { + return $this->hasOne(Products1c::class, ['id' => 'product_id']); + } +} diff --git a/erp24/services/CollectResult.php b/erp24/services/CollectResult.php new file mode 100644 index 00000000..e0167894 --- /dev/null +++ b/erp24/services/CollectResult.php @@ -0,0 +1,37 @@ +success = $success; + $this->rowCount = $rowCount; + $this->dqResult = $dqResult; + } + + public function isSuccess(): bool + { + return $this->success; + } + + public function getRowCount(): int + { + return $this->rowCount; + } + + public function getDqResult(): ?DqResult + { + return $this->dqResult; + } +} diff --git a/erp24/services/DqResult.php b/erp24/services/DqResult.php new file mode 100644 index 00000000..6b023254 --- /dev/null +++ b/erp24/services/DqResult.php @@ -0,0 +1,43 @@ +failures = $failures; + } + + public function allPassed(): bool + { + return empty($this->failures); + } + + public function getCriticalFailures(): array + { + return array_values(array_filter($this->failures, fn($f) => $f['level'] === 'CRITICAL')); + } + + public function getMajorFailures(): array + { + return array_values(array_filter($this->failures, fn($f) => $f['level'] === 'MAJOR')); + } + + public function getWarnings(): array + { + return array_values(array_filter($this->failures, fn($f) => $f['level'] === 'WARNING')); + } + + public function getAll(): array + { + return $this->failures; + } +} diff --git a/erp24/services/StockHistoryService.php b/erp24/services/StockHistoryService.php new file mode 100644 index 00000000..6e5b9dde --- /dev/null +++ b/erp24/services/StockHistoryService.php @@ -0,0 +1,407 @@ +db = $db; + $this->telegramCallback = $telegramCallback; + } + + /** + * Основной метод сбора остатков. + */ + public function collect(string $snapshotTime): CollectResult + { + $snapshotDate = date('Y-m-d'); + + // 1. Advisory lock + $this->acquireLock(); + + try { + // 2. SELECT данные + $rows = $this->fetchBalances(); + + // 3. Batch INSERT + $insertedCount = $this->batchInsert($rows, $snapshotDate, $snapshotTime); + + // 4. DQ assertions + $dqResult = $this->runDqAssertions($snapshotDate, $snapshotTime); + + // 5. Log + $this->logResult($snapshotDate, $snapshotTime, $insertedCount, $dqResult); + + return new CollectResult(true, $insertedCount, $dqResult); + } finally { + $this->releaseLock(); + } + } + + /** + * DQ assertions после сбора. + */ + public function runDqAssertions(string $date, string $time): DqResult + { + $failures = []; + + // DQ-1: stores count >= active stores + $activeStores = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM products_1c WHERE tip = 'city_store'") + ->queryScalar(); + + $snapshotStores = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(DISTINCT store_id) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time") + ->bindValues([':date' => $date, ':time' => $time]) + ->queryScalar(); + + if ($snapshotStores < $activeStores) { + $failures[] = [ + 'level' => 'CRITICAL', + 'code' => 'DQ-1', + 'message' => "Stores in snapshot ($snapshotStores) < active stores ($activeStores)", + ]; + } + + // DQ-2: no NULL in required fields + $nullCount = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND (store_id IS NULL OR product_id IS NULL OR quantity IS NULL)") + ->bindValues([':date' => $date, ':time' => $time]) + ->queryScalar(); + + if ($nullCount > 0) { + $failures[] = [ + 'level' => 'CRITICAL', + 'code' => 'DQ-2', + 'message' => "Found $nullCount records with NULL in required fields", + ]; + } + + // DQ-3: no negative quantity + $negativeQty = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND quantity < 0") + ->bindValues([':date' => $date, ':time' => $time]) + ->queryScalar(); + + if ($negativeQty > 0) { + $failures[] = [ + 'level' => 'MAJOR', + 'code' => 'DQ-3', + 'message' => "Found $negativeQty records with negative quantity", + ]; + } + + // DQ-4: reserv <= quantity + $reservOverQty = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time AND reserv > quantity") + ->bindValues([':date' => $date, ':time' => $time]) + ->queryScalar(); + + if ($reservOverQty > 0) { + $failures[] = [ + 'level' => 'WARNING', + 'code' => 'DQ-4', + 'message' => "Found $reservOverQty records where reserv > quantity", + ]; + } + + // DQ-5: row count deviation + $currentCount = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_history WHERE snapshot_date = :date AND snapshot_time = :time") + ->bindValues([':date' => $date, ':time' => $time]) + ->queryScalar(); + + $previousCount = (int)$this->db->createCommand() + ->setSql("SELECT COUNT(*) FROM stock_history WHERE (snapshot_date, snapshot_time) < (:date, :time) ORDER BY snapshot_date DESC, snapshot_time DESC LIMIT 1") + ->bindValues([':date' => $date, ':time' => $time]) + ->queryScalar(); + + if ($previousCount > 0) { + $deviation = abs($currentCount - $previousCount) / $previousCount; + if ($deviation > self::DEVIATION_THRESHOLD) { + $pct = round($deviation * 100, 1); + $failures[] = [ + 'level' => 'MAJOR', + 'code' => 'DQ-5', + 'message' => "Row count deviation {$pct}% (current: $currentCount, previous: $previousCount)", + ]; + } + } + + // DQ-6: non-empty snapshot + if ($currentCount === 0) { + $failures[] = [ + 'level' => 'CRITICAL', + 'code' => 'DQ-6', + 'message' => "Empty snapshot: 0 records for $date $time", + ]; + } + + $dqResult = new DqResult($failures); + + // Отправить алерты при CRITICAL/MAJOR + $this->sendAlerts($dqResult, $date, $time); + + return $dqResult; + } + + /** + * Создание партиции на указанный месяц. + */ + public function createPartition(string $yearMonth): void + { + $start = $yearMonth . '-01'; + $end = date('Y-m-d', strtotime($start . ' +1 month')); + $suffix = str_replace('-', '_', $yearMonth); + + $this->db->createCommand() + ->setSql("CREATE TABLE IF NOT EXISTS stock_history_{$suffix} PARTITION OF stock_history FOR VALUES FROM ('{$start}') TO ('{$end}')") + ->execute(); + } + + /** + * Удаление партиций старше retention периода. + */ + public function dropOldPartitions(int $retentionMonths = 24): void + { + $cutoffDate = date('Y-m-d', strtotime("-{$retentionMonths} months")); + $cutoffYm = substr($cutoffDate, 0, 7); // "2024-02" + + $partitions = $this->db->createCommand() + ->setSql("SELECT tablename FROM pg_tables WHERE schemaname = 'erp24' AND tablename LIKE 'stock_history_%' ORDER BY tablename") + ->queryAll(); + + foreach ($partitions as $row) { + $name = $row['tablename']; + // Извлекаем дату из имени: stock_history_2024_01 → 2024-01 + if (preg_match('/stock_history_(\d{4})_(\d{2})$/', $name, $m)) { + $partitionYm = $m[1] . '-' . $m[2]; + if ($partitionYm < $cutoffYm) { + $this->db->createCommand() + ->setSql("DROP TABLE IF EXISTS {$name}") + ->execute(); + + Yii::info("Dropped old partition: {$name}", 'stock-history'); + } + } + } + } + + // ========================================================= + // Private methods + // ========================================================= + + protected function getLockTimeoutSec(): int + { + return self::LOCK_TIMEOUT_SEC; + } + + private function acquireLock(): void + { + $deadline = time() + $this->getLockTimeoutSec(); + + while (time() < $deadline) { + $locked = $this->db->createCommand() + ->setSql("SELECT pg_try_advisory_lock(" . self::ADVISORY_LOCK_KEY . ")") + ->queryScalar(); + + if ($locked) { + return; + } + + usleep(500000); // 0.5 sec + } + + $msg = 'Failed to acquire advisory lock within ' . $this->getLockTimeoutSec() . ' seconds'; + Yii::error($msg, 'stock-history'); + $this->sendTelegram("CRITICAL: Stock History ETL — $msg"); + + throw new \RuntimeException($msg); + } + + private function releaseLock(): void + { + $this->db->createCommand() + ->setSql("SELECT pg_advisory_unlock(" . self::ADVISORY_LOCK_KEY . ")") + ->queryScalar(); + } + + private function fetchBalances(): array + { + return $this->db->createCommand() + ->setSql(" + SELECT + b.store_id, + ps.name as store_name, + b.product_id, + pp.name as product_name, + pp.articule, + pp.parent_id as father_id, + pp.components, + b.quantity, + b.reserv + FROM balances b + LEFT JOIN products_1c ps ON ps.id = b.store_id AND ps.tip = 'city_store' + LEFT JOIN products_1c pp ON pp.id = b.product_id + WHERE ps.id IS NOT NULL + ORDER BY b.store_id, b.product_id + ") + ->queryAll(); + } + + private function batchInsert(array $rows, string $date, string $time): int + { + if (empty($rows)) { + return 0; + } + + $total = 0; + $chunks = array_chunk($rows, self::BATCH_SIZE); + + foreach ($chunks as $chunk) { + $total += $this->insertChunkWithRetry($chunk, $date, $time); + } + + return $total; + } + + private function insertChunkWithRetry(array $chunk, string $date, string $time): int + { + $attempt = 0; + + while ($attempt < self::MAX_RETRY) { + try { + return $this->insertChunk($chunk, $date, $time); + } catch (\yii\db\Exception $e) { + $attempt++; + if ($attempt >= self::MAX_RETRY) { + throw $e; + } + // Exponential backoff: 1s, 2s, 4s + sleep((int)pow(2, $attempt - 1)); + Yii::warning("Batch INSERT retry #{$attempt}: " . $e->getMessage(), 'stock-history'); + } + } + + return 0; + } + + private function insertChunk(array $chunk, string $date, string $time): int + { + $values = []; + $params = []; + $i = 0; + + foreach ($chunk as $row) { + $components = $row['components'] ?? null; + if ($components !== null && !is_string($components)) { + $components = json_encode($components); + } + + $values[] = "(:date_{$i}, :time_{$i}, :sid_{$i}, :sn_{$i}, :pid_{$i}, :pn_{$i}, :art_{$i}, :fid_{$i}, :comp_{$i}::jsonb, :qty_{$i}, :res_{$i}, NOW())"; + $params[":date_{$i}"] = $date; + $params[":time_{$i}"] = $time; + $params[":sid_{$i}"] = $row['store_id']; + $params[":sn_{$i}"] = $row['store_name'] ?? null; + $params[":pid_{$i}"] = $row['product_id']; + $params[":pn_{$i}"] = $row['product_name'] ?? null; + $params[":art_{$i}"] = $row['articule'] ?? null; + $params[":fid_{$i}"] = $row['father_id'] ?? null; + $params[":comp_{$i}"] = $components; + $params[":qty_{$i}"] = $row['quantity'] ?? 0; + $params[":res_{$i}"] = $row['reserv'] ?? 0; + $i++; + } + + $sql = "INSERT INTO stock_history (snapshot_date, snapshot_time, store_id, store_name, product_id, product_name, articule, father_id, components, quantity, reserv, created_at) + VALUES " . implode(', ', $values) . " + ON CONFLICT (snapshot_date, snapshot_time, store_id, product_id) + DO UPDATE SET + quantity = EXCLUDED.quantity, + reserv = EXCLUDED.reserv, + store_name = EXCLUDED.store_name, + product_name = EXCLUDED.product_name, + articule = EXCLUDED.articule, + father_id = EXCLUDED.father_id, + components = EXCLUDED.components, + created_at = NOW()"; + + return $this->db->createCommand() + ->setSql($sql) + ->bindValues($params) + ->execute(); + } + + private function logResult(string $date, string $time, int $rowCount, DqResult $dqResult): void + { + $status = $dqResult->allPassed() ? 'SUCCESS' : 'DQ_FAILURES'; + Yii::info("Stock History ETL [{$date} {$time}]: {$status}, rows={$rowCount}", 'stock-history'); + } + + private function sendAlerts(DqResult $dqResult, string $date, string $time): void + { + $criticals = $dqResult->getCriticalFailures(); + $majors = $dqResult->getMajorFailures(); + + if (empty($criticals) && empty($majors)) { + return; + } + + $messages = []; + foreach ($criticals as $f) { + $messages[] = "CRITICAL [{$f['code']}]: {$f['message']}"; + Yii::error("DQ {$f['code']}: {$f['message']}", 'stock-history'); + } + foreach ($majors as $f) { + $messages[] = "MAJOR [{$f['code']}]: {$f['message']}"; + Yii::warning("DQ {$f['code']}: {$f['message']}", 'stock-history'); + } + + $text = "Stock History DQ [{$date} {$time}]:\n" . implode("\n", $messages); + $this->sendTelegram($text); + } + + private function sendTelegram(string $message): void + { + if ($this->telegramCallback !== null) { + ($this->telegramCallback)($message); + return; + } + + try { + if (Yii::$app && Yii::$app->has('queue')) { + Yii::$app->queue->push(new \app\jobs\SendTelegramMessageJob([ + 'message' => $message, + ])); + } + } catch (\Throwable $e) { + Yii::error("Failed to send Telegram alert: " . $e->getMessage(), 'stock-history'); + } + } +}