1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package vc.thinker.absctacts.mqtt.publish;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import vc.thinker.config.DataReportQueueUtil;
import vc.thinker.config.entity.DataReportArrayQueue;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
/**
* mqtt 指令发送
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
public class MqttCommandPush {
public MqttCommandPush(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
private final MqttClient mqttClient;
public MqttClient getMqttClient() {
return mqttClient;
}
public void sendMqttMessage(String topic, byte[] payload) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(payload);
message.setQos(1);
message.setRetained(false);
try {
if (mqttClient == null) {
log.error("命令推送失败 mqttClient is null");
// } else if (mqttClient.isConnected()) {
// mqttClient.publish(topic, message);
} else {
// mqttClient.connect();
mqttClient.publish(topic, message);
}
} catch (MqttException e) {
if (Objects.equals((int)MqttException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED,e.getReasonCode())
|| Objects.equals((int)MqttException.REASON_CODE_CLIENT_NOT_CONNECTED,e.getReasonCode())
|| Objects.equals((int)MqttException.REASON_CODE_CONNECTION_LOST,e.getReasonCode())
){
ArrayBlockingQueue<DataReportArrayQueue> queue = DataReportQueueUtil.getArrayBlockingQueue();
DataReportArrayQueue obj = new DataReportArrayQueue();
obj.setPayload(payload);
obj.setTopic(topic);
queue.offer(obj);
log.error("broker 失去连接 数据添加到缓存队列中");
}
throw e;
}
}
}