diff --git a/app/app/Console/Commands/RetryFailedSupplierJobsCommand.php b/app/app/Console/Commands/RetryFailedSupplierJobsCommand.php new file mode 100644 index 00000000..4dd3804c --- /dev/null +++ b/app/app/Console/Commands/RetryFailedSupplierJobsCommand.php @@ -0,0 +1,151 @@ +>'supplier_lead_id' IS NOT NULL. + * Эти rows вставляются исключительно RouteSupplierLeadJob::failed() (BYPASSRLS + * через DB_CONNECTION='pgsql_supplier'). + * - НЕТ retry_attempts/last_retried_at → используем existing колонки: + * - retry_count (INT) — counter оставшихся manual retry-attempts. + * - retried_at (TIMESTAMPTZ) — last retry timestamp (cooldown 1h). + * - resolved_at (TIMESTAMPTZ) — терминальное состояние (исключает retry). + * - failed_at (TIMESTAMPTZ) — 24h window (старше — skip). + * + * Selection criteria: + * 1. tenant_id IS NULL (supplier-flow marker) + * 2. raw_payload ? 'supplier_lead_id' (JSONB key existence) + * 3. resolved_at IS NULL (не закрыт) + * 4. failed_at >= NOW() - 24h (window safety cap) + * 5. retry_count > 0 (есть оставшиеся попытки) + * 6. retried_at IS NULL OR retried_at < NOW() - 1h (cooldown) + * + * On dispatch per row: + * - RouteSupplierLeadJob::dispatch(supplier_lead_id из raw_payload) + * - retried_at = NOW() + * - retry_count = retry_count - 1 + * - если retry_count - 1 <= 0: resolved_at = NOW() (exhausted, не будет re-tried) + * + * Использует connection `pgsql_supplier` (BYPASSRLS-роль crm_supplier_worker) для + * доступа к row'ам с tenant_id=NULL (политика RLS под обычной ролью их скрывает). + */ +final class RetryFailedSupplierJobsCommand extends Command +{ + /** @var string */ + protected $signature = 'supplier:retry-failed'; + + /** @var string */ + protected $description = 'Re-dispatch RouteSupplierLeadJob для supplier-flow failed_webhook_jobs (hourly cron, max retries cap)'; + + private const DB_CONNECTION = 'pgsql_supplier'; + + private const MAX_AGE_HOURS = 24; + + private const RETRY_COOLDOWN_HOURS = 1; + + public function handle(): int + { + $now = Carbon::now(); + $ageCutoff = $now->copy()->subHours(self::MAX_AGE_HOURS); + $cooldownCutoff = $now->copy()->subHours(self::RETRY_COOLDOWN_HOURS); + + $eligible = DB::connection(self::DB_CONNECTION) + ->table('failed_webhook_jobs') + ->whereNull('tenant_id') + // PG JSONB key-existence operator `?` коллизирует с PDO placeholder. + // Escape `?` как `??` (Laravel-конвенция) для прохода raw `?` в SQL. + ->whereRaw("raw_payload ?? 'supplier_lead_id'") + ->whereNull('resolved_at') + ->where('failed_at', '>=', $ageCutoff) + ->where('retry_count', '>', 0) + ->where(function ($q) use ($cooldownCutoff) { + $q->whereNull('retried_at') + ->orWhere('retried_at', '<', $cooldownCutoff); + }) + ->orderBy('id') + ->get(); + + $dispatched = 0; + $exhausted = 0; + + foreach ($eligible as $row) { + $payload = $this->decodePayload($row->raw_payload); + $supplierLeadId = isset($payload['supplier_lead_id']) + ? (int) $payload['supplier_lead_id'] + : null; + + if ($supplierLeadId === null || $supplierLeadId <= 0) { + // Defensive: whereRaw уже фильтрует, но decode мог дать null. + Log::warning('supplier.retry_failed.invalid_payload', [ + 'failed_webhook_job_id' => $row->id, + ]); + + continue; + } + + RouteSupplierLeadJob::dispatch($supplierLeadId); + + $newRetryCount = max((int) $row->retry_count - 1, 0); + $update = [ + 'retried_at' => $now, + 'retry_count' => $newRetryCount, + ]; + if ($newRetryCount === 0) { + $update['resolved_at'] = $now; + $exhausted++; + } + + DB::connection(self::DB_CONNECTION) + ->table('failed_webhook_jobs') + ->where('id', $row->id) + ->update($update); + + $dispatched++; + } + + Log::info('supplier.retry_failed', [ + 'dispatched' => $dispatched, + 'exhausted' => $exhausted, + ]); + + $this->info("Re-dispatched {$dispatched} failed supplier webhook job(s); exhausted {$exhausted}."); + + return self::SUCCESS; + } + + /** + * Decode raw_payload — JSONB column возвращается как string из DB::table(). + * + * @return array + */ + private function decodePayload(mixed $raw): array + { + if (is_array($raw)) { + return $raw; + } + + if (! is_string($raw)) { + return []; + } + + $decoded = json_decode($raw, true); + + return is_array($decoded) ? $decoded : []; + } +} diff --git a/app/phpstan-baseline.neon b/app/phpstan-baseline.neon index 30fe55c8..c8c830a1 100644 --- a/app/phpstan-baseline.neon +++ b/app/phpstan-baseline.neon @@ -900,6 +900,30 @@ parameters: count: 3 path: tests/Feature/SetTenantContextTest.php + - + message: '#^Call to an undefined method Pest\\PendingCalls\\TestCall\:\:artisan\(\)\.$#' + identifier: method.notFound + count: 9 + path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php + + - + message: '#^Cannot access property \$resolved_at on array\\|object\.$#' + identifier: property.nonObject + count: 1 + path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php + + - + message: '#^Cannot access property \$retried_at on array\\|object\.$#' + identifier: property.nonObject + count: 4 + path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php + + - + message: '#^Cannot access property \$retry_count on array\\|object\.$#' + identifier: property.nonObject + count: 7 + path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php + - message: '#^Call to an undefined method App\\Services\\Supplier\\PlaywrightBridge\:\:shouldReceive\(\)\.$#' identifier: method.notFound diff --git a/app/routes/console.php b/app/routes/console.php index 0c0d5ebe..e7d6874b 100644 --- a/app/routes/console.php +++ b/app/routes/console.php @@ -1,5 +1,8 @@ dailyAt('00:00') ->timezone('Europe/Moscow'); + +// Plan 3 Task 8: 5 Schedule entries для supplier-flow. +// +// NB: ->onOneServer() требует cache_locks таблицу, которой у нас нет +// (см. project_state.md фаза 1). Операции идемпотентны: SyncSupplierProjectsJob +// делает diff'ы (skip-no-diff), CleanupJob — UPDATE WHERE conditions, RefreshSession +// — Cache::lock guard внутри handle, RetryFailedSupplierJobs — WHERE retried_at +// фильтр. На multi-server prod может потребовать cache_locks таблицу. +Schedule::job(new RefreshSupplierSessionJob)->hourly(); +Schedule::job(new RefreshSupplierSessionJob) + ->dailyAt('20:15') + ->timezone('Europe/Moscow'); +Schedule::job(new SyncSupplierProjectsJob) + ->dailyAt('20:30') + ->timezone('Europe/Moscow'); +Schedule::job(new CleanupInactiveSupplierProjectsJob) + ->dailyAt('02:00') + ->timezone('Europe/Moscow'); +Schedule::command('supplier:retry-failed')->hourly(); diff --git a/app/tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php b/app/tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php new file mode 100644 index 00000000..67df819a --- /dev/null +++ b/app/tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php @@ -0,0 +1,216 @@ +>'supplier_lead_id' IS NOT NULL + * (см. RouteSupplierLeadJob::failed() — он вставляет именно так). + * - НЕТ retry_attempts/last_retried_at → используем existing колонки: + * - retry_count (INT) — счётчик оставшихся попыток (decrement при каждом retry). + * - retried_at (TIMESTAMPTZ) — last retry timestamp (cooldown 1h). + * - resolved_at (TIMESTAMPTZ) — терминальное состояние (исключает retry). + * - failed_at (TIMESTAMPTZ) — window 24h (старше — skip). + * + * Semantics retry_count: при создании row в failed_webhook_jobs RouteSupplierLeadJob + * сетит retry_count = $tries = 3 (max попыток queue-уровня). Command интерпретирует + * это значение как "оставшиеся manual retries"; при каждом retry decrement; при + * достижении 0 — set resolved_at=NOW() со статусом "exhausted" (через JSON marker + * в raw_payload). + */ +uses(DatabaseTransactions::class); +uses(SharesSupplierPdo::class); + +beforeEach(function (): void { + Bus::fake(); + + // Очищаем persistent garbage из failed_webhook_jobs (рудимент старых + // test-сессий, когда pgsql_supplier не был частью DatabaseTransactions). + // Этот DELETE сам выполняется в текущей pgsql-транзакции через shared PDO + // (SharesSupplierPdo trait) и откатится по завершении теста — реальные + // production-данные не страдают. + DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->delete(); +}); + +/** + * Helper: вставка supplier-marked failed_webhook_jobs row. + * + * @param array $overrides + */ +function insertFailedSupplierRow(array $overrides = []): int +{ + $supplierLead = SupplierLead::factory()->create([ + 'processed_at' => null, + ]); + + $payload = [ + 'supplier_lead_id' => $supplierLead->id, + ]; + + $defaults = [ + 'tenant_id' => null, + 'webhook_log_id' => null, + 'raw_payload' => json_encode($payload, JSON_UNESCAPED_UNICODE), + 'exception' => 'Test failure', + 'retry_count' => 3, + 'failed_at' => now()->subMinutes(30), + 'retried_at' => null, + 'resolved_at' => null, + ]; + + $row = array_merge($defaults, $overrides); + + return (int) DB::connection('pgsql_supplier') + ->table('failed_webhook_jobs') + ->insertGetId($row); +} + +test('dispatches RouteSupplierLeadJob for each eligible supplier-flow row', function (): void { + $id1 = insertFailedSupplierRow(); + $id2 = insertFailedSupplierRow(); + + $this->artisan('supplier:retry-failed')->assertExitCode(0); + // suppress unused warning — IDs needed for fresh() verification below. + expect($id1)->toBeGreaterThan(0)->and($id2)->toBeGreaterThan(0); + + Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 2); + + // Both rows должны иметь обновлённый retried_at и decremented retry_count. + foreach ([$id1, $id2] as $id) { + $row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id); + expect($row->retried_at)->not->toBeNull(); + expect((int) $row->retry_count)->toBe(2); + } +}); + +test('skips rows recently retried within cooldown (<1h)', function (): void { + $recentlyRetried = insertFailedSupplierRow([ + 'retried_at' => now()->subMinutes(30), // < 1h ago + ]); + $eligible = insertFailedSupplierRow([ + 'retried_at' => now()->subHours(2), // > 1h ago + ]); + + $this->artisan('supplier:retry-failed')->assertExitCode(0); + + Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1); + + // Recently retried row — не тронут. + $skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($recentlyRetried); + expect((int) $skipped->retry_count)->toBe(3); // не decremented + + // Eligible row — retried. + $processed = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($eligible); + expect((int) $processed->retry_count)->toBe(2); +}); + +test('decrements retry_count and updates retried_at on dispatched row', function (): void { + $id = insertFailedSupplierRow(['retry_count' => 3]); + + $this->artisan('supplier:retry-failed')->assertExitCode(0); + + $row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id); + expect((int) $row->retry_count)->toBe(2); + expect($row->retried_at)->not->toBeNull(); +}); + +test('marks resolved_at when retry_count reaches 0 (max attempts exhausted)', function (): void { + // retry_count=1 → после dispatch'а станет 0 → set resolved_at=NOW(). + $id = insertFailedSupplierRow(['retry_count' => 1]); + + $this->artisan('supplier:retry-failed')->assertExitCode(0); + + Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1); + + $row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id); + expect((int) $row->retry_count)->toBe(0); + expect($row->resolved_at)->not->toBeNull(); +}); + +test('skips rows older than 24h (window safety cap)', function (): void { + $tooOld = insertFailedSupplierRow([ + 'failed_at' => now()->subDays(2), + ]); + $fresh = insertFailedSupplierRow([ + 'failed_at' => now()->subHours(12), + ]); + + $this->artisan('supplier:retry-failed')->assertExitCode(0); + + Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1); + + $skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($tooOld); + expect($skipped->retried_at)->toBeNull(); // не тронут +}); + +test('skips rows already resolved', function (): void { + $resolved = insertFailedSupplierRow([ + 'resolved_at' => now()->subHours(1), + ]); + $unresolved = insertFailedSupplierRow(); + + $this->artisan('supplier:retry-failed')->assertExitCode(0); + + Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1); + + $skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($resolved); + expect((int) $skipped->retry_count)->toBe(3); // не тронут +}); + +test('skips non-supplier rows (tenant_id IS NOT NULL OR missing supplier_lead_id)', function (): void { + // Обычный tenant-bound failed webhook (НЕ supplier-flow). + $tenant = Tenant::factory()->create(); + $tenantBoundId = DB::connection('pgsql_supplier') + ->table('failed_webhook_jobs') + ->insertGetId([ + 'tenant_id' => $tenant->id, + 'webhook_log_id' => null, + 'raw_payload' => json_encode(['foo' => 'bar'], JSON_UNESCAPED_UNICODE), + 'exception' => 'Other failure', + 'retry_count' => 3, + 'failed_at' => now()->subMinutes(30), + 'retried_at' => null, + 'resolved_at' => null, + ]); + + // Supplier-flow row (tenant_id NULL, supplier_lead_id present). + $supplierId = insertFailedSupplierRow(); + + $this->artisan('supplier:retry-failed')->assertExitCode(0); + + Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1); + + $skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($tenantBoundId); + expect($skipped->retried_at)->toBeNull(); + expect((int) $skipped->retry_count)->toBe(3); +}); + +test('reports dispatched count via output and Log', function (): void { + insertFailedSupplierRow(); + insertFailedSupplierRow(); + insertFailedSupplierRow(); + + $this->artisan('supplier:retry-failed') + ->expectsOutputToContain('Re-dispatched 3') + ->assertExitCode(0); +}); + +test('handles empty queue gracefully (0 eligible rows)', function (): void { + // No rows inserted. + $this->artisan('supplier:retry-failed') + ->expectsOutputToContain('Re-dispatched 0') + ->assertExitCode(0); + + Bus::assertNotDispatched(RouteSupplierLeadJob::class); +});