[ Поиск ] - [ Пользователи ] - [ Календарь ]
Полная Версия: как объединить Ratchet с MQTT ?
arbuzmaster
Что-то совсем запутался, подскажите пожалуйста как объединить Ratchet с MQTT.
В общем есть Websocket server Rathet и есть отдельно MQTT клиент, не могу сообразить как их объединить, чтобы при получении сообщения от MQTT сервера, можно было отправить сообщение через Websocket server клиентам?
Сейчас они работают раздельно!

Websocket

<?php

/*
* Click
nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click
nbfs://nbhost/SystemFileSystem/Templates/Scripting/PHPClass.php to edit this template
*/


/**
* Description of ChatController
*
https://packagist.org/packages/cboden/ratchet
* @author arbuzmaster
*
*/


namespace console\controllers;

use yii\console\Controller;
//use Ratchet\App;
use console\models\MyChat;
//use Ratchet\Server\EchoServer;
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;


class ChatController extends Controller {

public function actionStart() {
//$app = new App('localhost', 8080);
//$app->route('/chatonline', new MyChat(), ['*']);
//$app->run();


$server = IoServer::factory(
new HttpServer(
$wsServer= new WsServer(
new MyChat()
)
),

8080,
'0.0.0.0'
);
$wsServer->enableKeepAlive($server->loop, 5);
$server->run();
}
}




<?php

namespace console\models;

use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;
use yii\helpers\Console;
use \yii\db\Query;
use Yii;

class MyChat implements MessageComponentInterface {

protected $clients;

public function __construct() {
$this->clients = new \SplObjectStorage();
}

public function onOpen(ConnectionInterface $conn) {
Console::stdout('Connection Open\n' . $conn->resourceId);

$moduleInfo = (new Query())->select(['module_id', 'type'])->from('modules')->where(['ip' => $conn->remoteAddress])->one();

foreach ($this->clients as $client) {
if (!empty($moduleInfo)) {
$client->send('{"INFO":[{"MODULE_ID":"' . $moduleInfo['module_id'] . '","IP":"' . $conn->remoteAddress . '","TYPE":"' . $moduleInfo['type'] . '","STATUS":"CONNECTED"}]}');

Yii::$app->db->createCommand('UPDATE modules SET status=1 WHERE ip="'.$conn->remoteAddress.'"')->execute();
}
}




$this->clients->attach($conn);
}

public function onMessage(ConnectionInterface $from, $msg) {
//Но не лучше ли было выставить ini_set('mysql.connect_timeout','0');
try {
Yii::$app->db->createCommand("DO 1")->execute();
} catch (\Throwable $e) {
Console::stdout('Reset DB connections');
Yii::$app->db->close();
Yii::$app->db->open();
}
//$numRecv = count($this->clients) - 1;
//echo sprintf('Connection %d sending message "%s" to %d other connection %s' . "\n"
// , $from->resourceId, $msg, $numRecv, $numRecv == 1 ? '' : 's');
// $conn->send(new Frame($frame->getPayload(), true, Frame::OP_PING));

foreach ($this->clients as $client) {
if ($from !== $client) {
// The sender is not the receiver, send to each client connected
//print_r($client->remoteAddress);

$client->send($msg);
}
}
}


public function onClose(ConnectionInterface $conn) {
//print_r("Отключкен".$conn->remoteAddress);
$moduleInfo = (new Query())->select(['module_id', 'type'])->from('modules')->where(['ip' => $conn->remoteAddress])->one();

foreach ($this->clients as $client) {

if (!empty($moduleInfo)) {
$client->send('{"INFO":[{"MODULE_ID":"' . $moduleInfo['module_id'] . '","IP":"' . $conn->remoteAddress . '","TYPE":"' . $moduleInfo['type'] . '","STATUS":"DISCONNECTED"}]}');
Yii::$app->db->createCommand('UPDATE modules SET status=0 WHERE ip="'.$conn->remoteAddress.'"')->execute();

}
}



//Console::stdout('Connection Close\n');

$client = $this->findConnection($conn);

if ($client !== null) {
$client_id = $client->id;
Console::stdout('Connection Detached\n');
$this->clients->detach($client);
}
}


public function onError(ConnectionInterface $conn, \Exception $e) {
Console::stdout('Connection Error ' . $e);
$conn->close();
}

public function findConnection(ConnectionInterface $conn) {
// Ищем пользователя, который написал
foreach ($this->clients as $client) {
if ($client->connection === $conn) {

}
}


return $client;
}
}


return null;


MQTT клиент

<?php

/*
* Click
nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click
nbfs://nbhost/SystemFileSystem/Templates/Scripting/PHPClass.php to edit this template
*/


namespace console\controllers;
use yii\console\Controller;
use \PhpMqtt\Client\MqttClient;
use \PhpMqtt\Client\ConnectionSettings;
/**
* Description of MqttclientController
*
*
@author arbuzmaster
*/

class MqttclientController extends Controller {
//put your code here
public function actionStart() {

// Параметры подключения к брокеру
$server = 'test.ru'; // Адрес брокера
$port = 12345; // Порт TCP
$clientId = rand(5, 15); // Генерация случайного ID клиента
$username = 'name'; // Имя пользователя
$password = 'pass'; // Пароль
$clean_session = false; // Очистка сессии
$mqtt_version = MqttClient::MQTT_3_1_1; // Версия протокола MQTT

// Настройки подключения

$connectionSettings = (new ConnectionSettings)
->
setUsername($username)
->
setPassword($password)
->
setKeepAliveInterval(60) // Интервал активности (в секундах)
->setLastWillTopic('emqx/test/last-will') // Тема для последнего волеизъявления
->setLastWillMessage('client disconnect') // Сообщение для последнего волеизъявления
->setLastWillQualityOfService(1); // Уровень качества обслуживания для последнего волеизъявления

// Создание и подключение клиента MQTT

$mqtt = new MqttClient($server, $port, $clientId, $mqtt_version);
$mqtt->connect($connectionSettings, $clean_session);

printf("Клиент успешно подключился\n");

// Подписка на тему
$mqtt->subscribe('right_lamp', function ($topic, $message) {
printf("Получено сообщение на тему [%s]: %s\n", $topic, $message);
}, 0);

// Публикация сообщений
/*for ($i = 0; $i < 10; $i++) {
$payload = [
'protocol' => 'tcp',
'date' => date('Y-m-d H:i:s'),
'url' => '
https://github.com/emqx/MQTT-Client-Examples'
];

$mqtt->publish('emqx/test', json_encode($payload), 0, true); // QoS 0, флаг retain true
printf("Сообщение %i отправлено\n", $i);
sleep(1); // Пауза для контроля скорости отправки
}*/

// Запуск цикла для обработки входящих сообщений и очередей повторной отправки

$mqtt->loop(true);


}


_____________
Мой первый сайтик

Посмотри на свой XBMC под другим углом
Быстрый ответ:

 Графические смайлики |  Показывать подпись
Здесь расположена полная версия этой страницы.
Invision Power Board © 2001-2025 Invision Power Services, Inc.