123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- package com.sckj.opc.dataservice;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.google.common.eventbus.AsyncEventBus;
- import com.sckj.opc.dto.OPCPointDTO;
- import com.sckj.opc.entity.OPCData;
- import com.sckj.opc.entity.OPCPoint;
- import com.sckj.opc.entity.OPCServer;
- import com.sckj.opc.service.OPCDataServiceImpl;
- import com.sckj.opc.service.OPCPointServiceImpl;
- import com.sckj.opc.service.OPCServerServiceImpl;
- import com.sckj.opc.utils.CustomUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.ObjectUtils;
- import org.jinterop.dcom.common.JIException;
- import org.openscada.opc.lib.common.ConnectionInformation;
- import org.openscada.opc.lib.da.*;
- import org.springframework.beans.BeanUtils;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Service;
- import javax.annotation.PreDestroy;
- import javax.annotation.Resource;
- import java.util.Calendar;
- import java.util.List;
- import java.util.Objects;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- @Service
- @Slf4j
- public class OPCDAServiceImpl {
- @Resource
- OPCServerServiceImpl opcServerService;
- @Resource
- OPCPointServiceImpl opcPointService;
- @Resource
- OPCDataServiceImpl opcDataService;
- @Resource
- private AsyncEventBus asyncEventBus;
- @Resource(name = "taskExecutor")
- ThreadPoolTaskExecutor taskExecutor;
- @Value("${spring.profiles.active}")
- protected String activeProfiles;
- private ConcurrentHashMap<Long, Server> mOPCDaClientMap = new ConcurrentHashMap<>();
- private ConcurrentHashMap<String, ItemState> mOPCDaPointsMap = new ConcurrentHashMap<>();
- /**
- * 获得基础的连接信息
- */
- public Server createServer(OPCServer opcServer) {
- return mOPCDaClientMap.computeIfAbsent(opcServer.getId(), id -> {
- final ConnectionInformation ci = new ConnectionInformation();
- ci.setHost(opcServer.getIp());
- ci.setUser(opcServer.getUsername());
- ci.setPassword(opcServer.getPassword());
- ci.setClsid(opcServer.getClsid());
- log.info("Create Server : {}", opcServer);
- log.info("availableProcessors:{}", Runtime.getRuntime().availableProcessors());
- final Server server = new Server(ci, Executors.newSingleThreadScheduledExecutor());
- AutoReconnectController controller = new AutoReconnectController(server);
- controller.connect();
- return server;
- });
- }
- public Object createSubscription(OPCServer opcServer, OPCPoint opcPoint) {
- StringBuilder sb = new StringBuilder();
- if (ObjectUtils.isNotEmpty(opcPoint) && !mOPCDaPointsMap.containsKey(opcPoint.getPointName())) {
- Future<String> future = taskExecutor.submit(new Callable<String>() {
- @Override
- public String call() {
- try {
- Server server = createServer(opcServer);
- AccessBase access = new SyncAccess(server, opcPoint.getPeriod() * 1000);
- String newPointName = opcPoint.getPointName();
- if (ObjectUtils.isEmpty(newPointName)) {
- return null;
- }
- newPointName = CustomUtil.createNewPointName(newPointName,activeProfiles,"1");
- log.info("{} start subscribe", newPointName);
- mOPCDaPointsMap.put(opcPoint.getPointName(), new ItemState());
- access.addItem(newPointName, (item, itemstate) -> {
- int errorCode = itemstate.getErrorCode();
- String pointName = item.getId();
- Calendar calendar = itemstate.getTimestamp();
- Object object = null;
- try {
- object = itemstate.getValue().getObject();
- } catch (JIException e) {
- e.printStackTrace();
- }
- synchronized (mOPCDaPointsMap) {
- //DA中订阅是按照定时计算的,存在重复的数据项,进行过滤
- ItemState previousData = mOPCDaPointsMap.get(opcPoint.getPointName());
- Object currentData = object;
- //直接比较原始对象,避免字符串转换误差
- boolean isNewData = true;
- try {
- isNewData = previousData == null || previousData.getValue() == null ||
- !Objects.equals(previousData.getValue().getObject(), currentData);
- } catch (JIException e) {
- e.printStackTrace();
- }
- // 添加数值精度处理(处理小数点后2位)
- // if (!isNewData && currentData instanceof Number) {
- // DecimalFormat df = new DecimalFormat("#.##");
- // String prev = df.format(previousData.getData());
- // String curr = df.format(currentData);
- // isNewData = !prev.equals(curr);
- // }
- if (isNewData) {
- OPCData opcData = OPCData.builder()
- .data(object)
- .serverTime(calendar.getTime())
- .sourceTime(calendar.getTime())
- .statusCode((long) errorCode)
- .pointName(pointName)
- .build();
- // 使用put原子操作更新数据
- mOPCDaPointsMap.put(opcPoint.getPointName(), itemstate);
- //post给其他模块使用
- asyncEventBus.post(opcData);
- log.debug("DA,{},{}", item.getId(), itemstate);
- }
- }
- });
- access.bind();
- while (mOPCDaPointsMap.containsKey(opcPoint.getPointName())) {
- //阻塞,直到关闭订阅
- Thread.sleep(100); // 避免 CPU 空转
- }
- access.unbind();
- log.info("{} stop subscribe", opcPoint.getPointName());
- } catch (Exception e) {
- e.printStackTrace();
- return e.getMessage();
- }
- return "";
- }
- });
- // try {
- // String result = future.get(10, TimeUnit.SECONDS); //
- // log.error(">>> error:{}", result);
- // if (ObjectUtils.isNotEmpty(result)) {
- // sb.append(opcPoint.getPointName() + "订阅异常:" + result);
- // }
- // } catch (Exception e) {
- // e.printStackTrace();
- // log.info("订阅异常:{}", opcPoint.getPointName());
- // }
- }
- return sb;
- }
- public Object subscribe(OPCPoint opcPoint) {
- try {
- OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint);
- QueryWrapper<OPCPoint> pointQueryWrapper = new QueryWrapper<>();
- pointQueryWrapper.lambda().eq(OPCPoint::getStatus, "1");
- OPCServer opcServer = new OPCServer();
- BeanUtils.copyProperties(opcPointDTO, opcServer);
- return createSubscription(opcServer, opcPointDTO);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- /***
- * 取消订阅
- * @param opcPoint
- */
- public void unsubscribe(OPCPoint opcPoint) {
- mOPCDaPointsMap.remove(opcPoint.getId());
- }
- /***
- * 取消可用订阅
- */
- public void unsubscribeAvailable() {
- mOPCDaPointsMap.clear();
- }
- /**
- * 项目启动时自动创建 PLC连接并订阅节点
- */
- public void subscribeAvailable() {
- QueryWrapper<OPCServer> serverQueryWrapper = new QueryWrapper<>();
- serverQueryWrapper.lambda().eq(OPCServer::getStatus, "1");
- List<OPCServer> opcServerList = opcServerService.list(serverQueryWrapper);
- if (ObjectUtils.isEmpty(opcServerList)) {
- return;
- }
- for (OPCServer opcServer : opcServerList) {
- QueryWrapper<OPCPoint> pointQueryWrapper = new QueryWrapper<>();
- pointQueryWrapper.lambda().eq(OPCPoint::getOpcServerId, opcServer.getId()).eq(OPCPoint::getStatus, "1");
- List<OPCPoint> opcPointList = opcPointService.list(pointQueryWrapper);
- if (ObjectUtils.isNotEmpty(opcPointList)) {
- log.info("start point list:");
- for (OPCPoint opcPoint : opcPointList) {
- log.info(opcPoint.getPointName());
- createSubscription(opcServer, opcPoint);
- }
- }
- }
- }
- /**
- * 读取节点数据
- * <p>
- * identifier也可以通过UaExpert客户端去查询,这个值=通道名称.设备名称.标记名称
- *
- * @param opcPoint
- * @throws Exception
- */
- public String readNodeValue(OPCPoint opcPoint) throws Exception {
- OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint);
- if (ObjectUtils.isEmpty(opcPointDTO)) {
- return null;
- }
- OPCServer opcServer = new OPCServer();
- BeanUtils.copyProperties(opcPointDTO, opcServer);
- Server server = createServer(opcServer);
- Group group = server.addGroup();
- Item item = group.addItem(opcPointDTO.getPointName());
- return String.valueOf(item.read(true).getValue().getObject());
- }
- @PreDestroy
- public void releaseServer() {
- if (ObjectUtils.isNotEmpty(mOPCDaClientMap)) {
- for (Server server : mOPCDaClientMap.values()) {
- server.dispose();
- }
- }
- }
- }
|