OPCDAServiceImpl.java 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package com.sckj.opc.opcua;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.google.common.eventbus.AsyncEventBus;
  4. import com.sckj.opc.dto.OPCPointDTO;
  5. import com.sckj.opc.entity.OPCData;
  6. import com.sckj.opc.entity.OPCPoint;
  7. import com.sckj.opc.entity.OPCServer;
  8. import com.sckj.opc.service.OPCDataServiceImpl;
  9. import com.sckj.opc.service.OPCPointServiceImpl;
  10. import com.sckj.opc.service.OPCServerServiceImpl;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.commons.lang3.ObjectUtils;
  13. import org.jinterop.dcom.common.JIException;
  14. import org.openscada.opc.lib.common.ConnectionInformation;
  15. import org.openscada.opc.lib.common.NotConnectedException;
  16. import org.openscada.opc.lib.da.*;
  17. import org.springframework.beans.BeanUtils;
  18. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  19. import org.springframework.stereotype.Service;
  20. import javax.annotation.PreDestroy;
  21. import javax.annotation.Resource;
  22. import java.net.UnknownHostException;
  23. import java.util.*;
  24. import java.util.concurrent.*;
  25. import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
  26. @Service
  27. @Slf4j
  28. public class OPCDAServiceImpl {
  29. @Resource
  30. OPCServerServiceImpl opcServerService;
  31. @Resource
  32. OPCPointServiceImpl opcPointService;
  33. @Resource
  34. OPCDataServiceImpl opcDataService;
  35. @Resource
  36. private AsyncEventBus asyncEventBus;
  37. @Resource(name = "taskExecutor")
  38. ThreadPoolTaskExecutor taskExecutor;
  39. private ConcurrentHashMap<Long, Server> mOPCDaClientMap = new ConcurrentHashMap<>();
  40. private ConcurrentHashMap<Long, Object> mOPCDaPointsMap = new ConcurrentHashMap<>();
  41. /**
  42. * 获得基础的连接信息
  43. */
  44. public Server createServer(OPCServer opcServer) {
  45. if (mOPCDaClientMap.containsKey(opcServer.getId())) {
  46. return mOPCDaClientMap.get(opcServer.getId());
  47. }
  48. final ConnectionInformation ci = new ConnectionInformation();
  49. ci.setHost(opcServer.getIp()); // OPCDA IP
  50. ci.setUser(opcServer.getUsername()); // 用户名,配置DCOM时配置的
  51. ci.setPassword(opcServer.getPassword()); // 密码
  52. ci.setClsid(opcServer.getClsid()); //KEPServer的注册表ID
  53. log.info("Create Server : {}", opcServer);
  54. log.info("availableProcessors:{}",Runtime.getRuntime().availableProcessors());
  55. // 启动服务
  56. final Server server = new Server(ci, Executors.newSingleThreadScheduledExecutor());
  57. AutoReconnectController controller = new AutoReconnectController(server);
  58. // 连接到服务
  59. controller.connect();
  60. mOPCDaClientMap.put(opcServer.getId(), server);
  61. return server;
  62. }
  63. public Object createSubscription(OPCServer opcServer, OPCPoint opcPoint) {
  64. final int PERIOD = 100 * 10;
  65. StringBuilder sb = new StringBuilder();
  66. if (ObjectUtils.isNotEmpty(opcPoint) && !mOPCDaPointsMap.containsKey(opcPoint.getId())) {
  67. Map<String, Object> pointChangedMap = new LinkedHashMap<>();
  68. Future<String> future = taskExecutor.submit(new Callable<String>() {
  69. @Override
  70. public String call() {
  71. try {
  72. Server server = createServer(opcServer);
  73. AccessBase access = new SyncAccess(server, PERIOD);
  74. String newPointName = opcPoint.getPointName();
  75. if(ObjectUtils.isEmpty(newPointName)){
  76. return null;
  77. }
  78. if(newPointName.contains(".")){
  79. newPointName = "channel." + newPointName;
  80. }else{
  81. newPointName = "channel.device." + newPointName;
  82. }
  83. log.info("{} start subscribe",newPointName);
  84. access.addItem(newPointName, (item, itemstate) -> {
  85. int errorCode = itemstate.getErrorCode();
  86. String pointName = item.getId();
  87. Calendar calendar = itemstate.getTimestamp();
  88. Object object = null;
  89. try {
  90. object = itemstate.getValue().getObject();
  91. } catch (JIException e) {
  92. e.printStackTrace();
  93. }
  94. //DA中订阅是按照定时计算的,存在重复的数据项,进行过滤
  95. if (!Objects.equals(pointChangedMap.get(item.getId()), object)) {
  96. pointChangedMap.put(item.getId(), object);
  97. OPCData opcData = OPCData.builder()
  98. .data(object)
  99. .serverTime(calendar.getTime())
  100. .sourceTime(calendar.getTime())
  101. .statusCode((long) errorCode)
  102. .pointName(pointName)
  103. .build();
  104. //post给其他模块使用
  105. asyncEventBus.post(opcData);
  106. //opcDataService.save(opcData);
  107. log.info("DA,{},{}",item.getId(), itemstate.toString());
  108. }
  109. });
  110. access.bind();
  111. mOPCDaPointsMap.put(opcPoint.getId(), opcPoint);
  112. while (mOPCDaPointsMap.containsKey(opcPoint.getId())) {
  113. //阻塞,直到关闭订阅
  114. }
  115. access.unbind();
  116. log.info("{} stop subscribe",opcPoint.getPointName());
  117. } catch (Exception e) {
  118. e.printStackTrace();
  119. return e.getMessage();
  120. }
  121. return "";
  122. }
  123. });
  124. try {
  125. String result = future.get(1,TimeUnit.SECONDS); //
  126. log.error(">>> error:{}", result);
  127. if (ObjectUtils.isNotEmpty(result)) {
  128. sb.append(opcPoint.getPointName() + "订阅异常:" + result);
  129. }
  130. } catch (Exception e) {
  131. e.printStackTrace();
  132. }
  133. }
  134. return sb;
  135. }
  136. public Object subscribe(OPCPoint opcPoint) {
  137. try {
  138. OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint);
  139. QueryWrapper<OPCPoint> pointQueryWrapper = new QueryWrapper<>();
  140. pointQueryWrapper.lambda().eq(OPCPoint::getStatus, "1");
  141. OPCServer opcServer = new OPCServer();
  142. BeanUtils.copyProperties(opcPointDTO, opcServer);
  143. return createSubscription(opcServer, opcPointDTO);
  144. } catch (Exception e) {
  145. e.printStackTrace();
  146. }
  147. return null;
  148. }
  149. /***
  150. * 取消订阅
  151. * @param opcPoint
  152. */
  153. public void unsubscribe(OPCPoint opcPoint) {
  154. mOPCDaPointsMap.remove(opcPoint.getId());
  155. }
  156. /***
  157. * 取消可用订阅
  158. */
  159. public void unsubscribeAvailable() {
  160. mOPCDaPointsMap.clear();
  161. }
  162. /**
  163. * 项目启动时自动创建 PLC连接并订阅节点
  164. */
  165. public void subscribeAvailable() {
  166. QueryWrapper<OPCServer> serverQueryWrapper = new QueryWrapper<>();
  167. serverQueryWrapper.lambda().eq(OPCServer::getStatus, "1");
  168. List<OPCServer> opcServerList = opcServerService.list(serverQueryWrapper);
  169. if (ObjectUtils.isEmpty(opcServerList)) {
  170. return;
  171. }
  172. for (OPCServer opcServer : opcServerList) {
  173. QueryWrapper<OPCPoint> pointQueryWrapper = new QueryWrapper<>();
  174. pointQueryWrapper.lambda().eq(OPCPoint::getOpcServerId, opcServer.getId()).eq(OPCPoint::getStatus, "1");
  175. List<OPCPoint> opcPointList = opcPointService.list(pointQueryWrapper);
  176. if (ObjectUtils.isNotEmpty(opcPointList)) {
  177. for (OPCPoint opcPoint : opcPointList) {
  178. createSubscription(opcServer, opcPoint);
  179. }
  180. }
  181. }
  182. }
  183. /**
  184. * 读取节点数据
  185. * <p>
  186. * identifier也可以通过UaExpert客户端去查询,这个值=通道名称.设备名称.标记名称
  187. *
  188. * @param opcPoint
  189. * @throws Exception
  190. */
  191. public String readNodeValue(OPCPoint opcPoint) throws Exception {
  192. OPCPointDTO opcPointDTO = opcPointService.selectInfoWithServer(opcPoint);
  193. if (ObjectUtils.isEmpty(opcPointDTO)) {
  194. return null;
  195. }
  196. OPCServer opcServer = new OPCServer();
  197. BeanUtils.copyProperties(opcPointDTO, opcServer);
  198. Server server = createServer(opcServer);
  199. Group group = server.addGroup();
  200. Item item = group.addItem(opcPointDTO.getPointName());
  201. return String.valueOf(item.read(true).getValue().getObject());
  202. }
  203. @PreDestroy
  204. public void releaseServer() {
  205. if (ObjectUtils.isNotEmpty(mOPCDaClientMap)) {
  206. for (Server server : mOPCDaClientMap.values()) {
  207. server.dispose();
  208. }
  209. }
  210. }
  211. }