Commit b8278465 authored by xieshaojun's avatar xieshaojun

更新代码 添加离线缓存 最大缓存

parent a79b1962
......@@ -9,7 +9,6 @@
</parent>
<artifactId>terminal-protocol-proxy-service</artifactId>
<name>${artifactId}</name>
<description>富士康设备服务</description>
<packaging>jar</packaging>
<properties>
<image>registry-vpc.cn-shenzhen.aliyuncs.com/thinker-vc/terminal-protocol-proxy-service</image>
......@@ -157,6 +156,11 @@
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
......@@ -170,19 +174,7 @@
<version>2.5.0</version>
<!-- 相关配置 -->
<configuration>
<!-- <finalName></finalName>-->
<includePrefix>vc.thinker</includePrefix>
<!-- <excludePrefix></excludePrefix>-->
<!-- <includeXmlPrefix></includeXmlPrefix>-->
<!-- <excludeXmlPrefix></excludeXmlPrefix>-->
<!-- <toCleanXmlChildElementName></toCleanXmlChildElementName>-->
<!-- <password></password>-->
<!-- <includeLibs></includeLibs>-->
<!-- <alreadyProtectedLibs></alreadyProtectedLibs>-->
<!-- <supportFile></supportFile>-->
<!-- <jvmArgCheck></jvmArgCheck>-->
<!-- <tips></tips>-->
<!-- <debug></debug>-->
</configuration>
<executions>
<execution>
......
......@@ -4,6 +4,15 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import vc.thinker.config.DataReportQueueUtil;
import vc.thinker.config.entity.DataReportArrayQueue;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
/**
* mqtt 指令发送
......@@ -23,7 +32,7 @@ public class MqttCommandPush {
return mqttClient;
}
public void sendMqttMessage(String topic, byte[] payload) {
public void sendMqttMessage(String topic, byte[] payload) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(payload);
message.setQos(1);
......@@ -31,14 +40,25 @@ public class MqttCommandPush {
try {
if (mqttClient == null) {
log.error("命令推送失败 mqttClient is null");
} else if (mqttClient.isConnected()) {
mqttClient.publish(topic, message);
// } else if (mqttClient.isConnected()) {
// mqttClient.publish(topic, message);
} else {
mqttClient.connect();
// mqttClient.connect();
mqttClient.publish(topic, message);
}
} catch (MqttException e) {
e.printStackTrace();
if (Objects.equals((int)MqttException.REASON_CODE_CLIENT_EXCEPTION,e.getReasonCode())
|| Objects.equals((int)MqttException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED,e.getReasonCode())
|| Objects.equals((int)MqttException.REASON_CODE_CLIENT_NOT_CONNECTED,e.getReasonCode())
){
ArrayBlockingQueue<DataReportArrayQueue> queue = DataReportQueueUtil.getArrayBlockingQueue();
DataReportArrayQueue obj = new DataReportArrayQueue();
obj.setPayload(payload);
obj.setTopic(topic);
queue.offer(obj);
log.error("broker 失去连接 数据添加到缓存队列中");
}
throw e;
}
}
}
package vc.thinker.config;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
/**
* @author : xieshaojun
* @date : 2023/2/6 16:04
*/
@Configuration
public class Config {
/**
* 缓存
* @return
*/
@Bean
public Cache<String,Object> caffeineCache(){
return Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.DAYS)
.initialCapacity(100)
.maximumSize(5000)
.build();
}
}
package vc.thinker.config;
import vc.thinker.config.entity.DataReportArrayQueue;
import java.util.concurrent.ArrayBlockingQueue;
/**
* @author : xieshaojun
* @date : 2023/2/7 16:27
*/
public class DataReportQueueUtil {
private volatile static ArrayBlockingQueue<DataReportArrayQueue> arrayBlockingQueue;
private DataReportQueueUtil(){
}
public static ArrayBlockingQueue<DataReportArrayQueue> getArrayBlockingQueue(){
if (arrayBlockingQueue == null){
synchronized (DataReportQueueUtil.class){
if(arrayBlockingQueue == null){
arrayBlockingQueue = new ArrayBlockingQueue<DataReportArrayQueue>(1000);
}
}
}
return arrayBlockingQueue;
}
}
package vc.thinker.config.entity;
import lombok.Data;
/**
* @author : xieshaojun
* @date : 2023/2/6 17:13
*/
@Data
public class DataReportArrayQueue {
private String topic;
private byte[] payload;
}
......@@ -64,25 +64,21 @@ public class LimitAspect {
}
@Around("serviceAspect()")
public Object around(ProceedingJoinPoint joinPoint){
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
ServiceLimit serviceLimit = method.getAnnotation(ServiceLimit.class);
ServiceLimit.LimitType limitType = serviceLimit.limitType();
String key = serviceLimit.key();
Object object;
try {
if (Objects.equals(limitType,ServiceLimit.LimitType.IP)){
if (Objects.equals(limitType, ServiceLimit.LimitType.IP)) {
key = IpUtils.getIpAddr(request);
}
RateLimiter rateLimiter = caches.get(key);
boolean acquire = rateLimiter.tryAcquire();
if (acquire){
if (acquire) {
object = joinPoint.proceed();
}else {
throw new CurrentLimitException("请求太频繁");
}
}catch (Throwable e){
} else {
throw new CurrentLimitException("请求太频繁");
}
return object;
......
package vc.thinker.exception;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.http.HttpStatus;
import org.springframework.validation.BindException;
import org.springframework.validation.FieldError;
......@@ -34,6 +35,15 @@ public class ExceptionAdvice {
return errorResponse;
}
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
@ExceptionHandler({MqttException.class})
public AbstractResponse mqttException(MqttException e) {
AbstractResponse errorResponse = new SimpleResponse();
errorResponse.setErrorInfo(HttpStatus.INTERNAL_SERVER_ERROR.value(), "mqtt连接异常 "+ e.getMessage());
return errorResponse;
}
/**
* 请求过多
* @param e
......
package vc.thinker.mqtt.forwardthinker;
import com.alibaba.fastjson.JSONObject;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
......@@ -25,7 +26,13 @@ public class ThinkerProdProtocolClient extends AbstractNotSubscriptionMqttProtoc
super(mqttConnectionProperties);
}
@SneakyThrows
public void report(String protocolCode, ThinkerReportDTO dto) {
super.getMqttCommandPush().sendMqttMessage(FoxGatewayTopicDefinition.report.genTopic(protocolCode), JSONObject.toJSONString(List.of(dto)).getBytes());
}
@SneakyThrows
public void reportBytes(String topic, byte[] payload) {
super.getMqttCommandPush().sendMqttMessage(topic, payload);
}
}
package vc.thinker.mqtt.proxyclient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
......@@ -45,6 +46,7 @@ public class ThirdPartyProxyClientProtocol extends AbstractCommonEntityHandleMqt
* @param protocolCode 第三方协议编号
* @param payload 数据荷载
*/
@SneakyThrows
public void sendMessage(ThirdPartyProxyTopicDefinition topic, String protocolCode, byte[] payload) {
super.getMqttCommandPush().sendMqttMessage(topic.genTopic(protocolCode), payload);
}
......
......@@ -42,36 +42,6 @@ public class CentralAirProtocol extends AbstractThirdPartyProtocol implements Te
return deviceInfoConfig.getCode();
}
// @Override
// public void dataReport(DataReportInfoDTO dataReportInfoDTO) {
// LocalDateTime time = LocalDateTime.ofEpochSecond(dataReportInfoDTO.getTimestamp()/1000L,0, ZoneOffset.of("+8"));
// String deviceId = lastDeviceId(dataReportInfoDTO);
// DataReportParamDTO paramDTO = dataReportInfoDTO.getData();
// if (ObjectUtils.isEmpty(paramDTO)){
// return;
// }
// MeterReportEntity.MeterReportEntityBuilder builder = MeterReportEntity.builder(deviceId, time);
// if (!ObjectUtils.isEmpty(paramDTO.getElectricity())){
// builder.putValue(CentralAirMeterAttrDefine.electricity, dataReportInfoDTO.getData().getElectricity());
// }
// if (!ObjectUtils.isEmpty(paramDTO.getColdCapacity())) {
// builder.putValue(CentralAirMeterAttrDefine.coldCapacity, dataReportInfoDTO.getData().getColdCapacity());
// }
// if (!ObjectUtils.isEmpty(paramDTO.getSystemCop())) {
// builder.putValue(CentralAirMeterAttrDefine.systemCop, dataReportInfoDTO.getData().getSystemCop());
// }
// if (!ObjectUtils.isEmpty(paramDTO.getSupplyWaterTemp())) {
// builder.putValue(CentralAirMeterAttrDefine.supplyWaterTemp, dataReportInfoDTO.getData().getSupplyWaterTemp());
// }
// if (!ObjectUtils.isEmpty(paramDTO.getReturnWaterTemp())) {
// builder.putValue(CentralAirMeterAttrDefine.returnWaterTemp, dataReportInfoDTO.getData().getReturnWaterTemp());
// }
// MeterReportEntity meterReport = builder.build();
// System.out.println("____"+ JSONObject.toJSONString(meterReport));
// report(meterReport);
// }
@Override
public void dataReport(DataReportInfoDTO dataReportInfoDTO) {
ThinkerReportDTO dto = new ThinkerReportDTO();
......
......@@ -5,12 +5,17 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import vc.thinker.config.DataReportQueueUtil;
import vc.thinker.config.entity.DataReportArrayQueue;
import vc.thinker.config.interceptor.ServiceLimit;
import vc.thinker.mqtt.forwardthinker.ThinkerProdProtocolClient;
import vc.thinker.response.SimpleResponse;
import vc.thinker.thirdpartyproxy.ProtocolType;
import vc.thinker.thirdpartyproxy.protocolimpls.centralAir.CentralAirProtocol;
......@@ -20,6 +25,7 @@ import vc.thinker.web.vo.TerminalDataReportVO;
import javax.validation.Valid;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
/**
* @author HeTongHao
......@@ -34,12 +40,14 @@ public class CentralAirController {
@Autowired
private CentralAirProtocol centralAirProtocol;
@Autowired
private ThinkerProdProtocolClient thinkerProdProtocolClient;
@ApiOperation("设备数据上报接口")
@PostMapping("data-report")
@ServiceLimit(key = "data-report")
public SimpleResponse dataReport(@RequestBody @Valid TerminalDataReportVO reportVO) {
public SimpleResponse dataReport(@RequestBody @Valid TerminalDataReportVO reportVO) throws MqttException {
SimpleResponse simpleResponse = new SimpleResponse();
simpleResponse.setCode("200");
if (Objects.equals(reportVO.getProtocolCode(),ProtocolType.central_air_conditioning.getCode())){
......@@ -52,6 +60,14 @@ public class CentralAirController {
this.centralAirProtocol.dataReport(dto);
simpleResponse.setSuccess(true);
simpleResponse.setMessage("处理成功");
ArrayBlockingQueue<DataReportArrayQueue> queue = DataReportQueueUtil.getArrayBlockingQueue();
for (DataReportArrayQueue obj : queue){
DataReportArrayQueue poll = queue.poll();
if (!ObjectUtils.isEmpty(poll)){
this.thinkerProdProtocolClient.reportBytes(poll.getTopic(),poll.getPayload());
}
}
return simpleResponse;
}
simpleResponse.setSuccess(false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment