package vc.thinker.absctacts.mqtt.protocol; import akka.actor.ActorRef; import akka.actor.Props; import lombok.extern.slf4j.Slf4j; import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties; import vc.thinker.absctacts.mqtt.entity.MqttResponseBytes; import vc.thinker.absctacts.mqtt.topic.TopicDefinition; import vc.thinker.absctacts.mqtt.utils.StringFormatUtils; /** * 抽象的Mqtt协议 * * @author HeTongHao * @since 2019-10-29 16:00 */ @Slf4j public abstract class AbstractCommonBytesHandleMqttProtocol extends AbstractCommonEntityHandleMqttProtocol { public AbstractCommonBytesHandleMqttProtocol(MqttConnectionProperties mqttConnectionProperties) { super(mqttConnectionProperties); } /** * 处理消息 * * @param topic 监听到消息的Topic * @param message 监听到的消息 */ @Override public void handleMessage(String topic, byte[] message) { ActorRef ref; try { ref = actorRefMap.get(topic); if (ref == null) { TopicDefinition topicDefinition = resolveByTopic(topic); assert topicDefinition != null : "未定义topic"; ref = actorSystem.actorOf(springExtensionProps(topicDefinition)); actorRefMap.put(topic, ref); } ref.tell(new MqttResponseBytes(topic, message), ActorRef.noSender()); } catch (Exception e) { log.error("消息处理失败,{}", e.getMessage()); } } private Props springExtensionProps(TopicDefinition topicDefinition) { return springExtension.props(StringFormatUtils.classNameLowercaseFirst(topicDefinition.getActorClass())); } }