package com.sckj.opc.opcua; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.common.eventbus.AsyncEventBus; import com.sckj.opc.dto.OPCPointDTO; import com.sckj.opc.entity.OPCData; import com.sckj.opc.entity.OPCPoint; import com.sckj.opc.entity.OPCServer; import com.sckj.opc.service.OPCDataServiceImpl; import com.sckj.opc.service.OPCPointServiceImpl; import com.sckj.opc.service.OPCServerServiceImpl; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.jinterop.dcom.common.JIException; import org.openscada.opc.lib.common.ConnectionInformation; import org.openscada.opc.lib.common.NotConnectedException; import org.openscada.opc.lib.da.*; import org.springframework.beans.BeanUtils; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.*; import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; @Service @Slf4j public class OPCDAServiceImpl { @Resource OPCServerServiceImpl opcServerService; @Resource OPCPointServiceImpl opcPointService; @Resource OPCDataServiceImpl opcDataService; @Resource private AsyncEventBus asyncEventBus; @Resource(name = "taskExecutor") ThreadPoolTaskExecutor taskExecutor; private ConcurrentHashMap mOPCDaClientMap = new ConcurrentHashMap<>(); private ConcurrentHashMap mOPCDaPointsMap = new ConcurrentHashMap<>(); /** * 获得基础的连接信息 */ public Server createServer(OPCServer opcServer) { if (mOPCDaClientMap.containsKey(opcServer.getId())) { return mOPCDaClientMap.get(opcServer.getId()); } final ConnectionInformation ci = new ConnectionInformation(); ci.setHost(opcServer.getIp()); // OPCDA IP ci.setUser(opcServer.getUsername()); // 用户名,配置DCOM时配置的 ci.setPassword(opcServer.getPassword()); // 密码 ci.setClsid(opcServer.getClsid()); //KEPServer的注册表ID log.info("Create Server : {}", opcServer); log.info("availableProcessors:{}",Runtime.getRuntime().availableProcessors()); // 启动服务 final Server server = new Server(ci, Executors.newSingleThreadScheduledExecutor()); AutoReconnectController controller = new AutoReconnectController(server); // 连接到服务 controller.connect(); mOPCDaClientMap.put(opcServer.getId(), server); return server; } public Object createSubscription(OPCServer opcServer, OPCPoint opcPoint) { final int PERIOD = 100 * 10; StringBuilder sb = new StringBuilder(); if (ObjectUtils.isNotEmpty(opcPoint) && !mOPCDaPointsMap.containsKey(opcPoint.getId())) { Map pointChangedMap = new LinkedHashMap<>(); Future future = taskExecutor.submit(new Callable() { @Override public String call() { try { Server server = createServer(opcServer); AccessBase access = new SyncAccess(server, PERIOD); String newPointName = opcPoint.getPointName(); if(ObjectUtils.isEmpty(newPointName)){ return null; } if(newPointName.contains(".")){ newPointName = "channel." + newPointName; }else{ newPointName = "channel.device." + newPointName; } log.info("{} start subscribe",newPointName); access.addItem(newPointName, (item, itemstate) -> { int errorCode = itemstate.getErrorCode(); String pointName = item.getId(); Calendar calendar = itemstate.getTimestamp(); Object object = null; try { object = itemstate.getValue().getObject(); } catch (JIException e) { e.printStackTrace(); } //DA中订阅是按照定时计算的,存在重复的数据项,进行过滤 if (!Objects.equals(pointChangedMap.get(item.getId()), object)) { pointChangedMap.put(item.getId(), object); OPCData opcData = OPCData.builder() .data(object) .serverTime(calendar.getTime()) .sourceTime(calendar.getTime()) .statusCode((long) errorCode) .pointName(pointName) .build(); //post给其他模块使用 asyncEventBus.post(opcData); //opcDataService.save(opcData); log.info("DA,{},{}",item.getId(), itemstate.toString()); } }); access.bind(); mOPCDaPointsMap.put(opcPoint.getId(), opcPoint); while (mOPCDaPointsMap.containsKey(opcPoint.getId())) { //阻塞,直到关闭订阅 } access.unbind(); log.info("{} stop subscribe",opcPoint.getPointName()); } catch (Exception e) { e.printStackTrace(); return e.getMessage(); } return ""; } }); try { String result = future.get(1,TimeUnit.SECONDS); // log.error(">>> error:{}", result); if (ObjectUtils.isNotEmpty(result)) { sb.append(opcPoint.getPointName() + "订阅异常:" + result); } } catch (Exception e) { e.printStackTrace(); } } return sb; } public Object subscribe(OPCPoint opcPoint) { try { OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint); QueryWrapper pointQueryWrapper = new QueryWrapper<>(); pointQueryWrapper.lambda().eq(OPCPoint::getStatus, "1"); OPCServer opcServer = new OPCServer(); BeanUtils.copyProperties(opcPointDTO, opcServer); return createSubscription(opcServer, opcPointDTO); } catch (Exception e) { e.printStackTrace(); } return null; } /*** * 取消订阅 * @param opcPoint */ public void unsubscribe(OPCPoint opcPoint) { mOPCDaPointsMap.remove(opcPoint.getId()); } /*** * 取消可用订阅 */ public void unsubscribeAvailable() { mOPCDaPointsMap.clear(); } /** * 项目启动时自动创建 PLC连接并订阅节点 */ public void subscribeAvailable() { QueryWrapper serverQueryWrapper = new QueryWrapper<>(); serverQueryWrapper.lambda().eq(OPCServer::getStatus, "1"); List opcServerList = opcServerService.list(serverQueryWrapper); if (ObjectUtils.isEmpty(opcServerList)) { return; } for (OPCServer opcServer : opcServerList) { QueryWrapper pointQueryWrapper = new QueryWrapper<>(); pointQueryWrapper.lambda().eq(OPCPoint::getOpcServerId, opcServer.getId()).eq(OPCPoint::getStatus, "1"); List opcPointList = opcPointService.list(pointQueryWrapper); if (ObjectUtils.isNotEmpty(opcPointList)) { for (OPCPoint opcPoint : opcPointList) { createSubscription(opcServer, opcPoint); } } } } /** * 读取节点数据 *

* identifier也可以通过UaExpert客户端去查询,这个值=通道名称.设备名称.标记名称 * * @param opcPoint * @throws Exception */ public String readNodeValue(OPCPoint opcPoint) throws Exception { OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint); if (ObjectUtils.isEmpty(opcPointDTO)) { return null; } OPCServer opcServer = new OPCServer(); BeanUtils.copyProperties(opcPointDTO, opcServer); Server server = createServer(opcServer); Group group = server.addGroup(); Item item = group.addItem(opcPointDTO.getPointName()); return String.valueOf(item.read(true).getValue().getObject()); } @PreDestroy public void releaseServer() { if (ObjectUtils.isNotEmpty(mOPCDaClientMap)) { for (Server server : mOPCDaClientMap.values()) { server.dispose(); } } } }