Symfony messenger

Компонент Messenger позволяет управлять асинхронным кодом в Symfony.

В нем заложен ряд архитектурных решений, которые делают этот компонент очень гибким в настройке.

С некоторыми из них мы скоро познакомимся.

Пример, конфигурации с транспортом amqp может выглядеть так:

# messenger.yaml parameters: messenger: # Это кастомный блок. На его основе генерируется конфигурация supervisor. workers: - async: numprocs: 5 # Количество запущенных процессов # queues: [test, mail] # Позволяет указать очереди с которыми будет работать consumer - sync: framework: messenger: buses: messenger.bus.default: middleware: - 'SVEAK\MessengerBundle\Middleware\ChainMiddleware' - 'SVEAK\MessengerBundle\Middleware\ParallelMiddleware' - 'SVEAK\MessengerBundle\Middleware\AutoRoutingMiddleware' - 'SVEAK\MessengerBundle\Middleware\TtlMiddleware' - 'SVEAK\MessengerBundle\Middleware\LockMiddleware' - 'SVEAK\MessengerBundle\Middleware\SendTimestampMiddleware' - 'SVEAK\MessengerBundle\Middleware\LockRoutingMiddleware' # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling. # failure_transport: failed transports: # https://symfony.com/doc/current/messenger.html#transport-configuration # async: '%env(MESSENGER_TRANSPORT_DSN)%' # failed: 'doctrine://default?queue_name=failed' # sync: 'sync://' async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' retry_strategy: max_retries: 0 options: exchange: name: async type: direct default_publish_routing_key: lost queues: lost: binding_keys: [ lost ] arguments: x-dead-letter-exchange: async.fallback test: binding_keys: [ test ] arguments: x-dead-letter-exchange: async.fallback async_fallback: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: async.fallback type: direct default_publish_routing_key: lost.fallback queues: lost.fallback: binding_keys: [ lost ] arguments: x-message-ttl: 60000 x-dead-letter-exchange: async test.fallback: binding_keys: [ test ] arguments: x-message-ttl: 60000 x-dead-letter-exchange: async lock: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: lock type: direct sync: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: sync type: direct default_publish_routing_key: sync queues: sync: binding_keys: [ sync ] arguments: x-single-active-consumer: true x-dead-letter-exchange: sync.fallback sync_fallback: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' options: exchange: name: sync.fallback type: direct default_publish_routing_key: sync queues: sync.fallback: binding_keys: [ sync ] arguments: x-message-ttl: 60000 x-dead-letter-exchange: sync routing: 'App\Message\Test': async

Здесь можно выделить ряд настроек.

  • parameters.messenger.workers - на его основе генерируется конфигурация supervisord. Можно настраивать количество worker`ов и какие очереди должны обрабатываться каким воркером.
  • framework.messenger.buses - этой настройкой можно сконфигурировать продюсера сообщений. То есть задать ряд обработчиков сообщения, отключить штатные обработчики, указать способ сериализации и т.д.
  • framework.messenger.transports - в этом блоке конфигурируются транспорты. Например: можно указать хранилище сообщений (redis/ rabbitmq / postgresql), задать режим синхронного выполнения, зафиксировать набор очередей / exchange и их параметры
  • framework.messenger.routing - так можно привязать конкретную DTO к определенном транспорту, чтобы продюсер знал куда отправлять сообщение.

Отладка

bin/console debug:container --show-arguments messenger.bus.default.inner

С помощью команды можно посмотреть как сконфигурирован транспорт. Посмотреть всю цепочку обработки сообщения перед отправкой и обработкой.

Information for Service "debug.traced.messenger.bus.default.inner" ================================================================== ---------------- --------------------------------------------------------------------------- Option Value ---------------- --------------------------------------------------------------------------- Service ID debug.traced.messenger.bus.default.inner Class Symfony\Component\Messenger\MessageBus Tags - Public no Synthetic no Lazy no Shared yes Abstract no Autowired no Autoconfigured no Arguments Iterator (14 element(s)) - Service(messenger.bus.default.middleware.traceable) - Service(messenger.bus.default.middleware.add_bus_name_stamp_middleware) - Service(messenger.middleware.reject_redelivered_message_middleware) - Service(messenger.middleware.dispatch_after_current_bus) - Service(messenger.middleware.failed_message_processing_middleware) - Service(SVEAK\MessengerBundle\Middleware\ChainMiddleware) - Service(SVEAK\MessengerBundle\Middleware\ParallelMiddleware) - Service(SVEAK\MessengerBundle\Middleware\AutoRoutingMiddleware) - Service(SVEAK\MessengerBundle\Middleware\TtlMiddleware) - Service(SVEAK\MessengerBundle\Middleware\LockMiddleware) - Service(SVEAK\MessengerBundle\Middleware\SendTimestampMiddleware) - Service(SVEAK\MessengerBundle\Middleware\LockRoutingMiddleware) - Service(messenger.middleware.send_message) - Service(messenger.bus.default.middleware.handle_message) ---------------- --------------------------------------------------------------------------- ! [NOTE] The "debug.traced.messenger.bus.default.inner" service or alias has been removed or inlined when the container ! was compiled.

Messenger использует паттерн - "Цепочка обязанностей" для обработки сообщений.

Здесь мы видим, что наши кастомные middleware`ы были добавлены в середину цепочки между служебными и core обработчиками.

Служебные отвечают за перенаправление сообщения обработанного с ошибкой в другую fallback очередь, за механизм повтора обработки, за выполнение определенных сообщений после того как обработается текущее и т.д.

Core обработчики отвечают за непосредственно отправку сообщения и конечную обработку

Использование

Чтобы начать использование messenger нужно создать DTO:

// src/Message/CommentMessage.php namespace App\Message; class CommentMessage { public function __construct( private int $id, private array $context = [], ) { } public function getId(): int { return $this->id; } public function getContext(): array { return $this->context; } }

И затем handler:

// src/MessageHandler/CommentMessageHandler.php namespace App\MessageHandler; use App\Message\CommentMessage; use App\Repository\CommentRepository; use App\SpamChecker; use Doctrine\ORM\EntityManagerInterface; use Symfony\Component\Messenger\Attribute\AsMessageHandler; #[AsMessageHandler] class CommentMessageHandler { public function __construct( private EntityManagerInterface $entityManager, private SpamChecker $spamChecker, private CommentRepository $commentRepository, ) { } public function __invoke(CommentMessage $message) { $comment = $this->commentRepository->find($message->getId()); if (!$comment) { return; } if (2 === $this->spamChecker->getSpamScore($comment, $message->getContext())) { $comment->setState('spam'); } else { $comment->setState('published'); } $this->entityManager->flush(); } }

Допустимо создавать множество handler`ов на одну DTO. В этом случае вызовутся все обработчики.

Отправка сообщения

В простом виде для того чтобы отправить сообщение нужно "заинжектить" продюсер в конструкторе и затем отправить через него DTO.

class ConferenceController extends AbstractController { public function __construct( private CommentRepository $commentRepository, private MessageBusInterface $bus, ) { } public function exampleAction( Request $request, ): Response { $comment = $this->commentRepository->find(1); $this->bus->dispatch(new CommentMessage($comment->getId(), [])); return $this->redirectToRoute('conference', ['slug' => $conference->getSlug()]); } }

В этом случае сообщение пройдет всю цепочку обработки через все middleware`ы.

В messenger.middleware.send_message обработчике сообщение будет сериализовано и отправлено в очередь.

В более сложных вариантах использования может потребоваться сделать какую-то обработку сообщения.

Например:

  • Добавить доп данные в сообщение
  • Прервать выполнение сообщения
  • Поменять трансфер или очередь
  • Создать ttl очередь и перенаправить в нее сообщение
  • и т.д.

В этом случае нужно обернуть DTO в спец объект Envelope, который помимо самого сообщения может содержать вспомогательную информацию - объекты имплементирующие интерфейс Symfony\Component\Messenger\Stamp\StampInterface. Например:

// Наш кастомный штамп <?php declare(strict_types=1); namespace SVEAK\MessengerBundle\Stamp; use Symfony\Component\Messenger\Stamp\StampInterface; class ContextStamp implements StampInterface { private ?string $key = null; private ?int $ttl = null; private ?bool $useLock = null; private ?bool $parallel = null; private ?int $timestamp = null; private ?bool $needResolveOrderConflict = null; private array $messages = []; private ?object $afterParallelMessage = null; public function getKey(): ?string { return $this->key; } public function setKey(?string $key): self { $this->key = $key; return $this; } public function getTtl(): ?int { return $this->ttl; } public function setTtl(?int $ttl): self { $this->ttl = $ttl; return $this; } public function getUseLock(): ?bool { return $this->useLock; } public function setUseLock(?bool $useLock): self { $this->useLock = $useLock; return $this; } public function getParallel(): ?bool { return $this->parallel; } public function setParallel(?bool $parallel): self { $this->parallel = $parallel; return $this; } public function getAfterParallelMessage(): ?object { return $this->afterParallelMessage; } public function setAfterParallelMessage(?object $afterParallelMessage): self { $this->afterParallelMessage = $afterParallelMessage; return $this; } public function getTimestamp(): ?int { return $this->timestamp; } public function setTimestamp(?int $timestamp): self { $this->timestamp = $timestamp; return $this; } public function getNeedResolveOrderConflict(): ?bool { return $this->needResolveOrderConflict; } public function setNeedResolveOrderConflict(?bool $needResolveOrderConflict): self { $this->needResolveOrderConflict = $needResolveOrderConflict; return $this; } public function getMessages(): array { return $this->messages; } public function addMessage(object $message): self { $this->messages[] = $message; return $this; } public function popMessage(): ?object { return array_shift($this->messages); } public function getLockKey(): string { $this->assertHasKey(); return $this->key . ':lock'; } public function getTimestampKey(): string { $this->assertHasKey(); return $this->key . ':timestamp'; } public function getOrderConflictKey(): string { $this->assertHasKey(); return $this->key . ':order_conflict'; } public function getParallelKey(): string { $this->assertHasKey(); return $this->key . ':parallel'; } public function assertHasKey(): void { if (!$this->key) { throw new \InvalidArgumentException('Key not found'); } } }

Привязать штамп к сообщению можно так:

use Symfony\Component\Messenger\Envelope; $context = new ContextStamp(); $context ->key('user_1') ->lock(); $envelope = Envelope::wrap($data); $envelope = $envelope->with($context); $this->bus->dispatch($envelope);

Эта информация будет доступна в middleware и ее можно использовать, например так:

// bundles/MessengerBundle/src/Middleware/LockMiddleware.php <?php declare(strict_types=1); namespace SVEAK\MessengerBundle\Middleware; use SVEAK\MessengerBundle\Stamp\ContextStamp; use SVEAK\RedisBundle\Service\RedisClient; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; use Symfony\Component\Messenger\Middleware\StackInterface; use Symfony\Component\Messenger\Stamp\ReceivedStamp; class LockMiddleware implements MiddlewareInterface { public function __construct( private RedisClient $client, ) { } public function handle(Envelope $envelope, StackInterface $stack): Envelope { /** @var ?ContextStamp $stamp */ $stamp = $envelope->last(ContextStamp::class); if (!$stamp || !$stamp->getUseLock()) { return $stack->next()->handle($envelope, $stack); } $receivedStamp = $envelope->last(ReceivedStamp::class); // Если воркер обработал сообщение, то снимаем lock. Даже если код упал. if ($receivedStamp) { try { return $stack->next()->handle($envelope, $stack); } finally { $this->client->unlock($stamp->getLockKey()); } } // Если отправка залочена, то прекращаем обработку if (!$this->client->lock($stamp->getLockKey())) { return $envelope; } return $stack->next()->handle($envelope, $stack); } }

В начале middeware есть проверка на наличие нашего штампа. Если его нет, то обработчик пропускает и запускается следующий элемент цепочки.

Далее мы проверяем наличие ReceivedStamp. Если его нет, значит мы находимся на этапе отправки. Мы пробуем поставить блокировку в редисе и если нам удалось, то идем дальше, иначе останавливаем обработку сообщения. Если же ReceivedStamp есть, то значит мы в консамере. Здесь нам нужно прокинуть сообщение дальше по цепочке и после обработки снимаем блокировку. Даже в случае ошибки. Чтобы не заблочить выполнение других сообщений.

Получаем сообщение

Для того чтобы получить сообщение из очереди и обработать его, нужно выполнить команду

bin/console messenger:consume async

Команда будет работать в режиме демона. То есть работать в бесконечном цикле, засыпая на каждой итерации.

На каждой итерации messenger будет проходиться по всем сконфигурированным транспортам, которые должны обработаться командой.

В каждом транспорте компонент в цикле проходит все сконфигурированные очереди. То есть читает сообщения из первой очереди. Как только все сообщения в первой очереди обработаны, переходим к следующей очереди и т. д. В бесконечном цикле читаем сообщения из очередей по кругу.

Для локальной разработки чтобы при переходе с ветки на ветку код обновлялся в демоне было принято решение добавить опцию --time-limit=10 в команду консамера, чтобы каждые 10 секунд демон перезагружался

Наша реализация

Для более удобного использования было принято решения сделать сервис для работы с messenger.

<?php declare(strict_types=1); namespace SVEAK\MessengerBundle\Service; use SVEAK\MessengerBundle\Stamp\ContextStamp; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; class Bus { private ?ContextStamp $context = null; public function __construct( private MessageBusInterface $bus, ) { } public function send(): void { $context = $this->grabContextStamp(); $data = $context->popMessage(); if (!$data) { throw new \InvalidArgumentException('Message not found!!!'); } $envelope = Envelope::wrap($data); $envelope = $envelope->with($context); $this->bus->dispatch($envelope); $this->context = null; } public function key(string $key): self { $context = $this->grabContextStamp(); $context->setKey($key); return $this; } public function ttl(int $ttl): self { $context = $this->grabContextStamp(); $context->setTtl($ttl); return $this; } public function lock(): self { $context = $this->grabContextStamp(); $context->setUseLock(true); return $this; } public function parallel(): self { $context = $this->grabContextStamp(); $context->setParallel(true); return $this; } public function ifNewer(int $timestamp): self { $context = $this->grabContextStamp(); $context->setTimestamp($timestamp); return $this; } public function resolveOrderConflict(): self { $context = $this->grabContextStamp(); $context->setNeedResolveOrderConflict(true); return $this; } public function addMessage(object $message): self { $context = $this->grabContextStamp(); $context->addMessage($message); return $this; } public function setAfterParallelMessage(object $message): self { $context = $this->grabContextStamp(); $context->setAfterParallelMessage($message); return $this; } private function grabContextStamp(): ContextStamp { return $this->context = ($this->context ?? new ContextStamp()); } }

Он реализован по паттерну builder, где все состояние хранится в объекте контексте, который всегда очищается после вызова метода send

В сервисе заложен набор фич с реализацией определенных кейсов работы с очередями, которые неоднократно применялись на проектах.

Кейсы:

Задача должна выполнить 1 раз. Следующая может начаться только если 1 закончилась.

В данном случае ставится лок в редис. Если лок есть, то новое сообщение игнорируется

$this->superBus ->key('user_1') // Ключ запроса ->lock() // Флаг блокировки. Если стоит, то применится механизм. Ключ будет построен на основе key ->addMessage(new Test('asdf')) ->send();

Нужно игнорировать сообщения дата отправки которых меньше, чем те которые уже получили

В данном случае в редис пишется метка времени последнего отправленного сообщения Следующее сообщение будет сравниваться с установленным значением. Если значение меньше, сообщение игнорится. Если больше, то отправляем и ставим новое значение в редис.

$this->superBus ->key('user_1') ->ifNewer(121) ->addMessage(new Test('asdf')) ->send(); // Отправится, так как первое $this->superBus ->key('user_1') ->ifNewer(120) ->addMessage(new Test('asdf')) ->send(); // Будет проигнорировано $this->superBus ->key('user_1') ->ifNewer(122) ->addMessage(new Test('asdf')) ->send(); // Отправится, так как оно более свежее

Отправка сообщения, которое должно обработаться через 23 секунды

В данном случае добавляется stamp с delay меткой. Messenger сам поймет что сообщение ttl. Создаст новую ttl ветку и отправит туда сообщение.

$this->superBus ->ttl(23) ->addMessage(new Test('asdf')) ->send();

Гарантирование порядка выполнения сообщений

1. 1 очередь и много воркеров. 2 сообщение может обработаться быстрее более быстрым воркером чем первое

2. Сообщения в разных очередях и разные воркеры. Когда есть 2 зависимые задачи. Например, в одну очередь отправляем сообщение о добавлении продукта, а в другую о добавлении настроек продуктов.

В данном случае в редисе ставится лок на отправку сообщений. Если ключ уже залочен, то будет создана лок очередь и сообщения будут отправлены туда. Как только базовое сообщение обработается, воркер заберет остальные сообщения из лок очереди и выполнит их в этом же потоке. Если базовое сообщение не успеет обработаться в течение часа, сообщения будет перенаправлено в sync очередь, где обработаются синхронно 1 воркером

$this->superBus ->key('product_1') ->resolveOrderConflict() ->addMessage(new Product('111')) ->send(); $this->superBus ->key('product_1') ->resolveOrderConflict() ->addMessage(new ProductSettings('111')) ->send();

Цепочка задач. Когда нужно гарантировать порядок выполнения задач и все задачи заранее известны.

В данном случае в rabbit улетят все сообщения разом и обработаются в такой же последовательности

$this->superBus ->addMessage(new Product('111')) ->addMessage(new Product('222')) ->addMessage(new Product('333')) ->addMessage(new Product('444')) ->send();

Запустить задачи параллельно

В данном случае в rabbit улетят все сообщения отдельно. Последовательность обработки не гарантирована

$this->superBus ->parallel() ->addMessage(new Product('111')) ->addMessage(new Product('222')) ->addMessage(new Product('333')) ->addMessage(new Product('444')) ->send();

Выполнить задачу после выполнения ряда параллельных задач

В данном случае в rabbit улетят все параллельные сообщения отдельно. Последовательность обработки не гарантирована. После того как все сообщения выполнятся будет снята блокировка в редисе и выполнится последнее сообщение

$this->superBus ->key('user_2') ->parallel() ->addMessage(new Test('111')) ->addMessage(new Test('222')) ->addMessage(new Test('333')) ->addMessage(new Test('444')) ->setAfterParallelMessage(new Test('555')) ->send();

Сравнение

GO worker

Первоначальный механизм работы с очередями выглядит так. Есть демон на go, который поддерживает постоянное соединение с rabbitmq. Как только демон получает сообщение из очереди он запускает команду symfony и передает в нее полученное сообщение.

При таком подходе мы видим в интерфейсе rabbitmq подключенные воркеры, гарантируем что при каждом запуске команды не будет какого-либо накопленного состояния, которое повлияет на ход обработки. Так как команда после выполнения завершается, то наши демоны не могут начать потреблять больше памяти, что делает их более стабильными.

Так как команда завершается и освобождает всю память, то становится возможным выделить под каждую очередь отдельного демона

Так как у демонов разделена ответственность, то при попадании 1 из них в вечный цикл остальные продолжат работать.

Выполнение сообщения всегда требует запуска ядра symfony, что увеличивает время обработки сообщений.

На графиках в интерфейсе rabbitmq можно наблюдать что от момент отдачи сообщения до его обработки проходит обычно несколько секунд

В dev окружении отправил 1000 сообщений в очередь и go consumer работал 10 минут

Messenger

В случае с messenger наш демон построен на php. По мере выполнения сообщений происходит autoload классов, подгрузка кеша, накопление разного рода состояний. Все это делает наших демонов более прожорливыми. Требует выделения на каждый воркер определенного объема оперативной памяти (в нашем случае 150-200 мегабайт).

Из-за того что инстансы живут долго, то было принято решение поднять только 5 воркеров, которые будут обрабатывать все очереди разом.

В этом случае если у нас появится очередь в которой будет много сообщений, которые долго обрабатываются, может случиться, что все воркеры начнут обрабатывать только эту очередь и в остальных начнут копиться сообщения. В данном случае придется перенастраивать ответственность воркеров, выделяя проблемной очереди отдельного демона и конфигурировать, чтобы остальные демоны не трогали эту очередь.

В случае, если на проекте есть код который попадает в вечный цикл, может произойти так, что все воркеры попадут в него и больше некому будет обрабатывать сообщения.

Благодаря тому что код всегда загружен в память и демону не нужно каждый раз загружать ядро, старт обработки сообщения начинается максимально быстро, что улучшает общий перфоманс бизнес процесса

На графиках в интерфейсе rabbitmq для прогретого демона видно, что сообщения обрабатываются быстрее. Часто можно увидеть только точку (Наличие факта обработки).

В dev окружении отправил 1000 сообщений в очередь и messenger работал 14 секунд.

Разница в 42 раза. Не совсем честное сравнение, так как в dev окружении, например не используется кеш, работает profiler. Так как основное время съело запуск ядра симфони и прогрев кеша, то в долгих командах разницы может вовсе не быть.

В данном примере в воркере происхоило получение документа из монги и отправка http запроса в стороннее апи.

На практике разница сильно заметна и 5 воркеров справляются с обработкой всех сообщений.

Ресурсы: