Programmation PHP/RabbitMQ

Un livre de Wikilivres.
Sauter à la navigation Sauter à la recherche


RabbitMQ est un logiciel de messages en protocole AMQP. Il permet donc à des processus de produire des messages JSON dans des files d'attente pour que d'autres les consomme ensuite[1].

Installation[modifier | modifier le wikicode]

composer require php-amqplib/php-amqplib

Une interface graphique existe pour lire et manipuler les messages manuellement. On la trouve par exemple sur Docker[2].

Connexion[modifier | modifier le wikicode]

Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest".

$connection = new AMQPStreamConnection($host, $port, $login, $password);
...
$connection->close();

Création de queue et routage[modifier | modifier le wikicode]

Pour créer une queue simple prête à recevoir des messages :

        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue1', false, false, false, false);

Exchange[modifier | modifier le wikicode]

icône image Image externe
Schéma des différents types de routage RabbitMQ sur le site : (en) Jyoti Sachdeva, « Getting Started With RabbitMQ: Python », 20/12/2018

Une autre manière de poster des messages est en passant par un exchange. On en distingue plusieurs types[3] :

  • direct : une seule queue recevra le message (patron de conception producteur/consommateur).
  • fanout : toutes les queues liée à l’exchange recevront le message (patron de conception producteur/abonné).
  • topic : les queues de l’exchange inscrites aux sujets concernés recevront le message (selon un pattern dans la "routing key" où "*" représente un seul mot séparé par un point, et "#" au moins un)[4].
  • headers.

Dans cet exemple, on rattache la queue à un exchange "Bus" :

        $this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue2', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue2', 'Bus');
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue3', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue3', 'Bus');

Exemple de topic : on ne publie pas dans la queue mais dans l’exchange qui leur routera ensuite le message.

        $this->rabbitMqConnection->getChannel()->exchange_declare('Topic_bus', 'topic', false, false, false);
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue4', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue4', 'Topic_bus');
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue5', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue5', 'Topic_bus');

QoS[modifier | modifier le wikicode]

Pour demander à RabbitMQ de ne pas surcharger les consommateurs d'une queue en leur répartissant les messages que s'ils ont terminé de traiter le précédent :

        $this->rabbitMqConnection->getChannel()->basic_qos(null, 1, null);

DLX[modifier | modifier le wikicode]

Le mode DLX (Dead Letter Exchanges) permet de transférer un message d'une queue dans un autre après un certain temps[5].

Production[modifier | modifier le wikicode]

        $amqpMessage = new AMQPMessage(json_encode('Hello World!'),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        $this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Wikibooks.Queue1');

Consommation[modifier | modifier le wikicode]

Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().

        $this->rabbitMqConnection->getChannel()->basic_consume(
            'Wikibooks.Queue1',
            gethostname() . '#' . rand(1, 9999),
            false,
            false,
            false,
            false,
            [$this, 'consumeCallback']
        );

        while (count($this->rabbitMqConnection->getChannel()->callbacks)) {
            $this->rabbitMqConnection->getChannel()->wait();
        }

    public function consumeCallback(?AMQPMessage $msg)
    {
        if (empty($msg)) {
            return null;
        }

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

        var_dump(json_decode($msg->getBody()));
    }

En mode "topic", on peut remplacer 'Wikibooks.Queue1' par 'Wikibooks.*' pour récupérer toutes les queues.

Références[modifier | modifier le wikicode]