AbstractCommonEntityHandleMqttProtocol.java 2.27 KB
Newer Older
xieshaojun's avatar
xieshaojun committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
package vc.thinker.absctacts.mqtt.protocol;


import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
import vc.thinker.absctacts.mqtt.entity.MqttResponse;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;

import java.util.Map;

/**
 * 抽象的Mqtt协议
 *
 * @author HeTongHao
 * @since 2019-10-29 16:00
 */
@Slf4j
public abstract class AbstractCommonEntityHandleMqttProtocol extends AbstractMqttProtocol {
    public AbstractCommonEntityHandleMqttProtocol(MqttConnectionProperties mqttConnectionProperties) {
        super(mqttConnectionProperties);
    }

    /**
     * 当前 Topic 与Actor实例的映射
     */
    protected final Map<String, ActorRef> actorRefMap = Maps.newHashMap();

    /**
     * 根据topic 解析一个topic定义枚举
     *
     * @param topic 消息接收的topic
     * @return topic定义枚举
     */
    protected abstract TopicDefinition resolveByTopic(String topic);

    /**
     * 处理消息
     *
     * @param topic   监听到消息的Topic
     * @param message 监听到的消息
     */
    @Override
    public void handleMessage(String topic, byte[] message) {
        ActorRef ref;
        try {
            ref = actorRefMap.get(topic);
            if (ref == null) {
                TopicDefinition topicDefinition = resolveByTopic(topic);
                assert topicDefinition != null : "未定义topic";
                ref = actorSystem.actorOf(springExtensionProps(topicDefinition));
                actorRefMap.put(topic, ref);
            }
            ref.tell(response(topic, message), ActorRef.noSender());
        } catch (Exception e) {
            log.error("消息处理失败,{}", e.getMessage());
        }
    }

    /**
     * Mqtt响应体
     *
     * @param topic   topic
     * @param message message
     */
    protected Object response(String topic, byte[] message) {
        return new MqttResponse(topic, new String(message));
    }

    private Props springExtensionProps(TopicDefinition topicDefinition) {
        return springExtension.props(StringFormatUtils.classNameLowercaseFirst(topicDefinition.getActorClass()));
    }
}