RabbitMQ is a message bus that allows you to send and receive messages via the Pub/Sub pattern . It is a great way to decouple different responsibilities of your system. In this post I will show you how to write a simple consumer ( subscriber ) in PHP.

prerequisites:

  • PHP
  • composer
  • docker
  • docker-compose

Setup the required composer packages

First a composer file:

{
  "name": "arnovr/consumer",
  "type": "project",
  "license": "MIT",
  "autoload": {
    "psr-4": {
      "Arnovr\\": "src/"
    }
  },
  "autoload-dev": {
    "psr-4": {
      "Tests\\Arnovr\\": "tests"
    }
  },
  "require": {
    "php-amqplib/php-amqplib": "^2.6",
    "symfony/console": "^3.1",
    "symfony/yaml": "^3.1"
  }
}

I will be using symfony/console to create an easy to use console command, symfony/yaml to read out some config, and php-ampqlib for the consumer.

Install the required packages:

composer update

Setup config for RabbitMQ

The next step would be some config, i will be using the symfony structure for this, i will have a app/config/parameters.yml file with the content:

---
parameters:
  rabbitmq_host:      rabbitmq.dev
  rabbitmq_user:      guest
  rabbitmq_password:  guest
  rabbitmq_vhost:     /

Now your project has composer.json, composer.lock, app directory and a vendor directory.

Create a console entry point

Create the following file at bin/console

#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPLazyConnection;
use Symfony\Component\Console\Application;
use Symfony\Component\Yaml\Yaml;

$application = new Application();
// Read out the configuration file
$configuration = Yaml::parse(file_get_contents( __DIR__ . '/../app/config/parameters.yml'));

// Setup a RabbitMQ connection with the parameters defined in app/config/parameters.yml
$rabbitMQConnection = new AMQPLazyConnection(
    $configuration['parameters']['rabbitmq_host'],
    5672,
    $configuration['parameters']['rabbitmq_user'],
    $configuration['parameters']['rabbitmq_password'],
    $configuration['parameters']['rabbitmq_vhost']
);

// Run the application
$application->run();

Don’t forget to chmod +x bin/console

When you run it, it should give default symfony console output:

Console Tool

Usage:
  command [options] [arguments]

Options:
  -h, --help            Display this help message
  -q, --quiet           Do not output any message
  -V, --version         Display this application version
      --ansi            Force ANSI output
      --no-ansi         Disable ANSI output
  -n, --no-interaction  Do not ask any interactive question
  -v|vv|vvv, --verbose  Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug

Available commands:
  help  Displays help for a command
  list  Lists commands

Create a consumer command

The consumer will do only 1 thing, that is echo anything that it comes from RabbitMQ

Created an Echo Service that is callable and recieves an AMQPMessage, AMQPMessage will hold all the information from the RabbitMQ Message.

Create an Echo Service

<?php
declare(strict_types = 1);

namespace Arnovr\Service;

use PhpAmqpLib\Message\AMQPMessage;

class EchoService
{
    /**
     * @param \PhpAmqpLib\Message\AMQPMessage $message
     */
    public function __invoke(AMQPMessage $message): void
    {
        var_dump($message->getBody());

        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
}

And create a command that chains console and the service together

Create the Command

<?php
declare(strict_types = 1);

namespace Arnovr\Command;

use Arnovr\Service\EchoService;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class HelloWorldConsumerCommand extends Command
{
    /**
     * @var AMQPLazyConnection
     */
    private $connection;

    /**
     * @var EchoService
     */
    private $service;

    /**
     * @param EchoService        $service
     * @param AMQPLazyConnection $connection
     */
    public function __construct(EchoService $service, AMQPLazyConnection $connection)
    {
        $this->connection = $connection;
        $this->service = $service;

        parent::__construct();
    }

    /**
     * @return void
     */
    protected function configure()
    {
        $this->setName('arnovr:consumer-hello-world')
            ->setDescription('Start consumer to echo hello world');
    }

    /**
     * @param InputInterface  $input
     * @param OutputInterface $output
     *
     * @return void
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $queue = 'consume';
        $exchange = 'awesomeness';

        $channel = $this->connection->channel();
        $channel->queue_declare($queue, false, true, false, false);
        $channel->exchange_declare($exchange, 'topic', false, true, false);
        $channel->queue_bind($queue, $exchange);
        $channel->basic_consume($queue, "consume", false, false, false, false, $this->service);

        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

}

As you can see the method $channel->basic_consume expects the echo service as one of the parameters, and it expects it to be a callable. Hence the __invoke on the EchoService

Add your PHP Consumer Command to bin/console

<?php
$application->add(
    new Arnovr\Command\HelloWorldConsumerCommand(
        new Arnovr\Service\EchoService(),
        $rabbitMQConnection
    )
);

When you run the console command, it should show an extra command:

 arnovr
  arnovr:consumer-hello-world  Start consumer to echo hello world

When you try to run your command ./bin/console arnovr:consumer-hello-world, it will throw an error

   stream_socket_client(): unable to connect to tcp://rabbitmq.dev:5672 (Connection refused)

Logically, we don’t have RabbitMQ yet !

Setup the docker environment

A docker compose file that will setup our consumer and RabbitMQ

version: '2'

services:
  consumer:
    container_name: consumer
    build:
      context: .
    image: arnovr/consumer:latest
    environment:
      SYMFONY_ENV: "prod"
  rabbitmq:
    container_name: rabbitmq
    image: rabbitmq:3.6-management
    networks:
      default:
        aliases:
         - rabbitmq.dev
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      SERVICE_15672_NAME: rabbitmq

And define our service with PHP7.1

FROM php:7.1

RUN apt-get update

RUN docker-php-ext-install bcmath

COPY composer.lock composer.lock
COPY app app
COPY src src
COPY vendor vendor
COPY bin/console bin/console

CMD /bin/console arnovr:consumer-hello-world

Lets start our service!

docker-compose up -d rabbitmq

RabbitMQ needs some time to get up, there for i will up each service separately.

docker-compose up consumer

See if it works

Browse to: http://localhost:15672/#/

You should see 1 consumer and 1 queue:

Consumer and Queue

Click on “queues” and “publish message”

Publish a message

You should see it posted on your consumer:

Echo Service

Conclusion

Developers say that PHP is bad in long lived processes. They are partially correct, it is obvious that the language is not designed to be long lived. However when you look out for memory problems, it is not a problem, and PHP will run for months without problems.