Files
portal/app/app/Jobs/ImportLeadsJob.php
T
Дмитрий dffefe7fc0 docs(billing): Phase 3 cleanup — refresh orphan comments to live classes
After ProcessWebhookJob/WebhookReceiveController removal — обновлены 8
docblock/inline комментариев, ссылавшихся на удалённый код:

- DealController: ProcessWebhookJob → SupplierWebhookController/RouteSupplierLeadJob
- SupplierWebhookController: убрана legacy backward-compat note
- ImportLeadsJob: паритет с RouteSupplierLeadJob
- RouteSupplierLeadJob: убрана ссылка на ProcessWebhookJob-pattern
- NewLeadNotification mailable: триггер в RouteSupplierLeadJob
- FailedWebhookJob model: ссылка на RouteSupplierLeadJob::failed()
- SupplierLeadCost model: создаётся в LedgerService::chargeForDelivery
- CsvLeadsParser: паритет с RouteSupplierLeadJob парсером

Code-функциональность не затронута, только doc-rot fix.
2026-05-24 18:51:16 +03:00

148 lines
4.9 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Jobs;
use App\Mail\ImportCompletedNotification;
use App\Models\ImportLog;
use App\Models\User;
use App\Services\Import\CsvLeadsParser;
use App\Services\Import\HistoricalImportService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Mail;
use Illuminate\Support\Facades\Storage;
use RuntimeException;
use Throwable;
/**
* Асинхронная обработка CSV-импорта исторических лидов (ТЗ §6.6).
*
* Жизненный цикл import_log: pending → processing → done | failed.
* RLS: каждый доступ к БД задаёт SET LOCAL app.current_tenant_id (воркер
* вне middleware-контекста — паритет с RouteSupplierLeadJob).
*/
class ImportLeadsJob implements ShouldQueue
{
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;
public int $tries = 1;
public int $timeout = 600;
public function __construct(
public int $importLogId,
public int $tenantId,
) {}
public function handle(HistoricalImportService $service, CsvLeadsParser $parser): void
{
$log = $this->loadLog();
if ($log === null) {
Log::error('import.log_not_found', ['import_log_id' => $this->importLogId]);
return;
}
$this->updateLog($log->id, ['status' => 'processing', 'started_at' => now()]);
try {
if (! Storage::disk('local')->exists($log->file_path)) {
throw new RuntimeException("Файл импорта не найден: {$log->file_path}");
}
$content = (string) Storage::disk('local')->get($log->file_path);
$parsed = $parser->parse($content);
$result = $service->import($this->tenantId, $log->user_id, $log, $parsed->rows);
$this->updateLog($log->id, [
'status' => 'done',
'rows_total' => count($parsed->rows) + count($parsed->errors),
'rows_added' => $result->added,
'rows_updated' => $result->updated,
'rows_skipped' => count($parsed->errors) + $result->skipped,
'unknown_statuses_count' => count($result->unknownStatuses),
'finished_at' => now(),
]);
$this->notify($log->user_id, 'done');
} catch (Throwable $e) {
Log::error('import.job_failed', ['import_log_id' => $log->id, 'error' => $e->getMessage()]);
$this->updateLog($log->id, [
'status' => 'failed',
'error_message' => $e->getMessage(),
'finished_at' => now(),
]);
$this->notify($log->user_id, 'failed');
}
}
private function loadLog(): ?ImportLog
{
return DB::transaction(function (): ?ImportLog {
DB::statement('SET LOCAL app.current_tenant_id = '.$this->tenantId);
return ImportLog::query()->find($this->importLogId);
});
}
/**
* @param array<string, mixed> $attributes
*/
private function updateLog(int $logId, array $attributes): void
{
DB::transaction(function () use ($logId, $attributes): void {
DB::statement('SET LOCAL app.current_tenant_id = '.$this->tenantId);
ImportLog::query()->whereKey($logId)->update($attributes);
});
}
private function notify(int $userId, string $outcome): void
{
$log = $this->loadLog();
$user = DB::transaction(function () use ($userId): ?User {
DB::statement('SET LOCAL app.current_tenant_id = '.$this->tenantId);
return User::query()->find($userId);
});
if ($log === null || $user === null || $user->email === '') {
return;
}
try {
Mail::to($user->email)->send(new ImportCompletedNotification($log, $outcome));
} catch (Throwable $e) {
// Отказ почтового канала не должен валить успешный импорт.
Log::warning('import.mail_failed', ['import_log_id' => $log->id, 'error' => $e->getMessage()]);
}
}
/**
* Финальный callback после исчерпания ретраев ($tries=1).
*/
public function failed(Throwable $e): void
{
$this->updateLog($this->importLogId, [
'status' => 'failed',
'error_message' => $e->getMessage(),
'finished_at' => now(),
]);
Log::error('import.job_failed_permanently', [
'import_log_id' => $this->importLogId,
'exception' => $e->getMessage(),
]);
}
}