La plupart des systèmes modernes doivent absorber des pics de charge, s’intégrer à des services externes capricieux et garantir l’exactitude des traitements. Symfony Messenger est un excellent allié pour y parvenir, à condition d’embrasser ses modèles d’exécution asynchrones, ses échecs… et ses relectures. Dans cet article, nous allons au-delà des bases pour construire des pipelines robustes: choix de transports (AMQP/Redis/Doctrine), retries avec backoff, idempotence, DLQ et patterns de résilience éprouvés.
Comprendre le modèle de livraison de Messenger
Messenger propose une sémantique “at-least-once”: un message peut être livré plusieurs fois, jamais zéro si tout va bien. Cela impose:
- Des handlers idempotents (supportant les doublons).
- La séparation des responsabilités (un message fait une chose unitaire).
- Une gestion explicite des échecs, relectures et mises à l’écart (DLQ).
Acceptez qu’un message puisse être traité 0, 1, 2+ fois; concevez vos handlers pour qu’un doublon n’ait pas d’effet secondaire indésirable.
Configurer les transports: AMQP, Redis et Doctrine
Choisissez le transport selon vos contraintes:
- AMQP (RabbitMQ): robuste, features avancées (DLX, prefetch), haute performance.
- Redis: simple à opérer, latence faible, bon pour la rapidité et la simplicité.
- Doctrine: stockage en base de données, idéal comme “outbox” transactionnelle.
Exemple de configuration YAML
## config/packages/messenger.yaml
framework:
messenger:
transports:
async_amqp:
dsn: '%env(MESSENGER_TRANSPORT_DSN_AMQP)%' # amqp://guest:guest@rabbitmq:5672/%2f/messages
options:
exchange:
name: app.exchange
type: direct
queues:
app.queue: ~
retry_strategy:
max_retries: 5
delay: 1000 # 1s
multiplier: 2 # 1s, 2s, 4s, 8s, 16s
max_delay: 60000 # cap à 60s
async_redis:
dsn: '%env(MESSENGER_TRANSPORT_DSN_REDIS)%' # redis://localhost:6379/messages
outbox_doctrine:
dsn: '%env(MESSENGER_TRANSPORT_DSN_DOCTRINE)%' # doctrine://default?auto_setup=1
failed: 'doctrine://default?queue_name=failed_messages'
failure_transport: failed
routing:
'App\Message\SendInvoice': async_amqp
'App\Message\GenerateThumbnail': async_redis
'App\Message\OrderPlaced': outbox_doctrine
- Utilisez
failure_transport
pour isoler les messages en échec durable. - La
routing
mappe message → transport pour créer des “bulkheads” (cloisons) fonctionnels.
Retries, backoff, DLQ et relecture
Faire échouer proprement pour déclencher un retry
Le handler doit lever une exception pour que Messenger retente. Utilisez une exception spécifique pour distinguer les erreurs récupérables.
namespace App\MessageHandler;
use App\Message\SendInvoice;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
#[AsMessageHandler]
final class SendInvoiceHandler
{
public function __invoke(SendInvoice $message): void
{
try {
// Appel API tiers
} catch (\Throwable $e) {
// Erreur temporaire → retry
throw new RecoverableMessageHandlingException('API indisponible', 0, $e);
}
}
}
- Les erreurs “récupérables” déclenchent la stratégie de retry du transport.
- Les erreurs “non récupérables” (ex. validation) doivent être capturées et loguées, puis le message peut être considéré consommé (sans retry).
Personnaliser le backoff
Le backoff exponentiel est souvent un bon choix. Vous pouvez ajouter du “jitter” pour éviter les effets de thundering herd.
## config/services.yaml
services:
App\Messenger\Retry\JitterRetryStrategy:
arguments:
$maxRetries: 7
$baseDelayMs: 500
$multiplier: 1.8
$maxDelayMs: 120000
## config/packages/messenger.yaml
framework:
messenger:
transports:
async_amqp:
retry_strategy:
service: App\Messenger\Retry\JitterRetryStrategy
namespace App\Messenger\Retry;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
final class JitterRetryStrategy implements RetryStrategyInterface
{
public function __construct(
private int $maxRetries,
private int $baseDelayMs,
private float $multiplier,
private int $maxDelayMs
) {}
public function isRetryable(Envelope $message, ?\Throwable $throwable = null): bool
{
return true;
}
public function getWaitingTime(Envelope $message, ?\Throwable $throwable = null): int
{
$attempt = $message->last(\\Symfony\\Component\\Messenger\\Stamp\\RedeliveryStamp::class)?->getRetryCount() ?? 0;
$delay = min((int)($this->baseDelayMs * ($this->multiplier ** $attempt)), $this->maxDelayMs);
$jitter = random_int(0, (int)($delay * 0.2));
return $delay + $jitter;
}
}
Dead Letter Queue (DLQ) et relecture
- Après
max_retries
, le message atterrit dansfailure_transport
. - Inspectez et relisez quand il est sûr de le faire:
bin/console messenger:failed:show
bin/console messenger:failed:retry --force
bin/console messenger:failed:remove
Pensez à enrichir les logs avec l’ID de message et le contexte (voir section idempotence).
Rendre les handlers idempotents
L’idempotence garantit que traiter deux fois le même message n’a pas d’effet secondaire en double. Deux approches complémentaires:
- Idempotence applicative (clé d’idempotence + storage).
- Outbox/transaction pour publier des messages de manière atomique.
1) Clé d’idempotence + Redis
- Donnez un identifiant unique au message (UUID).
- Avant d’exécuter, vérifiez et “réservez” cette clé via un SET NX (write-once).
- Si la clé existe, on retourne immédiatement (déjà traité).
namespace App\Message;
use Symfony\Component\Uid\Uuid;
final class SendInvoice
{
public function __construct(
public readonly string $orderId,
public readonly string $messageId = new Uuid()
) {}
}
namespace App\Messenger\Middleware;
use Predis\Client;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
final class IdempotencyMiddleware implements MiddlewareInterface
{
public function __construct(private Client $redis, private int $ttlSeconds = 604800) {}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$message = $envelope->getMessage();
if (!property_exists($message, 'messageId')) {
return $stack->next()->handle($envelope, $stack);
}
$key = sprintf('idm:%s', $message->messageId);
// SETNX + TTL
$created = $this->redis->set($key, '1', 'NX', 'EX', $this->ttlSeconds);
if ($created !== true) {
// déjà traité
return $envelope;
}
return $stack->next()->handle($envelope, $stack);
}
}
## services.yaml
services:
App\Messenger\Middleware\IdempotencyMiddleware:
tags: ['messenger.middleware']
- Le TTL protège de la croissance infinie.
- Stockez suffisamment longtemps pour couvrir les relectures et retards maximums.
2) Outbox transactionnelle avec Doctrine
Quand vous modifiez votre modèle et publiez un événement/commande, protégez-vous des “pertes de message” par le pattern outbox. Avec le transport Doctrine, le dispatch s’inscrit dans la même transaction DB:
use Symfony\Component\Messenger\MessageBusInterface;
$em->wrapInTransaction(function () use ($em, $bus, $order) {
$order->markPaid();
$em->persist($order);
// Ce dispatch est atomique avec le commit (Doctrine transport)
$bus->dispatch(new \App\Message\OrderPlaced($order->getId()));
});
- Le consumer lira la table
messenger_messages
et publiera de façon fiable. - Combinez avec des handlers idempotents pour la livraison “at-least-once”.
Patterns de résilience à appliquer
Bulkhead (cloisonnement)
- Séparez les flux par file/transport pour éviter la contagion:
- ex:
SendEmail
→async_amqp
,GenerateThumbnail
→async_redis
.
- ex:
- Ajustez l’allocation de workers par file pour absorber les pics ciblés.
Circuit Breaker autour des appels externes
- Coupez court après N erreurs rapides pour laisser le système respirer.
- Implémentez-le via un middleware ou au sein du handler (stockage Redis):
- État: closed → open → half-open.
- Associez-le à des timeouts agressifs côté HTTP client.
Timeouts, limites et backpressure
bin/console messenger:consume async_amqp --time-limit=3600 --memory-limit=256M --sleep=100000
- AMQP: configurez
prefetch_count
pour contrôler le nombre de messages “en vol”. - Appliquez des timeouts au niveau des clients externes (HTTP, DB).
DLQ et poison messages
- Un message mal formé ou non rejouable doit rester en DLQ.
- Automatisez des alertes sur le volume/ancienneté des messages en échec.
Observabilité: logs, métriques, traces
- Ajoutez des logs structurés par messageId, type, durée, status.
- Exposez des métriques: durée de traitement, taux d’erreur, backlog par file.
- Trace distribuée: propagez un correlationId dans le message; ajoutez-le aux entêtes AMQP/Redis si nécessaire.
Exemple minimal de middleware de mesure:
namespace App\Messenger\Middleware;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
final class MetricsMiddleware implements MiddlewareInterface
{
public function __construct(private LoggerInterface $logger) {}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$start = microtime(true);
try {
return $stack->next()->handle($envelope, $stack);
} finally {
$ms = (int)((microtime(true) - $start) * 1000);
$this->logger->info('message_processed', [
'type' => $envelope->getMessage()::class,
'duration_ms' => $ms,
]);
}
}
}
Bonnes pratiques de production
- Gardez les messages petits et sérialisables; passez des IDs, pas des blobs.
- Versionnez les schémas (ajoutez des champs optionnels; évitez les breaking changes).
- Fixez des retries par type de message (certaines erreurs ne sont pas récupérables).
- Testez vos handlers avec des messages du monde réel et des erreurs simulées.
- Déployez des workers via Supervisor/systemd/Kubernetes, avec redémarrage automatique.
- Planifiez la maintenance: vidage progressif (
--stop-when-empty
), désactivation de file, puis migration. - Sécurité: si nécessaire, chiffrez certains champs avant sérialisation.
Checklist de résilience (rapide)
- Chaque message a un messageId (UUID).
- Handler idempotent (clé d’idempotence, upsert, “check-before-do”).
- Retry avec backoff + jitter, exceptions classifiées.
- DLQ isolée + procédure de relecture et d’alerte.
- Cloisonnement par file (bulkhead) + sizing des workers.
- Circuit breaker + timeouts sur les IO externes.
- Observabilité: logs structurés, métriques, traces.
- Outbox transactionnelle quand il y a des effets DB + publication.
Conclusion
Bien configuré, Messenger est un formidable socle pour des traitements asynchrones résilients: files dédiées, retries intelligents, idempotence stricte et DLQ maîtrisées. L’essentiel est d’accepter la duplication potentielle, de s’y préparer et d’observer votre pipeline en continu.
Vous souhaitez auditer votre configuration Messenger, dimensionner vos files ou mettre en place une outbox transactionnelle? Contactez-nous pour un diagnostic et un plan d’amélioration sur mesure: découvrez nos offres et ressources dédiées à Messenger.