package vc.thinker.absctacts.mqtt.publish;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
import vc.thinker.config.akka.extension.SpringExtension;

import java.util.Map;

/**
 * @author HeTongHao
 * @since 2021/7/9 14:35
 */
public class TopicMqttPushService {
    @Autowired
    protected ActorSystem actorSystem;
    @Autowired
    protected SpringExtension springExtension;

    /**
     * topic 与actor映射
     */
    protected final Map<TopicDefinition, ActorRef> topicDefinitionActorRefMap = Maps.newConcurrentMap();

    public ActorRef getActorRef(TopicDefinition topicDefinition) {
        return topicDefinitionActorRefMap.computeIfAbsent(topicDefinition, k ->
                actorSystem.actorOf(springExtension.props(StringFormatUtils.classNameLowercaseFirst(k.getActorClass()))));
    }

    public void push(TopicDefinition topicDefinition, Object message) {
        getActorRef(topicDefinition).tell(message, ActorRef.noSender());
    }
}