OPCDAServiceImpl.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package com.sckj.opc.dataservice;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.google.common.eventbus.AsyncEventBus;
  4. import com.sckj.opc.dto.OPCPointDTO;
  5. import com.sckj.opc.entity.OPCData;
  6. import com.sckj.opc.entity.OPCPoint;
  7. import com.sckj.opc.entity.OPCServer;
  8. import com.sckj.opc.service.OPCDataServiceImpl;
  9. import com.sckj.opc.service.OPCPointServiceImpl;
  10. import com.sckj.opc.service.OPCServerServiceImpl;
  11. import com.sckj.opc.utils.CustomUtil;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.apache.commons.lang3.ObjectUtils;
  14. import org.jinterop.dcom.common.JIException;
  15. import org.openscada.opc.lib.common.ConnectionInformation;
  16. import org.openscada.opc.lib.da.*;
  17. import org.springframework.beans.BeanUtils;
  18. import org.springframework.beans.factory.annotation.Value;
  19. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  20. import org.springframework.stereotype.Service;
  21. import javax.annotation.PreDestroy;
  22. import javax.annotation.Resource;
  23. import java.util.Calendar;
  24. import java.util.List;
  25. import java.util.Objects;
  26. import java.util.concurrent.Callable;
  27. import java.util.concurrent.ConcurrentHashMap;
  28. import java.util.concurrent.Executors;
  29. import java.util.concurrent.Future;
  30. @Service
  31. @Slf4j
  32. public class OPCDAServiceImpl {
  33. @Resource
  34. OPCServerServiceImpl opcServerService;
  35. @Resource
  36. OPCPointServiceImpl opcPointService;
  37. @Resource
  38. OPCDataServiceImpl opcDataService;
  39. @Resource
  40. private AsyncEventBus asyncEventBus;
  41. @Resource(name = "taskExecutor")
  42. ThreadPoolTaskExecutor taskExecutor;
  43. @Value("${spring.profiles.active}")
  44. protected String activeProfiles;
  45. private ConcurrentHashMap<Long, Server> mOPCDaClientMap = new ConcurrentHashMap<>();
  46. private ConcurrentHashMap<String, ItemState> mOPCDaPointsMap = new ConcurrentHashMap<>();
  47. /**
  48. * 获得基础的连接信息
  49. */
  50. public Server createServer(OPCServer opcServer) {
  51. return mOPCDaClientMap.computeIfAbsent(opcServer.getId(), id -> {
  52. final ConnectionInformation ci = new ConnectionInformation();
  53. ci.setHost(opcServer.getIp());
  54. ci.setUser(opcServer.getUsername());
  55. ci.setPassword(opcServer.getPassword());
  56. ci.setClsid(opcServer.getClsid());
  57. log.info("Create Server : {}", opcServer);
  58. log.info("availableProcessors:{}", Runtime.getRuntime().availableProcessors());
  59. final Server server = new Server(ci, Executors.newSingleThreadScheduledExecutor());
  60. AutoReconnectController controller = new AutoReconnectController(server);
  61. controller.connect();
  62. return server;
  63. });
  64. }
  65. public Object createSubscription(OPCServer opcServer, OPCPoint opcPoint) {
  66. StringBuilder sb = new StringBuilder();
  67. if (ObjectUtils.isNotEmpty(opcPoint) && !mOPCDaPointsMap.containsKey(opcPoint.getPointName())) {
  68. Future<String> future = taskExecutor.submit(new Callable<String>() {
  69. @Override
  70. public String call() {
  71. try {
  72. Server server = createServer(opcServer);
  73. AccessBase access = new SyncAccess(server, opcPoint.getPeriod() * 1000);
  74. String newPointName = opcPoint.getPointName();
  75. if (ObjectUtils.isEmpty(newPointName)) {
  76. return null;
  77. }
  78. newPointName = CustomUtil.createNewPointName(newPointName,activeProfiles,"1");
  79. log.info("{} start subscribe", newPointName);
  80. mOPCDaPointsMap.put(opcPoint.getPointName(), new ItemState());
  81. access.addItem(newPointName, (item, itemstate) -> {
  82. int errorCode = itemstate.getErrorCode();
  83. String pointName = item.getId();
  84. Calendar calendar = itemstate.getTimestamp();
  85. Object object = null;
  86. try {
  87. object = itemstate.getValue().getObject();
  88. } catch (JIException e) {
  89. e.printStackTrace();
  90. }
  91. synchronized (mOPCDaPointsMap) {
  92. //DA中订阅是按照定时计算的,存在重复的数据项,进行过滤
  93. ItemState previousData = mOPCDaPointsMap.get(opcPoint.getPointName());
  94. Object currentData = object;
  95. //直接比较原始对象,避免字符串转换误差
  96. boolean isNewData = true;
  97. try {
  98. isNewData = previousData == null || previousData.getValue() == null ||
  99. !Objects.equals(previousData.getValue().getObject(), currentData);
  100. } catch (JIException e) {
  101. e.printStackTrace();
  102. }
  103. // 添加数值精度处理(处理小数点后2位)
  104. // if (!isNewData && currentData instanceof Number) {
  105. // DecimalFormat df = new DecimalFormat("#.##");
  106. // String prev = df.format(previousData.getData());
  107. // String curr = df.format(currentData);
  108. // isNewData = !prev.equals(curr);
  109. // }
  110. if (isNewData) {
  111. OPCData opcData = OPCData.builder()
  112. .data(object)
  113. .serverTime(calendar.getTime())
  114. .sourceTime(calendar.getTime())
  115. .statusCode((long) errorCode)
  116. .pointName(pointName)
  117. .build();
  118. // 使用put原子操作更新数据
  119. mOPCDaPointsMap.put(opcPoint.getPointName(), itemstate);
  120. //post给其他模块使用
  121. asyncEventBus.post(opcData);
  122. log.debug("DA,{},{}", item.getId(), itemstate);
  123. }
  124. }
  125. });
  126. access.bind();
  127. while (mOPCDaPointsMap.containsKey(opcPoint.getPointName())) {
  128. //阻塞,直到关闭订阅
  129. Thread.sleep(100); // 避免 CPU 空转
  130. }
  131. access.unbind();
  132. log.info("{} stop subscribe", opcPoint.getPointName());
  133. } catch (Exception e) {
  134. e.printStackTrace();
  135. return e.getMessage();
  136. }
  137. return "";
  138. }
  139. });
  140. // try {
  141. // String result = future.get(10, TimeUnit.SECONDS); //
  142. // log.error(">>> error:{}", result);
  143. // if (ObjectUtils.isNotEmpty(result)) {
  144. // sb.append(opcPoint.getPointName() + "订阅异常:" + result);
  145. // }
  146. // } catch (Exception e) {
  147. // e.printStackTrace();
  148. // log.info("订阅异常:{}", opcPoint.getPointName());
  149. // }
  150. }
  151. return sb;
  152. }
  153. public Object subscribe(OPCPoint opcPoint) {
  154. try {
  155. OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint);
  156. QueryWrapper<OPCPoint> pointQueryWrapper = new QueryWrapper<>();
  157. pointQueryWrapper.lambda().eq(OPCPoint::getStatus, "1");
  158. OPCServer opcServer = new OPCServer();
  159. BeanUtils.copyProperties(opcPointDTO, opcServer);
  160. return createSubscription(opcServer, opcPointDTO);
  161. } catch (Exception e) {
  162. e.printStackTrace();
  163. }
  164. return null;
  165. }
  166. /***
  167. * 取消订阅
  168. * @param opcPoint
  169. */
  170. public void unsubscribe(OPCPoint opcPoint) {
  171. mOPCDaPointsMap.remove(opcPoint.getId());
  172. }
  173. /***
  174. * 取消可用订阅
  175. */
  176. public void unsubscribeAvailable() {
  177. mOPCDaPointsMap.clear();
  178. }
  179. /**
  180. * 项目启动时自动创建 PLC连接并订阅节点
  181. */
  182. public void subscribeAvailable() {
  183. QueryWrapper<OPCServer> serverQueryWrapper = new QueryWrapper<>();
  184. serverQueryWrapper.lambda().eq(OPCServer::getStatus, "1");
  185. List<OPCServer> opcServerList = opcServerService.list(serverQueryWrapper);
  186. if (ObjectUtils.isEmpty(opcServerList)) {
  187. return;
  188. }
  189. for (OPCServer opcServer : opcServerList) {
  190. QueryWrapper<OPCPoint> pointQueryWrapper = new QueryWrapper<>();
  191. pointQueryWrapper.lambda().eq(OPCPoint::getOpcServerId, opcServer.getId()).eq(OPCPoint::getStatus, "1");
  192. List<OPCPoint> opcPointList = opcPointService.list(pointQueryWrapper);
  193. if (ObjectUtils.isNotEmpty(opcPointList)) {
  194. log.info("start point list:");
  195. for (OPCPoint opcPoint : opcPointList) {
  196. log.info(opcPoint.getPointName());
  197. createSubscription(opcServer, opcPoint);
  198. }
  199. }
  200. }
  201. }
  202. /**
  203. * 读取节点数据
  204. * <p>
  205. * identifier也可以通过UaExpert客户端去查询,这个值=通道名称.设备名称.标记名称
  206. *
  207. * @param opcPoint
  208. * @throws Exception
  209. */
  210. public String readNodeValue(OPCPoint opcPoint) throws Exception {
  211. OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint);
  212. if (ObjectUtils.isEmpty(opcPointDTO)) {
  213. return null;
  214. }
  215. OPCServer opcServer = new OPCServer();
  216. BeanUtils.copyProperties(opcPointDTO, opcServer);
  217. Server server = createServer(opcServer);
  218. Group group = server.addGroup();
  219. Item item = group.addItem(opcPointDTO.getPointName());
  220. return String.valueOf(item.read(true).getValue().getObject());
  221. }
  222. @PreDestroy
  223. public void releaseServer() {
  224. if (ObjectUtils.isNotEmpty(mOPCDaClientMap)) {
  225. for (Server server : mOPCDaClientMap.values()) {
  226. server.dispose();
  227. }
  228. }
  229. }
  230. }