TopicMqttPushService.java 1.16 KB
Newer Older
xieshaojun's avatar
xieshaojun committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
package vc.thinker.absctacts.mqtt.publish;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
import vc.thinker.config.akka.extension.SpringExtension;

import java.util.Map;

/**
 * @author HeTongHao
 * @since 2021/7/9 14:35
 */
public class TopicMqttPushService {
    @Autowired
    protected ActorSystem actorSystem;
    @Autowired
    protected SpringExtension springExtension;

    /**
     * topic 与actor映射
     */
    protected final Map<TopicDefinition, ActorRef> topicDefinitionActorRefMap = Maps.newConcurrentMap();

    public ActorRef getActorRef(TopicDefinition topicDefinition) {
        return topicDefinitionActorRefMap.computeIfAbsent(topicDefinition, k ->
                actorSystem.actorOf(springExtension.props(StringFormatUtils.classNameLowercaseFirst(k.getActorClass()))));
    }

    public void push(TopicDefinition topicDefinition, Object message) {
        getActorRef(topicDefinition).tell(message, ActorRef.noSender());
    }
}