package vc.thinker.absctacts.mqtt.protocol;


import akka.actor.ActorRef;
import akka.actor.Props;
import lombok.extern.slf4j.Slf4j;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
import vc.thinker.absctacts.mqtt.entity.MqttResponseBytes;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;

/**
 * 抽象的Mqtt协议
 *
 * @author HeTongHao
 * @since 2019-10-29 16:00
 */
@Slf4j
public abstract class AbstractCommonBytesHandleMqttProtocol extends AbstractCommonEntityHandleMqttProtocol {
    public AbstractCommonBytesHandleMqttProtocol(MqttConnectionProperties mqttConnectionProperties) {
        super(mqttConnectionProperties);
    }

    /**
     * 处理消息
     *
     * @param topic   监听到消息的Topic
     * @param message 监听到的消息
     */
    @Override
    public void handleMessage(String topic, byte[] message) {
        ActorRef ref;
        try {
            ref = actorRefMap.get(topic);
            if (ref == null) {
                TopicDefinition topicDefinition = resolveByTopic(topic);
                assert topicDefinition != null : "未定义topic";
                ref = actorSystem.actorOf(springExtensionProps(topicDefinition));
                actorRefMap.put(topic, ref);
            }
            ref.tell(new MqttResponseBytes(topic, message), ActorRef.noSender());
        } catch (Exception e) {
            log.error("消息处理失败,{}", e.getMessage());
        }
    }

    private Props springExtensionProps(TopicDefinition topicDefinition) {
        return springExtension.props(StringFormatUtils.classNameLowercaseFirst(topicDefinition.getActorClass()));
    }
}