Conectar microservicios con colas de mensajes usando Spring y RabbitMQ

Es posible que desarrollando microservicios te hayas encontrado algún problema con los sistemas de colas de mensajes. En este post intentaremos arrojar un poco de luz sobre este tema aplicando un caso real con RabbitMQ y Spring.

Tenemos un problema

Nuestro punto de partida era el desarrollo de una solución que se integrara con un eCommerce de forma que los pedidos recibidos fueran asignados a tiendas en un ERP de la empresa usando la información de una tercera plataforma que centraliza el estado del stock.

Para implementarla, un aspecto importante era que los servicios debían estar desacoplados y, sobre todo, ser tolerante a fallos de forma que la caída de uno de ellos no afectara al flujo de pedidos y que al recuperarse el servicio se restableciera la comunicación y se procesaran las peticiones pendientes entre ellos.

Por supuesto, lo que nos viene a la mente es el uso de colas de mensajes.

El concepto de una cola de mensajes

Una cola es una estructura de datos que imita colas reales como las que podemos ver en el buzón de correo o delante de la pescadería de nuestro barrio.

Una cola de mensajes es una forma de comunicación asíncrona que se usa en arquitecturas de microservicios. Los mensajes se almacenan en la cola hasta que se reciben y eliminan siendo cada mensaje procesado una sola vez.

Esta estructura intermedia permite ajustar la velocidad y disponibilidad de diferentes microservicios de forma que puedan trabajar conjuntamente y los errores sean recuperables, lo que aumenta el nivel de tolerancia a fallos que es lo que necesitamos en este momento.

Patrón productor-consumidor

Los mensajes son publicados por una aplicación productora en un cola y la aplicación consumidora se suscribe a la cola para recibir estos mensajes.

Un potencia añadida a este patrón es que podemos tener un número variable de productores y consumidores de forma que en entornos de microservicios flexibles en la nube podemos poner en funcionamiento varias instancias de productores y consumidores según las necesidades del momento.

Estándares y protocolos

Como vemos, existen ya patrones para esta funcionalidad que buscamos, no hemos inventado nada; hay mucha gente y muy inteligente que ha estado pensando sobre esto y han llegado a soluciones en forma de estándares y protocolos con los que podemos resolver nuestro problema.

  • AMQP (Advanced Message Queuing Protocol). Protocolo de cola de mensajes avanzado que destaca por su fiabilidad. Hay servidores comerciales y OpenSource y clientes interoperables para muchos lenguajes lo que facilita su uso. Se utiliza en grandes corporaciones que procesan millones de mensajes.
  • MQTT (Message Queue Telemetry Transport). Es un protocolo ligero de cola de mensajes pensado especialmente para dispositivos y redes con pocos recursos, por lo que es ideal para aplicaciones tipo IoT.
  • STOMP (Streaming Text Oriented Messaging Protocol). Protocolo simple de mensajes orientado a texto (similar a HTTP) diseñado para ser ligero.

En un entorno de eCommerce en el que conectaremos varias plataformas web, no hay restricciones de red o necesidad de conectar dispositivos pequeños con un protocolo ligero. Además, pensando en la escalabilidad de la solución nuestra opción elegida fue AMQP, hay múltiples servidores y clientes fiables que implementan este protocolo.

¿Qué es AMQP exactamente?

Para tener una visión clara del protocolo y saber con lo que vamos a trabajar debemos conocer una serie de conceptos.

El mensaje es el elemento central de toda la comunicación y también hay una serie de elementos principales en el flujo:

  • El productor o publicador (publisher) crea un mensaje, lo envía y es publicado en un buzón (exchange).
  • El servidor (bróker de mensajería) distribuye o enruta el mensaje desde el buzón de acuerdo con unas reglas definidas (bindings) en diferentes colas (queue).
  • El consumidor (consumer) recupera el mensaje de la cola a la que está suscrito.

El buzón o intercambiador (exchange) recibe los mensajes y dirige los datos a una cola utilizando una vinculación (binding) que le permite decidir cuál es la correcta. Hay cuatro tipos de intercambiadores:

  • Directo (Direct). Envía mensajes a un receptor concreto y para ello se utilizan claves de enrutamiento (routing key). El mensaje se transmite con una clave y a su vez una cola tiene una clave de vinculación, si las claves coinciden el mensaje se envía a la cola.
  • Tema (Topic). Al igual que en un intercambiador directo, la clave de enrutamiento y la de vinculación se emparejan, sin embargo no debe existir una coincidencia exacta, en su lugar se utilizan patrones y de esta forma se pueden enrutar mensajes a varias colas.
  • Abanico (Fanout). El bróker ignora en este caso la clave de enrutamiento por completo y el intercambiador envía el mensaje a todas las colas vinculadas multiplicando el mensaje, se suele utilizar para un broadcast de mensajes.
  • Cabeceras (Headers). Actúa con el encabezamiento de un mensaje en lugar de utilizar una clave de enrutamiento. Según la coincidencia o reglas sobre una o varias cabeceras del mensaje se dirige a la cola correcta.

Implementaciones de servidores

Hay muchísimas opciones, ¡qué locura! Además muchos de ellos implementan varios protocolos de los que hemos visto.

  • Productos. IBM MQ, Microsoft Message Queue Server (MSMQ)
  • Productos en la nube. Amazon Simple Queue Service (SQS), Amazon MQ (activeMQ), StormMQ, Solace, IBM MQ, Windows Azure Service Bus
  • Código abierto. Apache ActiveMQ, Apache Kafka, Apache Qpid, Apache RocketMQ, Enduro / X, JBoss Messaging, JORAM, RabbitMQ, Sun Open Message Queue, Tarantool

Así que tenemos que ponernos a elegir. ¿Qué hacemos? ¿Sacamos una bola? ¿Nos vendamos los ojos y tiramos un dardo?

En nuestro caso conocíamos ActiveMQ y RabbitMQ de otros proyectos anteriores por lo que es un plus haberlos usado antes y no ir a ciegas. Además, la solución recomendada por los sysadmin del cliente fue Rabbit, principalmente porque permite la persistencia de las colas para no perder los datos ante una caída y además es una opción OpenSource.

¡Pues no le damos más vueltas! El elegido es RabbitMQ (si quieres ver un poco más en detalle como implementa AMQP puedes echarle un vistazo a su web).

Y como en nuestro equipo somos unos viciosos de Spring y usamos este framework para muchas cosas, si miramos todos los proyectos que ofrece encontramos que tienen una solución para un cliente AMQP que utilizaremos dentro de un proyecto con Spring Boot.

¡Mejor imposible! Ahora toca montar las piezas.

Servidor RabbitMQ

Para empezar a desarrollar lo primero que necesitamos es un servidor de Rabbit funcionando. Para facilitar las cosas existe la posibilidad de usar Docker, tener uno levantado en muy poco tiempo es relativamente sencillo.

Descargamos la imagen de docker con

docker pull rabbitmq:3-management

y la ejecutamos en local indicando los puertos por los que opera

docker run -d --hostname localhost --name rabbitmq -p 5672:5672 -p 5673:5673 -p         15672:15672 rabbitmq:3-management

Si todo funciona correctamente verás una web como esta al conectar a http://localhost:15672/, en ese puerto se levanta un servidor http con una consola de control y administración de RabbitMQ (user-pass: guest-guest) muy útil para ver que está pasando en cada momento o purgar una cola si es necesario.

Implementar publisher y consumer

Después de hablar de conceptos, volvemos al problema concreto que nos ocupa. Tenemos un pedido que tenemos que recibir de una plataforma de eCommerce y que pasará por diferentes fases en las que conectaremos con plataformas externas. La idea principal es colocar una cola en medio que nos permita ese desacoplamiento y reencolar en caso de error.

El primer paso es recibir el pedido y, como la plataforma no tiene implementado un plugin o conector que permita publicar directamente en una cola con AMQP, nosotros programamos un servicio HTTP que recibe una petición del eCommerce que ha configurado un webhook. Es decir, se dispara una petición ante el evento del pago del pedido y nosotros directamente, al recibir la petición en el web service, encolamos el pedido en la primera cola.

Tenemos un microservicio que ejecuta ese primer paso, para programar el publisher en el proyecto de Spring Boot primero necesitamos incluir la dependencia a la librería de Spring.


          org.springframework.boot
          spring-boot-starter-amqp
      

Configuramos el  servidor de Rabbit  al que conectaremos, teniendo los datos en un properties:

Podemos tener una clase de configuración similar a esta:

Configuration
public class RabbitConfiguration {

    public static final String QUEUE_NAME = "queue";

    public static final String EXCHANGE_NAME = "exchange";

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private Integer port;

    @Value("${spring.rabbitmq.username}")
    private String user;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtualhost}")
    private String virtualhost;

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(user);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        return connectionFactory;
    }

Indicando la cola y las opciones instanciaremos un RabbitTemplate en la configuración que será el servicio para publicar mensajes. En este caso, la cola es única y todos los mensajes publicados irán directamente a la cola por lo que incluso hemos usado como routing key el propio nombre de la cola. Además, como los pedidos están en formato Json, hemos incluido un converter para usarlo directamente en el cuerpo de los mensajes.

@Bean
    public RabbitTemplate rabbitTemplate() throws Exception {
         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
         rabbitTemplate.setMessageConverter(jsonMessageConverter());
         return rabbitTemplate;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true, false, false);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(QUEUE_NAME);
    }
    
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

Lo siguiente es el servicio que hará de publisher y publicará un pedido en la cola. Aquí hay que comentar que, junto al template de Rabbit, hemos usado dos funcionalidades que también ofrece Spring de una manera bastante sencilla con anotaciones :

  • @Async. Hacemos el servicio asíncrono de forma que se ejecuta en un hilo nuevo y no haya que hacer una espera activa.
  • @Retryable. Hacemos que el servicio, ante fallos, automáticamente haga reintentos según la configuración de una política de los mismos, que se puede hacer mediante properties y definir unos valores por defecto.
@Slf4j
@Component
public class PublishController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // @Async debe de ir siempre antes de @Retryable para que los reintentos sean de
    // ejecución asíncrona y no bloqueen la respuesta al cliente
    @Async
    @Retryable(value = { AmqpException.class }, maxAttemptsExpression = "#{${server.retry.policy.max.attempts:3}}", backoff = @Backoff(delayExpression = "#{${server.retry.policy.delay:36000}}", multiplierExpression = "#{${server.retry.policy.multiplier:2}}", maxDelayExpression = "#{${server.retry.policy.max.delay:252000}}"))
    public void doSendMessageToRabbitMQ(String orderShopifyJson, String shopifyDomain) throws AmqpException {

        String correlationId = generateUniqueCorrelationId();

        log.debug("correlationId : {}", correlationId);
        log.debug("X-Shopify-Shop-Domain : {}", shopifyDomain);
        log.debug("Shopify payload {}", orderShopifyJson);

        rabbitTemplate.convertAndSend(RabbitConfiguration.QUEUE_NAME, orderShopifyJson, m -> {
            m.getMessageProperties().setCorrelationId(correlationId);
            m.getMessageProperties().getHeaders().put(PublishAPI.X_SHOPIFY_SHOP_DOMAIN, shopifyDomain);
            return m;
        });

    }

Por último, solo destacar que podemos enviar todas las cabeceras que consideremos necesarias para procesar el mensaje posteriormente y en nuestro caso necesitamos el dominio de la tienda.

En el web service donde recibimos el pedido del eCommerce hacemos una serie de validaciones de las cabeceras y encolamos llamando al método anterior.

@Slf4j
@Service
public class PublishAPIImpl implements PublishAPI {

    @Autowired
    PublishController publishController;

    @Autowired
    CacheableConnectorService cacheableConnectorService;

    @Autowired
    CryptoService cryptoService;

    @Override
    public void messageToRabbitMQ(@RequestBody String orderShopifyJson,
            @RequestHeader(value = X_SHOPIFY_TOPIC) String shopifyTopic,
            @RequestHeader(value = X_SHOPIFY_HMAC_SHA256) String shopifyHmac,
            @RequestHeader(value = X_SHOPIFY_SHOP_DOMAIN) String shopifyDomain,
            @RequestHeader(value = X_SHOPIFY_API_VERSION) String shopifyApiversion) throws Exception {

        Optional connector = cacheableConnectorService.findByShopifyShopDomain(shopifyDomain);

        if (connector.isPresent()) {
            ConnectorDTO connectorDto = connector.get();

            if (shopifyApiversion.equals(connectorDto.getApiVersion())) {

                String token = cryptoService.encrypt(orderShopifyJson, connectorDto.getShopifyShopApiKey());

                if (shopifyHmac.equals(token)) {
                    log.debug("VALID SHOPIFY WEBHOOK");
                    publishController.doSendMessageToRabbitMQ(orderShopifyJson, shopifyDomain);
                } else {
                    log.error("HMAC not valid from {}", shopifyDomain);
                    throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "HMAC not valid");
                }
            } else {
                log.error("Unexpected Api version {}", shopifyApiversion);
                throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Unexpected Api version or topic");
            }
        } else {
            log.error("Unsupported domain {}", shopifyDomain);
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Unsupported domain");

        }

    }

}

La parte del consumidor en el otro extremo de la comunicación es sencilla: la configuración es como la anterior para el servidor de Rabbit y añadiendo un listener:

@Bean
    public SimpleRabbitListenerContainerFactory customRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }

Mediante anotaciones indicando la cola a la que nos suscribimos y el listener que usaremos, instanciaremos un servicio consumidor que escucha.

@RabbitListener(queues = RabbitConfig.QUEUE_NAME, containerFactory = "customRabbitListenerContainerFactory")
    @RabbitHandler
    public void onMessageFromRabbitMQ(@Payload String payload, Message message, Channel channel)
            throws InterruptedException {
        try {
            String messageId = (String) message.getHeaders().get(AmqpHeaders.MESSAGE_ID);
            String correlationId = (String) message.getHeaders().get(AmqpHeaders.CORRELATION_ID);
            String shopifyDomain = (String) message.getHeaders().get(X_SHOPIFY_SHOP_DOMAIN);

            log.debug("rabbitMQ message id: {}", messageId);
            log.debug("correlation id : {}", correlationId);
            log.debug("X-Shopify-Shop-Domain : {}", shopifyDomain);
            log.debug("Shopify payload {}", payload);

            orderPersistenceService.processShopifyOrderPayLoad(payload, shopifyDomain);

        } catch (Exception e) {
            log.error("Error al guardar o asignar el pedido recibido de Shopify de cola RabbitMQ", e);
        }
    }

Leemos los mensajes de la primera cola y guardamos el pedido. En este caso no queremos reencolar en caso de error, ya que se trata de errores que no pueden recuperarse de esa forma y simplemente quedarán registrados en el log. Otra opción sería persistir los errores si queremos procesarlos de otra manera o mostrarlos en una consola de control.

Para las siguiente colas la implementación ha sido similar a la descrita anteriormente en cuanto a la creación de los publisher y consumer, solo que en caso de error en el procesamiento del pedido sí que optamos por reencolar los pedidos, capturando las excepciones y volviendo a enviar a la cola (en este caso el mensaje sólo incluye el id interno del pedido).

private void assignmentOrderInGroup(Long orderId, StoreType storeType, List totalEans,
            Set stocksEans) {
        log.debug("Procesando pedido de tienda tipo {} con id {}", storeType, orderId);
        try {
            Optional orderOpt = orderService.findOneEntity(orderId);
            if (orderOpt.isPresent()) {

                Order order = orderOpt.get();

                if (OrderState.PENDING.equals(order.getState())) {
                    self.assignmentOrderInGroupByPriority(order, storeType, totalEans, stocksEans);
                }
            }
        } catch (Exception e) {
            log.error("Error procesando asignaciones de pedido con id {}. Reencola trigger", orderId, e);
            publishController.doSendMessageToRabbitMQ(orderId);
        }
    }

Estableciendo este mecanismo de colas los servicios son capaces de procesar miles de pedidos y, ante caídas de otros nodos del sistema, se queda reencolando los pedidos recuperando finalmente las peticiones cuando vuelven a funcionar.

De hecho, al principio en la primera versión del sistema, algunas de las fases del proceso de pedidos estaban en el mismo paso o servicio. Tuvimos una primera experiencia en el Black Friday en la que los pedidos se quedaron atascados en la primera cola, unos 5.000 pedidos que llegaron en dos horas. Tuvieron que pasar seis horas para que se acabaran procesando y enviando al ERP. Pasamos un poco de nervios…

Vista la experiencia hicimos una reestructuración separando e incluyendo el resto de colas. Ya en las pruebas de carga con Jmeter a razón de 5 pedidos/segundo vimos que éramos capaces de procesar 3.000 pedidos en pocos minutos (eso sí, mockeando los servicios externos) lo que suponía una mejora exponencial.

En la siguiente campaña, también con unos 7.000 pedidos, el flujo no se detuvo y los pedidos se quedaron encolados pero ya en el ERP que gestiona los albaranes y los envíos a domicilio. Todo un éxito.

Aún con esos números, una posible mejora sería gestionar los errores en una cola distinta ya que corremos el riesgo de tener pedidos que se reencolen indefinidamente en el tiempo.

Tendríamos unas colas de error separadas en las que podemos controlar las veces que se ha reencolado con una cabecera de número de reintentos, de esa forma no afectaríamos al flujo principal pudiendo incluso procesar estos errores en otro momento.

El proyecto se finalizó con la implementación de la primera opción y no tuvimos ocasión de hacer esta última mejora. Cuidado con los “lo hago así y luego lo cambio” que al final siempre falta tiempo y esas cosas se quedan definitivamente en el código.

Espero que estas líneas hayan servido para que puedas tener una imagen más clara de las colas de mensajes, te haya ayudado con algún bloqueo a la hora de implementarlo y, si nunca las has empleado, te anime a usarlas sin miedo.

Alfredo Monereo,
Arquitecto de Software