Aller au contenu principal

Maîtriser Symfony Messenger : files, retries, idempotence et patterns de résilience

Corentin Boutillier
8 min de lecture
1 vues

La plupart des systèmes modernes doivent absorber des pics de charge, s’intégrer à des services externes capricieux et garantir l’exactitude des traitements. Symfony Messenger est un excellent allié pour y parvenir, à condition d’embrasser ses modèles d’exécution asynchrones, ses échecs… et ses relectures. Dans cet article, nous allons au-delà des bases pour construire des pipelines robustes: choix de transports (AMQP/Redis/Doctrine), retries avec backoff, idempotence, DLQ et patterns de résilience éprouvés.

Comprendre le modèle de livraison de Messenger

Messenger propose une sémantique “at-least-once”: un message peut être livré plusieurs fois, jamais zéro si tout va bien. Cela impose:

  • Des handlers idempotents (supportant les doublons).
  • La séparation des responsabilités (un message fait une chose unitaire).
  • Une gestion explicite des échecs, relectures et mises à l’écart (DLQ).

Acceptez qu’un message puisse être traité 0, 1, 2+ fois; concevez vos handlers pour qu’un doublon n’ait pas d’effet secondaire indésirable.

Configurer les transports: AMQP, Redis et Doctrine

Choisissez le transport selon vos contraintes:

  • AMQP (RabbitMQ): robuste, features avancées (DLX, prefetch), haute performance.
  • Redis: simple à opérer, latence faible, bon pour la rapidité et la simplicité.
  • Doctrine: stockage en base de données, idéal comme “outbox” transactionnelle.

Exemple de configuration YAML

## config/packages/messenger.yaml
framework:
  messenger:
    transports:
      async_amqp:
        dsn: '%env(MESSENGER_TRANSPORT_DSN_AMQP)%' # amqp://guest:guest@rabbitmq:5672/%2f/messages
        options:
          exchange:
            name: app.exchange
            type: direct
          queues:
            app.queue: ~
        retry_strategy:
          max_retries: 5
          delay: 1000           # 1s
          multiplier: 2         # 1s, 2s, 4s, 8s, 16s
          max_delay: 60000      # cap à 60s
      async_redis:
        dsn: '%env(MESSENGER_TRANSPORT_DSN_REDIS)%'  # redis://localhost:6379/messages
      outbox_doctrine:
        dsn: '%env(MESSENGER_TRANSPORT_DSN_DOCTRINE)%' # doctrine://default?auto_setup=1

      failed: 'doctrine://default?queue_name=failed_messages'

    failure_transport: failed

    routing:
      'App\Message\SendInvoice': async_amqp
      'App\Message\GenerateThumbnail': async_redis
      'App\Message\OrderPlaced': outbox_doctrine
  • Utilisez failure_transport pour isoler les messages en échec durable.
  • La routing mappe message → transport pour créer des “bulkheads” (cloisons) fonctionnels.

Retries, backoff, DLQ et relecture

Faire échouer proprement pour déclencher un retry

Le handler doit lever une exception pour que Messenger retente. Utilisez une exception spécifique pour distinguer les erreurs récupérables.

namespace App\MessageHandler;

use App\Message\SendInvoice;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;

#[AsMessageHandler]
final class SendInvoiceHandler
{
    public function __invoke(SendInvoice $message): void
    {
        try {
            // Appel API tiers
        } catch (\Throwable $e) {
            // Erreur temporaire → retry
            throw new RecoverableMessageHandlingException('API indisponible', 0, $e);
        }
    }
}
  • Les erreurs “récupérables” déclenchent la stratégie de retry du transport.
  • Les erreurs “non récupérables” (ex. validation) doivent être capturées et loguées, puis le message peut être considéré consommé (sans retry).

Personnaliser le backoff

Le backoff exponentiel est souvent un bon choix. Vous pouvez ajouter du “jitter” pour éviter les effets de thundering herd.

## config/services.yaml
services:
  App\Messenger\Retry\JitterRetryStrategy:
    arguments:
      $maxRetries: 7
      $baseDelayMs: 500
      $multiplier: 1.8
      $maxDelayMs: 120000

## config/packages/messenger.yaml
framework:
  messenger:
    transports:
      async_amqp:
        retry_strategy:
          service: App\Messenger\Retry\JitterRetryStrategy
namespace App\Messenger\Retry;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;

final class JitterRetryStrategy implements RetryStrategyInterface
{
    public function __construct(
        private int $maxRetries,
        private int $baseDelayMs,
        private float $multiplier,
        private int $maxDelayMs
    ) {}

    public function isRetryable(Envelope $message, ?\Throwable $throwable = null): bool
    {
        return true;
    }

    public function getWaitingTime(Envelope $message, ?\Throwable $throwable = null): int
    {
        $attempt = $message->last(\\Symfony\\Component\\Messenger\\Stamp\\RedeliveryStamp::class)?->getRetryCount() ?? 0;
        $delay = min((int)($this->baseDelayMs * ($this->multiplier ** $attempt)), $this->maxDelayMs);
        $jitter = random_int(0, (int)($delay * 0.2));
        return $delay + $jitter;
    }
}

Dead Letter Queue (DLQ) et relecture

  • Après max_retries, le message atterrit dans failure_transport.
  • Inspectez et relisez quand il est sûr de le faire:
    • bin/console messenger:failed:show
    • bin/console messenger:failed:retry --force
    • bin/console messenger:failed:remove

Pensez à enrichir les logs avec l’ID de message et le contexte (voir section idempotence).

Rendre les handlers idempotents

L’idempotence garantit que traiter deux fois le même message n’a pas d’effet secondaire en double. Deux approches complémentaires:

  1. Idempotence applicative (clé d’idempotence + storage).
  2. Outbox/transaction pour publier des messages de manière atomique.

1) Clé d’idempotence + Redis

  • Donnez un identifiant unique au message (UUID).
  • Avant d’exécuter, vérifiez et “réservez” cette clé via un SET NX (write-once).
  • Si la clé existe, on retourne immédiatement (déjà traité).
namespace App\Message;

use Symfony\Component\Uid\Uuid;

final class SendInvoice
{
    public function __construct(
        public readonly string $orderId,
        public readonly string $messageId = new Uuid()
    ) {}
}
namespace App\Messenger\Middleware;

use Predis\Client;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

final class IdempotencyMiddleware implements MiddlewareInterface
{
    public function __construct(private Client $redis, private int $ttlSeconds = 604800) {}

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $message = $envelope->getMessage();
        if (!property_exists($message, 'messageId')) {
            return $stack->next()->handle($envelope, $stack);
        }

        $key = sprintf('idm:%s', $message->messageId);
        // SETNX + TTL
        $created = $this->redis->set($key, '1', 'NX', 'EX', $this->ttlSeconds);
        if ($created !== true) {
            // déjà traité
            return $envelope;
        }

        return $stack->next()->handle($envelope, $stack);
    }
}
## services.yaml
services:
  App\Messenger\Middleware\IdempotencyMiddleware:
    tags: ['messenger.middleware']
  • Le TTL protège de la croissance infinie.
  • Stockez suffisamment longtemps pour couvrir les relectures et retards maximums.

2) Outbox transactionnelle avec Doctrine

Quand vous modifiez votre modèle et publiez un événement/commande, protégez-vous des “pertes de message” par le pattern outbox. Avec le transport Doctrine, le dispatch s’inscrit dans la même transaction DB:

use Symfony\Component\Messenger\MessageBusInterface;

$em->wrapInTransaction(function () use ($em, $bus, $order) {
    $order->markPaid();
    $em->persist($order);

    // Ce dispatch est atomique avec le commit (Doctrine transport)
    $bus->dispatch(new \App\Message\OrderPlaced($order->getId()));
});
  • Le consumer lira la table messenger_messages et publiera de façon fiable.
  • Combinez avec des handlers idempotents pour la livraison “at-least-once”.

Patterns de résilience à appliquer

Bulkhead (cloisonnement)

  • Séparez les flux par file/transport pour éviter la contagion:
    • ex: SendEmailasync_amqp, GenerateThumbnailasync_redis.
  • Ajustez l’allocation de workers par file pour absorber les pics ciblés.

Circuit Breaker autour des appels externes

  • Coupez court après N erreurs rapides pour laisser le système respirer.
  • Implémentez-le via un middleware ou au sein du handler (stockage Redis):
    • État: closed → open → half-open.
    • Associez-le à des timeouts agressifs côté HTTP client.

Timeouts, limites et backpressure

  • bin/console messenger:consume async_amqp --time-limit=3600 --memory-limit=256M --sleep=100000
  • AMQP: configurez prefetch_count pour contrôler le nombre de messages “en vol”.
  • Appliquez des timeouts au niveau des clients externes (HTTP, DB).

DLQ et poison messages

  • Un message mal formé ou non rejouable doit rester en DLQ.
  • Automatisez des alertes sur le volume/ancienneté des messages en échec.

Observabilité: logs, métriques, traces

  • Ajoutez des logs structurés par messageId, type, durée, status.
  • Exposez des métriques: durée de traitement, taux d’erreur, backlog par file.
  • Trace distribuée: propagez un correlationId dans le message; ajoutez-le aux entêtes AMQP/Redis si nécessaire.

Exemple minimal de middleware de mesure:

namespace App\Messenger\Middleware;

use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;

final class MetricsMiddleware implements MiddlewareInterface
{
    public function __construct(private LoggerInterface $logger) {}

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $start = microtime(true);
        try {
            return $stack->next()->handle($envelope, $stack);
        } finally {
            $ms = (int)((microtime(true) - $start) * 1000);
            $this->logger->info('message_processed', [
                'type' => $envelope->getMessage()::class,
                'duration_ms' => $ms,
            ]);
        }
    }
}

Bonnes pratiques de production

  • Gardez les messages petits et sérialisables; passez des IDs, pas des blobs.
  • Versionnez les schémas (ajoutez des champs optionnels; évitez les breaking changes).
  • Fixez des retries par type de message (certaines erreurs ne sont pas récupérables).
  • Testez vos handlers avec des messages du monde réel et des erreurs simulées.
  • Déployez des workers via Supervisor/systemd/Kubernetes, avec redémarrage automatique.
  • Planifiez la maintenance: vidage progressif (--stop-when-empty), désactivation de file, puis migration.
  • Sécurité: si nécessaire, chiffrez certains champs avant sérialisation.

Checklist de résilience (rapide)

  1. Chaque message a un messageId (UUID).
  2. Handler idempotent (clé d’idempotence, upsert, “check-before-do”).
  3. Retry avec backoff + jitter, exceptions classifiées.
  4. DLQ isolée + procédure de relecture et d’alerte.
  5. Cloisonnement par file (bulkhead) + sizing des workers.
  6. Circuit breaker + timeouts sur les IO externes.
  7. Observabilité: logs structurés, métriques, traces.
  8. Outbox transactionnelle quand il y a des effets DB + publication.

Conclusion

Bien configuré, Messenger est un formidable socle pour des traitements asynchrones résilients: files dédiées, retries intelligents, idempotence stricte et DLQ maîtrisées. L’essentiel est d’accepter la duplication potentielle, de s’y préparer et d’observer votre pipeline en continu.

Vous souhaitez auditer votre configuration Messenger, dimensionner vos files ou mettre en place une outbox transactionnelle? Contactez-nous pour un diagnostic et un plan d’amélioration sur mesure: découvrez nos offres et ressources dédiées à Messenger.

Partager cet article

Logo Vulcain Développement - Développeur Symfony expert vulcain.agency

Développeur Full-Stack freelance expert
Créateur d'applications web sur mesure

📧 vulcain.developpement@gmail.com
📍 Saint-Lô, France

🏗️ Développement Symfony

🔗 API Platform

🏢 Solutions Métier

Liens Rapides