Modbus tcp 批量写线圈状态
使用socket方式实现Modbus tcp实现的批量写线圈,设置写入频率,防止plc负载过大:
public sealed class ModbusTcpTool : IDisposable{private static readonly Lazy<ModbusTcpTool> _instance = new Lazy<ModbusTcpTool>(() => new ModbusTcpTool());public static ModbusTcpTool Instance => _instance.Value;// 添加写入频率控制private long _lastWriteTime;// 配置参数public int ReadTimeout { get; set; } = 500; // 缩短超时时间public int WriteTimeout { get; set; } = 500; // 缩短超时时间public int ConnectTimeout { get; set; } = 1000;public string IP { get; private set; }public int Port { get; private set; }// 连接管理private Socket _socket;private readonly Logger _logger = LogManager.GetCurrentClassLogger();private volatile bool _isConnected = false;private readonly object _connectionLock = new object();private DateTime _lastSuccessfulOperationTime = DateTime.MinValue;private CancellationTokenSource _cts = new CancellationTokenSource();// 写入优化private readonly ConcurrentQueue<WriteTask> _writeQueue = new ConcurrentQueue<WriteTask>();private Thread _writeThread;private const int MinWriteInterval = 25; // msprivate bool _disposed = false;// 心跳管理private Timer _heartbeatTimer;private const int HeartbeatInterval = 1000; // msprivate const int HeartbeatTimeout = 2000; // 心跳超时时间private ModbusTcpTool(){IP = ConfigManager.Current.PlcIP;Port = ConfigManager.Current.PlcPort;InitializeSocket();StartWriteThread();StartHeartbeat();}public bool IsConnected{get{lock (_connectionLock){try{return _isConnected &&_socket != null &&_socket.Connected &&!(_socket.Poll(100, SelectMode.SelectError));}catch{return false;}}}}private void InitializeSocket(){lock (_connectionLock){try{DisposeSocket();_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp){SendTimeout = WriteTimeout,ReceiveTimeout = ReadTimeout,NoDelay = true, // 禁用Nagle算法LingerState = new LingerOption(false, 0), // 立即关闭SendBufferSize = 8192, // 增大发送缓冲区ReceiveBufferSize = 8192 // 增大接收缓冲区};_isConnected = false;_logger.Info("Socket初始化完成");}catch (Exception ex){_logger.Error(ex, "Socket初始化失败");_isConnected = false;}}}private void DisposeSocket(){try{if (_socket != null){if (_socket.Connected){try { _socket.Shutdown(SocketShutdown.Both); }catch { /* 忽略关闭异常 */ }}_socket.Close();_socket.Dispose();_socket = null;}}catch (Exception ex){_logger.Warn(ex, "释放Socket时出错");}finally{_isConnected = false;}}public void Connect(){if (IsConnected) return;lock (_connectionLock){if (IsConnected) return;try{// 使用同步连接确保完全建立var connectResult = _socket.BeginConnect(IP, Port, null, null);if (!connectResult.AsyncWaitHandle.WaitOne(ConnectTimeout)){throw new SocketException((int)SocketError.TimedOut);}_socket.EndConnect(connectResult);_isConnected = true;_lastSuccessfulOperationTime = DateTime.Now;_logger.Info("PLC连接成功");}catch (Exception ex){_logger.Warn(ex, "PLC连接失败");_isConnected = false;}}}public void Write(ushort start, byte funcode, ushort count, byte[] datas){long currentTime = Stopwatch.GetTimestamp() * 1000 / Stopwatch.Frequency;long elapsed = currentTime - _lastWriteTime;if (elapsed < MinWriteInterval){Thread.Sleep((int)(MinWriteInterval - elapsed));}// 将任务加入队列(线程安全)_writeQueue.Enqueue(new WriteTask{Start = start,Funcode = funcode,Count = count,Datas = (byte[])datas.Clone() // 防御性拷贝});_lastWriteTime = currentTime;}private void StartWriteThread(){_writeThread = new Thread(WriteLoop){IsBackground = true,Priority = ThreadPriority.Highest // 最高优先级};_writeThread.Start();}private void WriteLoop(){var lastWriteTime = DateTime.MinValue;while (!_disposed){try{// 精确控制写入间隔var elapsedMs = (DateTime.Now - lastWriteTime).TotalMilliseconds;if (elapsedMs < MinWriteInterval){Thread.Sleep((int)(MinWriteInterval - elapsedMs));}if (_writeQueue.TryDequeue(out var task)){ExecuteWriteTask(task);lastWriteTime = DateTime.Now;}else{Thread.Sleep(1); // 无任务时短暂休眠}}catch (Exception ex){_logger.Error(ex, "写入线程发生错误");Thread.Sleep(10);}}}private void ExecuteWriteTask(WriteTask task){int retryCount = 0;while (retryCount < 3){try{lock (_connectionLock){if (!IsConnected){Connect(); // 同步连接}if (!IsConnected){throw new SocketException((int)SocketError.NotConnected);}var tid = CreateTID();var request = BuildRequest(tid, task.Start, task.Count, task.Datas);// 同步发送确保完成int bytesSent = _socket.Send(request);if (bytesSent != request.Length){throw new SocketException((int)SocketError.MessageSize);}_lastSuccessfulOperationTime = DateTime.Now;return;}}catch (SocketException sex) when (sex.SocketErrorCode == SocketError.ConnectionReset){_logger.Warn("连接被PLC重置,触发快速重连");HandleSocketError();retryCount++;}catch (SocketException sex){retryCount++;_logger.Warn(sex, $"写入失败(尝试 {retryCount}/3)");if (retryCount >= 3){_logger.Error("达到最大重试次数");return;}HandleSocketError();Thread.Sleep(10 * retryCount); // 退避策略}catch (Exception ex){_logger.Error(ex, "写入操作发生意外错误");return;}}}private void StartHeartbeat(){_heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);}private void HeartbeatCallback(object state){try{if (!IsConnected) return;// 检查最后操作时间if ((DateTime.Now - _lastSuccessfulOperationTime).TotalMilliseconds < HeartbeatInterval){return; // 最近有操作,跳过本次心跳}// 简单的心跳包(读保持寄存器)var heartbeat = new byte[] {0x00, 0x01, // 事务ID0x00, 0x00, // 协议ID0x00, 0x06, // 长度0x01, // 单元ID0x03, // 功能码(读保持寄存器)0x00, 0x00, // 起始地址0x00, 0x01 // 寄存器数量};lock (_connectionLock){if (IsConnected){// 发送心跳_socket.Send(heartbeat);// 设置接收超时_socket.ReceiveTimeout = HeartbeatTimeout;// 读取响应头var header = new byte[6];int received = _socket.Receive(header);if (received != 6){throw new SocketException((int)SocketError.TimedOut);}_lastSuccessfulOperationTime = DateTime.Now;}}}catch (Exception ex){_logger.Warn(ex, "心跳检测失败");HandleSocketError();}}private byte[] BuildRequest(int tid, ushort start, ushort count, byte[] datas){var request = new List<byte>{(byte)(tid >> 8), (byte)(tid & 0xFF),0x00, 0x00, 0x00, (byte)(7 + datas.Length),0x01, 0x0F,(byte)(start >> 8), (byte)(start & 0xFF),(byte)(count >> 8), (byte)(count & 0xFF),(byte)datas.Length};request.AddRange(datas);return request.ToArray();}private void HandleSocketError(){lock (_connectionLock){try{_logger.Info("处理Socket错误,尝试重连...");DisposeSocket();InitializeSocket();Connect();}catch (Exception ex){_logger.Warn(ex, "重连过程中出错");}}}private int CreateTID(){return Interlocked.Increment(ref _tid) % 65536;}private int _tid = 0;public void Disconnect(){lock (_connectionLock){DisposeSocket();}}public void Dispose(){ _disposed = true;_heartbeatTimer?.Dispose();if (_writeThread != null && _writeThread.IsAlive){_writeThread.Join(2000);}Disconnect();}private class WriteTask{public ushort Start { get; set; }public byte Funcode { get; set; }public ushort Count { get; set; }public byte[] Datas { get; set; }}}