Блог о программировании

Работа в PHP по протоколу Stomp

Категория: PHP
 16 июля 2017 г. 19:49

В последнее время все более активно продвигается идея построения событийно-ориентированной архитектуры программного обеспечения. Такая архитектура предполагает, что её компоненты - отдельные сервисы, общаются между собой путем отправки асинхронных событий через общую шину ESB. Событие – это сообщение (например в терминологии AMQP), у которого есть название и набор параметров. Обычно событием является запрос какой-то информации, либо это может быть просто уведомление о чем-то заинтересованных сторон. В общем случае, событие отправляется безадресно, то есть без конкретных знаний, какой сервис должен получить данное событие. Конкретные же сервисы слушаю общую шину на предмет появления в них тех или иных событий или запросов с целью реагирования. Прелесть построения событийно-ориентированной архитектуры в том, что сервис-отправить может отправить событие в шину и, не дожидаясь ответа, приступить к выполнению других важных задач. Ответ будет получен позже также через шину.

Схема использования ESB

Схема использования ESB

Шина обычно представляет собой набор очередей, внутри которых и находятся сообщения. В очередь можно как записать сообщение, так и получить его, удалив из очереди. Программное обеспечение, реализующее и управляющее очередями, предоставляющее API для работы с ними, называется брокером сообщений. Наиболее известные брокеры сообщений это ZeroMQ, ActiveMQ, RabbitMQ.

Очереди сообщений

Очереди сообщений

В PHP для взаимодействия с брокерами сообщений обычно используется протокол Stomp. Уже давно существует PECL-расширение для PHP, реализующее программный интерфейс для работы по этому протоколу. Его-то и следует использовать. Инструкция по установке PECL-расширений

Как было описано выше, взаимодействие сервисов в контексте событийно-ориентированной архитектуры приложений сводится к двум задачам:

  • возможность отправлять событие в общую шину;
  • возможность слушать общую шину на предмет интересных и нужных данному компоненту событий.

Для решения этих задач можно воспользоваться довольно простой библиотекой stomp-utils. Установить можно с помощью команды composer:

composer require stomp-utils

Теперь можно его использовать. Для начала разберемся, как отправлять сообщения в шину - это более простая задача:

publish.php:
<?php

use Tochka\Integration\Stomp\{
    StompClient,
    Publisher
};

//указываем реквизиты для установки подключения к брокеру-сообщений по протоколу Stomp
$stompClient = new StompClient('tcp://localhost-smx:61613', 'admin', 'password');

//создаем объект, который умеет отправлять сообщения
$publisher = new Publisher($stompClient);

// формируем сообщение на отправку:
$requestContent = <<<REQ
<?xml version="1.0" encoding="UTF-8"?>
<request sender="test" timestamp="2016-12-07T07:27:26.503+00:00">
 <data code="1234567" />
</request>
REQ;

// Указываем необходимые заголовки
$headers = [
    'field' => 'test data'
];

// отправляем сообщение в очередь
$publisher->send('q.test.queue', $requestContent, $headers, uniqid());

Запустив скрипт publish.php отправляется сообщение в очередь. Все очень просто.

Более сложной задачей является получение сообщений из шины. Для этого необходимо знать, из какой-очереди нужно получать сообщения.

Для начала необходимо реализовать класс, который будет обрабатывать полученное сообщение. Он должен наследоваться от класса Tochka\Integration\Stomp\BaseWorker и реализовывать метод handle:

TestWorker.php:
<?php

use Tochka\Integration\Stomp\BaseWorker;

class TestWorker extends BaseWorker {

    public function handle(): bool {
        echo 'Processing: ' . print_r($this->mapper) . PHP_EOL;
        return true;
    }
}

Теперь необходимо реализовать класс, который будет слушать сообщения из очереди. Класс должен наследоваться от Tochka\Integration\Stomp\Listener. В классе нужно переопределить метод generateHandler, который должен возвращать объект-обработчик сообщения, который только что был написан:

TestListener.php:
<?php

use Tochka\Integration\Stomp\{
    Listener,
    FrameMapper,
    BaseWorker
};

class TestListener extends Listener {

    protected function generateHandler(FrameMapper $mapper): BaseWorker {
        return new TestWorker($mapper);
    }
}

Теперь можно написать скрипт, который будет слушать очередь:

listen.php:
<?php

require __DIR__ . '/../vendor/autoload.php';
require __DIR__ . '/TestWorker.php';
require __DIR__ . '/TestListener.php';

$stompClient = new Tochka\Integration\Stomp\StompClient('tcp://localhost-smx:61613', 'admin', 'password');

$listener = new TestListener("q.test.queue", $stompClient);
$listener->pull();

Скрипт listen.php работает постоянно подключен к очереди, и если в ней появится новое необработанное сообщение, скрипт обязательно его считает.

Теги:  php  stomp  ActiveMQ  RabbitMQ  php7 

Поделиться статьей

Оставить комментарий