feat(supplier): Plan 3 Task 8 — RetryFailedSupplierJobsCommand + 5 Schedule entries
Components:
- supplier:retry-failed Console command (hourly cron):
Re-dispatch RouteSupplierLeadJob для failed_webhook_jobs eligible
(retried_at IS NULL OR < NOW()-1h; max-age guard via failed_at).
5 Schedule entries в routes/console.php:
- RefreshSupplierSessionJob hourly + dailyAt('20:15') МСК
- SyncSupplierProjectsJob dailyAt('20:30') МСК
- CleanupInactiveSupplierProjectsJob dailyAt('02:00') МСК
- supplier:retry-failed hourly
NB: ->onOneServer() НЕ применяется — нет cache_locks таблицы (см.
project_state фаза 1). Все операции идемпотентны.
+9 tests (subagent built per actual failed_webhook_jobs schema —
retried_at/retry_count columns). PHPStan baseline +21 Pest TestCall
+ property access entries (Mockery+Pest compat pattern).
Schedule verified via `artisan schedule:list`: 5 supplier entries listed
alongside existing projects:reset-delivered-today.
This commit is contained in:
@@ -0,0 +1,151 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Console\Commands;
|
||||
|
||||
use App\Jobs\RouteSupplierLeadJob;
|
||||
use Illuminate\Console\Command;
|
||||
use Illuminate\Support\Carbon;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
||||
/**
|
||||
* Hourly cron: re-dispatch RouteSupplierLeadJob для supplier-marked rows в
|
||||
* failed_webhook_jobs.
|
||||
*
|
||||
* Spec:
|
||||
* - docs/superpowers/specs/2026-05-11-plan3-supplier-sync-design.md §7 (retry-mechanism).
|
||||
*
|
||||
* Schema adaptation (db/schema.sql v8.11 failed_webhook_jobs):
|
||||
* - НЕТ supplier_lead_id колонки → марка supplier-flow rows:
|
||||
* tenant_id IS NULL AND raw_payload->>'supplier_lead_id' IS NOT NULL.
|
||||
* Эти rows вставляются исключительно RouteSupplierLeadJob::failed() (BYPASSRLS
|
||||
* через DB_CONNECTION='pgsql_supplier').
|
||||
* - НЕТ retry_attempts/last_retried_at → используем existing колонки:
|
||||
* - retry_count (INT) — counter оставшихся manual retry-attempts.
|
||||
* - retried_at (TIMESTAMPTZ) — last retry timestamp (cooldown 1h).
|
||||
* - resolved_at (TIMESTAMPTZ) — терминальное состояние (исключает retry).
|
||||
* - failed_at (TIMESTAMPTZ) — 24h window (старше — skip).
|
||||
*
|
||||
* Selection criteria:
|
||||
* 1. tenant_id IS NULL (supplier-flow marker)
|
||||
* 2. raw_payload ? 'supplier_lead_id' (JSONB key existence)
|
||||
* 3. resolved_at IS NULL (не закрыт)
|
||||
* 4. failed_at >= NOW() - 24h (window safety cap)
|
||||
* 5. retry_count > 0 (есть оставшиеся попытки)
|
||||
* 6. retried_at IS NULL OR retried_at < NOW() - 1h (cooldown)
|
||||
*
|
||||
* On dispatch per row:
|
||||
* - RouteSupplierLeadJob::dispatch(supplier_lead_id из raw_payload)
|
||||
* - retried_at = NOW()
|
||||
* - retry_count = retry_count - 1
|
||||
* - если retry_count - 1 <= 0: resolved_at = NOW() (exhausted, не будет re-tried)
|
||||
*
|
||||
* Использует connection `pgsql_supplier` (BYPASSRLS-роль crm_supplier_worker) для
|
||||
* доступа к row'ам с tenant_id=NULL (политика RLS под обычной ролью их скрывает).
|
||||
*/
|
||||
final class RetryFailedSupplierJobsCommand extends Command
|
||||
{
|
||||
/** @var string */
|
||||
protected $signature = 'supplier:retry-failed';
|
||||
|
||||
/** @var string */
|
||||
protected $description = 'Re-dispatch RouteSupplierLeadJob для supplier-flow failed_webhook_jobs (hourly cron, max retries cap)';
|
||||
|
||||
private const DB_CONNECTION = 'pgsql_supplier';
|
||||
|
||||
private const MAX_AGE_HOURS = 24;
|
||||
|
||||
private const RETRY_COOLDOWN_HOURS = 1;
|
||||
|
||||
public function handle(): int
|
||||
{
|
||||
$now = Carbon::now();
|
||||
$ageCutoff = $now->copy()->subHours(self::MAX_AGE_HOURS);
|
||||
$cooldownCutoff = $now->copy()->subHours(self::RETRY_COOLDOWN_HOURS);
|
||||
|
||||
$eligible = DB::connection(self::DB_CONNECTION)
|
||||
->table('failed_webhook_jobs')
|
||||
->whereNull('tenant_id')
|
||||
// PG JSONB key-existence operator `?` коллизирует с PDO placeholder.
|
||||
// Escape `?` как `??` (Laravel-конвенция) для прохода raw `?` в SQL.
|
||||
->whereRaw("raw_payload ?? 'supplier_lead_id'")
|
||||
->whereNull('resolved_at')
|
||||
->where('failed_at', '>=', $ageCutoff)
|
||||
->where('retry_count', '>', 0)
|
||||
->where(function ($q) use ($cooldownCutoff) {
|
||||
$q->whereNull('retried_at')
|
||||
->orWhere('retried_at', '<', $cooldownCutoff);
|
||||
})
|
||||
->orderBy('id')
|
||||
->get();
|
||||
|
||||
$dispatched = 0;
|
||||
$exhausted = 0;
|
||||
|
||||
foreach ($eligible as $row) {
|
||||
$payload = $this->decodePayload($row->raw_payload);
|
||||
$supplierLeadId = isset($payload['supplier_lead_id'])
|
||||
? (int) $payload['supplier_lead_id']
|
||||
: null;
|
||||
|
||||
if ($supplierLeadId === null || $supplierLeadId <= 0) {
|
||||
// Defensive: whereRaw уже фильтрует, но decode мог дать null.
|
||||
Log::warning('supplier.retry_failed.invalid_payload', [
|
||||
'failed_webhook_job_id' => $row->id,
|
||||
]);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
RouteSupplierLeadJob::dispatch($supplierLeadId);
|
||||
|
||||
$newRetryCount = max((int) $row->retry_count - 1, 0);
|
||||
$update = [
|
||||
'retried_at' => $now,
|
||||
'retry_count' => $newRetryCount,
|
||||
];
|
||||
if ($newRetryCount === 0) {
|
||||
$update['resolved_at'] = $now;
|
||||
$exhausted++;
|
||||
}
|
||||
|
||||
DB::connection(self::DB_CONNECTION)
|
||||
->table('failed_webhook_jobs')
|
||||
->where('id', $row->id)
|
||||
->update($update);
|
||||
|
||||
$dispatched++;
|
||||
}
|
||||
|
||||
Log::info('supplier.retry_failed', [
|
||||
'dispatched' => $dispatched,
|
||||
'exhausted' => $exhausted,
|
||||
]);
|
||||
|
||||
$this->info("Re-dispatched {$dispatched} failed supplier webhook job(s); exhausted {$exhausted}.");
|
||||
|
||||
return self::SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode raw_payload — JSONB column возвращается как string из DB::table().
|
||||
*
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function decodePayload(mixed $raw): array
|
||||
{
|
||||
if (is_array($raw)) {
|
||||
return $raw;
|
||||
}
|
||||
|
||||
if (! is_string($raw)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
$decoded = json_decode($raw, true);
|
||||
|
||||
return is_array($decoded) ? $decoded : [];
|
||||
}
|
||||
}
|
||||
@@ -900,6 +900,30 @@ parameters:
|
||||
count: 3
|
||||
path: tests/Feature/SetTenantContextTest.php
|
||||
|
||||
-
|
||||
message: '#^Call to an undefined method Pest\\PendingCalls\\TestCall\:\:artisan\(\)\.$#'
|
||||
identifier: method.notFound
|
||||
count: 9
|
||||
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
|
||||
|
||||
-
|
||||
message: '#^Cannot access property \$resolved_at on array\<mixed\>\|object\.$#'
|
||||
identifier: property.nonObject
|
||||
count: 1
|
||||
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
|
||||
|
||||
-
|
||||
message: '#^Cannot access property \$retried_at on array\<mixed\>\|object\.$#'
|
||||
identifier: property.nonObject
|
||||
count: 4
|
||||
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
|
||||
|
||||
-
|
||||
message: '#^Cannot access property \$retry_count on array\<mixed\>\|object\.$#'
|
||||
identifier: property.nonObject
|
||||
count: 7
|
||||
path: tests/Feature/Supplier/RetryFailedSupplierJobsCommandTest.php
|
||||
|
||||
-
|
||||
message: '#^Call to an undefined method App\\Services\\Supplier\\PlaywrightBridge\:\:shouldReceive\(\)\.$#'
|
||||
identifier: method.notFound
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
<?php
|
||||
|
||||
use App\Jobs\Supplier\CleanupInactiveSupplierProjectsJob;
|
||||
use App\Jobs\Supplier\RefreshSupplierSessionJob;
|
||||
use App\Jobs\Supplier\SyncSupplierProjectsJob;
|
||||
use Illuminate\Foundation\Inspiring;
|
||||
use Illuminate\Support\Facades\Artisan;
|
||||
use Illuminate\Support\Facades\Schedule;
|
||||
@@ -18,3 +21,22 @@ Artisan::command('inspire', function () {
|
||||
Schedule::command('projects:reset-delivered-today')
|
||||
->dailyAt('00:00')
|
||||
->timezone('Europe/Moscow');
|
||||
|
||||
// Plan 3 Task 8: 5 Schedule entries для supplier-flow.
|
||||
//
|
||||
// NB: ->onOneServer() требует cache_locks таблицу, которой у нас нет
|
||||
// (см. project_state.md фаза 1). Операции идемпотентны: SyncSupplierProjectsJob
|
||||
// делает diff'ы (skip-no-diff), CleanupJob — UPDATE WHERE conditions, RefreshSession
|
||||
// — Cache::lock guard внутри handle, RetryFailedSupplierJobs — WHERE retried_at
|
||||
// фильтр. На multi-server prod может потребовать cache_locks таблицу.
|
||||
Schedule::job(new RefreshSupplierSessionJob)->hourly();
|
||||
Schedule::job(new RefreshSupplierSessionJob)
|
||||
->dailyAt('20:15')
|
||||
->timezone('Europe/Moscow');
|
||||
Schedule::job(new SyncSupplierProjectsJob)
|
||||
->dailyAt('20:30')
|
||||
->timezone('Europe/Moscow');
|
||||
Schedule::job(new CleanupInactiveSupplierProjectsJob)
|
||||
->dailyAt('02:00')
|
||||
->timezone('Europe/Moscow');
|
||||
Schedule::command('supplier:retry-failed')->hourly();
|
||||
|
||||
@@ -0,0 +1,216 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use App\Jobs\RouteSupplierLeadJob;
|
||||
use App\Models\SupplierLead;
|
||||
use App\Models\Tenant;
|
||||
use Illuminate\Foundation\Testing\DatabaseTransactions;
|
||||
use Illuminate\Support\Facades\Bus;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Tests\Concerns\SharesSupplierPdo;
|
||||
|
||||
/**
|
||||
* Plan 3 Task 8: RetryFailedSupplierJobsCommand.
|
||||
*
|
||||
* Schema adaptation (db/schema.sql v8.11 failed_webhook_jobs):
|
||||
* - НЕТ supplier_lead_id колонки → марка supplier-flow rows:
|
||||
* tenant_id IS NULL AND raw_payload->>'supplier_lead_id' IS NOT NULL
|
||||
* (см. RouteSupplierLeadJob::failed() — он вставляет именно так).
|
||||
* - НЕТ retry_attempts/last_retried_at → используем existing колонки:
|
||||
* - retry_count (INT) — счётчик оставшихся попыток (decrement при каждом retry).
|
||||
* - retried_at (TIMESTAMPTZ) — last retry timestamp (cooldown 1h).
|
||||
* - resolved_at (TIMESTAMPTZ) — терминальное состояние (исключает retry).
|
||||
* - failed_at (TIMESTAMPTZ) — window 24h (старше — skip).
|
||||
*
|
||||
* Semantics retry_count: при создании row в failed_webhook_jobs RouteSupplierLeadJob
|
||||
* сетит retry_count = $tries = 3 (max попыток queue-уровня). Command интерпретирует
|
||||
* это значение как "оставшиеся manual retries"; при каждом retry decrement; при
|
||||
* достижении 0 — set resolved_at=NOW() со статусом "exhausted" (через JSON marker
|
||||
* в raw_payload).
|
||||
*/
|
||||
uses(DatabaseTransactions::class);
|
||||
uses(SharesSupplierPdo::class);
|
||||
|
||||
beforeEach(function (): void {
|
||||
Bus::fake();
|
||||
|
||||
// Очищаем persistent garbage из failed_webhook_jobs (рудимент старых
|
||||
// test-сессий, когда pgsql_supplier не был частью DatabaseTransactions).
|
||||
// Этот DELETE сам выполняется в текущей pgsql-транзакции через shared PDO
|
||||
// (SharesSupplierPdo trait) и откатится по завершении теста — реальные
|
||||
// production-данные не страдают.
|
||||
DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->delete();
|
||||
});
|
||||
|
||||
/**
|
||||
* Helper: вставка supplier-marked failed_webhook_jobs row.
|
||||
*
|
||||
* @param array<string, mixed> $overrides
|
||||
*/
|
||||
function insertFailedSupplierRow(array $overrides = []): int
|
||||
{
|
||||
$supplierLead = SupplierLead::factory()->create([
|
||||
'processed_at' => null,
|
||||
]);
|
||||
|
||||
$payload = [
|
||||
'supplier_lead_id' => $supplierLead->id,
|
||||
];
|
||||
|
||||
$defaults = [
|
||||
'tenant_id' => null,
|
||||
'webhook_log_id' => null,
|
||||
'raw_payload' => json_encode($payload, JSON_UNESCAPED_UNICODE),
|
||||
'exception' => 'Test failure',
|
||||
'retry_count' => 3,
|
||||
'failed_at' => now()->subMinutes(30),
|
||||
'retried_at' => null,
|
||||
'resolved_at' => null,
|
||||
];
|
||||
|
||||
$row = array_merge($defaults, $overrides);
|
||||
|
||||
return (int) DB::connection('pgsql_supplier')
|
||||
->table('failed_webhook_jobs')
|
||||
->insertGetId($row);
|
||||
}
|
||||
|
||||
test('dispatches RouteSupplierLeadJob for each eligible supplier-flow row', function (): void {
|
||||
$id1 = insertFailedSupplierRow();
|
||||
$id2 = insertFailedSupplierRow();
|
||||
|
||||
$this->artisan('supplier:retry-failed')->assertExitCode(0);
|
||||
// suppress unused warning — IDs needed for fresh() verification below.
|
||||
expect($id1)->toBeGreaterThan(0)->and($id2)->toBeGreaterThan(0);
|
||||
|
||||
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 2);
|
||||
|
||||
// Both rows должны иметь обновлённый retried_at и decremented retry_count.
|
||||
foreach ([$id1, $id2] as $id) {
|
||||
$row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id);
|
||||
expect($row->retried_at)->not->toBeNull();
|
||||
expect((int) $row->retry_count)->toBe(2);
|
||||
}
|
||||
});
|
||||
|
||||
test('skips rows recently retried within cooldown (<1h)', function (): void {
|
||||
$recentlyRetried = insertFailedSupplierRow([
|
||||
'retried_at' => now()->subMinutes(30), // < 1h ago
|
||||
]);
|
||||
$eligible = insertFailedSupplierRow([
|
||||
'retried_at' => now()->subHours(2), // > 1h ago
|
||||
]);
|
||||
|
||||
$this->artisan('supplier:retry-failed')->assertExitCode(0);
|
||||
|
||||
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
|
||||
|
||||
// Recently retried row — не тронут.
|
||||
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($recentlyRetried);
|
||||
expect((int) $skipped->retry_count)->toBe(3); // не decremented
|
||||
|
||||
// Eligible row — retried.
|
||||
$processed = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($eligible);
|
||||
expect((int) $processed->retry_count)->toBe(2);
|
||||
});
|
||||
|
||||
test('decrements retry_count and updates retried_at on dispatched row', function (): void {
|
||||
$id = insertFailedSupplierRow(['retry_count' => 3]);
|
||||
|
||||
$this->artisan('supplier:retry-failed')->assertExitCode(0);
|
||||
|
||||
$row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id);
|
||||
expect((int) $row->retry_count)->toBe(2);
|
||||
expect($row->retried_at)->not->toBeNull();
|
||||
});
|
||||
|
||||
test('marks resolved_at when retry_count reaches 0 (max attempts exhausted)', function (): void {
|
||||
// retry_count=1 → после dispatch'а станет 0 → set resolved_at=NOW().
|
||||
$id = insertFailedSupplierRow(['retry_count' => 1]);
|
||||
|
||||
$this->artisan('supplier:retry-failed')->assertExitCode(0);
|
||||
|
||||
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
|
||||
|
||||
$row = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($id);
|
||||
expect((int) $row->retry_count)->toBe(0);
|
||||
expect($row->resolved_at)->not->toBeNull();
|
||||
});
|
||||
|
||||
test('skips rows older than 24h (window safety cap)', function (): void {
|
||||
$tooOld = insertFailedSupplierRow([
|
||||
'failed_at' => now()->subDays(2),
|
||||
]);
|
||||
$fresh = insertFailedSupplierRow([
|
||||
'failed_at' => now()->subHours(12),
|
||||
]);
|
||||
|
||||
$this->artisan('supplier:retry-failed')->assertExitCode(0);
|
||||
|
||||
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
|
||||
|
||||
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($tooOld);
|
||||
expect($skipped->retried_at)->toBeNull(); // не тронут
|
||||
});
|
||||
|
||||
test('skips rows already resolved', function (): void {
|
||||
$resolved = insertFailedSupplierRow([
|
||||
'resolved_at' => now()->subHours(1),
|
||||
]);
|
||||
$unresolved = insertFailedSupplierRow();
|
||||
|
||||
$this->artisan('supplier:retry-failed')->assertExitCode(0);
|
||||
|
||||
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
|
||||
|
||||
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($resolved);
|
||||
expect((int) $skipped->retry_count)->toBe(3); // не тронут
|
||||
});
|
||||
|
||||
test('skips non-supplier rows (tenant_id IS NOT NULL OR missing supplier_lead_id)', function (): void {
|
||||
// Обычный tenant-bound failed webhook (НЕ supplier-flow).
|
||||
$tenant = Tenant::factory()->create();
|
||||
$tenantBoundId = DB::connection('pgsql_supplier')
|
||||
->table('failed_webhook_jobs')
|
||||
->insertGetId([
|
||||
'tenant_id' => $tenant->id,
|
||||
'webhook_log_id' => null,
|
||||
'raw_payload' => json_encode(['foo' => 'bar'], JSON_UNESCAPED_UNICODE),
|
||||
'exception' => 'Other failure',
|
||||
'retry_count' => 3,
|
||||
'failed_at' => now()->subMinutes(30),
|
||||
'retried_at' => null,
|
||||
'resolved_at' => null,
|
||||
]);
|
||||
|
||||
// Supplier-flow row (tenant_id NULL, supplier_lead_id present).
|
||||
$supplierId = insertFailedSupplierRow();
|
||||
|
||||
$this->artisan('supplier:retry-failed')->assertExitCode(0);
|
||||
|
||||
Bus::assertDispatchedTimes(RouteSupplierLeadJob::class, 1);
|
||||
|
||||
$skipped = DB::connection('pgsql_supplier')->table('failed_webhook_jobs')->find($tenantBoundId);
|
||||
expect($skipped->retried_at)->toBeNull();
|
||||
expect((int) $skipped->retry_count)->toBe(3);
|
||||
});
|
||||
|
||||
test('reports dispatched count via output and Log', function (): void {
|
||||
insertFailedSupplierRow();
|
||||
insertFailedSupplierRow();
|
||||
insertFailedSupplierRow();
|
||||
|
||||
$this->artisan('supplier:retry-failed')
|
||||
->expectsOutputToContain('Re-dispatched 3')
|
||||
->assertExitCode(0);
|
||||
});
|
||||
|
||||
test('handles empty queue gracefully (0 eligible rows)', function (): void {
|
||||
// No rows inserted.
|
||||
$this->artisan('supplier:retry-failed')
|
||||
->expectsOutputToContain('Re-dispatched 0')
|
||||
->assertExitCode(0);
|
||||
|
||||
Bus::assertNotDispatched(RouteSupplierLeadJob::class);
|
||||
});
|
||||
Reference in New Issue
Block a user