package com.sckj.opc.dataservice; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.common.eventbus.AsyncEventBus; import com.sckj.opc.dto.OPCPointDTO; import com.sckj.opc.entity.OPCData; import com.sckj.opc.entity.OPCPoint; import com.sckj.opc.entity.OPCServer; import com.sckj.opc.service.OPCDataServiceImpl; import com.sckj.opc.service.OPCPointServiceImpl; import com.sckj.opc.service.OPCServerServiceImpl; import com.sckj.opc.utils.CustomUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.jinterop.dcom.common.JIException; import org.openscada.opc.lib.common.ConnectionInformation; import org.openscada.opc.lib.da.*; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.util.Calendar; import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Service @Slf4j public class OPCDAServiceImpl { @Resource OPCServerServiceImpl opcServerService; @Resource OPCPointServiceImpl opcPointService; @Resource OPCDataServiceImpl opcDataService; @Resource private AsyncEventBus asyncEventBus; @Resource(name = "taskExecutor") ThreadPoolTaskExecutor taskExecutor; @Value("${spring.profiles.active}") protected String activeProfiles; private ConcurrentHashMap mOPCDaClientMap = new ConcurrentHashMap<>(); private ConcurrentHashMap mOPCDaPointsMap = new ConcurrentHashMap<>(); /** * 获得基础的连接信息 */ public Server createServer(OPCServer opcServer) { return mOPCDaClientMap.computeIfAbsent(opcServer.getId(), id -> { final ConnectionInformation ci = new ConnectionInformation(); ci.setHost(opcServer.getIp()); ci.setUser(opcServer.getUsername()); ci.setPassword(opcServer.getPassword()); ci.setClsid(opcServer.getClsid()); log.info("Create Server : {}", opcServer); log.info("availableProcessors:{}", Runtime.getRuntime().availableProcessors()); final Server server = new Server(ci, Executors.newSingleThreadScheduledExecutor()); AutoReconnectController controller = new AutoReconnectController(server); controller.connect(); return server; }); } public Object createSubscription(OPCServer opcServer, OPCPoint opcPoint) { StringBuilder sb = new StringBuilder(); if (ObjectUtils.isNotEmpty(opcPoint) && !mOPCDaPointsMap.containsKey(opcPoint.getPointName())) { Future future = taskExecutor.submit(new Callable() { @Override public String call() { try { Server server = createServer(opcServer); AccessBase access = new SyncAccess(server, opcPoint.getPeriod() * 1000); String newPointName = opcPoint.getPointName(); if (ObjectUtils.isEmpty(newPointName)) { return null; } newPointName = CustomUtil.createNewPointName(newPointName,activeProfiles,"1"); log.info("{} start subscribe", newPointName); mOPCDaPointsMap.put(opcPoint.getPointName(), new ItemState()); access.addItem(newPointName, (item, itemstate) -> { int errorCode = itemstate.getErrorCode(); String pointName = item.getId(); Calendar calendar = itemstate.getTimestamp(); Object object = null; try { object = itemstate.getValue().getObject(); } catch (JIException e) { e.printStackTrace(); } synchronized (mOPCDaPointsMap) { //DA中订阅是按照定时计算的,存在重复的数据项,进行过滤 ItemState previousData = mOPCDaPointsMap.get(opcPoint.getPointName()); Object currentData = object; //直接比较原始对象,避免字符串转换误差 boolean isNewData = true; try { isNewData = previousData == null || previousData.getValue() == null || !Objects.equals(previousData.getValue().getObject(), currentData); } catch (JIException e) { e.printStackTrace(); } // 添加数值精度处理(处理小数点后2位) // if (!isNewData && currentData instanceof Number) { // DecimalFormat df = new DecimalFormat("#.##"); // String prev = df.format(previousData.getData()); // String curr = df.format(currentData); // isNewData = !prev.equals(curr); // } if (isNewData) { OPCData opcData = OPCData.builder() .data(object) .serverTime(calendar.getTime()) .sourceTime(calendar.getTime()) .statusCode((long) errorCode) .pointName(pointName) .build(); // 使用put原子操作更新数据 mOPCDaPointsMap.put(opcPoint.getPointName(), itemstate); //post给其他模块使用 asyncEventBus.post(opcData); log.debug("DA,{},{}", item.getId(), itemstate); } } }); access.bind(); while (mOPCDaPointsMap.containsKey(opcPoint.getPointName())) { //阻塞,直到关闭订阅 Thread.sleep(100); // 避免 CPU 空转 } access.unbind(); log.info("{} stop subscribe", opcPoint.getPointName()); } catch (Exception e) { e.printStackTrace(); return e.getMessage(); } return ""; } }); // try { // String result = future.get(10, TimeUnit.SECONDS); // // log.error(">>> error:{}", result); // if (ObjectUtils.isNotEmpty(result)) { // sb.append(opcPoint.getPointName() + "订阅异常:" + result); // } // } catch (Exception e) { // e.printStackTrace(); // log.info("订阅异常:{}", opcPoint.getPointName()); // } } return sb; } public Object subscribe(OPCPoint opcPoint) { try { OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint); QueryWrapper pointQueryWrapper = new QueryWrapper<>(); pointQueryWrapper.lambda().eq(OPCPoint::getStatus, "1"); OPCServer opcServer = new OPCServer(); BeanUtils.copyProperties(opcPointDTO, opcServer); return createSubscription(opcServer, opcPointDTO); } catch (Exception e) { e.printStackTrace(); } return null; } /*** * 取消订阅 * @param opcPoint */ public void unsubscribe(OPCPoint opcPoint) { mOPCDaPointsMap.remove(opcPoint.getId()); } /*** * 取消可用订阅 */ public void unsubscribeAvailable() { mOPCDaPointsMap.clear(); } /** * 项目启动时自动创建 PLC连接并订阅节点 */ public void subscribeAvailable() { QueryWrapper serverQueryWrapper = new QueryWrapper<>(); serverQueryWrapper.lambda().eq(OPCServer::getStatus, "1"); List opcServerList = opcServerService.list(serverQueryWrapper); if (ObjectUtils.isEmpty(opcServerList)) { return; } for (OPCServer opcServer : opcServerList) { QueryWrapper pointQueryWrapper = new QueryWrapper<>(); pointQueryWrapper.lambda().eq(OPCPoint::getOpcServerId, opcServer.getId()).eq(OPCPoint::getStatus, "1"); List opcPointList = opcPointService.list(pointQueryWrapper); if (ObjectUtils.isNotEmpty(opcPointList)) { log.info("start point list:"); for (OPCPoint opcPoint : opcPointList) { log.info(opcPoint.getPointName()); createSubscription(opcServer, opcPoint); } } } } /** * 读取节点数据 *

* identifier也可以通过UaExpert客户端去查询,这个值=通道名称.设备名称.标记名称 * * @param opcPoint * @throws Exception */ public String readNodeValue(OPCPoint opcPoint) throws Exception { OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint); if (ObjectUtils.isEmpty(opcPointDTO)) { return null; } OPCServer opcServer = new OPCServer(); BeanUtils.copyProperties(opcPointDTO, opcServer); Server server = createServer(opcServer); Group group = server.addGroup(); Item item = group.addItem(opcPointDTO.getPointName()); return String.valueOf(item.read(true).getValue().getObject()); } @PreDestroy public void releaseServer() { if (ObjectUtils.isNotEmpty(mOPCDaClientMap)) { for (Server server : mOPCDaClientMap.values()) { server.dispose(); } } } }