ITPub博客

首页 > 应用开发 > IT综合 > 【MQTT系列】Eclipse Mosquitto实战

【MQTT系列】Eclipse Mosquitto实战

原创 IT综合 作者:工匠小猪猪的技术世界 时间:2019-01-11 14:05:29 0 删除 编辑

虽然我强烈推荐使用EMQ,但是Mosquitto也是非常有必要了解的。

Mosquitto的英文意思是 蚊子 。

由于Mosquitto的安装比较简单,本文不会具体描述,本文测试介绍Mosquitto的概念、客户端以及如何使用Java代码跑通Mosquitto的功能。
在文章的最后,也会说明Mosquitto存在的一些问题以及我不选择它的一些理由。

Mosquitto介绍

Eclipse Mosquitto是一个开源(EPL / EDL许可)消息代理,它实现了MQTT协议版本3.1和3.1.1。Mosquitto重量轻,适用于从低功耗单板计算机到完整服务器的所有设备。

MQTT协议提供了一种使用发布/订阅模型执行消息传递的轻量级方法。这使其适用于物联网消息传递,例如低功率传感器或移动设备,如电话,嵌入式计算机或微控制器。

Mosquitto项目还提供了用于实现MQTT客户端的C库,以及非常流行的mosquitto_pub和mosquitto_sub命令行MQTT客户端。

Mosquitto是Eclipse Foundation的一部分,是一个iot.eclipse.org项目。

官网: http://mosquitto.org/



正常来说,如果是Java项目我们可以引入如下依赖

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

Mosquitto Java 客户端实现

Mosquitto 消息发送主要分为三个类:

  • PubMsg 客服端发布消息

  • PushCallback 消息回调

  • SubMsg 订阅消息

下面我们一起来看这个示例程序:

PubMsg

package com.example.mqtt.publish;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * 发送消息到MQTT
 */
public class PubMsg {
    private static int qos = 2; //只有一次
    private static String broker = "tcp://XXXXXXXXX:1883";
    private static String userName = "admin";
    private static String passWord = "admin";


    private static MqttClient connect(String clientId, String userName,
                                      String password) throws MqttException {
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName(userName);
        connOpts.setPassword(password.toCharArray());
        connOpts.setConnectionTimeout(10);// 设置超时时间
        connOpts.setKeepAliveInterval(20); // 设置会话心跳时间
//      String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};
//      connOpts.setServerURIs(uris);  //起到负载均衡和高可用的作用

        // broker,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        MqttClient mqttClient = new MqttClient(broker, clientId, persistence);


/*        // MQTT的连接设置
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        // 设置连接的用户名
        options.setUserName(userName);
        // 设置连接的密码
        options.setPassword(passWord.toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        MqttTopic topic = mqttClient.getTopic("test-topic");
        // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
        options.setWill(topic, "close".getBytes(), 2, true);*/

        mqttClient.setCallback(new PushCallback("test"));
        mqttClient.connect(connOpts);
        return mqttClient;
    }

    private static void pub(MqttClient sampleClient, String msg, String topic)
            throws MqttPersistenceException, MqttException {
        MqttMessage message = new MqttMessage("Hello Charles!".getBytes());
        message.setQos(qos);
        message.setRetained(false);
        sampleClient.publish(topic, message);
    }

    private static void publish(String str, String clientId, String topic) throws MqttException {
        MqttClient mqttClient = connect(clientId, userName, passWord);

        if (mqttClient != null) {
            pub(mqttClient, str, topic);
            System.out.println("pub-->" + str);
        }

        if (mqttClient != null) {
            mqttClient.disconnect();
        }
    }

    public static void main(String[] args) throws MqttException {
        publish("message content", "client-id-0", "test-topic");
    }
}

PushCallback

package com.example.mqtt.publish;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;


/**
 * 发布消息的回调类
 *
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 *
 * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 *
 * public void connectionLost(Throwable cause)在断开连接时调用。
 *
 * public void deliveryComplete(MqttDeliveryToken token)) 接收到已经发布的 QoS 1 或 QoS 2
 * 消息的传递令牌时调用。 由 MqttClient.connect 激活此回调。
 *
 */
class PushCallback implements MqttCallback {
    private String threadId;

    public PushCallback(String threadId) {
        this.threadId = threadId;
    }

    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
       System.out.println("deliveryComplete---------" + token.isComplete());
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        System.out.println("-------messageArrived-------"+threadId + " " + msg);
    }
}

SubMsg

package com.example.mqtt.subscribe;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SubMsg {


    private static int qos = 2;
    private static String broker = "tcp://XXXXXXX:1883";


    private static MqttClient connect(String clientId) throws MqttException{
        MemoryPersistence persistence = new MemoryPersistence();
        MqttConnectOptions connOpts = new MqttConnectOptions();

        connOpts.setCleanSession(false);

        connOpts.setConnectionTimeout(10);
        connOpts.setKeepAliveInterval(20);
        connOpts.setUserName("admin");
        connOpts.setPassword("admin".toCharArray());

        MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
        mqttClient.connect(connOpts);
        return mqttClient;
    }

    public static void sub(MqttClient mqttClient,String topic) throws MqttException{
        int[] Qos  = {qos};
        String[] topics = {topic};
        mqttClient.subscribe(topics, Qos);
    }


    private static void runsub(String clientId, String topic) throws MqttException{
        MqttClient mqttClient = connect(clientId);
        if(mqttClient != null){
            sub(mqttClient,topic);
        }
        mqttClient.subscribe(topic,2, new IMqttMessageListener() {

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                System.out.println("接收消息主题 : " + topic);
                System.out.println("接收消息Qos : " + message.getQos());
                System.out.println("接收消息内容 : " + new String(message.getPayload()));
            }
        });
    }
    public static void main(String[] args) throws MqttException{

        runsub("testSub", "test-topic");
    }
}

运行结果

启动SubMsg订阅消息,再启动PubMsg发送消息

1)PubMsg显示发送成功:

2)SubMsg也会收到信息:

为什么线上环境不考虑使用Mosquitto?

线上环境需要使用集群模式,虽然官方可以使用bridge功能,连接多个mqtt broker,但是还是存在一些问题。原因主要有如下几个:

  • Mosquitto半年多没维护了,版本也不是最新的

  • 如果增加bridge节点,需要修改多个节点的配置,并且重启,维护成本高

  • 集群模式如果主节点down机,所有从服务器会变成孤立的节点

  • down机节点重新启动以后,容易造成负载不均衡

  • 我们有更好的方案EMQ

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/31555484/viewspace-2375531/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2018-09-19

  • 博文量
    19
  • 访问量
    9932