From: Aleksey Filippov Date: Fri, 23 Jan 2026 18:06:30 +0000 (+0300) Subject: [ERP-40-j] Сохранение исторических данных каталога и привязки к фокусным группам. X-Git-Url: https://gitweb.erp-flowers.ru/?a=commitdiff_plain;h=41fb2b6ccc47294fe25cbdfc70f7713fca153ef5;p=erp24_rep%2Fyii-erp24%2F.git [ERP-40-j] Сохранение исторических данных каталога и привязки к фокусным группам. --- diff --git a/erp24/.gitignore b/erp24/.gitignore index eca74311..b54a79ca 100644 --- a/erp24/.gitignore +++ b/erp24/.gitignore @@ -47,4 +47,5 @@ tests/_support/_generated /widgets/app.log /web/dist-prod /pgsql_last.sql -/media/notification/* \ No newline at end of file +/media/notification/* +/api1/log/xml/ diff --git a/erp24/commands/Products1cClassDynamicController.php b/erp24/commands/Products1cClassDynamicController.php new file mode 100644 index 00000000..bed4400c --- /dev/null +++ b/erp24/commands/Products1cClassDynamicController.php @@ -0,0 +1,408 @@ + BATCH_LOADING_THRESHOLD, возвращается пустой массив + echo "Загрузка активных записей...\n"; + $activeRecords = $this->loadActiveRecordsIndexed(); + $useBatchLookup = !empty($activeRecords); + + if ($useBatchLookup) { + echo "Загружено активных записей: " . count($activeRecords) . " (batch mode)\n\n"; + } + + // Chunked processing: обрабатываем исходные данные порциями + $total = $this->getSourceDataCount(); + echo "Найдено товаров для синхронизации: {$total}\n\n"; + + $processed = 0; + + foreach ($this->getSourceDataChunked(self::CHUNK_SIZE) as $productData) { + $processed++; + + // Рассчитываем хэш новых данных + $newHash = Products1cClassDynamic::calculateHash($productData); + + // Получаем активную запись из кэша или БД + if ($useBatchLookup) { + $activeRecord = $activeRecords[$productData['product_id']] ?? null; + } else { + // Per-record lookup (fallback при большом количестве записей) + $activeRecord = Products1cClassDynamic::getActiveByProductId($productData['product_id']); + } + + if ($activeRecord === null) { + // Новый товар — создаём запись (changes = null для первичной загрузки) + $result = $this->createNewRecord($productData); + if ($result === true) { + $created++; + } else { + $errors++; + echo "Ошибка создания записи для product_id={$productData['product_id']}: {$result}\n"; + } + } elseif ($activeRecord->hash !== $newHash) { + // Хэш изменился — обновляем с защитой от race condition + $result = $this->updateRecordWithLock($activeRecord, $productData); + if ($result === true) { + $updated++; + // Удаляем из кэша старую запись (она закрыта) + if ($useBatchLookup) { + unset($activeRecords[$productData['product_id']]); + } + } else { + $errors++; + echo "Ошибка обновления записи для product_id={$productData['product_id']}: {$result}\n"; + } + } else { + // Хэш совпадает — данные не изменились + $unchanged++; + } + + // Прогресс каждые PROGRESS_INTERVAL записей + if ($processed % self::PROGRESS_INTERVAL === 0) { + echo "Обработано: {$processed}/{$total}\n"; + } + } + + $endTime = microtime(true); + $duration = round($endTime - $startTime, 2); + + echo "\n"; + echo "=================================\n"; + echo "Синхронизация завершена\n"; + echo "=================================\n"; + echo "Создано новых записей: {$created}\n"; + echo "Обновлено записей: {$updated}\n"; + echo "Без изменений: {$unchanged}\n"; + echo "Ошибок: {$errors}\n"; + echo "Время выполнения: {$duration} сек.\n"; + echo "Окончание: " . date('Y-m-d H:i:s') . "\n"; + + return $errors > 0 ? ExitCode::UNSPECIFIED_ERROR : ExitCode::OK; + } + + /** + * Загрузка всех активных записей, индексированных по product_id + * + * Устраняет N+1 проблему: один запрос вместо N + * + * При количестве записей > BATCH_LOADING_THRESHOLD возвращает пустой массив, + * что переключает обработку на per-record lookup (предотвращение OOM). + * + * @return Products1cClassDynamic[] Массив с ключами = product_id + */ + private function loadActiveRecordsIndexed(): array + { + // Проверка количества записей перед загрузкой для предотвращения OOM + $count = Products1cClassDynamic::find() + ->where(['active' => Products1cClassDynamic::ACTIVE]) + ->count(); + + if ($count > self::BATCH_LOADING_THRESHOLD) { + echo "ВНИМАНИЕ: {$count} активных записей превышает порог " . self::BATCH_LOADING_THRESHOLD . ".\n"; + echo "Используется per-record lookup для экономии памяти.\n\n"; + return []; + } + + return Products1cClassDynamic::find() + ->where(['active' => Products1cClassDynamic::ACTIVE]) + ->indexBy('product_id') + ->all(); + } + + /** + * Создание новой записи + * + * @param array $productData Данные товара + * @return true|string true при успехе, сообщение об ошибке при неудаче + */ + private function createNewRecord(array $productData) + { + $newRecord = Products1cClassDynamic::createFromData($productData, null); + + if ($newRecord->save()) { + return true; + } + + return implode(', ', $newRecord->getFirstErrors()); + } + + /** + * Обновление записи с защитой от race condition + * + * Использует SELECT FOR UPDATE для блокировки строки до конца транзакции. + * Это гарантирует, что два конкурентных процесса не создадут дублирующих записей. + * + * При PostgreSQL serialization_failure (40001) выполняет retry с exponential backoff. + * + * @param Products1cClassDynamic $cachedRecord Запись из кэша (может быть устаревшей) + * @param array $productData Новые данные товара + * @return true|string true при успехе, сообщение об ошибке при неудаче + */ + private function updateRecordWithLock(Products1cClassDynamic $cachedRecord, array $productData) + { + $retries = 0; + + while ($retries <= self::MAX_RETRIES) { + $transaction = Yii::$app->db->beginTransaction(Transaction::SERIALIZABLE); + + try { + // SELECT FOR UPDATE: блокируем строку, перечитываем актуальные данные + $activeRecord = Products1cClassDynamic::find() + ->where([ + 'product_id' => $productData['product_id'], + 'active' => Products1cClassDynamic::ACTIVE, + ]) + ->forUpdate() + ->one(); + + // Проверяем, не обработал ли кто-то эту запись параллельно + if ($activeRecord === null) { + // Запись уже была обновлена другим процессом — пропускаем + $transaction->commit(); + return true; + } + + // Пересчитываем хэш (на случай если данные изменились) + $newHash = Products1cClassDynamic::calculateHash($productData); + if ($activeRecord->hash === $newHash) { + // Данные уже актуальны — пропускаем + $transaction->commit(); + return true; + } + + // Определяем какие поля изменились + $changes = $activeRecord->detectChanges($productData); + + // Закрываем старую запись + $activeRecord->disableRecord(); + if (!$activeRecord->save()) { + throw new \Exception('Ошибка закрытия записи: ' . implode(', ', $activeRecord->getFirstErrors())); + } + + // Создаём новую запись с указанием изменённых полей + $newRecord = Products1cClassDynamic::createFromData($productData, $changes); + if (!$newRecord->save()) { + throw new \Exception('Ошибка создания записи: ' . implode(', ', $newRecord->getFirstErrors())); + } + + $transaction->commit(); + return true; + } catch (\yii\db\Exception $e) { + $transaction->rollBack(); + + // PostgreSQL serialization_failure = SQLSTATE 40001 + $sqlState = $e->errorInfo[0] ?? null; + if ($sqlState === '40001' && $retries < self::MAX_RETRIES) { + $retries++; + // Exponential backoff: 100ms, 200ms, 400ms + $delay = self::RETRY_BASE_DELAY_MS * (2 ** ($retries - 1)); + usleep($delay * 1000); + continue; + } + + return $e->getMessage(); + } catch (\Exception $e) { + $transaction->rollBack(); + return $e->getMessage(); + } + } + + return 'Max retries exceeded for serialization conflict'; + } + + /** + * Показать статус таблицы products_1c_class_dynamic + * + * @return int ExitCode + */ + public function actionStatus(): int + { + echo "Статус таблицы products_1c_class_dynamic\n"; + echo "========================================\n\n"; + + // Общее количество записей + $totalRecords = Products1cClassDynamic::find()->count(); + echo "Всего записей: {$totalRecords}\n"; + + // Активные записи + $activeRecords = Products1cClassDynamic::find() + ->where(['active' => Products1cClassDynamic::ACTIVE]) + ->count(); + echo "Активных записей: {$activeRecords}\n"; + + // Исторические записи + $historicalRecords = Products1cClassDynamic::find() + ->where(['active' => Products1cClassDynamic::NOT_ACTIVE]) + ->count(); + echo "Исторических записей: {$historicalRecords}\n\n"; + + // Статистика по class_tip + echo "Распределение по классам (активные):\n"; + $classTipStats = Products1cClassDynamic::find() + ->select(['class_tip', 'COUNT(*) as cnt']) + ->where(['active' => Products1cClassDynamic::ACTIVE]) + ->groupBy('class_tip') + ->asArray() + ->all(); + + foreach ($classTipStats as $stat) { + $classTip = $stat['class_tip'] ?? 'NULL'; + echo " {$classTip}: {$stat['cnt']}\n"; + } + + echo "\n"; + + // Статистика по tip + echo "Распределение по типам (активные):\n"; + $tipStats = Products1cClassDynamic::find() + ->select(['tip', 'COUNT(*) as cnt']) + ->where(['active' => Products1cClassDynamic::ACTIVE]) + ->groupBy('tip') + ->asArray() + ->all(); + + foreach ($tipStats as $stat) { + echo " {$stat['tip']}: {$stat['cnt']}\n"; + } + + return ExitCode::OK; + } + + /** + * Получение количества исходных данных для синхронизации + * + * @return int + */ + private function getSourceDataCount(): int + { + $sql = " + SELECT COUNT(*) + FROM products_1c p + WHERE p.tip IN (:tip_products, :tip_products_group) + "; + + return (int) Yii::$app->db->createCommand($sql, [ + ':tip_products' => Products1c::TYPE_PRODUCTS, + ':tip_products_group' => Products1c::TYPE_PRODUCTS_GROUP, + ])->queryScalar(); + } + + /** + * Получение исходных данных для синхронизации порциями (chunked) + * + * Использует генератор для экономии памяти. + * Вместо загрузки всех записей в память — читаем по $chunkSize за раз. + * + * @param int $chunkSize Размер порции + * @return \Generator + */ + private function getSourceDataChunked(int $chunkSize): \Generator + { + $offset = 0; + + while (true) { + $sql = " + SELECT + p.id as product_id, + p.parent_id, + p.tip, + p.code, + p.name, + p.articule, + p.view, + p.components, + p.type, + pc.category_id as class_category_id, + pc.tip as class_tip + FROM + products_1c p + LEFT JOIN + products_class pc ON p.parent_id = pc.category_id + WHERE + p.tip IN (:tip_products, :tip_products_group) + ORDER BY + p.id + LIMIT :limit OFFSET :offset + "; + + $rows = Yii::$app->db->createCommand($sql, [ + ':tip_products' => Products1c::TYPE_PRODUCTS, + ':tip_products_group' => Products1c::TYPE_PRODUCTS_GROUP, + ':limit' => $chunkSize, + ':offset' => $offset, + ])->queryAll(); + + if (empty($rows)) { + break; + } + + foreach ($rows as $row) { + yield $row; + } + + $offset += $chunkSize; + + // Освобождаем память после каждого чанка + if (function_exists('gc_collect_cycles')) { + gc_collect_cycles(); + } + } + } +} diff --git a/erp24/migrations/m250123_100000_create_table_products_1c_class_dynamic.php b/erp24/migrations/m250123_100000_create_table_products_1c_class_dynamic.php new file mode 100644 index 00000000..b6922555 --- /dev/null +++ b/erp24/migrations/m250123_100000_create_table_products_1c_class_dynamic.php @@ -0,0 +1,123 @@ +db->getTableSchema(self::TABLE_NAME); + + if (!isset($tableSchema)) { + $this->createTable(self::TABLE_NAME, [ + // Первичный ключ + 'id' => $this->bigPrimaryKey(), + + // Поля из products_1c + 'product_id' => $this->string(36)->notNull()->comment('GUID товара из products_1c.id'), + 'parent_id' => $this->string(36)->null()->comment('UUID родительской группы'), + 'tip' => $this->string(25)->notNull()->comment('Тип: products или products_group'), + 'code' => $this->string(36)->notNull()->comment('Код товара из 1С'), + 'name' => $this->string(255)->notNull()->comment('Наименование товара/группы'), + 'articule' => $this->string(36)->null()->comment('Артикул товара'), + 'view' => $this->integer()->null()->comment('Видимость: 1=видим, 0=скрыт'), + 'components' => $this->text()->null()->comment('Компоненты товара JSON'), + 'type' => $this->string(255)->null()->comment('Дополнительный тип'), + + // Поля из products_class + 'class_category_id' => $this->string(36)->null()->comment('GUID категории из products_class'), + 'class_tip' => $this->string(25)->null()->comment('Класс товара: wrap, potted, matrix и др.'), + + // Поля активности (SCD Type 2) + 'date_from' => $this->date()->notNull()->comment('Дата начала активности'), + 'date_to' => $this->date()->defaultValue('2100-01-01')->notNull()->comment('Дата окончания активности'), + 'active' => $this->smallInteger()->defaultValue(1)->notNull()->comment('Статус: 1=активна, 0=закрыта'), + + // Хэш и изменения + 'hash' => $this->string(32)->notNull()->comment('MD5-хэш ключевых полей'), + 'changes' => $this->text()->null()->comment('JSON-список изменённых колонок'), + + // Аудит + 'created_at' => $this->dateTime()->notNull()->defaultExpression('NOW()')->comment('Дата создания записи'), + 'updated_at' => $this->dateTime()->null()->comment('Дата обновления записи'), + ]); + + // Индексы + $this->createIndex( + 'idx_p1c_class_dyn_product_id', + self::TABLE_NAME, + 'product_id' + ); + + $this->createIndex( + 'idx_p1c_class_dyn_active', + self::TABLE_NAME, + 'active' + ); + + $this->createIndex( + 'idx_p1c_class_dyn_dates', + self::TABLE_NAME, + ['date_from', 'date_to'] + ); + + $this->createIndex( + 'idx_p1c_class_dyn_class_tip', + self::TABLE_NAME, + 'class_tip' + ); + + $this->createIndex( + 'idx_p1c_class_dyn_product_active', + self::TABLE_NAME, + ['product_id', 'active'] + ); + + $this->createIndex( + 'idx_p1c_class_dyn_parent_id', + self::TABLE_NAME, + 'parent_id' + ); + + // Covering index для исторических запросов по датам + // Оптимизирует: WHERE product_id = ? AND date_from <= ? AND date_to >= ? + $this->createIndex( + 'idx_p1c_class_dyn_product_dates', + self::TABLE_NAME, + ['product_id', 'date_from', 'date_to'] + ); + + // Partial unique index: только одна активная запись на товар + $this->execute(" + CREATE UNIQUE INDEX idx_p1c_class_dyn_product_active_unique + ON " . self::TABLE_NAME . " (product_id) + WHERE active = 1 + "); + + // Комментарий к таблице + $this->execute("COMMENT ON TABLE " . self::TABLE_NAME . " IS 'Версионированный каталог товаров с классификацией (SCD Type 2)'"); + } + } + + /** + * {@inheritdoc} + */ + public function safeDown() + { + $tableSchema = $this->db->getTableSchema(self::TABLE_NAME); + if (isset($tableSchema)) { + $this->dropTable(self::TABLE_NAME); + } + } +} diff --git a/erp24/records/Products1cClassDynamic.php b/erp24/records/Products1cClassDynamic.php new file mode 100644 index 00000000..ddb4d9c9 --- /dev/null +++ b/erp24/records/Products1cClassDynamic.php @@ -0,0 +1,383 @@ + [ + 'class' => TimestampBehavior::class, + 'createdAtAttribute' => 'created_at', + 'updatedAtAttribute' => 'updated_at', + 'value' => new Expression('NOW()'), + ], + ]; + } + + /** + * {@inheritdoc} + */ + public function rules(): array + { + return [ + [['product_id', 'tip', 'code', 'name', 'date_from', 'hash'], 'required'], + [['view', 'active'], 'integer'], + [['components', 'changes'], 'string'], + [['date_from', 'date_to', 'created_at', 'updated_at'], 'safe'], + [['product_id', 'parent_id', 'code', 'articule', 'class_category_id'], 'string', 'max' => 36], + [['tip', 'class_tip'], 'string', 'max' => 25], + [['name', 'type'], 'string', 'max' => 255], + [['hash'], 'string', 'max' => 32], + [['active'], 'default', 'value' => self::ACTIVE], + [['date_to'], 'default', 'value' => self::DATE_INFINITY], + ]; + } + + /** + * {@inheritdoc} + */ + public function attributeLabels(): array + { + return [ + 'id' => 'ID', + 'product_id' => 'GUID товара', + 'parent_id' => 'Родительская группа', + 'tip' => 'Тип записи', + 'code' => 'Код 1С', + 'name' => 'Наименование', + 'articule' => 'Артикул', + 'view' => 'Видимость', + 'components' => 'Компоненты', + 'type' => 'Тип', + 'class_category_id' => 'Категория класса', + 'class_tip' => 'Класс товара', + 'date_from' => 'Дата начала', + 'date_to' => 'Дата окончания', + 'active' => 'Активность', + 'hash' => 'Хэш данных', + 'changes' => 'Изменённые поля', + 'created_at' => 'Дата создания', + 'updated_at' => 'Дата обновления', + ]; + } + + /** + * Инициализация новой активной записи + * + * Устанавливает: + * - date_from = сегодня (первичная) или завтра (замена) + * - date_to = 2100-01-01 (бесконечность) + * - active = 1 + * + * @param bool $isReplacement true если это замена существующей записи + * @return self + */ + public function initActiveRecord(bool $isReplacement = false): self + { + // Первичная загрузка — сегодня + // Замена — завтра (чтобы не пересекаться с закрытой записью) + $this->date_from = $isReplacement + ? date('Y-m-d', strtotime('+1 day')) + : date('Y-m-d'); + $this->date_to = self::DATE_INFINITY; + $this->active = self::ACTIVE; + + return $this; + } + + /** + * Закрытие текущей активной записи + * + * Устанавливает: + * - date_to = текущая дата + * - active = 0 + * + * @return self + */ + public function disableRecord(): self + { + $this->date_to = date('Y-m-d'); + $this->active = self::NOT_ACTIVE; + + return $this; + } + + /** + * Получение записи товара + * + * Если $date = null — возвращает текущую активную запись (по флагу active) + * Если $date указана — возвращает запись, актуальную на эту дату (по диапазону дат) + * + * @param string $productId GUID товара + * @param string|null $date Дата для исторического запроса (null = текущая запись) + * @return self|null + */ + public static function getActiveByProductId(string $productId, ?string $date = null): ?self + { + $query = self::find()->where(['product_id' => $productId]); + + if ($date === null) { + // Текущая запись — только по флагу active + $query->andWhere(['active' => self::ACTIVE]); + } else { + // Историческая запись на дату — только по диапазону дат + // Интервал [date_from, date_to] — оба конца включены + $query->andWhere(['<=', 'date_from', $date]) + ->andWhere(['>=', 'date_to', $date]); + } + + return $query->one(); + } + + /** + * Получение всех активных записей на дату + * + * @param string|null $date Дата (по умолчанию текущая) + * @return self[] + */ + public static function getAllActive(?string $date = null): array + { + if ($date === null) { + $date = date('Y-m-d'); + } + + return self::find() + ->where(['active' => self::ACTIVE]) + ->andWhere(['<=', 'date_from', $date]) + ->andWhere(['>=', 'date_to', $date]) // Интервал [date_from, date_to] — оба конца включены + ->all(); + } + + /** + * Проверка наличия изменений товара в периоде + * + * @param string $productId GUID товара + * @param string $dateFrom Начало периода + * @param string $dateTo Конец периода + * @return bool + */ + public static function hasChangesInPeriod(string $productId, string $dateFrom, string $dateTo): bool + { + return self::find() + ->where(['product_id' => $productId]) + ->andWhere(['or', + ['and', + ['>=', 'date_from', $dateFrom], + ['<=', 'date_from', $dateTo] + ], + ['and', + ['>=', 'date_to', $dateFrom], + ['<=', 'date_to', $dateTo], + ['<>', 'date_to', self::DATE_INFINITY] + ] + ]) + ->exists(); + } + + /** + * Сравнение текущих данных записи с новыми + * + * Проверяет изменились ли ключевые поля товара + * + * @param array $newData Новые данные для сравнения + * @return bool true если данные изменились + */ + public function isDataChanged(array $newData): bool + { + return !empty($this->detectChanges($newData)); + } + + /** + * Расчёт MD5-хэша для данных товара + * + * Хэш рассчитывается из полей HASH_FIELDS, разделённых | + * + * @param array $data Данные товара + * @return string MD5-хэш (32 символа) + */ + public static function calculateHash(array $data): string + { + $values = []; + foreach (self::HASH_FIELDS as $field) { + $value = $data[$field] ?? ''; + // Нормализация: null и пустая строка → пустая строка + $values[] = $value === null ? '' : (string)$value; + } + + return md5(implode('|', $values)); + } + + /** + * Определение изменённых полей между текущей записью и новыми данными + * + * @param array $newData Новые данные + * @return array Массив названий изменённых полей + */ + public function detectChanges(array $newData): array + { + $changedFields = []; + + foreach (self::HASH_FIELDS as $field) { + $currentValue = $this->$field; + $newValue = $newData[$field] ?? null; + + // Нормализация для сравнения + if ($currentValue === '') { + $currentValue = null; + } + if ($newValue === '') { + $newValue = null; + } + + // Приведение типов для корректного сравнения + if (is_numeric($currentValue) && is_numeric($newValue)) { + $currentValue = (string)$currentValue; + $newValue = (string)$newValue; + } + + if ($currentValue !== $newValue) { + $changedFields[] = $field; + } + } + + return $changedFields; + } + + /** + * Создание новой версии записи на основе данных + * + * @param array $data Данные товара + * @param array|null $changes Список изменённых полей (null для первичной загрузки) + * @return self + */ + public static function createFromData(array $data, ?array $changes = null): self + { + $model = new self(); + $model->product_id = $data['product_id']; + $model->parent_id = $data['parent_id'] ?? null; + $model->tip = $data['tip']; + $model->code = $data['code']; + $model->name = $data['name']; + $model->articule = $data['articule'] ?? null; + $model->view = $data['view'] ?? null; + $model->components = $data['components'] ?? null; + $model->type = $data['type'] ?? null; + $model->class_category_id = $data['class_category_id'] ?? null; + $model->class_tip = $data['class_tip'] ?? null; + $model->hash = self::calculateHash($data); + $model->changes = $changes !== null ? json_encode($changes) : null; + + // Если есть changes — это замена (date_from = завтра) + // Если changes = null — первичная загрузка (date_from = сегодня) + $model->initActiveRecord($changes !== null); + + return $model; + } + + /** + * Связь с исходной записью products_1c + * + * @return ActiveQuery + */ + public function getProducts1c(): ActiveQuery + { + return $this->hasOne(Products1c::class, ['id' => 'product_id']); + } + + /** + * Связь с записью классификации + * + * ProductsClass имеет составной первичный ключ ['category_id', 'tip'], + * поэтому связь должна использовать оба поля. + * + * @return ActiveQuery + */ + public function getProductsClass(): ActiveQuery + { + return $this->hasOne(ProductsClass::class, [ + 'category_id' => 'class_category_id', + 'tip' => 'class_tip', + ]); + } + + /** + * Получение истории изменений товара + * + * @param string $productId GUID товара + * @return self[] + */ + public static function getHistory(string $productId): array + { + return self::find() + ->where(['product_id' => $productId]) + ->orderBy(['date_from' => SORT_DESC]) + ->all(); + } +}