MqttCommandPush.java 2.29 KB
package vc.thinker.absctacts.mqtt.publish;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import vc.thinker.config.DataReportQueueUtil;
import vc.thinker.config.entity.DataReportArrayQueue;

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * mqtt 指令发送
 *
 * @author HeTongHao
 * @since 2019-10-29 16:00
 */
@Slf4j
public class MqttCommandPush {
    public MqttCommandPush(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }

    private final MqttClient mqttClient;

    public MqttClient getMqttClient() {
        return mqttClient;
    }

    public void sendMqttMessage(String topic, byte[] payload) throws MqttException {
        MqttMessage message = new MqttMessage();
        message.setPayload(payload);
        message.setQos(1);
        message.setRetained(false);
        try {
            if (mqttClient == null) {
                log.error("命令推送失败 mqttClient is null");
//            } else if (mqttClient.isConnected()) {
//                mqttClient.publish(topic, message);
            } else {
//                mqttClient.connect();
                mqttClient.publish(topic, message);
            }
        } catch (MqttException e) {
            if (Objects.equals((int)MqttException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED,e.getReasonCode())
                    || Objects.equals((int)MqttException.REASON_CODE_CLIENT_NOT_CONNECTED,e.getReasonCode())
                    || Objects.equals((int)MqttException.REASON_CODE_CONNECTION_LOST,e.getReasonCode())
            ){
                ArrayBlockingQueue<DataReportArrayQueue> queue = DataReportQueueUtil.getArrayBlockingQueue();
                DataReportArrayQueue obj = new DataReportArrayQueue();
                obj.setPayload(payload);
                obj.setTopic(topic);
                queue.offer(obj);
                log.error("broker 失去连接 数据添加到缓存队列中");
            }
            throw e;
        }
    }
}