Commit 728e4d5f authored by xieshaojun's avatar xieshaojun

init

parent 7f857659
Pipeline #40430 failed with stages
in 12 minutes and 18 seconds
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
image: registry.cn-shenzhen.aliyuncs.com/thinker-open/maven:3.6-jdk14
variables:
MAVEN_OPTS: "-Dmaven.repo.local=.m2/repository"
MAVEN_CLI_OPTS: "compile jib:build "
KUBECONFIG: "/etc/deploy/config"
# Cache downloaded dependencies and plugins between builds.
# To keep cache across branches add 'key: "$CI_JOB_NAME"'
cache:
paths:
- .m2/repository
stages:
- build
- deploy
build:jdk13-master:
stage: build
services:
- docker:18.09-dind
variables:
DOCKER_HOST: tcp://localhost:2375
DOCKER_DRIVER: overlay2
script:
- mvn $MAVEN_CLI_OPTS -Djib.to.auth.username=${docker_username} -Djib.to.auth.password=${docker_password}
only:
- master
docker-compose-down-and-up:
stage: deploy
tags:
- thinker-cloud-app
script:
- echo ">>>>docker-compose down and up"
- cd /data/thinker-devops-cloud/backend
- docker-compose pull thinker-protocol-proxy-service
- docker-compose up -d thinker-protocol-proxy-service
- docker system prune -f
- echo ">>>>done"
variables:
GIT_STRATEGY: clone
GIT_CHECKOUT: "false"
cache: {}
only:
- master
端数据上报代理
\ No newline at end of file
# 协议代理服务
协议代理服务,第三方物联网设备私有协议,在此服务中进行编码适配
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>vc.thinker</groupId>
<artifactId>spring-cloud-cabbage-parent</artifactId>
<version>1.0.14-SNAPSHOT</version>
</parent>
<artifactId>terminal-protocol-proxy-service</artifactId>
<name>${artifactId}</name>
<description>富士康设备服务</description>
<packaging>jar</packaging>
<properties>
<image>registry-vpc.cn-shenzhen.aliyuncs.com/thinker-vc/terminal-protocol-proxy-service</image>
<baseimage>registry-vpc.cn-shenzhen.aliyuncs.com/thinker-open/openjdk14-openj9</baseimage>
<mainClass>vc.thinker.ProtocolProxyApplication</mainClass>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk16</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-rules-core</artifactId>
</dependency>
<dependency>
<groupId>net.sf.dozer</groupId>
<artifactId>dozer</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.craftsman</groupId>
<artifactId>dozer-jdk8-support</artifactId>
</dependency>
<!-- Alibaba Spring Context extension -->
<dependency>
<groupId>com.alibaba.spring</groupId>
<artifactId>spring-context-support</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>1.6.1</version>
<configuration>
<from>
<image>${baseimage}</image>
</from>
<to>
<image>${image}</image>
</to>
<container>
<creationTime>USE_CURRENT_TIMESTAMP</creationTime>
<mainClass>${mainClass}</mainClass>
</container>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>${mainClass}</mainClass>
<includeSystemScope>true</includeSystemScope>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package vc.thinker;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* <p>
* 服务
* </p>
*
* @author HeTongHao
* @since 2019-07-19
*/
@EnableAsync
@SpringBootApplication
@EnableScheduling
public class ProtocolProxyApplication {
public static void main(String[] args) {
SpringApplication.run(ProtocolProxyApplication.class, args);
}
}
package vc.thinker.absctacts.mqtt.connection;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import vc.thinker.absctacts.mqtt.protocol.IMqttProtocol;
/**
* Mqtt 回调实现
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
public class MqttCallbackImpl implements MqttCallback {
public MqttCallbackImpl(MqttConnectionProperties connectionProperties, IMqttProtocol mqttProtocol) throws MqttException {
this.connectionProperties = connectionProperties;
this.mqttProtocol = mqttProtocol;
this.reconnectionInterval = connectionProperties.getReconnectionInterval();
initMqttClient();
}
private final MqttConnectionProperties connectionProperties;
private final IMqttProtocol mqttProtocol;
private MqttClient mqttClient;
/**
* 重连时间,秒
*/
private int reconnectionInterval;
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost", throwable);
log.info("mqtt 连接异常,{}稍后重新连接", reconnectionInterval);
try {
Thread.sleep(reconnectionInterval * 1000L);
} catch (InterruptedException e) {
log.info("", e);
}
if (mqttClient != null && !mqttClient.isConnected()) {
try {
mqttClient.close();
} catch (MqttException e) {
e.printStackTrace();
}
try {
log.info("mqtt 重新连接");
initMqttClient();
} catch (Exception e) {
if (reconnectionInterval < connectionProperties.getReconnectionIntervalMax()) {
reconnectionInterval += connectionProperties.getReconnectionIncreasing();
}
connectionLost(e);
}
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
if (mqttMessage.isRetained()) {
return;
}
String q = new String(mqttMessage.getPayload());
if (isClientStatus(topic)) {
log.info("topic:{},payload:{}", topic, q);
clientConnected(q);
} else {
mqttProtocol.handleMessage(topic, mqttMessage.getPayload());
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
private MqttClient newMqttClient() throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(connectionProperties.getBroker()
, connectionProperties.getClientId() + "-" + System.currentTimeMillis(), persistence);
mqttClient.setCallback(this);
return mqttClient;
}
private void initMqttClient() throws MqttException {
this.mqttClient = newMqttClient();
mqttProtocol.setMqttClient(mqttClient);
connect(mqttClient, mqttProtocol.subscribe());
}
/**
* 连接配制
*
* @return
*/
private MqttConnectOptions getConnOpts() {
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setUserName(connectionProperties.getUsername());
connOpts.setPassword(connectionProperties.getPassword().toCharArray());
try {
// ssl
// connOpts.setSocketFactory(SslUtil.getSocketFactory(
// ResourceUtils.getURL("classpath:").getPath() + certFile));
} catch (Exception e) {
e.printStackTrace();
}
connOpts.setKeepAliveInterval(30);
connOpts.setConnectionTimeout(60 * 5);
connOpts.setCleanSession(true);
connOpts.setAutomaticReconnect(true);
return connOpts;
}
/**
* 判断连接状态
*
* @param topic
* @return
*/
private boolean isClientStatus(String topic) {
return topic.endsWith("connected");
}
/**
* 连接
*
* @throws MqttSecurityException
* @throws MqttException
*/
private void connect(MqttClient mqttClient, String[] topic) throws MqttSecurityException, MqttException {
mqttClient.connect(getConnOpts());
mqttClient.subscribe(topic);
log.info("mqtt client [{}] connect [{}] Success"
, connectionProperties.getClientId(), connectionProperties.getBroker());
}
/**
* 客户端连接状态处理
*
* @param q
*/
private void clientConnected(String q) {
JSONObject session = JSONObject.parseObject(q);
String clientId = session.getString("clientid");
if (session.containsKey("ipaddress")) {
log.info("{} 连接服务器", clientId);
} else {
log.info("{} 断开服务器", clientId);
}
}
}
package vc.thinker.absctacts.mqtt.connection;
import lombok.Data;
/**
* Mqtt连接基础信息
*
* @author xfy
* @since 2021/6/21 18:11
**/
@Data
public class MqttConnectionProperties {
/**
* broker地址
*/
private String broker;
/**
* 客户端id
*/
private String clientId;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 清除session
*/
private boolean cleanSession;
/**
* 重连初始时间,秒
*/
private int reconnectionInterval = 3;
/**
* 重连时间最大值
*/
private final int reconnectionIntervalMax = 30;
/**
* 重连时间递增
*/
private final int reconnectionIncreasing = 0;
}
package vc.thinker.absctacts.mqtt.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serial;
import java.io.Serializable;
/**
* Mqtt响应对象
*
* @author xfy
* @since 2021/6/22 18:00
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class MqttResponse implements Serializable {
@Serial
private static final long serialVersionUID = 4548586577055536060L;
/**
* topic
*/
private String topic;
/**
* 消息体
*/
private String message;
}
package vc.thinker.absctacts.mqtt.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serial;
import java.io.Serializable;
/**
* Mqtt响应对象
*
* @author xfy
* @since 2021/6/22 18:00
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class MqttResponseBytes implements Serializable {
@Serial
private static final long serialVersionUID = 4548586577055536060L;
/**
* topic
*/
private String topic;
/**
* 消息体
*/
private byte[] message;
}
package vc.thinker.absctacts.mqtt.protocol;
import akka.actor.ActorRef;
import akka.actor.Props;
import lombok.extern.slf4j.Slf4j;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
import vc.thinker.absctacts.mqtt.entity.MqttResponseBytes;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
/**
* 抽象的Mqtt协议
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
public abstract class AbstractCommonBytesHandleMqttProtocol extends AbstractCommonEntityHandleMqttProtocol {
public AbstractCommonBytesHandleMqttProtocol(MqttConnectionProperties mqttConnectionProperties) {
super(mqttConnectionProperties);
}
/**
* 处理消息
*
* @param topic 监听到消息的Topic
* @param message 监听到的消息
*/
@Override
public void handleMessage(String topic, byte[] message) {
ActorRef ref;
try {
ref = actorRefMap.get(topic);
if (ref == null) {
TopicDefinition topicDefinition = resolveByTopic(topic);
assert topicDefinition != null : "未定义topic";
ref = actorSystem.actorOf(springExtensionProps(topicDefinition));
actorRefMap.put(topic, ref);
}
ref.tell(new MqttResponseBytes(topic, message), ActorRef.noSender());
} catch (Exception e) {
log.error("消息处理失败,{}", e.getMessage());
}
}
private Props springExtensionProps(TopicDefinition topicDefinition) {
return springExtension.props(StringFormatUtils.classNameLowercaseFirst(topicDefinition.getActorClass()));
}
}
package vc.thinker.absctacts.mqtt.protocol;
import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
import vc.thinker.absctacts.mqtt.entity.MqttResponse;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
import java.util.Map;
/**
* 抽象的Mqtt协议
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
public abstract class AbstractCommonEntityHandleMqttProtocol extends AbstractMqttProtocol {
public AbstractCommonEntityHandleMqttProtocol(MqttConnectionProperties mqttConnectionProperties) {
super(mqttConnectionProperties);
}
/**
* 当前 Topic 与Actor实例的映射
*/
protected final Map<String, ActorRef> actorRefMap = Maps.newHashMap();
/**
* 根据topic 解析一个topic定义枚举
*
* @param topic 消息接收的topic
* @return topic定义枚举
*/
protected abstract TopicDefinition resolveByTopic(String topic);
/**
* 处理消息
*
* @param topic 监听到消息的Topic
* @param message 监听到的消息
*/
@Override
public void handleMessage(String topic, byte[] message) {
ActorRef ref;
try {
ref = actorRefMap.get(topic);
if (ref == null) {
TopicDefinition topicDefinition = resolveByTopic(topic);
assert topicDefinition != null : "未定义topic";
ref = actorSystem.actorOf(springExtensionProps(topicDefinition));
actorRefMap.put(topic, ref);
}
ref.tell(response(topic, message), ActorRef.noSender());
} catch (Exception e) {
log.error("消息处理失败,{}", e.getMessage());
}
}
/**
* Mqtt响应体
*
* @param topic topic
* @param message message
*/
protected Object response(String topic, byte[] message) {
return new MqttResponse(topic, new String(message));
}
private Props springExtensionProps(TopicDefinition topicDefinition) {
return springExtension.props(StringFormatUtils.classNameLowercaseFirst(topicDefinition.getActorClass()));
}
}
package vc.thinker.absctacts.mqtt.protocol;
import akka.actor.ActorRef;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
import java.util.Map;
/**
* 抽象的Mqtt协议
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
public abstract class AbstractCommonHandleMqttProtocol extends AbstractMqttProtocol {
public AbstractCommonHandleMqttProtocol(MqttConnectionProperties mqttConnectionProperties) {
super(mqttConnectionProperties);
}
/**
* 当前 Topic 与Actor实例的映射
*/
private final Map<String, ActorRef> actorRefMap = Maps.newHashMap();
/**
* 根据topic 解析一个topic定义枚举
*
* @param topic 消息接收的topic
* @return topic定义枚举
*/
protected abstract TopicDefinition resolveByTopic(String topic);
/**
* 处理消息
*
* @param topic 监听到消息的Topic
* @param message 监听到的消息
*/
@Override
public void handleMessage(String topic, byte[] message) {
ActorRef ref;
try {
ref = actorRefMap.get(topic);
if (ref == null) {
TopicDefinition topicDefinition = resolveByTopic(topic);
assert topicDefinition != null : "未定义topic";
ref = actorSystem.actorOf(springExtension.props(
StringFormatUtils.classNameLowercaseFirst(topicDefinition.getActorClass()))
);
actorRefMap.put(topic, ref);
}
ref.tell(new String(message), ActorRef.noSender());
} catch (Exception e) {
log.error("消息处理失败,{}", e.getMessage());
}
}
}
package vc.thinker.absctacts.mqtt.protocol;
import akka.actor.ActorSystem;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Autowired;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
import vc.thinker.absctacts.mqtt.publish.MqttCommandPush;
import vc.thinker.config.akka.extension.SpringExtension;
/**
* 抽象的Mqtt协议
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
public abstract class AbstractMqttProtocol implements IMqttProtocol {
@Autowired
protected ActorSystem actorSystem;
@Autowired
protected SpringExtension springExtension;
protected final MqttConnectionProperties mqttConnectionProperties;
/**
* mqtt连接
*/
private MqttClient mqttClient;
/**
* 默认mqtt推送组件
*/
private MqttCommandPush mqttCommandPush;
@Override
public MqttConnectionProperties getProperties() {
return mqttConnectionProperties;
}
public AbstractMqttProtocol(MqttConnectionProperties mqttConnectionProperties) {
this.mqttConnectionProperties = mqttConnectionProperties;
}
@Override
public void setMqttClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
this.mqttCommandPush = new MqttCommandPush(mqttClient);
}
@Override
public MqttCommandPush getMqttCommandPush() {
return mqttCommandPush == null ? mqttCommandPush = new MqttCommandPush(mqttClient) : mqttCommandPush;
}
/**
* 处理消息
*
* @param topic 监听到消息的Topic
* @param message 监听到的消息
*/
@Override
public abstract void handleMessage(String topic, byte[] message);
}
package vc.thinker.absctacts.mqtt.protocol;
import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
/**
* 抽象不订阅的Mqtt协议
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
public abstract class AbstractNotSubscriptionMqttProtocol extends AbstractMqttProtocol {
public AbstractNotSubscriptionMqttProtocol(MqttConnectionProperties mqttConnectionProperties) {
super(mqttConnectionProperties);
}
@Override
public String[] subscribe() {
// 订阅一个不存在的Topic 防止启动报错
return new String[]{IdUtil.randomUUID()};
}
@Override
public void handleMessage(String topic, byte[] message) {
}
}
package vc.thinker.absctacts.mqtt.protocol;
import org.eclipse.paho.client.mqttv3.MqttClient;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
import vc.thinker.absctacts.mqtt.publish.MqttCommandPush;
/**
* mqtt 协议
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
public interface IMqttProtocol {
/**
* 订阅Topic列表
*
* @return Topic列表
*/
String[] subscribe();
/**
* 处理业务消息
*
* @param topic 监听到消息的Topic
* @param message 监听到的消息
*/
void handleMessage(String topic, byte[] message);
/**
* 得到消息推送
*
* @return 可以实现Mqtt消息推送
*/
MqttCommandPush getMqttCommandPush();
/**
* 设置这个协议对应的连接
*
* @param mqttClient Mqtt连接
*/
void setMqttClient(MqttClient mqttClient);
/**
* 获取Mqtt连接配置
*
* @return MqttConnectionProperties
*/
MqttConnectionProperties getProperties();
}
package vc.thinker.absctacts.mqtt.publish;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
import vc.thinker.config.akka.extension.SpringExtension;
import java.util.Map;
/**
* @author HeTongHao
* @since 2021/7/9 14:35
*/
public class ConcurrentKeyMqttPushService {
@Autowired
protected ActorSystem actorSystem;
@Autowired
protected SpringExtension springExtension;
/**
* topic 与actor映射
*/
protected final Map<String, ActorRef> topicDefinitionActorRefMap = Maps.newConcurrentMap();
public ActorRef getActorRef(TopicDefinition topicDefinition, String key) {
return topicDefinitionActorRefMap.computeIfAbsent(topicDefinition + key, k ->
actorSystem.actorOf(springExtension.props(StringFormatUtils.classNameLowercaseFirst(topicDefinition.getActorClass()))));
}
public void push(TopicDefinition topicDefinition, String key, Object message) {
getActorRef(topicDefinition, key).tell(message, ActorRef.noSender());
}
}
package vc.thinker.absctacts.mqtt.publish;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* mqtt 指令发送
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
public class MqttCommandPush {
public MqttCommandPush(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
private final MqttClient mqttClient;
public MqttClient getMqttClient() {
return mqttClient;
}
public void sendMqttMessage(String topic, byte[] payload) {
MqttMessage message = new MqttMessage();
message.setPayload(payload);
message.setQos(1);
message.setRetained(false);
try {
if (mqttClient == null) {
log.error("命令推送失败 mqttClient is null");
} else if (mqttClient.isConnected()) {
mqttClient.publish(topic, message);
} else {
mqttClient.connect();
mqttClient.publish(topic, message);
}
} catch (MqttException e) {
e.printStackTrace();
}
}
}
package vc.thinker.absctacts.mqtt.publish;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
import vc.thinker.config.akka.extension.SpringExtension;
import java.util.Map;
/**
* @author HeTongHao
* @since 2021/7/9 14:35
*/
public class TopicMqttPushService {
@Autowired
protected ActorSystem actorSystem;
@Autowired
protected SpringExtension springExtension;
/**
* topic 与actor映射
*/
protected final Map<TopicDefinition, ActorRef> topicDefinitionActorRefMap = Maps.newConcurrentMap();
public ActorRef getActorRef(TopicDefinition topicDefinition) {
return topicDefinitionActorRefMap.computeIfAbsent(topicDefinition, k ->
actorSystem.actorOf(springExtension.props(StringFormatUtils.classNameLowercaseFirst(k.getActorClass()))));
}
public void push(TopicDefinition topicDefinition, Object message) {
getActorRef(topicDefinition).tell(message, ActorRef.noSender());
}
}
package vc.thinker.absctacts.mqtt.topic;
/**
* topic定义
*
* @author HeTongHao
* @since 2021/7/9 14:13
*/
public interface TopicDefinition {
/**
* topic定义
*
* @return topic定义
*/
String getTopic();
/**
* 对应处理该topic的actor
*
* @return 对应处理该topic的actor
*/
Class<?> getActorClass();
}
package vc.thinker.absctacts.mqtt.utils;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMReader;
import org.bouncycastle.openssl.PasswordFinder;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.X509Certificate;
public class SslUtil {
public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile,
final String password) throws Exception {
Security.addProvider(new BouncyCastleProvider());
// load CA certificate
PEMReader reader = new PEMReader(
new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
X509Certificate caCert = (X509Certificate) reader.readObject();
reader.close();
// load client certificate
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile)))));
X509Certificate cert = (X509Certificate) reader.readObject();
reader.close();
// load client private key
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
new PasswordFinder() {
@Override
public char[] getPassword() {
return password.toCharArray();
}
});
KeyPair key = (KeyPair) reader.readObject();
reader.close();
// CA certificate is used to authenticate server
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
// client key and certificates are sent to server so it can authenticate
// us
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setCertificateEntry("certificate", cert);
ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
new java.security.cert.Certificate[]{cert});
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1");
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return context.getSocketFactory();
}
public static SSLSocketFactory getSocketFactory(final String caCrtFile) throws Exception {
Security.addProvider(new BouncyCastleProvider());
// load CA certificate
PEMReader reader = new PEMReader(
new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
X509Certificate caCert = (X509Certificate) reader.readObject();
reader.close();
// CA certificate is used to authenticate server
KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
caKs.load(null, null);
caKs.setCertificateEntry("ca-certificate", caCert);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(caKs);
// finally, create SSL socket factory
SSLContext context = SSLContext.getInstance("TLSv1");
context.init(null, tmf.getTrustManagers(), null);
return context.getSocketFactory();
}
}
\ No newline at end of file
package vc.thinker.absctacts.mqtt.utils;
import org.slf4j.helpers.MessageFormatter;
/**
* String格式化简化操作类
*
* @author HeTongHao
* @since 2019-08-27 10:59
*/
public class StringFormatUtils {
/**
* 用{}作为占位符,格式化字符串
*
* @param format
* @param argArray
* @return
*/
public static String format(String format, Object... argArray) {
return MessageFormatter.arrayFormat(format, argArray).getMessage();
}
/**
* 首字母小写
*
* @param str
* @return
*/
public static String lowercaseFirst(String str) {
String firstChar = str.substring(0, 1);
return str.replaceFirst(firstChar, firstChar.toLowerCase());
}
/**
* 类名首字母小写
*
* @param clazz
* @return
*/
public static String classNameLowercaseFirst(Class clazz) {
return lowercaseFirst(clazz.getSimpleName());
}
}
package vc.thinker.absctacts.protocolproxy.dto;
import vc.thinker.absctacts.protocolproxy.enums.IMeterDataDefine;
import java.time.LocalDateTime;
import java.util.Map;
/**
* @author HeTongHao
* @since 2021/7/9 17:08
*/
public interface IMeterReportEntity {
/**
* 设备唯一code
*
* @return 设备唯一code
*/
String getDeviceCode();
/**
* 数据采集时间
*
* @return 数据采集时间
*/
LocalDateTime getCollectTime();
/**
* 数据上报集
*
* @return 数据上报集
*/
Map<IMeterDataDefine, Object> getDataMap();
}
package vc.thinker.absctacts.protocolproxy.dto;
import vc.thinker.absctacts.protocolproxy.enums.DataType;
import vc.thinker.absctacts.protocolproxy.enums.service.IServiceParamDefine;
import java.util.Map;
/**
* 服务执行命令
*
* @author HeTongHao
* @since 2021/7/9 17:08
*/
public interface IServiceExecuteCmd {
/**
* 流水号id
*
* @return 流水号id
*/
String getRequestId();
/**
* 设备唯一code
*
* @return 设备唯一code
*/
String getDeviceCode();
/**
* 服务编号
*
* @return 服务编号
*/
String getServiceCode();
/**
* 参数集合
*
* @return 参数集合
*/
Map<String, Object> getParams();
/**
* 参数集合
*
* @param paramDefine 参数定义
* @return 参数集合
*/
default <T> T getParamValue(IServiceParamDefine paramDefine) {
Object result = this.getParams().get(paramDefine.getCode());
if (result instanceof String) {
if (DataType.Int.equals(paramDefine.getDataType())) {
result = Integer.valueOf((String) result);
} else if (DataType.Float.equals(paramDefine.getDataType())) {
result = Float.valueOf((String) result);
}
}
return (T) result;
}
}
package vc.thinker.absctacts.protocolproxy.dto;
import vc.thinker.absctacts.protocolproxy.enums.ServiceExecuteResultType;
/**
* 服务执行结果
*
* @author HeTongHao
* @since 2021/7/9 17:08
*/
public interface IServiceExecuteResult {
/**
* 流水号id
*
* @return 流水号id
*/
String getRequestId();
/**
* get结果类型
*
* @return 结果类型
*/
ServiceExecuteResultType getType();
/**
* 消息
*
* @return 执行结果消息
*/
String getMessage();
}
package vc.thinker.absctacts.protocolproxy.dto;
import lombok.Builder;
import lombok.Getter;
/**
* 仪表信息
*
* @author HeTongHao
* @since 2022/8/19 15:59
*/
@Getter
@Builder
public class MeterInfo {
private String id;
private String code;
}
package vc.thinker.absctacts.protocolproxy.dto;
import com.google.common.collect.Maps;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.IMeterDataDefine;
import java.time.LocalDateTime;
import java.util.Map;
/**
* @author HeTongHao
* @since 2021/7/9 17:47
*/
@Getter
public class MeterReportEntity implements IMeterReportEntity {
private MeterReportEntity(String deviceCode, LocalDateTime collectTime, Map<IMeterDataDefine, Object> dataMap) {
this.deviceCode = deviceCode;
this.collectTime = collectTime;
this.dataMap = dataMap;
}
private final String deviceCode;
private final LocalDateTime collectTime;
private final Map<IMeterDataDefine, Object> dataMap;
public static MeterReportEntityBuilder builder(String deviceId, LocalDateTime collectTime) {
return new MeterReportEntityBuilder(deviceId, collectTime);
}
public static class MeterReportEntityBuilder {
private MeterReportEntityBuilder(String deviceId, LocalDateTime collectTime) {
this.deviceId = deviceId;
this.collectTime = collectTime;
}
private final String deviceId;
private final LocalDateTime collectTime;
private final Map<IMeterDataDefine, Object> dataMap = Maps.newConcurrentMap();
public <DD extends IMeterDataDefine> MeterReportEntityBuilder putValue(DD dataDefine, Object value) {
putData(dataDefine, value);
return this;
}
public <DD extends IMeterDataDefine> MeterReportEntityBuilder putValues(Map<DD, Object> dataDefineAndValueMap) {
dataDefineAndValueMap.forEach(this::putData);
return this;
}
private <DD extends IMeterDataDefine> void putData(DD dataDefine, Object value) {
dataMap.put(dataDefine, value);
}
public MeterReportEntity build() {
return new MeterReportEntity(deviceId, collectTime, dataMap);
}
}
}
package vc.thinker.absctacts.protocolproxy.dto;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Map;
/**
* 服务执行命令
*
* @author HeTongHao
* @since 2021/7/9 17:47
*/
@Getter
@AllArgsConstructor
public class ServiceExecuteCmd implements IServiceExecuteCmd {
private final String requestId;
private final String deviceCode;
private final String serviceCode;
private final Map<String, Object> params;
}
package vc.thinker.absctacts.protocolproxy.dto;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.ServiceExecuteResultType;
/**
* 服务执行结果
*
* @author HeTongHao
* @since 2021/8/11 17:47
*/
@Getter
public class ServiceExecuteResult implements IServiceExecuteResult {
private ServiceExecuteResult(String requestId, ServiceExecuteResultType type, String message) {
this.requestId = requestId;
this.type = type;
this.message = message;
}
private final String requestId;
private final ServiceExecuteResultType type;
private final String message;
public static ServiceExecuteResult ok(IServiceExecuteCmd cmd) {
return new ServiceExecuteResult(cmd.getRequestId(), ServiceExecuteResultType.ok, ServiceExecuteResultType.ok.getName());
}
public static ServiceExecuteResult fail(IServiceExecuteCmd cmd) {
return new ServiceExecuteResult(cmd.getRequestId(), ServiceExecuteResultType.fail, ServiceExecuteResultType.fail.getName());
}
public static ServiceExecuteResult fail(IServiceExecuteCmd cmd, String message) {
return new ServiceExecuteResult(cmd.getRequestId(), ServiceExecuteResultType.fail, message);
}
}
package vc.thinker.absctacts.protocolproxy.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 写入数据格式
*
* @author HeTongHao
* @since 2021/7/9 16:20
*/
@Getter
@AllArgsConstructor
public enum DataType {
/**
* 整形
*/
Int("整形"),
/**
* 浮点型
*/
Float("浮点型"),
;
/**
* 类型名称
*/
private final String name;
}
package vc.thinker.absctacts.protocolproxy.enums;
/**
* 属性定义 枚举
*
* @author HeTongHao
* @since 2021/7/9 16:38
*/
public interface IMeterAttrDefine extends IMeterDataDefine {
/**
* 数据类型
*
* @return 数据类型
*/
DataType getDataType();
}
package vc.thinker.absctacts.protocolproxy.enums;
/**
* 数据项定义 枚举
*
* @author HeTongHao
* @since 2021/7/9 16:38
*/
public interface IMeterDataDefine {
/**
* 数据编号
*
* @return 数据编号
*/
String getCode();
}
package vc.thinker.absctacts.protocolproxy.enums;
/**
* 事件定义 枚举
*
* @author HeTongHao
* @since 2021/7/9 16:38
*/
public interface IMeterEventDefine extends IMeterDataDefine {
}
package vc.thinker.absctacts.protocolproxy.enums;
/**
* 协议类型
*
* @author HeTongHao
* @since 2021/7/17 13:24
*/
public interface IProtocolType {
/**
* 协议编号
*
* @return 协议编号
*/
String getCode();
/**
* 协议名称
*
* @return 协议名称
*/
String getName();
}
package vc.thinker.absctacts.protocolproxy.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 服务执行结果类型
*
* @author HeTongHao
* @since 2022/8/11 18:26
*/
@Getter
@AllArgsConstructor
public enum ServiceExecuteResultType {
/**
* 服务执行结果类型
*/
ok("执行成功"),
fail("执行失败"),
;
private final String name;
}
package vc.thinker.absctacts.protocolproxy.enums.service;
import java.util.List;
/**
* 服务定义
*
* @author HeTongHao
* @since 2021/7/9 16:38
*/
public interface IMeterServiceDefine {
/**
* 服务编号
*
* @return 服务编号
*/
String getCode();
/**
* 获取参数定义
*
* @return 参数定义
*/
List<IServiceParamDefine> getParamDefines();
}
package vc.thinker.absctacts.protocolproxy.enums.service;
import vc.thinker.absctacts.protocolproxy.enums.DataType;
/**
* 服务参数定义
*
* @author HeTongHao
* @since 2021/11/24 17:09
*/
public interface IServiceParamDefine {
/**
* 覆盖,编号默认取枚举的名称
*
* @return 枚举的名称
*/
String getCode();
/**
* 参数数据类型
*
* @return 参数数据类型
*/
DataType getDataType();
}
package vc.thinker.absctacts.protocolproxy.enums.service;
import lombok.AllArgsConstructor;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.DataType;
/**
* 无参定义
*
* @author HeTongHao
* @since 2022/8/11 17:11
*/
@Getter
@AllArgsConstructor
public enum NoParam implements IServiceParamDefine {
/**
* 无参定义
*/
Instance("NoParam", null);
private final String code;
private final DataType dataType;
}
package vc.thinker.absctacts.protocolproxy.enums.service;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.DataType;
/**
* 参数定义
*
* @author HeTongHao
* @since 2022/8/11 17:11
*/
@Getter
public class ParamDefine implements IServiceParamDefine {
private ParamDefine(String code, DataType dataType) {
this.code = code;
this.dataType = dataType;
}
private final String code;
private final DataType dataType;
public static ParamDefine build(String code, DataType dataType) {
return new ParamDefine(code, dataType);
}
}
package vc.thinker.absctacts.protocolproxy.exception;
import vc.thinker.utils.StringFormaterUtils;
/**
* @author HeTongHao
* @since 2021/7/16 17:42
*/
public class VerifyException extends Exception {
public VerifyException(String message) {
super(message);
}
public VerifyException(String message, Object... argArray) {
super(StringFormaterUtils.format(message, argArray));
}
}
package vc.thinker.absctacts.protocolproxy.exceptions;
/**
* 执行超时
*
* @author HeTongHao
* @since 2022/9/3 17:07
*/
public class ExecTimeoutException extends RuntimeException {
public ExecTimeoutException() {
}
public ExecTimeoutException(String message) {
super(message);
}
public ExecTimeoutException(String message, Throwable cause) {
super(message, cause);
}
public ExecTimeoutException(Throwable cause) {
super(cause);
}
}
package vc.thinker.absctacts.protocolproxy.exceptions;
/**
* 仪表编号格式异常
*
* @author HeTongHao
* @since 2022/9/1 17:34
*/
public class MeterCodeFormatException extends IllegalArgumentException {
public MeterCodeFormatException(String message) {
super(message);
}
public MeterCodeFormatException(String message, Throwable cause) {
super(message, cause);
}
public MeterCodeFormatException(Throwable cause) {
super(cause);
}
}
package vc.thinker.absctacts.protocolproxy.exceptions;
/**
* 离线异常
*
* @author HeTongHao
* @since 2022/9/1 17:34
*/
public class MeterOfflineException extends RuntimeException {
public MeterOfflineException(String message) {
super(message);
}
public MeterOfflineException(String message, Throwable cause) {
super(message, cause);
}
public MeterOfflineException(Throwable cause) {
super(cause);
}
}
package vc.thinker.absctacts.protocolproxy.exceptions;
/**
* 服务执行超时
*
* @author HeTongHao
* @since 2022/9/3 17:07
*/
public class ServiceExecTimeoutException extends ExecTimeoutException {
public ServiceExecTimeoutException() {
}
public ServiceExecTimeoutException(String message) {
super(message);
}
public ServiceExecTimeoutException(String message, Throwable cause) {
super(message, cause);
}
public ServiceExecTimeoutException(Throwable cause) {
super(cause);
}
}
package vc.thinker.absctacts.protocolproxy.protocol;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import vc.thinker.absctacts.protocolproxy.dto.IMeterReportEntity;
import vc.thinker.absctacts.protocolproxy.dto.IServiceExecuteCmd;
import vc.thinker.absctacts.protocolproxy.dto.IServiceExecuteResult;
import vc.thinker.absctacts.protocolproxy.dto.ServiceExecuteResult;
import vc.thinker.absctacts.protocolproxy.enums.IMeterAttrDefine;
import vc.thinker.absctacts.protocolproxy.enums.IMeterEventDefine;
import vc.thinker.mqtt.proxyclient.entity.up.MeterData;
import vc.thinker.mqtt.proxyclient.entity.up.ThirdPartyProxyReport;
import vc.thinker.mqtt.proxyclient.service.ThirdPartyProxyClientService;
import vc.thinker.utils.LocalDateTimeUtils;
import java.util.List;
import java.util.stream.Collectors;
/**
* 第三方协议
*
* @author HeTongHao
* @since 2021/7/9 16:20
*/
@Slf4j
public abstract class AbstractThirdPartyProtocol implements IThirdPartyProtocol {
@Autowired
protected ThirdPartyProxyClientService thirdPartyProxyClientService;
@Override
public void report(List<IMeterReportEntity> meterReportEntityList) {
thirdPartyProxyClientService.report(new ThirdPartyProxyReport(protocolType().getCode()
, meterReportEntityList.stream()
.map(this::meterReportEntityToMeterData)
.collect(Collectors.toList())
)
);
}
/**
* 单组数据上报
*
* @param meterReportEntity meterReportEntity
*/
public void report(IMeterReportEntity meterReportEntity) {
report(Lists.newArrayList(meterReportEntity));
}
private MeterData meterReportEntityToMeterData(IMeterReportEntity meterReportEntity) {
MeterData meterData = new MeterData()
.setDeviceCode(meterReportEntity.getDeviceCode())
.setTimestamp(LocalDateTimeUtils.parseToTimestamp(meterReportEntity.getCollectTime()));
meterReportEntity.getDataMap().forEach((dataDefine, value) -> {
if (dataDefine instanceof IMeterAttrDefine) {
IMeterAttrDefine meterAttrDefine = (IMeterAttrDefine) dataDefine;
meterData.putAttrData(meterAttrDefine.getCode(), value);
} else if (dataDefine instanceof IMeterEventDefine) {
IMeterEventDefine meterEventDefine = (IMeterEventDefine) dataDefine;
if (value instanceof Boolean) {
value = (Boolean) value ? 1 : 0;
}
meterData.putEventData(meterEventDefine.getCode(), value);
}
});
return meterData;
}
@Override
public IServiceExecuteResult onExecutiveService(IServiceExecuteCmd cmd) {
return ServiceExecuteResult.fail(cmd, "未实现任何服务");
}
}
package vc.thinker.absctacts.protocolproxy.protocol;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import vc.thinker.absctacts.protocolproxy.dto.MeterInfo;
import vc.thinker.mqtt.proxyclient.entity.down.PlateFormMeterInfo;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* 第三方协议
*
* @author HeTongHao
* @since 2022/8/19 16:20
*/
@Slf4j
public abstract class AbstractThirdPartyProtocolSyncMeterInfo extends AbstractThirdPartyProtocol implements IThirdPartyProtocolSyncMeterInfo {
private final Map<String, MeterInfo> idMeterMap = Maps.newConcurrentMap();
@PostConstruct
protected void initMeterList() {
ThreadUtil.execute(() -> {
int retryCount = 0;
do {
try {
pullMeterList();
return;
} catch (TimeoutException e) {
log.warn("[{}]协议拉取仪表超时,第{}次重试中...", protocolType().getName(), ++retryCount);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
} while (true);
});
}
public synchronized Collection<MeterInfo> getMetes() {
return idMeterMap.values();
}
@Override
public boolean pullMeterList() throws TimeoutException {
try {
List<PlateFormMeterInfo> plateFormMeterInfos = thirdPartyProxyClientService.pullMeterList(protocolType().getCode());
Map<String, MeterInfo> idMeterMap = plateFormMeterInfos.stream()
.map(plateFormMeterInfo -> MeterInfo
.builder()
.id(plateFormMeterInfo.getId())
.code(plateFormMeterInfo.getCode())
.build()
)
.collect(Collectors.toMap(MeterInfo::getId, meterInfo -> meterInfo));
synchronized (this) {
this.idMeterMap.clear();
this.idMeterMap.putAll(idMeterMap);
return true;
}
} catch (TimeoutException e) {
throw e;
} catch (Exception e) {
log.error("[{}]协议拉取仪表异常:{}", protocolType().getName(), e.getMessage());
e.printStackTrace();
return false;
}
}
@Override
public void putMeter(MeterInfo meterInfo) {
this.idMeterMap.put(meterInfo.getId(), meterInfo);
}
@Override
public void deleteMeter(String id) {
this.idMeterMap.remove(id);
}
}
package vc.thinker.absctacts.protocolproxy.protocol;
import vc.thinker.absctacts.protocolproxy.dto.IMeterReportEntity;
import vc.thinker.absctacts.protocolproxy.dto.IServiceExecuteCmd;
import vc.thinker.absctacts.protocolproxy.dto.IServiceExecuteResult;
import vc.thinker.absctacts.protocolproxy.enums.IProtocolType;
import java.util.List;
/**
* 第三方协议
*
* @author HeTongHao
* @since 2021/7/9 16:20
*/
public interface IThirdPartyProtocol {
/**
* 协议类型
*
* @return 协议类型
*/
IProtocolType protocolType();
/**
* 上报一个或多个仪表的数据
*
* @param meterReportEntityList 仪表上报实体列表
*/
void report(List<IMeterReportEntity> meterReportEntityList);
/**
* 接受执行服务
*
* @param serviceExecuteCmd 服务执行命令
* @return 服务执行结果
*/
IServiceExecuteResult onExecutiveService(IServiceExecuteCmd serviceExecuteCmd);
}
package vc.thinker.absctacts.protocolproxy.protocol;
import vc.thinker.absctacts.protocolproxy.dto.MeterInfo;
import java.util.concurrent.TimeoutException;
/**
* 第三方协议
* 支持同步仪表信息,让代理及时更新需要采集的仪表列表
*
* @author HeTongHao
* @since 2022/8/19 16:20
*/
public interface IThirdPartyProtocolSyncMeterInfo extends IThirdPartyProtocol {
/**
* 拉取仪表列表
*
* @return 仪表列表
* @throws TimeoutException 拉取超时
*/
boolean pullMeterList() throws TimeoutException;
/**
* 更新仪表
*
* @param meterInfo 设备信息
*/
void putMeter(MeterInfo meterInfo);
/**
* 删除仪表
*
* @param id 设备唯一id
*/
void deleteMeter(String id);
}
package vc.thinker.config.akka;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import vc.thinker.config.akka.extension.SpringExtension;
import vc.thinker.utils.SpringContextHolder;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:47
*/
@Configuration
public class AkkaConfig {
@Autowired
private ApplicationContext applicationContext;
@Bean
@Lazy(false)
public SpringContextHolder springContextHolder() {
return new SpringContextHolder();
}
/**
* Akka配置
*
* @return
*/
@Bean
public Config akkaConfiguration() {
return ConfigFactory.load();
}
/**
* spring 集成Akka
*
* @return
*/
@Bean
public SpringExtension springExtension() {
return new SpringExtension();
}
/**
* Actor system singleton for this application.
*/
@Bean
public ActorSystem actorSystem() {
ActorSystem system = ActorSystem.create("AkkaTaskProcessing", akkaConfiguration());
springExtension().initialize(applicationContext);
return system;
}
}
package vc.thinker.config.akka.extension;
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:48
*/
public class SpringActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorBeanName;
private Object arg0;
public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
}
public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object arg0) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
this.arg0 = arg0;
}
@Override
public Actor produce() {
return arg0 != null ? (Actor) applicationContext.getBean(actorBeanName, arg0)
: (Actor) applicationContext.getBean(actorBeanName);
}
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}
\ No newline at end of file
package vc.thinker.config.akka.extension;
import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:48
*/
public class SpringExtension implements Extension {
private ApplicationContext applicationContext;
/**
* Used to initialize the Spring application context for the extension.
*/
public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
/**
* Create a Props for the specified actorBeanName using the SpringActorProducer
* class.
*/
public Props props(String actorBeanName, Object arg0) {
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, arg0);
}
public Props props(String actorBeanName) {
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName);
}
}
package vc.thinker.exception;
import org.springframework.http.HttpStatus;
import org.springframework.validation.BindException;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import vc.thinker.response.AbstractResponse;
import vc.thinker.response.SimpleResponse;
import java.util.List;
/**
* @author : xieshaojun
* @date : 2023/1/3 15:21
*/
@RestControllerAdvice(annotations = RestController.class)
public class ExceptionAdvice {
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler({MethodArgumentNotValidException.class, BindException.class})
public AbstractResponse handleBodyValidException(MethodArgumentNotValidException e) {
AbstractResponse errorResponse = new SimpleResponse();
List<FieldError> fieldErrors = e.getBindingResult().getFieldErrors();
errorResponse.setErrorInfo(HttpStatus.BAD_REQUEST.value(), fieldErrors.get(0).getDefaultMessage());
return errorResponse;
}
}
\ No newline at end of file
package vc.thinker.mqtt.config;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Configuration;
import vc.thinker.absctacts.mqtt.connection.MqttCallbackImpl;
import vc.thinker.absctacts.mqtt.protocol.IMqttProtocol;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
/**
* Mqtt 协议配置
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
@Configuration
@ConditionalOnExpression("${mqtt.run:true}")
public class MqttProtocolConfig {
/**
* 需要注册的协议列表
*/
private final List<IMqttProtocol> mqttProtocolList;
/**
* mqtt授权服务
*/
private final Map<IMqttProtocol, MqttCallback> protocolCallbackMap = Maps.newHashMap();
public MqttProtocolConfig(List<IMqttProtocol> mqttProtocolList) {
this.mqttProtocolList = mqttProtocolList;
}
@PostConstruct
private void initProtocol() {
mqttProtocolList.forEach(mqttProtocol -> {
Class<?> clazz = mqttProtocol.getClass();
try {
protocolCallbackMap.put(mqttProtocol, new MqttCallbackImpl(mqttProtocol.getProperties(), mqttProtocol));
log.info("[{}]协议注册成功!", clazz.getSimpleName());
} catch (MqttException e) {
log.error("构建Mqtt回调组件失败,[{}]协议将不可用,errorMessage:{}", clazz.getSimpleName(), e.getMessage());
}
});
log.info("注册协议总数:{}", protocolCallbackMap.size());
}
}
package vc.thinker.mqtt.config;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
/**
* 新科云 Mqtt 属性配置
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Data
@Configuration
@EqualsAndHashCode(callSuper = true)
@ConditionalOnExpression("${mqtt.run:false}")
@ConfigurationProperties(prefix = "mqtt.connection.thinker")
public class ThinkerMqttConnectionProperties extends MqttConnectionProperties {
}
package vc.thinker.mqtt.config;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
/**
* 新科云 Mqtt 属性配置
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Data
@Configuration
@EqualsAndHashCode(callSuper = true)
@ConditionalOnExpression("${mqtt.run:false}")
@ConfigurationProperties(prefix = "mqtt.connection.thinker-prod")
public class ThinkerProdMqttConnectionProperties extends MqttConnectionProperties {
}
package vc.thinker.mqtt.config;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import vc.thinker.absctacts.mqtt.connection.MqttConnectionProperties;
/**
* 新科云 Mqtt 属性配置
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Data
@Configuration
@EqualsAndHashCode(callSuper = true)
@ConditionalOnExpression("${mqtt.run:false}")
@ConfigurationProperties(prefix = "mqtt.connection.thinker-test")
public class ThinkerTestMqttConnectionProperties extends MqttConnectionProperties {
}
package vc.thinker.mqtt.forwardthinker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.mqtt.protocol.AbstractNotSubscriptionMqttProtocol;
import vc.thinker.mqtt.config.ThinkerProdMqttConnectionProperties;
import vc.thinker.mqtt.forwardthinker.enums.FoxGatewayTopicDefinition;
/**
* 富士康数据转发新科云 客户端
*
* @author HeTongHao
* @since 2022-06-28 16:00
*/
@Slf4j
@Component
@ConditionalOnBean(ThinkerProdMqttConnectionProperties.class)
public class ThinkerProdProtocolClient extends AbstractNotSubscriptionMqttProtocol {
public ThinkerProdProtocolClient(ThinkerProdMqttConnectionProperties mqttConnectionProperties) {
super(mqttConnectionProperties);
}
@Scheduled(cron = "0 0/1 * * * ?")
public void timeReport() {
String json = "[{\"deviceCode\":\"ZGZC_NXTX_001\",\"events\":{\"ddeedww\":1,\"21312312\":1,\"686787887\":1,\"78078078088\":1,\"532637828739\":1},\"timestamp\":" + System.currentTimeMillis() + "}]";
super.getMqttCommandPush().sendMqttMessage(FoxGatewayTopicDefinition.report.genTopic("CRRC_SMART_COCKPIT_SYSTEM"), json.getBytes());
}
}
package vc.thinker.mqtt.forwardthinker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.mqtt.protocol.AbstractCommonBytesHandleMqttProtocol;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.mqtt.config.ThinkerTestMqttConnectionProperties;
import vc.thinker.mqtt.forwardthinker.enums.FoxGatewayTopicDefinition;
/**
* 富士康网关数据上报
*
* @author HeTongHao
* @since 2022-06-28 16:00
*/
@Slf4j
@Component
@ConditionalOnBean(ThinkerTestMqttConnectionProperties.class)
public class ThinkerTestDataProtocol extends AbstractCommonBytesHandleMqttProtocol {
public ThinkerTestDataProtocol(ThinkerTestMqttConnectionProperties mqttConnectionProperties) {
super(mqttConnectionProperties);
}
@Override
public String[] subscribe() {
return new String[]{
FoxGatewayTopicDefinition.report.getTopic(),
FoxGatewayTopicDefinition.report_version.getTopic(),
};
}
@Override
protected TopicDefinition resolveByTopic(String topic) {
String protocolCode = FoxGatewayTopicDefinition.resolveProtocolCodeByTopic(topic);
String matchTopic = topic.replace(protocolCode, "+");
return FoxGatewayTopicDefinition.RESOLVE_BY_TOPIC.resolve(matchTopic);
}
}
package vc.thinker.mqtt.forwardthinker.actor.up;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.mqtt.entity.MqttResponseBytes;
import vc.thinker.mqtt.forwardthinker.ThinkerProdProtocolClient;
import vc.thinker.mqtt.forwardthinker.enums.FoxGatewayTopicDefinition;
import javax.annotation.Resource;
import java.util.List;
/**
* 网关上报数据接收
*
* @author HeTongHao
* @since 2022/6/28 14:10
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ThirdPartyReportDataActor extends AbstractActor {
@Resource
private ThinkerProdProtocolClient thinkerProdProtocolClient;
/**
* 转发的网关编号列表
*/
private static final List<String> PROTOCOL_CODES = List.of("CRRC_SMART_COCKPIT_SYSTEM", "WF_IOT_FUSE_SYSTEM");
public ThirdPartyReportDataActor() {
receive(ReceiveBuilder.match(MqttResponseBytes.class, mqttResponse -> {
String protocolCode = FoxGatewayTopicDefinition.resolveProtocolCodeByTopic(mqttResponse.getTopic());
// 指转发指定协议下的设备
if (PROTOCOL_CODES.contains(protocolCode)) {
thinkerProdProtocolClient.getMqttCommandPush()
.sendMqttMessage(mqttResponse.getTopic(), mqttResponse.getMessage());
}
}).build());
}
}
package vc.thinker.mqtt.forwardthinker.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.absctacts.mqtt.utils.StringFormatUtils;
import vc.thinker.mqtt.forwardthinker.actor.up.ThirdPartyReportDataActor;
import vc.thinker.utils.Resolve;
/**
* Topic-actor 定义
*
* @author HeTongHao
* @since 2022-06-28 16:00
*/
@Getter
@AllArgsConstructor
public enum FoxGatewayTopicDefinition implements TopicDefinition {
/**
* Topic-actor 定义
*/
report("third-party/+/up/report", ThirdPartyReportDataActor.class),
report_version("third-party/+/v2/up/report", ThirdPartyReportDataActor.class),
;
private final String topic;
private final Class<?> actorClass;
public static final Resolve<FoxGatewayTopicDefinition, String> RESOLVE_BY_TOPIC = new Resolve<>(values(), FoxGatewayTopicDefinition::getTopic);
public String getActorInstanceName() {
return StringFormatUtils.classNameLowercaseFirst(this.actorClass);
}
public String genTopic(String protocolCode) {
return this.topic.replace("+", protocolCode);
}
/**
* 根据topic解析第三方协议编号
*
* @param topic topic
* @return 第三方协议编号
*/
public static String resolveProtocolCodeByTopic(String topic) {
return topic.split("/")[1];
}
}
package vc.thinker.mqtt.proxyclient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.mqtt.protocol.AbstractCommonEntityHandleMqttProtocol;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.mqtt.config.ThinkerMqttConnectionProperties;
import vc.thinker.mqtt.proxyclient.enums.ThirdPartyProxyTopicDefinition;
/**
* 第三方协议代理协议 客户端
*
* @author HeTongHao
* @since 2019-10-29 16:00
*/
@Slf4j
@Component
@ConditionalOnBean(ThinkerMqttConnectionProperties.class)
public class ThirdPartyProxyClientProtocol extends AbstractCommonEntityHandleMqttProtocol {
public ThirdPartyProxyClientProtocol(ThinkerMqttConnectionProperties thinkerMqttConnectionProperties) {
super(thinkerMqttConnectionProperties);
}
@Override
public String[] subscribe() {
return new String[]{
ThirdPartyProxyTopicDefinition.executeService.getTopic(),
ThirdPartyProxyTopicDefinition.meterListPullResponse.getTopic(),
ThirdPartyProxyTopicDefinition.meterChange.getTopic()
};
}
@Override
protected TopicDefinition resolveByTopic(String topic) {
String protocolCode = ThirdPartyProxyTopicDefinition.resolveProtocolCodeByTopic(topic);
String matchTopic = topic.replace(protocolCode, "+");
return ThirdPartyProxyTopicDefinition.RESOLVE_BY_TOPIC.resolve(matchTopic);
}
/**
* 统一发送Mqtt消息入口
*
* @param topic topic定义
* @param protocolCode 第三方协议编号
* @param payload 数据荷载
*/
public void sendMessage(ThirdPartyProxyTopicDefinition topic, String protocolCode, byte[] payload) {
super.getMqttCommandPush().sendMqttMessage(topic.genTopic(protocolCode), payload);
}
}
package vc.thinker.mqtt.proxyclient.actor.down;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.mqtt.entity.MqttResponse;
import vc.thinker.absctacts.protocolproxy.dto.MeterInfo;
import vc.thinker.absctacts.protocolproxy.protocol.IThirdPartyProtocolSyncMeterInfo;
import vc.thinker.mqtt.proxyclient.entity.down.MeterChange;
import vc.thinker.mqtt.proxyclient.entity.down.PlateFormMeterInfo;
import vc.thinker.mqtt.proxyclient.enums.ThirdPartyProxyTopicDefinition;
import javax.annotation.Resource;
import java.util.List;
/**
* 仪表改变通知
*
* @author HeTongHao
* @since 2022/18/19 15:43
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MeterChangeActor extends AbstractActor {
@Resource
private List<IThirdPartyProtocolSyncMeterInfo> thirdPartyProtocols;
public MeterChangeActor() {
receive(ReceiveBuilder.match(MqttResponse.class, mqttResponse -> {
String topic = mqttResponse.getTopic();
String protocolType = ThirdPartyProxyTopicDefinition.resolveProtocolCodeByTopic(topic);
for (IThirdPartyProtocolSyncMeterInfo thirdPartyProtocol : thirdPartyProtocols) {
if (protocolType.equals(thirdPartyProtocol.protocolType().getCode())) {
MeterChange meterChange = JSONObject.parseObject(mqttResponse.getMessage(), MeterChange.class);
PlateFormMeterInfo meterInfo = meterChange.getMeterInfo();
// 发送到具体协议去执行
switch (meterChange.getOperateType()) {
case "PUT":
thirdPartyProtocol.putMeter(MeterInfo
.builder()
.id(meterInfo.getId())
.code(meterInfo.getCode())
.build()
);
break;
case "DELETE":
thirdPartyProtocol.deleteMeter(meterInfo.getId());
break;
default:
log.error("不支持操作:" + meterChange.getOperateType());
}
break;
}
}
}).build());
}
}
package vc.thinker.mqtt.proxyclient.actor.down;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.mqtt.entity.MqttResponse;
import vc.thinker.mqtt.proxyclient.entity.down.MeterListResponse;
import vc.thinker.mqtt.proxyclient.service.ThirdPartyProxyClientService;
import javax.annotation.Resource;
/**
* 仪表列表拉取响应
*
* @author HeTongHao
* @since 2022/18/19 15:43
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MeterListPullResponseActor extends AbstractActor {
@Resource
private ThirdPartyProxyClientService thirdPartyProxyClientService;
public MeterListPullResponseActor() {
receive(ReceiveBuilder.match(MqttResponse.class, mqttResponse -> {
MeterListResponse meterListResponse = JSONObject.parseObject(mqttResponse.getMessage(), MeterListResponse.class);
// 响应数据
thirdPartyProxyClientService.responseData(meterListResponse.getRequestId(), meterListResponse.getMeterList());
}).build());
}
}
package vc.thinker.mqtt.proxyclient.actor.down;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.mqtt.entity.MqttResponse;
import vc.thinker.absctacts.protocolproxy.dto.IServiceExecuteResult;
import vc.thinker.absctacts.protocolproxy.dto.ServiceExecuteCmd;
import vc.thinker.absctacts.protocolproxy.protocol.IThirdPartyProtocol;
import vc.thinker.mqtt.proxyclient.entity.down.ServiceExecCmd;
import vc.thinker.mqtt.proxyclient.entity.up.ServiceExecResult;
import vc.thinker.mqtt.proxyclient.enums.ThirdPartyProxyTopicDefinition;
import vc.thinker.mqtt.proxyclient.service.ThirdPartyProxyClientService;
import javax.annotation.Resource;
import java.util.List;
/**
* 新科云执行服务到此第三方代理
*
* @author HeTongHao
* @since 2021/8/11 15:43
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ThirdPartyServiceExecuteActor extends AbstractActor {
@Resource
private ThirdPartyProxyClientService thirdPartyProxyClientService;
@Resource
private List<IThirdPartyProtocol> thirdPartyProtocols;
public ThirdPartyServiceExecuteActor() {
receive(ReceiveBuilder.match(MqttResponse.class, mqttResponse -> {
String topic = mqttResponse.getTopic();
String protocolType = ThirdPartyProxyTopicDefinition.resolveProtocolCodeByTopic(topic);
for (IThirdPartyProtocol thirdPartyProtocol : thirdPartyProtocols) {
if (protocolType.equals(thirdPartyProtocol.protocolType().getCode())) {
List<ServiceExecCmd> serviceExecuteCmds =
JSONArray.parseArray(mqttResponse.getMessage(), ServiceExecCmd.class);
// 发送到具体协议去执行
executeAndResponse(thirdPartyProtocol, serviceExecuteCmds);
break;
}
}
}).build());
}
/**
* 执行并响应结果
*
* @param thirdPartyProtocol 分发到的协议协议
* @param serviceExecuteCmds 待执行的命令列表
*/
private void executeAndResponse(IThirdPartyProtocol thirdPartyProtocol, List<ServiceExecCmd> serviceExecuteCmds) {
serviceExecuteCmds.forEach(cmd ->
ThreadUtil.execute(() -> {
// 执行->获取结果
IServiceExecuteResult result = thirdPartyProtocol.onExecutiveService(
new ServiceExecuteCmd(cmd.getRequestId(), cmd.getDeviceCode(), cmd.getServiceCode(), cmd.getParams())
);
// 结果响应
thirdPartyProxyClientService.executeResponse(new ServiceExecResult(thirdPartyProtocol.protocolType().getCode(), result));
})
);
}
}
package vc.thinker.mqtt.proxyclient.actor.up;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vc.thinker.mqtt.proxyclient.ThirdPartyProxyClientProtocol;
import vc.thinker.mqtt.proxyclient.entity.up.MeterListPull;
import vc.thinker.mqtt.proxyclient.enums.ThirdPartyProxyTopicDefinition;
import javax.annotation.Resource;
/**
* 拉取仪表列表
*
* @author HeTongHao
* @since 2022/8/19 15:43
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MeterListPullActor extends AbstractActor {
@Resource
private ThirdPartyProxyClientProtocol thirdPartyProxyClientProtocol;
public MeterListPullActor() {
receive(ReceiveBuilder.match(MeterListPull.class, meterListPull ->
thirdPartyProxyClientProtocol.sendMessage(ThirdPartyProxyTopicDefinition.meterListPull
, meterListPull.getProtocolCode()
, meterListPull.getRequestId().getBytes())
).build());
}
}
package vc.thinker.mqtt.proxyclient.actor.up;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vc.thinker.mqtt.proxyclient.ThirdPartyProxyClientProtocol;
import vc.thinker.mqtt.proxyclient.entity.up.ThirdPartyProxyReport;
import vc.thinker.mqtt.proxyclient.enums.ThirdPartyProxyTopicDefinition;
import javax.annotation.Resource;
/**
* 第三方数据上报到新科云
*
* @author HeTongHao
* @since 2021/7/7 20:43
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ThirdPartyReportActor extends AbstractActor {
@Resource
private ThirdPartyProxyClientProtocol thirdPartyProxyClientProtocol;
public ThirdPartyReportActor() {
receive(ReceiveBuilder.match(ThirdPartyProxyReport.class, thirdPartyProxyReport ->
thirdPartyProxyClientProtocol.sendMessage(ThirdPartyProxyTopicDefinition.report
, thirdPartyProxyReport.getProtocolType()
, JSONArray.toJSONBytes(thirdPartyProxyReport.getPayload()))
).build());
}
}
package vc.thinker.mqtt.proxyclient.actor.up;
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vc.thinker.mqtt.proxyclient.ThirdPartyProxyClientProtocol;
import vc.thinker.mqtt.proxyclient.entity.up.ServiceExecResult;
import vc.thinker.mqtt.proxyclient.enums.ThirdPartyProxyTopicDefinition;
import javax.annotation.Resource;
/**
* 拉取仪表列表
*
* @author HeTongHao
* @since 2022/8/19 15:43
*/
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ThirdPartyServiceExecuteResponseActor extends AbstractActor {
@Resource
private ThirdPartyProxyClientProtocol thirdPartyProxyClientProtocol;
public ThirdPartyServiceExecuteResponseActor() {
receive(ReceiveBuilder.match(ServiceExecResult.class, serviceExecResult ->
thirdPartyProxyClientProtocol.sendMessage(ThirdPartyProxyTopicDefinition.executeServiceResponse
, serviceExecResult.getProtocolCode()
, JSONArray.toJSONBytes(serviceExecResult))
).build());
}
}
package vc.thinker.mqtt.proxyclient.entity.down;
import lombok.Data;
/**
* 基础响应字段
*
* @author HeTongHao
* @since 2022/8/19 17:38
*/
@Data
public class BaseResponse {
private String requestId;
}
package vc.thinker.mqtt.proxyclient.entity.down;
import lombok.Data;
/**
* 仪表信息
*
* @author HeTongHao
* @since 2022/8/19 17:37
*/
@Data
public class MeterChange {
private String operateType;
private PlateFormMeterInfo meterInfo;
}
package vc.thinker.mqtt.proxyclient.entity.down;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.List;
/**
* 仪表列表响应体
*
* @author HeTongHao
* @since 2022/8/19 17:37
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class MeterListResponse extends BaseResponse {
private List<PlateFormMeterInfo> meterList;
}
package vc.thinker.mqtt.proxyclient.entity.down;
import lombok.Data;
/**
* 仪表信息
*
* @author HeTongHao
* @since 2022/8/19 17:37
*/
@Data
public class PlateFormMeterInfo {
private String id;
private String code;
}
package vc.thinker.mqtt.proxyclient.entity.down;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Map;
/**
* 服务执行命令
*
* @author HeTongHao
* @since 2021/7/9 17:47
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class ServiceExecCmd extends BaseResponse {
private String deviceCode;
private String serviceCode;
private Map<String, Object> params;
}
package vc.thinker.mqtt.proxyclient.entity.up;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.JSONSerializable;
import com.alibaba.fastjson.serializer.JSONSerializer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serial;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
/**
* 原始数据
*
* @author XFY
* @since 2022/10/24 18:03
**/
@Data
@Accessors(chain = true)
public class MeterData implements Serializable, JSONSerializable {
@Serial
private static final long serialVersionUID = -6256921296477327221L;
/**
* 时间戳
*/
private Long timestamp;
private Map<String, Object> meterDataMap;
private static final String TIME_PREFIX = "T_";
private static final String DEVICE_CODE = "deviceCode";
public MeterData() {
this.meterDataMap = Maps.newHashMap();
}
/**
* 设置设备编号
*/
public MeterData setDeviceCode(String deviceCode) {
meterDataMap.put(DEVICE_CODE, deviceCode);
return this;
}
/**
* 获取设备编号
*/
public String getDeviceCode() {
return String.valueOf(meterDataMap.get(DEVICE_CODE));
}
/**
* 添加属性数据
*
* @param code 属性编号
* @param value 属性值
* @return MeterData
*/
public MeterData putAttrData(String code, Object value) {
Objects.requireNonNull(timestamp, "时间戳不能为空");
putAttrData(timestamp, code, value);
return this;
}
/**
* 添加属性数据
*
* @param timestamp 时间戳
* @param code 属性编号
* @param value 属性值
* @return MeterData
*/
public MeterData putAttrData(Long timestamp, String code, Object value) {
String key = TIME_PREFIX + timestamp;
DataBucket dataBucket = (DataBucket) meterDataMap.computeIfAbsent(key, k -> new DataBucket());
dataBucket.putAttrData(code, value);
return this;
}
/**
* 添加事件数据
*
* @param code 事件编号
* @param value 事件值
* @return MeterData
*/
public MeterData putEventData(String code, Object value) {
Objects.requireNonNull(timestamp, "时间戳不能为空");
putEventData(timestamp, code, value);
return this;
}
/**
* 添加事件数据
*
* @param timestamp 时间戳
* @param code 事件编号
* @param value 事件值
* @return MeterData
*/
public MeterData putEventData(Long timestamp, String code, Object value) {
String key = TIME_PREFIX + timestamp;
DataBucket dataBucket = (DataBucket) meterDataMap.computeIfAbsent(key, k -> new DataBucket());
dataBucket.putEventData(code, value);
return this;
}
public static void main(String[] args) {
ArrayList<MeterData> list = Lists.newArrayList();
MeterData v2 = new MeterData();
v2.setDeviceCode("123").setTimestamp(System.currentTimeMillis());
v2.putAttrData("AirGapSensor_1.airGap", 3.45);
v2.putEventData("AirGapSensor_1.boot_up", 1);
list.add(v2);
System.out.println(v2);
System.out.println(JSONArray.toJSONString(list));
}
/**
* 数据包封装
* 标识一个仪表的一次上报,内容可包含0个或多个属性、事件
*
* @author HeTongHao
* @since 2022/8/8 20:39
*/
@Data
@Accessors(chain = true)
public static class DataBucket implements Serializable {
@Serial
private static final long serialVersionUID = -7756910224854163803L;
/**
* 属性数据键值对
*/
private Map<String, Object> datas;
/**
* 事件数据键值对
*/
private Map<String, Object> events;
public Map<String, Object> putAttrData(String code, Object value) {
if (datas == null) {
datas = Maps.newHashMap();
}
datas.put(code, value);
return datas;
}
public Map<String, Object> putEventData(String code, Object value) {
if (events == null) {
events = Maps.newHashMap();
}
events.put(code, value);
return events;
}
}
@Override
public void write(JSONSerializer serializer, Object fieldName, Type fieldType, int features) {
serializer.write(meterDataMap);
}
@Override
public String toString() {
return JSONObject.toJSONString(meterDataMap);
}
}
package vc.thinker.mqtt.proxyclient.entity.up;
import lombok.Builder;
import lombok.Getter;
/**
* 设备列表拉取
*
* @author HeTongHao
* @since 2022/8/19 17:24
*/
@Getter
@Builder
public class MeterListPull {
private String protocolCode;
private String requestId;
}
package vc.thinker.mqtt.proxyclient.entity.up;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.dto.IServiceExecuteResult;
import vc.thinker.absctacts.protocolproxy.enums.ServiceExecuteResultType;
/**
* 服务执行结果
*
* @author HeTongHao
* @since 2021/8/11 17:47
*/
@Getter
public class ServiceExecResult {
public ServiceExecResult(String protocolCode, IServiceExecuteResult serviceExecuteResult) {
this.protocolCode = protocolCode;
this.requestId = serviceExecuteResult.getRequestId();
// 当前只有两种,简单实现:
this.resCode = ServiceExecuteResultType.ok.equals(serviceExecuteResult.getType()) ? "200" : "500";
this.resMsg = serviceExecuteResult.getMessage();
}
private final String protocolCode;
private final String requestId;
private final String resCode;
private final String resMsg;
}
package vc.thinker.mqtt.proxyclient.entity.up;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
/**
* 第三方代理上报对象
*
* @author HeTongHao
* @since 2021/7/7 20:56
*/
@Data
@Accessors(chain = true)
@AllArgsConstructor
public class ThirdPartyProxyReport {
/**
* 协议类型编号
*/
private String protocolType;
/**
* 数据列表
*/
private List<MeterData> payload;
}
package vc.thinker.mqtt.proxyclient.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import vc.thinker.absctacts.mqtt.topic.TopicDefinition;
import vc.thinker.mqtt.proxyclient.actor.down.MeterChangeActor;
import vc.thinker.mqtt.proxyclient.actor.down.MeterListPullResponseActor;
import vc.thinker.mqtt.proxyclient.actor.down.ThirdPartyServiceExecuteActor;
import vc.thinker.mqtt.proxyclient.actor.up.MeterListPullActor;
import vc.thinker.mqtt.proxyclient.actor.up.ThirdPartyReportActor;
import vc.thinker.mqtt.proxyclient.actor.up.ThirdPartyServiceExecuteResponseActor;
import vc.thinker.utils.Resolve;
/**
* Topic-actor 定义
*
* @author HeTongHao
* @since 2022/8/10 11:16
*/
@Getter
@AllArgsConstructor
public enum ThirdPartyProxyTopicDefinition implements TopicDefinition {
// 数据上报
report("third-party/+/v2/up/report", ThirdPartyReportActor.class),
// 服务执行
executeService("third-party/+/down/execute-service", ThirdPartyServiceExecuteActor.class),
executeServiceResponse("third-party/+/up/execute-service/response", ThirdPartyServiceExecuteResponseActor.class),
// 仪表同步
meterListPull("third-party/+/up/meter-list-pull", MeterListPullActor.class),
meterListPullResponse("third-party/+/down/meter-list-pull/response", MeterListPullResponseActor.class),
meterChange("third-party/+/down/meter-change", MeterChangeActor.class),
;
private final String topic;
private final Class<?> actorClass;
public String genTopic(String protocolCode) {
return this.topic.replace("+", protocolCode);
}
/**
* 根据topic解析第三方协议编号
*
* @param topic topic
* @return 第三方协议编号
*/
public static String resolveProtocolCodeByTopic(String topic) {
return topic.split("/")[1];
}
public static final Resolve<ThirdPartyProxyTopicDefinition, String> RESOLVE_BY_TOPIC =
new Resolve<>(values(), ThirdPartyProxyTopicDefinition::getTopic);
}
package vc.thinker.mqtt.proxyclient.service;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Service;
import vc.thinker.absctacts.mqtt.publish.TopicMqttPushService;
import vc.thinker.mqtt.proxyclient.entity.down.PlateFormMeterInfo;
import vc.thinker.mqtt.proxyclient.entity.up.MeterListPull;
import vc.thinker.mqtt.proxyclient.entity.up.ServiceExecResult;
import vc.thinker.mqtt.proxyclient.entity.up.ThirdPartyProxyReport;
import vc.thinker.mqtt.proxyclient.enums.ThirdPartyProxyTopicDefinition;
import vc.thinker.utils.CompletableFutureObserver;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 适配4G网关方式协议适配
*
* @author HeTongHao
* @since 2021/7/9 14:05
*/
@Service
public class ThirdPartyProxyClientService extends TopicMqttPushService {
public final CompletableFutureObserver<Object> completableFutureObserver = new CompletableFutureObserver<>();
/**
* 响应数据
* 基于requestId 来实现同步响应数据
*
* @param requestId requestId
* @param data 数据对象
*/
public void responseData(String requestId, Object data) {
completableFutureObserver.onMessage(requestId, data);
}
/**
* 上报数据
*
* @param thirdPartyProxyReport 上报数据
*/
public void report(ThirdPartyProxyReport thirdPartyProxyReport) {
super.push(ThirdPartyProxyTopicDefinition.report, thirdPartyProxyReport);
}
/**
* 服务执行响应
*
* @param serviceExecResult 服务执行响应结果
*/
public void executeResponse(ServiceExecResult serviceExecResult) {
super.push(ThirdPartyProxyTopicDefinition.executeServiceResponse, serviceExecResult);
}
/**
* 拉取仪表列表
*
* @param protocolCode 协议编号
*/
public List<PlateFormMeterInfo> pullMeterList(String protocolCode) throws TimeoutException {
String requestId = UUID.randomUUID().toString();
super.push(ThirdPartyProxyTopicDefinition.meterListPull, MeterListPull
.builder()
.protocolCode(protocolCode)
.requestId(requestId)
.build()
);
CompletableFuture<Object> subscribe = completableFutureObserver.subscribe(requestId);
try {
return (List<PlateFormMeterInfo>) subscribe.get(3, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return Lists.newArrayList();
} catch (TimeoutException e) {
throw e;
}
}
}
package vc.thinker.response;
import com.fasterxml.jackson.annotation.JsonView;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.context.MessageSource;
import org.springframework.web.servlet.support.RequestContextUtils;
import javax.servlet.http.HttpServletRequest;
import java.util.Locale;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:59
*/
@Data
@Accessors(chain = true)
public abstract class AbstractResponse {
@ApiModelProperty(hidden = true)
private MessageSource messageSource;
@ApiModelProperty(hidden = true)
private Locale locale;
@ApiModelProperty("请求是否成功")
private boolean success = true;
@ApiModelProperty("自定义异常code")
private String code;
@ApiModelProperty("消息提示")
private String message;
public AbstractResponse() {
}
public AbstractResponse(MessageSource messageSource, HttpServletRequest request) {
this.messageSource = messageSource;
this.locale = RequestContextUtils.getLocale(request);
}
/**
* 设置错误信息
*
* @param code 自定义异常code
* @param message 消息提示
*/
public void setErrorInfo(String code, String message) {
this.success = false;
this.code = code;
this.message = message;
}
/**
* 设置错误信息
*
* @param code 自定义异常code
* @param message 消息提示
*/
public void setErrorInfo(int code, String message) {
this.success = false;
this.code = String.valueOf(code);
this.message = message;
}
/**
* 设置错误信息
*
* @param code 自定义异常code
* @param messageCode 用于国际化的消息编号
* @param defaultMessage 如果找不到对应的消息,返回默认的消息
*/
public void setErrorInfo(String code, String messageCode, String defaultMessage) {
if (messageSource == null) {
throw new RuntimeException("未初始化 messageSource 字段");
}
this.success = false;
this.code = code;
this.message = messageSource.getMessage(messageCode, null, defaultMessage, locale);
}
/**
* 设置错误信息
*
* @param code 自定义异常code
* @param messageCode 用于国际化的消息编号
* @param defaultMessage 如果找不到对应的消息,返回默认的消息
*/
public void setErrorInfo(int code, String messageCode, String defaultMessage) {
setErrorInfo(String.valueOf(code), messageCode, defaultMessage);
}
/**
* 设置错误信息
*
* @param code 自定义异常code
* @param messageCode 用于国际化的消息编号
* @param args 消息参数列表
* @param defaultMessage 如果找不到对应的消息,返回默认的消息
*/
public void setErrorInfo(String code, String messageCode, Object[] args, String defaultMessage) {
this.success = false;
this.code = code;
this.message = messageSource.getMessage(messageCode, args, defaultMessage, locale);
}
/**
* 设置错误信息
*
* @param code 自定义异常code
* @param messageCode 用于国际化的消息编号
* @param args 消息参数列表
* @param defaultMessage 如果找不到对应的消息,返回默认的消息
*/
public void setErrorInfo(int code, String messageCode, Object[] args, String defaultMessage) {
setErrorInfo(String.valueOf(code), messageCode, args, defaultMessage);
}
}
package vc.thinker.response;
import org.springframework.context.MessageSource;
import javax.servlet.http.HttpServletRequest;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:58
*/
public class SimpleResponse extends AbstractResponse {
public SimpleResponse() {
}
public SimpleResponse(MessageSource messageSource, HttpServletRequest request) {
super(messageSource, request);
}
}
package vc.thinker.thirdpartyproxy;
import com.google.common.collect.Sets;
import lombok.AllArgsConstructor;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.IProtocolType;
import vc.thinker.web.validator.EnumValidate;
import java.util.Objects;
import java.util.Set;
/**
* 第三方协议类型
*
* @author HeTongHao
* @since 2021/7/9 16:20
*/
@Getter
@AllArgsConstructor
public enum ProtocolType implements IProtocolType, EnumValidate<String> {
/**
* 第三方协议类型
*/
test("test", "测试"),
/**
* 中央空调
*/
central_air_conditioning("central-air-conditioning","中央空调")
;
static {
int total = values().length;
Set<ProtocolType> distinctSet = Sets.newHashSet(values());
if (distinctSet.size() != total) {
throw new IllegalArgumentException("协议编号有重复,请检查");
}
}
/**
* 协议编号
*/
private final String code;
/**
* 协议名称
*/
private final String name;
@Override
public boolean existValidate(String value) {
if (value == null) {
return false;
}
for (ProtocolType protocolType : ProtocolType.values()) {
if (Objects.equals(protocolType.getCode(), value)) {
return true;
}
}
return false;
}
}
package vc.thinker.thirdpartyproxy.protocolimpls.centralAir;
import lombok.AllArgsConstructor;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.DataType;
import vc.thinker.absctacts.protocolproxy.enums.IMeterAttrDefine;
/**
* 中央空调属性协议属性定义
*
* @author HeTongHao
* @since 2021/7/9 16:20
*/
@Getter
@AllArgsConstructor
public enum CentralAirMeterAttrDefine implements IMeterAttrDefine {
/**
* 中央空调属性
*/
electricity("electricity", "电量", DataType.Float),
coldCapacity("coldCapacity", "冷量", DataType.Int),
systemCop("systemCop", "系统COP", DataType.Float),
supplyWaterTemp("supplyWaterTemp", "供水温度", DataType.Float),
returnWaterTemp("returnWaterTemp", "回水温度", DataType.Float),
;
/**
* 编号,设备下唯一
*/
private final String code;
/**
* 属性名称
*/
private final String name;
/**
* 数据类型
*/
private final DataType dataType;
}
package vc.thinker.thirdpartyproxy.protocolimpls.centralAir;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import vc.thinker.absctacts.protocolproxy.dto.MeterReportEntity;
import vc.thinker.absctacts.protocolproxy.enums.IProtocolType;
import vc.thinker.absctacts.protocolproxy.protocol.AbstractThirdPartyProtocolSyncMeterInfo;
import vc.thinker.thirdpartyproxy.ProtocolType;
import vc.thinker.thirdpartyproxy.protocolimpls.centralAir.dto.DataReportInfoDTO;
import vc.thinker.thirdpartyproxy.protocolimpls.centralAir.dto.DataReportParamDTO;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
/**
* 中央空调代理上报网关协议
*
* @author HeTongHao
* @since 2021/7/9 17:13
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class CentralAirProtocol extends AbstractThirdPartyProtocolSyncMeterInfo implements TerminalReportData {
@Override
public IProtocolType protocolType() {
return ProtocolType.central_air_conditioning;
}
@Override
public void dataReport(DataReportInfoDTO dataReportInfoDTO) {
LocalDateTime time = LocalDateTime.ofEpochSecond(dataReportInfoDTO.getTimestamp()/1000L,0, ZoneOffset.of("+8"));
String deviceId = lastDeviceId(dataReportInfoDTO);
DataReportParamDTO paramDTO = dataReportInfoDTO.getData();
if (ObjectUtils.isEmpty(paramDTO)){
return;
}
MeterReportEntity.MeterReportEntityBuilder builder = MeterReportEntity.builder(deviceId, time);
if (!ObjectUtils.isEmpty(paramDTO.getElectricity())){
builder.putValue(CentralAirMeterAttrDefine.electricity, dataReportInfoDTO.getData().getElectricity());
}
if (!ObjectUtils.isEmpty(paramDTO.getColdCapacity())) {
builder.putValue(CentralAirMeterAttrDefine.coldCapacity, dataReportInfoDTO.getData().getColdCapacity());
}
if (!ObjectUtils.isEmpty(paramDTO.getSystemCop())) {
builder.putValue(CentralAirMeterAttrDefine.systemCop, dataReportInfoDTO.getData().getSystemCop());
}
if (!ObjectUtils.isEmpty(paramDTO.getSupplyWaterTemp())) {
builder.putValue(CentralAirMeterAttrDefine.supplyWaterTemp, dataReportInfoDTO.getData().getSupplyWaterTemp());
}
if (!ObjectUtils.isEmpty(paramDTO.getReturnWaterTemp())) {
builder.putValue(CentralAirMeterAttrDefine.returnWaterTemp, dataReportInfoDTO.getData().getReturnWaterTemp());
}
MeterReportEntity meterReport = builder.build();
System.out.println("____"+ JSONObject.toJSONString(meterReport));
//report(meterReport);
}
}
package vc.thinker.thirdpartyproxy.protocolimpls.centralAir;
import vc.thinker.absctacts.protocolproxy.exceptions.MeterCodeFormatException;
import vc.thinker.thirdpartyproxy.protocolimpls.centralAir.dto.DataReportInfoDTO;
/**
* WfIot上报数据接口定义
*
* @author HeTongHao
* @since 2022/8/10 21:08
*/
public interface TerminalReportData {
/**
* 根据设备dto生成最终的设备id
*
* @param dataReportInfoDTO 设备上报信息dto
* @return deviceId
*/
default String lastDeviceId(DataReportInfoDTO dataReportInfoDTO) {
return dataReportInfoDTO.getProtocolCode() + "@" + dataReportInfoDTO.getDeviceAddress();
}
/**
* 根据仪表编号拆分成网关编号与deviceId
*
* @param meterCode 仪表编号
* @return deviceId
*/
default DataReportInfoDTO splitCode(String meterCode) {
String[] split = meterCode.split("@");
int codeItemNum = 2;
if (split.length != codeItemNum) {
throw new MeterCodeFormatException("CentralAir 仪表编号格式错误:" + meterCode);
}
return DataReportInfoDTO
.builder()
.protocolCode(split[0])
.deviceAddress(split[1])
.build();
}
/**
* 终端设备上报信息
*
* @param dataReportInfoDTO 设备参数信息
*/
void dataReport(DataReportInfoDTO dataReportInfoDTO);
}
package vc.thinker.thirdpartyproxy.protocolimpls.centralAir.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;
/**
* @author : xieshaojun
* @date : 2023/1/3 14:23
*/
@Data
@Builder
public class DataReportInfoDTO {
@ApiModelProperty("协议编号")
private String protocolCode;
@ApiModelProperty("设备唯一地址")
private String deviceAddress;
@ApiModelProperty("时间戳")
private Long timestamp;
@ApiModelProperty("协议数据")
private DataReportParamDTO data;
}
package vc.thinker.thirdpartyproxy.protocolimpls.centralAir.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.math.BigDecimal;
/**
* @author : xieshaojun
* @date : 2023/1/3 14:24
*/
@Data
public class DataReportParamDTO {
@ApiModelProperty("电量")
private BigDecimal electricity;
@ApiModelProperty("冷量")
private BigDecimal coldCapacity;
@ApiModelProperty("系统COP")
private BigDecimal systemCop;
@ApiModelProperty("供水温度")
private BigDecimal supplyWaterTemp;
@ApiModelProperty("回水温度")
private BigDecimal returnWaterTemp;
}
package vc.thinker.thirdpartyproxy.protocolimpls.test;
import lombok.AllArgsConstructor;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.DataType;
import vc.thinker.absctacts.protocolproxy.enums.IMeterAttrDefine;
/**
* 测试协议属性定义
*
* @author HeTongHao
* @since 2021/7/9 16:20
*/
@Getter
@AllArgsConstructor
public enum TestMeterAttrDefine implements IMeterAttrDefine {
/**
* 数据组
*/
current("current", "电压", DataType.Float),
voltage("voltage", "电流", DataType.Float),
Active_energy("Active_energy", "有功电能", DataType.Float),
;
/**
* 编号,设备下唯一
*/
private final String code;
/**
* 属性名称
*/
private final String name;
/**
* 数据类型
*/
private final DataType dataType;
}
package vc.thinker.thirdpartyproxy.protocolimpls.test;
import lombok.AllArgsConstructor;
import lombok.Getter;
import vc.thinker.absctacts.protocolproxy.enums.IMeterEventDefine;
/**
* 测试协议事件定义
*
* @author HeTongHao
* @since 2021/7/9 16:20
*/
@Getter
@AllArgsConstructor
public enum TestMeterEventDefine implements IMeterEventDefine {
/**
* 数据组
*/
alert("alert", "告警"),
shutdown("shutdown", "停机"),
;
/**
* 编号,设备下唯一
*/
private final String code;
/**
* 属性名称
*/
private final String name;
}
package vc.thinker.thirdpartyproxy.protocolimpls.test;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import vc.thinker.absctacts.protocolproxy.dto.IMeterReportEntity;
import vc.thinker.absctacts.protocolproxy.dto.MeterReportEntity;
import vc.thinker.absctacts.protocolproxy.enums.IMeterDataDefine;
import vc.thinker.absctacts.protocolproxy.enums.IProtocolType;
import vc.thinker.absctacts.protocolproxy.protocol.AbstractThirdPartyProtocol;
import vc.thinker.thirdpartyproxy.ProtocolType;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
/**
* @author HeTongHao
* @since 2021/7/9 17:13
*/
@Slf4j
@Component
public class TestProtocol extends AbstractThirdPartyProtocol {
@Override
public IProtocolType protocolType() {
return ProtocolType.test;
}
// @Scheduled(cron = "* * * * * *")
public void report() {
super.report(MeterReportEntity.builder("001", LocalDateTime.now())
.putValue(TestMeterAttrDefine.voltage, 111)
.putValue(TestMeterAttrDefine.current, 100)
.putValue(TestMeterAttrDefine.Active_energy, 123456.233)
.putValue(TestMeterEventDefine.alert, true)
.putValue(TestMeterEventDefine.shutdown, false)
.build()
);
}
// @Scheduled(cron = "* * * * * *")
public void reportMulti() {
List<IMeterReportEntity> meterReportEntityList = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
Map<IMeterDataDefine, Object> map = Maps.newLinkedHashMap();
map.put(TestMeterAttrDefine.current, 100);
map.put(TestMeterAttrDefine.voltage, 111);
map.put(TestMeterAttrDefine.Active_energy, 123456.233);
map.put(TestMeterEventDefine.alert, true);
map.put(TestMeterEventDefine.shutdown, false);
meterReportEntityList.add(MeterReportEntity.builder("001", LocalDateTime.now())
.putValues(map)
.build());
}
super.report(meterReportEntityList);
}
}
package vc.thinker.tools;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 用于确定线上发版更新正常
* 打印一个版本
*
* @author HeTongHao
* @since 2022/12/20 11:03
*/
@Slf4j
@Component
public class StartTool {
@PostConstruct
public void start() {
log.info("启动:1");
}
}
package vc.thinker.utils;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 合并未来的观察者
* 用于维护一批需要合并同步结果的线程关系
*
* @author HeTongHao
* @since 2022/8/10 16:43
*/
public class CompletableFutureObserver<T> {
private final Map<String, CompletableFuture<T>> completableFutureMap = Maps.newConcurrentMap();
public CompletableFuture<T> subscribe(String key) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
completableFutureMap.put(key, completableFuture);
return completableFuture;
}
public void onMessage(String key, T msg) {
CompletableFuture<T> completableFuture = completableFutureMap.get(key);
if (completableFuture != null) {
completableFuture.complete(msg);
completableFutureMap.remove(key);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
completableFuture.complete("666");
}).start();
String o = completableFuture.get();
System.out.println(o);
}
}
package vc.thinker.utils;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.DESKeySpec;
import java.security.SecureRandom;
/**
* DES加密介绍
* DES是一种对称加密算法,所谓对称加密算法即:加密和解密使用相同密钥的算法。DES加密算法出自IBM的研究,
* 后来被美国政府正式采用,之后开始广泛流传,但是近些年使用越来越少,因为DES使用56位密钥,以现代计算能力,
* 24小时内即可被破解。虽然如此,在某些简单应用中,我们还是可以使用DES加密算法,本文简单讲解DES的JAVA实现
* 。
* 注意:DES加密和解密过程中,密钥长度都必须是8的倍数
*/
public class DES {
//测试
public static void main(String[] args) {
//待加密内容
String str = "测试内容";
//密码,长度要是8的倍数
String password = "9588028820109132570743325311898426347857298773549468758875018579537757772163084478873699447306034466200616411960574122434059469100235892702736860872901247123456";
byte[] result = DES.encrypt(str.getBytes(), password);
System.out.println("加密后:" + new String(result));
//直接将如上内容解密
try {
byte[] decryResult = DES.decrypt(result, password);
System.out.println("解密后:" + new String(decryResult));
} catch (Exception e1) {
e1.printStackTrace();
}
}
/**
* 加密
*
* @param datasource byte[]
* @param password String
* @return byte[]
*/
public static byte[] encrypt(byte[] datasource, String password) {
try {
SecureRandom random = new SecureRandom();
DESKeySpec desKey = new DESKeySpec(password.getBytes());
//创建一个密匙工厂,然后用它把DESKeySpec转换成
SecretKeyFactory keyFactory = SecretKeyFactory.getInstance("DES");
SecretKey securekey = keyFactory.generateSecret(desKey);
//Cipher对象实际完成加密操作
Cipher cipher = Cipher.getInstance("DES");
//用密匙初始化Cipher对象
cipher.init(Cipher.ENCRYPT_MODE, securekey, random);
//现在,获取数据并加密
//正式执行加密操作
return cipher.doFinal(datasource);
} catch (Throwable e) {
e.printStackTrace();
}
return null;
}
/**
* 解密
*
* @param src byte[]
* @param password String
* @return byte[]
* @throws Exception
*/
public static byte[] decrypt(byte[] src, String password) throws Exception {
// DES算法要求有一个可信任的随机数源
SecureRandom random = new SecureRandom();
// 创建一个DESKeySpec对象
DESKeySpec desKey = new DESKeySpec(password.getBytes());
// 创建一个密匙工厂
SecretKeyFactory keyFactory = SecretKeyFactory.getInstance("DES");
// 将DESKeySpec对象转换成SecretKey对象
SecretKey securekey = keyFactory.generateSecret(desKey);
// Cipher对象实际完成解密操作
Cipher cipher = Cipher.getInstance("DES");
// 用密匙初始化Cipher对象
cipher.init(Cipher.DECRYPT_MODE, securekey, random);
// 真正开始解密操作
return cipher.doFinal(src);
}
}
\ No newline at end of file
package vc.thinker.utils;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.TemporalAdjusters;
import java.util.Date;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:55
*/
@Slf4j
public class LocalDateTimeUtils {
/**
* 默认时区
*/
private static final ZoneId SYSTEM_ZONE_ID = ZoneId.systemDefault();
/**
* LocalDateTime转换Date
*
* @param localDateTime
* @return
*/
public static Date toDate(final LocalDateTime localDateTime) {
return Date.from(localDateTime.atZone(SYSTEM_ZONE_ID).toInstant());
}
/**
* Date转换LocalDateTime
*
* @param date
* @return
*/
public static LocalDateTime toLocalDateTime(final Date date) {
return LocalDateTime.ofInstant(date.toInstant(), SYSTEM_ZONE_ID);
}
/**
* 获取一天的开始
*
* @return LocalDateTime
*/
public static LocalDateTime getTheStartOfTheDay() {
return LocalDateTime.now().withHour(0).withMinute(0).withSecond(0).withNano(0);
}
/**
* 获取一天的结束
*
* @return LocalDateTime
*/
public static LocalDateTime getTheEndOfTheDay() {
return LocalDateTime.now().withHour(23).withMinute(59).withSecond(59).withNano(0);
}
/**
* 设置一天的开始
*
* @param localDateTime
* @return LocalDateTime
*/
public static LocalDateTime setTheStartOfTheDay(final LocalDateTime localDateTime) {
return localDateTime.withHour(0).withMinute(0).withSecond(0).withNano(0);
}
/**
* 设置一天的结束
*
* @param localDateTime
* @return LocalDateTime
*/
public static LocalDateTime setTheEndOfTheDay(final LocalDateTime localDateTime) {
return localDateTime.withHour(23).withMinute(59).withSecond(59).withNano(0);
}
/**
* 设置一天的开始
*
* @param date
* @return Date
*/
public static Date setTheStartOfTheDay(final Date date) {
return toDate(setTheStartOfTheDay(toLocalDateTime(date)));
}
/**
* 设置一天的结束
*
* @param date
* @return Date
*/
public static Date setTheEndOfTheDay(final Date date) {
return toDate(setTheEndOfTheDay(toLocalDateTime(date)));
}
/**
* 填满时间的月
*/
public static Date fillMonth(final Date date) {
return toDate(toLocalDateTime(date).withMonth(12));
}
/**
* 填满时间的日
*/
public static Date fillDay(final Date date) {
return toDate(toLocalDateTime(date).with(TemporalAdjusters.lastDayOfMonth()));
}
/**
* 填满时间的月
*/
public static LocalDateTime fillMonth(final LocalDateTime date) {
return date.withMonth(12);
}
/**
* 填满时间的日
*/
public static LocalDateTime fillDay(final LocalDateTime date) {
return date.with(TemporalAdjusters.lastDayOfMonth());
}
/**
* 测试方法
*
* @param args
*/
public static void main(String[] args) {
System.out.println(intervalDay( LocalDateTime.of(2017, 8, 5, 00, 00), LocalDateTime.of(2019, 8, 6, 00, 00)));
}
/**
* 将LocalDateTime转为timestamp
*
* @param localDateTime
* @return
*/
public static long parseToTimestamp(LocalDateTime localDateTime) {
return localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}
/**
* 设置时间的月的开始
*
* @param localDateTime
* @return LocalDateTime
*/
public static LocalDateTime setTheStartOfTheMonth(final LocalDateTime localDateTime) {
return setTheStartOfTheDay(localDateTime).withDayOfMonth(1);
}
/**
* 设置时间的月的结束
*
* @param localDateTime
* @return LocalDateTime
*/
public static LocalDateTime setTheEndOfTheMonth(final LocalDateTime localDateTime) {
return setTheEndOfTheDay(localDateTime).withDayOfMonth(1).plusMonths(1).minusDays(1);
}
/**
* 设置时间的年的开始
*
* @param localDateTime
* @return LocalDateTime
*/
public static LocalDateTime setTheStartOfTheYear(final LocalDateTime localDateTime) {
return setTheStartOfTheMonth(localDateTime.withMonth(1));
}
/**
* 设置时间的年的结束
*
* @param localDateTime
* @return LocalDateTime
*/
public static LocalDateTime setTheEndOfTheYear(final LocalDateTime localDateTime) {
return setTheEndOfTheMonth(localDateTime.withMonth(12));
}
/**
* 两个时间间隔的毫秒数
*
* @param subtractedTimestamp 被减时间
* @param subtractTimestamp 减时间
* @return 时间戳差值
*/
public static long intervalTimestamp(LocalDateTime subtractedTimestamp, LocalDateTime subtractTimestamp) {
return parseToTimestamp(subtractedTimestamp) - parseToTimestamp(subtractTimestamp);
}
/**
* 两个时间间隔的分钟
*
* @param subtractedTimestamp 被减时间
* @param subtractTimestamp 减时间
* @return 分钟差值
*/
public static long intervalMinute(LocalDateTime subtractedTimestamp, LocalDateTime subtractTimestamp) {
return intervalTimestamp(subtractedTimestamp, subtractTimestamp) / 1000 / 60;
}
/**
* 两个时间间隔的分钟
*
* @param subtractedTimestamp 被减时间
* @param subtractTimestamp 减时间
* @return 分钟差值
*/
public static long intervalDay(LocalDateTime subtractedTimestamp, LocalDateTime subtractTimestamp) {
return intervalMinute(subtractedTimestamp, subtractTimestamp) / 60 / 24;
}
}
package vc.thinker.utils;
import com.google.common.collect.Maps;
import org.springframework.lang.Nullable;
import java.util.Map;
import java.util.function.Function;
/**
* 枚举解析器
*
* @author HeTongHao
* @since 2021/6/17 16:34
*/
public class Resolve<T, K> {
private final Map<K, T> keyValueMap = Maps.newHashMap();
/**
* 构造
*
* @param values 枚举数组
* @param resolveKey 根据那个字段解析
*/
public Resolve(T[] values, Function<T, K> resolveKey) {
for (T value : values) {
keyValueMap.put(resolveKey.apply(value), value);
}
}
@Nullable
public T resolve(K key) {
return keyValueMap.get(key);
}
}
package vc.thinker.utils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.springframework.lang.Nullable;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
/**
* 枚举解析器 解析出多个枚举
*
* @author HeTongHao
* @since 2021/6/17 16:34
*/
public class ResolveMultipart<T, K> {
private final Map<K, List<T>> keyValueMap = Maps.newHashMap();
/**
* 构造
*
* @param values 枚举数组
* @param resolveKey 根据那个字段解析
*/
public ResolveMultipart(T[] values, Function<T, K> resolveKey) {
for (T value : values) {
keyValueMap.computeIfAbsent(resolveKey.apply(value), k -> Lists.newLinkedList()).add(value);
}
}
public ResolveMultipart(List<T> values, Function<T, K> resolveKey) {
for (T value : values) {
keyValueMap.computeIfAbsent(resolveKey.apply(value), k -> Lists.newLinkedList()).add(value);
}
}
@Nullable
public List<T> resolve(K key) {
return keyValueMap.get(key);
}
}
package vc.thinker.utils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.io.DefaultResourceLoader;
import java.io.IOException;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:51
*/
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
private static ApplicationContext applicationContext = null;
private static Logger logger = LoggerFactory.getLogger(SpringContextHolder.class);
/**
* 取得存储在静态变量中的ApplicationContext.
*/
public static ApplicationContext getApplicationContext() {
assertContextInjected();
return applicationContext;
}
public static String getRootRealPath(){
String rootRealPath ="";
try {
rootRealPath=getApplicationContext().getResource("").getFile().getAbsolutePath();
} catch (IOException e) {
logger.warn("获取系统根目录失败");
}
return rootRealPath;
}
public static String getResourceRootRealPath(){
String rootRealPath ="";
try {
rootRealPath=new DefaultResourceLoader().getResource("").getFile().getAbsolutePath();
} catch (IOException e) {
logger.warn("获取资源根目录失败");
}
return rootRealPath;
}
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) {
assertContextInjected();
return (T) applicationContext.getBean(name);
}
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
public static <T> T getBean(Class<T> requiredType) {
assertContextInjected();
return applicationContext.getBean(requiredType);
}
/**
* 清除SpringContextHolder中的ApplicationContext为Null.
*/
public static void clearHolder() {
if (logger.isDebugEnabled()){
logger.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext);
}
applicationContext = null;
}
/**
* 实现ApplicationContextAware接口, 注入Context到静态变量中.
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
// logger.debug("注入ApplicationContext到SpringContextHolder:{}", applicationContext);
if (SpringContextHolder.applicationContext != null) {
logger.info("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);
}
SpringContextHolder.applicationContext = applicationContext; // NOSONAR
}
/**
* 实现DisposableBean接口, 在Context关闭时清理静态变量.
*/
@Override
public void destroy() throws Exception {
SpringContextHolder.clearHolder();
}
/**
* 检查ApplicationContext不为空.
*/
private static void assertContextInjected() {
Validate.validState(applicationContext != null, "applicaitonContext属性未注入, 请在applicationContext.xml中定义SpringContextHolder.");
}
}
package vc.thinker.utils;
import org.slf4j.helpers.MessageFormatter;
/**
* @author : xieshaojun
* @date : 2023/1/3 17:54
*/
public class StringFormaterUtils {
/**
* 用{}作为占位符,格式化字符串
*
* @param format
* @param argArray
* @return
*/
public static String format(String format, Object... argArray) {
return MessageFormatter.arrayFormat(format, argArray).getMessage();
}
}
package vc.thinker.utils;
import lombok.Data;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 同步信号通信
*
* @author HeTongHao
* @since 2020/7/9 19:50
*/
@Data
public class SynchronizeSignal {
public SynchronizeSignal() {
this(new ReentrantLock());
}
public SynchronizeSignal(Lock lock) {
this.lock = lock;
this.condition = this.lock.newCondition();
}
private Lock lock;
private Condition condition;
public void signal() {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
public boolean await(Duration duration) {
return await(duration.toMillis(), TimeUnit.MILLISECONDS);
}
public boolean await(long time, TimeUnit unit) {
lock.lock();
try {
return condition.await(time, unit);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return false;
}
}
package vc.thinker.utils.dependhandle;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
/**
* 依赖属性查询工具
*
* @author HeTongHao
* @since 2021/1/5 21:02
*/
public class DependsInfoTools {
/**
* 批量设置依赖信息
*
* @param handleList 需要处理的对象列表
* @param foreignKey 需要处理的对象的外键(拿这个外键去查依赖)
* @param selectDependList 根据所有外键查询依赖列表的过程
* @param primaryKey 查询出依赖的主键(与外键对应)
* @param setValueHandler 依赖属性设置过程
* @param <T> 需要处理的对象类型
* @param <D> 依赖对象类型
* @param <K> 主键与外键的类型
*/
public static <T, D, K> void batchSetDependInfo(List<T> handleList, Function<T, K> foreignKey
, Function<List<K>, List<D>> selectDependList, Function<D, K> primaryKey
, SetValueHandler<T, D> setValueHandler) {
if (handleList.isEmpty()) {
return;
}
List<K> idList = genIds(handleList, foreignKey);
if (!idList.isEmpty()) {
List<D> dependList = selectDependList.apply(idList);
setDependProperty(handleList, foreignKey, dependList, primaryKey, setValueHandler);
}
}
/**
* 批量设置依赖信息
*
* @param handleList 需要处理的对象列表
* @param selectDependList 根据所有外键查询依赖列表的过程
* @param primaryKey 查询出依赖的主键(与外键对应)
* @param keyValueHandlerRelations 外键与值处理的映射关系
* @param <T> 需要处理的对象类型
* @param <D> 依赖对象类型
* @param <K> 主键与外键的类型
*/
public static <T, D, K> void batchSetDependInfo(List<T> handleList
, Function<List<K>, List<D>> selectDependList, Function<D, K> primaryKey
, List<KeyValueHandlerRelation<T, D, K>> keyValueHandlerRelations) {
if (handleList.isEmpty()) {
return;
}
List<Function<T, K>> foreignKeys
= keyValueHandlerRelations.stream().map(KeyValueHandlerRelation::getForeignKey).collect(Collectors.toList());
List<K> idList = genIds(handleList, foreignKeys);
if (!idList.isEmpty()) {
List<D> dependList = selectDependList.apply(idList);
setDependProperty(handleList, dependList, primaryKey, keyValueHandlerRelations);
}
}
@SafeVarargs
public static <T, D, K> void batchSetDependInfo(List<T> handleList
, Function<List<K>, List<D>> selectDependList, Function<D, K> primaryKey
, KeyValueHandlerRelation<T, D, K>... keyValueHandlerRelations) {
batchSetDependInfo(handleList, selectDependList, primaryKey, Lists.newArrayList(keyValueHandlerRelations));
}
private static <T, K> List<K> genIds(List<T> handleList, Function<T, K> foreignKey) {
return handleList.isEmpty() ? Lists.newArrayList() :
handleList.parallelStream().map(foreignKey).filter(Objects::nonNull).collect(Collectors.toList());
}
private static <T, K> List<K> genIds(List<T> handleList, List<Function<T, K>> foreignKeys) {
List<K> ids = Lists.newCopyOnWriteArrayList();
if (!handleList.isEmpty()) {
handleList.parallelStream().forEach(handle ->
foreignKeys.forEach(tkFunction ->
Optional.ofNullable(tkFunction.apply(handle)).ifPresent(ids::add)));
}
return ids;
}
private static <T, D, K> void setDependProperty(List<T> handleList, Function<T, K> foreignKey
, List<D> dependList, Function<D, K> primaryKey
, SetValueHandler<T, D> setValueHandler) {
dependList.forEach(depend -> {
K primaryId = primaryKey.apply(depend);
handleList.parallelStream().forEach(setObject ->
Optional.ofNullable(foreignKey.apply(setObject)).ifPresent(foreignId -> {
if (foreignId.equals(primaryId)) {
setValueHandler.set(setObject, depend);
}
})
);
});
}
private static <T, D, K> void setDependProperty(List<T> handleList
, List<D> dependList, Function<D, K> primaryKey
, List<KeyValueHandlerRelation<T, D, K>> keyValueHandlerRelations) {
Map<K, D> idDependsList = dependList.parallelStream().collect(Collectors.toMap(primaryKey, UnaryOperator.identity()));
handleList.parallelStream().forEach(setObject -> keyValueHandlerRelations.forEach(tdkKeyValueHandlerRelation -> {
K dependId = tdkKeyValueHandlerRelation.getForeignKey().apply(setObject);
Optional.ofNullable(idDependsList.get(dependId))
.ifPresent(depend -> tdkKeyValueHandlerRelation.getSetValueHandler().set(setObject, depend));
}));
}
}
package vc.thinker.utils.dependhandle;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.function.Function;
/**
* 外键与设置过程关系
*
* @author HeTongHao
* @since 2021/1/6 13:57
*/
@Data
@AllArgsConstructor
public class KeyValueHandlerRelation<T, D, K> {
public static <T, D, K> KeyValueHandlerRelation<T, D, K> of(Function<T, K> foreignKey
, SetValueHandler<T, D> setValueHandler) {
return new KeyValueHandlerRelation<>(foreignKey, setValueHandler);
}
private Function<T, K> foreignKey;
private SetValueHandler<T, D> setValueHandler;
}
package vc.thinker.utils.dependhandle;
/**
* 值设置过程
*
* @author HeTongHao
* @since 2021/1/6 10:02
*/
@FunctionalInterface
public interface SetValueHandler<T, D> {
/**
* 依赖设置过程
*
* @param setObject 被设置属性的对象
* @param value 待设置的值
*/
void set(T setObject, D value);
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment