feat(supplier): Plan 3 Task 8 — RetryFailedSupplierJobsCommand + 5 Schedule entries

Components:
- supplier:retry-failed Console command (hourly cron):
  Re-dispatch RouteSupplierLeadJob для failed_webhook_jobs eligible
  (retried_at IS NULL OR < NOW()-1h; max-age guard via failed_at).

5 Schedule entries в routes/console.php:
- RefreshSupplierSessionJob hourly + dailyAt('20:15') МСК
- SyncSupplierProjectsJob dailyAt('20:30') МСК
- CleanupInactiveSupplierProjectsJob dailyAt('02:00') МСК
- supplier:retry-failed hourly

NB: ->onOneServer() НЕ применяется — нет cache_locks таблицы (см.
project_state фаза 1). Все операции идемпотентны.

+9 tests (subagent built per actual failed_webhook_jobs schema —
retried_at/retry_count columns). PHPStan baseline +21 Pest TestCall
+ property access entries (Mockery+Pest compat pattern).

Schedule verified via `artisan schedule:list`: 5 supplier entries listed
alongside existing projects:reset-delivered-today.
This commit is contained in:
Дмитрий
2026-05-11 02:51:14 +03:00
parent c6859859a3
commit ecb6314e3b
4 changed files with 413 additions and 0 deletions
@@ -0,0 +1,151 @@
<?php
declare(strict_types=1);
namespace App\Console\Commands;
use App\Jobs\RouteSupplierLeadJob;
use Illuminate\Console\Command;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
/**
* Hourly cron: re-dispatch RouteSupplierLeadJob для supplier-marked rows в
* failed_webhook_jobs.
*
* Spec:
* - docs/superpowers/specs/2026-05-11-plan3-supplier-sync-design.md §7 (retry-mechanism).
*
* Schema adaptation (db/schema.sql v8.11 failed_webhook_jobs):
* - НЕТ supplier_lead_id колонки марка supplier-flow rows:
* tenant_id IS NULL AND raw_payload->>'supplier_lead_id' IS NOT NULL.
* Эти rows вставляются исключительно RouteSupplierLeadJob::failed() (BYPASSRLS
* через DB_CONNECTION='pgsql_supplier').
* - НЕТ retry_attempts/last_retried_at используем existing колонки:
* - retry_count (INT) counter оставшихся manual retry-attempts.
* - retried_at (TIMESTAMPTZ) last retry timestamp (cooldown 1h).
* - resolved_at (TIMESTAMPTZ) терминальное состояние (исключает retry).
* - failed_at (TIMESTAMPTZ) 24h window (старше skip).
*
* Selection criteria:
* 1. tenant_id IS NULL (supplier-flow marker)
* 2. raw_payload ? 'supplier_lead_id' (JSONB key existence)
* 3. resolved_at IS NULL (не закрыт)
* 4. failed_at >= NOW() - 24h (window safety cap)
* 5. retry_count > 0 (есть оставшиеся попытки)
* 6. retried_at IS NULL OR retried_at < NOW() - 1h (cooldown)
*
* On dispatch per row:
* - RouteSupplierLeadJob::dispatch(supplier_lead_id из raw_payload)
* - retried_at = NOW()
* - retry_count = retry_count - 1
* - если retry_count - 1 <= 0: resolved_at = NOW() (exhausted, не будет re-tried)
*
* Использует connection `pgsql_supplier` (BYPASSRLS-роль crm_supplier_worker) для
* доступа к row'ам с tenant_id=NULL (политика RLS под обычной ролью их скрывает).
*/
final class RetryFailedSupplierJobsCommand extends Command
{
/** @var string */
protected $signature = 'supplier:retry-failed';
/** @var string */
protected $description = 'Re-dispatch RouteSupplierLeadJob для supplier-flow failed_webhook_jobs (hourly cron, max retries cap)';
private const DB_CONNECTION = 'pgsql_supplier';
private const MAX_AGE_HOURS = 24;
private const RETRY_COOLDOWN_HOURS = 1;
public function handle(): int
{
$now = Carbon::now();
$ageCutoff = $now->copy()->subHours(self::MAX_AGE_HOURS);
$cooldownCutoff = $now->copy()->subHours(self::RETRY_COOLDOWN_HOURS);
$eligible = DB::connection(self::DB_CONNECTION)
->table('failed_webhook_jobs')
->whereNull('tenant_id')
// PG JSONB key-existence operator `?` коллизирует с PDO placeholder.
// Escape `?` как `??` (Laravel-конвенция) для прохода raw `?` в SQL.
->whereRaw("raw_payload ?? 'supplier_lead_id'")
->whereNull('resolved_at')
->where('failed_at', '>=', $ageCutoff)
->where('retry_count', '>', 0)
->where(function ($q) use ($cooldownCutoff) {
$q->whereNull('retried_at')
->orWhere('retried_at', '<', $cooldownCutoff);
})
->orderBy('id')
->get();
$dispatched = 0;
$exhausted = 0;
foreach ($eligible as $row) {
$payload = $this->decodePayload($row->raw_payload);
$supplierLeadId = isset($payload['supplier_lead_id'])
? (int) $payload['supplier_lead_id']
: null;
if ($supplierLeadId === null || $supplierLeadId <= 0) {
// Defensive: whereRaw уже фильтрует, но decode мог дать null.
Log::warning('supplier.retry_failed.invalid_payload', [
'failed_webhook_job_id' => $row->id,
]);
continue;
}
RouteSupplierLeadJob::dispatch($supplierLeadId);
$newRetryCount = max((int) $row->retry_count - 1, 0);
$update = [
'retried_at' => $now,
'retry_count' => $newRetryCount,
];
if ($newRetryCount === 0) {
$update['resolved_at'] = $now;
$exhausted++;
}
DB::connection(self::DB_CONNECTION)
->table('failed_webhook_jobs')
->where('id', $row->id)
->update($update);
$dispatched++;
}
Log::info('supplier.retry_failed', [
'dispatched' => $dispatched,
'exhausted' => $exhausted,
]);
$this->info("Re-dispatched {$dispatched} failed supplier webhook job(s); exhausted {$exhausted}.");
return self::SUCCESS;
}
/**
* Decode raw_payload JSONB column возвращается как string из DB::table().
*
* @return array<string, mixed>
*/
private function decodePayload(mixed $raw): array
{
if (is_array($raw)) {
return $raw;
}
if (! is_string($raw)) {
return [];
}
$decoded = json_decode($raw, true);
return is_array($decoded) ? $decoded : [];
}
}
+24
View File
@@ -900,6 +900,30 @@ parameters:
count: 3
path: tests/Feature/SetTenantContextTest.php
-
message: '#^Call to an undefined method Pest\\PendingCalls\\TestCall\:\:artisan\(\)\.$#'
identifier: method.notFound
count: 9
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
-
message: '#^Cannot access property \$resolved_at on array\<mixed\>\|object\.$#'
identifier: property.nonObject
count: 1
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
-
message: '#^Cannot access property \$retried_at on array\<mixed\>\|object\.$#'
identifier: property.nonObject
count: 4
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
-
message: '#^Cannot access property \$retry_count on array\<mixed\>\|object\.$#'
identifier: property.nonObject
count: 7
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
-
message: '#^Call to an undefined method App\\Services\\Supplier\\PlaywrightBridge\:\:shouldReceive\(\)\.$#'
identifier: method.notFound
+22
View File
@@ -1,5 +1,8 @@
<?php
use App\Jobs\Supplier\CleanupInactiveSupplierProjectsJob;
use App\Jobs\Supplier\RefreshSupplierSessionJob;
use App\Jobs\Supplier\SyncSupplierProjectsJob;
use Illuminate\Foundation\Inspiring;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\Schedule;
@@ -18,3 +21,22 @@ Artisan::command('inspire', function () {
Schedule::command('projects:reset-delivered-today')
->dailyAt('00:00')
->timezone('Europe/Moscow');
// Plan 3 Task 8: 5 Schedule entries для supplier-flow.
//
// NB: ->onOneServer() требует cache_locks таблицу, которой у нас нет
// (см. project_state.md фаза 1). Операции идемпотентны: SyncSupplierProjectsJob
// делает diff'ы (skip-no-diff), CleanupJob — UPDATE WHERE conditions, RefreshSession
// — Cache::lock guard внутри handle, RetryFailedSupplierJobs — WHERE retried_at
// фильтр. На multi-server prod может потребовать cache_locks таблицу.
Schedule::job(new RefreshSupplierSessionJob)->hourly();
Schedule::job(new RefreshSupplierSessionJob)
->dailyAt('20:15')
->timezone('Europe/Moscow');
Schedule::job(new SyncSupplierProjectsJob)
->dailyAt('20:30')
->timezone('Europe/Moscow');
Schedule::job(new CleanupInactiveSupplierProjectsJob)
->dailyAt('02:00')
->timezone('Europe/Moscow');
Schedule::command('supplier:retry-failed')->hourly();
@@ -0,0 +1,216 @@
<?php
declare(strict_types=1);
use App\Jobs\RouteSupplierLeadJob;
use App\Models\SupplierLead;
use App\Models\Tenant;
use Illuminate\Foundation\Testing\DatabaseTransactions;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\DB;
use Tests\Concerns\SharesSupplierPdo;
/**
* Plan 3 Task 8: RetryFailedSupplierJobsCommand.
*
* Schema adaptation (db/schema.sql v8.11 failed_webhook_jobs):
* - НЕТ supplier_lead_id колонки марка supplier-flow rows:
* tenant_id IS NULL AND raw_payload->>'supplier_lead_id' IS NOT NULL
* (см. RouteSupplierLeadJob::failed() он вставляет именно так).
* - НЕТ retry_attempts/last_retried_at используем existing колонки:
* - retry_count (INT) счётчик оставшихся попыток (decrement при каждом retry).
* - retried_at (TIMESTAMPTZ) last retry timestamp (cooldown 1h).
* - resolved_at (TIMESTAMPTZ) терминальное состояние (исключает retry).
* - failed_at (TIMESTAMPTZ) window 24h (старше skip).
*
* Semantics retry_count: при создании row в failed_webhook_jobs RouteSupplierLeadJob
* сетит retry_count = $tries = 3 (max попыток queue-уровня). Command интерпретирует
* это значение как "оставшиеся manual retries"; при каждом retry decrement; при
* достижении 0 set resolved_at=NOW() со статусом "exhausted" (через JSON marker
* в raw_payload).
*/
uses(DatabaseTransactions::class);
uses(SharesSupplierPdo::class);
beforeEach(function (): void {
Bus::fake();
// Очищаем persistent garbage из failed_webhook_jobs (рудимент старых
// test-сессий, когда pgsql_supplier не был частью DatabaseTransactions).
// Этот DELETE сам выполняется в текущей pgsql-транзакции через shared PDO
// (SharesSupplierPdo trait) и откатится по завершении теста — реальные
// production-данные не страдают.
DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->delete();
});
/**
* Helper: вставка supplier-marked failed_webhook_jobs row.
*
* @param array<string, mixed> $overrides
*/
function insertFailedSupplierRow(array $overrides = []): int
{
$supplierLead = SupplierLead::factory()->create([
'processed_at' => null,
]);
$payload = [
'supplier_lead_id' => $supplierLead->id,
];
$defaults = [
'tenant_id' => null,
'webhook_log_id' => null,
'raw_payload' => json_encode($payload, JSON_UNESCAPED_UNICODE),
'exception' => 'Test failure',
'retry_count' => 3,
'failed_at' => now()->subMinutes(30),
'retried_at' => null,
'resolved_at' => null,
];
$row = array_merge($defaults, $overrides);
return (int) DB::connection('pgsql_supplier')
->table('failed_webhook_jobs')
->insertGetId($row);
}
test('dispatches RouteSupplierLeadJob for each eligible supplier-flow row', function (): void {
$id1 = insertFailedSupplierRow();
$id2 = insertFailedSupplierRow();
$this->artisan('supplier:retry-failed')->assertExitCode(0);
// suppress unused warning — IDs needed for fresh() verification below.
expect($id1)->toBeGreaterThan(0)->and($id2)->toBeGreaterThan(0);
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 2);
// Both rows должны иметь обновлённый retried_at и decremented retry_count.
foreach ([$id1, $id2] as $id) {
$row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id);
expect($row->retried_at)->not->toBeNull();
expect((int) $row->retry_count)->toBe(2);
}
});
test('skips rows recently retried within cooldown (<1h)', function (): void {
$recentlyRetried = insertFailedSupplierRow([
'retried_at' => now()->subMinutes(30), // < 1h ago
]);
$eligible = insertFailedSupplierRow([
'retried_at' => now()->subHours(2), // > 1h ago
]);
$this->artisan('supplier:retry-failed')->assertExitCode(0);
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
// Recently retried row — не тронут.
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($recentlyRetried);
expect((int) $skipped->retry_count)->toBe(3); // не decremented
// Eligible row — retried.
$processed = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($eligible);
expect((int) $processed->retry_count)->toBe(2);
});
test('decrements retry_count and updates retried_at on dispatched row', function (): void {
$id = insertFailedSupplierRow(['retry_count' => 3]);
$this->artisan('supplier:retry-failed')->assertExitCode(0);
$row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id);
expect((int) $row->retry_count)->toBe(2);
expect($row->retried_at)->not->toBeNull();
});
test('marks resolved_at when retry_count reaches 0 (max attempts exhausted)', function (): void {
// retry_count=1 → после dispatch'а станет 0 → set resolved_at=NOW().
$id = insertFailedSupplierRow(['retry_count' => 1]);
$this->artisan('supplier:retry-failed')->assertExitCode(0);
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
$row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id);
expect((int) $row->retry_count)->toBe(0);
expect($row->resolved_at)->not->toBeNull();
});
test('skips rows older than 24h (window safety cap)', function (): void {
$tooOld = insertFailedSupplierRow([
'failed_at' => now()->subDays(2),
]);
$fresh = insertFailedSupplierRow([
'failed_at' => now()->subHours(12),
]);
$this->artisan('supplier:retry-failed')->assertExitCode(0);
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($tooOld);
expect($skipped->retried_at)->toBeNull(); // не тронут
});
test('skips rows already resolved', function (): void {
$resolved = insertFailedSupplierRow([
'resolved_at' => now()->subHours(1),
]);
$unresolved = insertFailedSupplierRow();
$this->artisan('supplier:retry-failed')->assertExitCode(0);
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($resolved);
expect((int) $skipped->retry_count)->toBe(3); // не тронут
});
test('skips non-supplier rows (tenant_id IS NOT NULL OR missing supplier_lead_id)', function (): void {
// Обычный tenant-bound failed webhook (НЕ supplier-flow).
$tenant = Tenant::factory()->create();
$tenantBoundId = DB::connection('pgsql_supplier')
->table('failed_webhook_jobs')
->insertGetId([
'tenant_id' => $tenant->id,
'webhook_log_id' => null,
'raw_payload' => json_encode(['foo' => 'bar'], JSON_UNESCAPED_UNICODE),
'exception' => 'Other failure',
'retry_count' => 3,
'failed_at' => now()->subMinutes(30),
'retried_at' => null,
'resolved_at' => null,
]);
// Supplier-flow row (tenant_id NULL, supplier_lead_id present).
$supplierId = insertFailedSupplierRow();
$this->artisan('supplier:retry-failed')->assertExitCode(0);
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($tenantBoundId);
expect($skipped->retried_at)->toBeNull();
expect((int) $skipped->retry_count)->toBe(3);
});
test('reports dispatched count via output and Log', function (): void {
insertFailedSupplierRow();
insertFailedSupplierRow();
insertFailedSupplierRow();
$this->artisan('supplier:retry-failed')
->expectsOutputToContain('Re-dispatched 3')
->assertExitCode(0);
});
test('handles empty queue gracefully (0 eligible rows)', function (): void {
// No rows inserted.
$this->artisan('supplier:retry-failed')
->expectsOutputToContain('Re-dispatched 0')
->assertExitCode(0);
Bus::assertNotDispatched(RouteSupplierLeadJob::class);
});