package vc.thinker.absctacts.mqtt.publish; import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.collect.Maps; import org.springframework.beans.factory.annotation.Autowired; import vc.thinker.absctacts.mqtt.topic.TopicDefinition; import vc.thinker.absctacts.mqtt.utils.StringFormatUtils; import vc.thinker.config.akka.extension.SpringExtension; import java.util.Map; /** * @author HeTongHao * @since 2021/7/9 14:35 */ public class TopicMqttPushService { @Autowired protected ActorSystem actorSystem; @Autowired protected SpringExtension springExtension; /** * topic 与actor映射 */ protected final Map<TopicDefinition, ActorRef> topicDefinitionActorRefMap = Maps.newConcurrentMap(); public ActorRef getActorRef(TopicDefinition topicDefinition) { return topicDefinitionActorRefMap.computeIfAbsent(topicDefinition, k -> actorSystem.actorOf(springExtension.props(StringFormatUtils.classNameLowercaseFirst(k.getActorClass())))); } public void push(TopicDefinition topicDefinition, Object message) { getActorRef(topicDefinition).tell(message, ActorRef.noSender()); } }