From 55213252636f82ca4080b312158cd068976fe99d Mon Sep 17 00:00:00 2001 From: SL <123@123.com> Date: Sun, 8 Oct 2023 21:18:59 +0800 Subject: [PATCH] =?UTF-8?q?[Common]=20=E5=A2=9E=E5=8A=A0DataRecorderManage?= =?UTF-8?q?r=E5=8F=8A=E5=85=B6=E7=9B=B8=E5=85=B3=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Aitex/Core/RT/DBCore/PostgresqlHelper.cs | 24 +- .../HighPerformance/DataHolder.cs | 99 +++++ .../HighPerformance/DataRecorderCache.cs | 304 ++++++++++++++ .../DataRecorderDataTableCache.cs | 150 +++++++ .../HighPerformance/DataRecorderManager.cs | 371 ++++++++++++++++++ .../DefaultDataRecorderCallback.cs | 37 ++ .../HighPerformance/IDataRecorderCache.cs | 20 + .../HighPerformance/IDataRecorderCallback.cs | 13 + .../MECF.Framework.Common.csproj | 7 + 9 files changed, 1018 insertions(+), 7 deletions(-) create mode 100644 MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataHolder.cs create mode 100644 MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderCache.cs create mode 100644 MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderDataTableCache.cs create mode 100644 MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderManager.cs create mode 100644 MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DefaultDataRecorderCallback.cs create mode 100644 MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCache.cs create mode 100644 MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCallback.cs diff --git a/MECF.Framework.Common/Aitex/Core/RT/DBCore/PostgresqlHelper.cs b/MECF.Framework.Common/Aitex/Core/RT/DBCore/PostgresqlHelper.cs index c3ea492..5f622d9 100644 --- a/MECF.Framework.Common/Aitex/Core/RT/DBCore/PostgresqlHelper.cs +++ b/MECF.Framework.Common/Aitex/Core/RT/DBCore/PostgresqlHelper.cs @@ -5,9 +5,6 @@ using System.Collections.Generic; using System.Configuration; using System.Data.Common; using System.Data; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace Aitex.Core.RT.DBCore { @@ -16,17 +13,30 @@ namespace Aitex.Core.RT.DBCore /// /// 数据库名称 /// - public string DBName; + private readonly string _dbName; /// /// 连接字符串 /// - private string _connString = ConfigurationManager.ConnectionStrings["PostgreSQL"].ConnectionString; + private readonly string _connString; + + #region Constructors public PostgreSQLHelper(string dBName) { - DBName = dBName; - } + _dbName = dBName; + _connString = ConfigurationManager.ConnectionStrings["PostgreSQL"].ConnectionString; + } + + #endregion + + #region Properties + + public string DBName => _dbName; + + public string ConnectionString => _connString; + + #endregion public string GetSqlByNameType(string name, Type type) { diff --git a/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataHolder.cs b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataHolder.cs new file mode 100644 index 0000000..7ba3a53 --- /dev/null +++ b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataHolder.cs @@ -0,0 +1,99 @@ +using Aitex.Core.RT.Log; +using Aitex.Core.Util; +using System; +using System.Collections.Generic; + +namespace Aitex.Core.RT.DataCollection.HighPerformance +{ + /// + /// 数据收集器对象,用于收集在中注册的数据源的数据。 + /// + public class DataHolder + { + #region Variables + + private readonly Dictionary _buffer = new (); + private readonly R_TRIG _rTrigReadFailed = new (); + + #endregion + + #region Constructors + + public DataHolder(int index, string name, Func read) + { + Index = index; + Name = name; + Read = read; + } + + #endregion + + #region Properties + + /// + /// 返回数据获取器的序号。 + /// + public int Index { get; } + + /// + /// 返回数据获取器名称。 + /// + public string Name { get; } + + /// + /// 返回数据获取器。 + /// + internal Func Read{ get; } + + /// + /// 返回缓存数据的长度。 + /// + internal int CacheCount => _buffer.Count; + + #endregion + + #region Methods + + /// + /// 缓存数据。 + /// + /// + internal void Cache(long timestamp) + { + try + { + _buffer[timestamp] = Read(); + _rTrigReadFailed.CLK = false; + } + catch (Exception ex) + { + // 首次发生错误时记录日志。 + _rTrigReadFailed.CLK = true; + if(_rTrigReadFailed.Q) + LOG.Error($"数据获取器 {Name} 在 {timestamp} 时发生错误。", ex); + + _buffer[timestamp] = null; + } + } + + /// + /// 获取指定时间戳的数据。 + /// + /// 时间戳。 + /// + internal object Get(long timestamp) + { + if (_buffer.TryGetValue(timestamp, out var value)) + { + _buffer.Remove(timestamp); + return value; + } + else + { + return null; + } + } + + #endregion + } +} diff --git a/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderCache.cs b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderCache.cs new file mode 100644 index 0000000..46adaca --- /dev/null +++ b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderCache.cs @@ -0,0 +1,304 @@ +using Aitex.Core.RT.Event; +using Aitex.Core.RT.Log; +using Aitex.Core.Util; +using MECF.Framework.Common.Equipment; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Aitex.Core.RT.DataCollection.HighPerformance +{ + /// + /// 数据收集器缓存对象。 + /// + /// + /// 每个Table对应一个DataRecordCache对象。该对象负责缓存数据,而不提供任何写数据库相关操作。 + /// 周期性调用方法获取数据库写入表达式进行持久化操作。 + /// + internal class DataRecorderCache : IDataRecorderCache + { + #region Variables + + internal const int SQL_BUILD_DURATION_TOO_SLOW_MS = 200; + internal const int MIN_CACHE_SIZE = 1000; + internal const int MIN_CACHE_PERIOD_MS = 10; + internal const int MIN_PERSIST_ITEMS = 100; + private const int MAX_SIZE_SQL_EXPR = 5000000; // approx. 5MB + + private readonly object _syncRoot = new(); + private readonly int _maxItemsPerPersist; + private readonly int _minCachePeriodMs; + private readonly int _maxCacheSize; + private readonly string _tableName; + private readonly string _insertExpression; + private readonly Queue _qCachedTimestamps; + private readonly Stopwatch _stopwatch = new(); + private readonly StringBuilder _sqlExpr = new (MAX_SIZE_SQL_EXPR); // preallocate 5MB + private readonly StringBuilder _sqlValuesList = new (MAX_SIZE_SQL_EXPR); // preallocate 5MB + + private readonly R_TRIG _rTrigCacheHalfFull = new (); + private readonly R_TRIG _rTrigCacheAlmostFull = new (); + private readonly R_TRIG _rTrigCacheFull = new (); + private readonly R_TRIG _rTrigBuildSqlExprTooSlow = new (); + + private int _lastPersistRows = 0; + private double _lastCachePeriodMs = 0; + + #endregion + + #region Constructors + + /// + /// 创建数据记录器缓存对象。 + /// + /// 当前Cache对应的数据库表名称。 + /// 当前Cache对应的分组名称。 + /// 数据获取器对象集合,保存当前Cache需要缓存的数据源。 + /// 最小缓存周期,如果外部频繁调用方法请求缓存数据,该周期内的重复请求被忽略,以避免高频缓存导致性能下降。 + /// 每次持久化的最大行数,避免一次性对数据库写入太多数据造成性能问题。 + /// 数据缓存的最大项目数。 + /// + public DataRecorderCache(string tableName, string module, IReadOnlyList dataHolders, int minCachePeriodMs = 50, int maxItemsPerPersist = 2000, int maxCacheSize = 100) + { + Debug.Assert(dataHolders is { Count: > 0 }, "Incorrect Data Holders List."); + + Module = module; + DataHolders = dataHolders; + _tableName = tableName; + _maxItemsPerPersist = maxItemsPerPersist < MIN_PERSIST_ITEMS ? MIN_PERSIST_ITEMS : maxItemsPerPersist; + _minCachePeriodMs = minCachePeriodMs < MIN_CACHE_PERIOD_MS ? MIN_CACHE_PERIOD_MS : minCachePeriodMs; + _maxCacheSize = maxCacheSize < MIN_CACHE_SIZE ? MIN_CACHE_SIZE : maxCacheSize; + _qCachedTimestamps = new Queue(maxCacheSize); + + // 必须传入有效的数据收集器列表。 + if (dataHolders == null || dataHolders.Count == 0) + throw new ArgumentException("数据保持器列表不能为空。", nameof(dataHolders)); + + // 检查数据收集器列表中是否存在重复的序号。 + if (dataHolders.GroupBy(dc => dc.Index).Any(g => g.Count() > 1)) + throw new ArgumentException("数据保持器列表中存在重复的序号。", nameof(dataHolders)); + + // 检查数据收集器列表中是否存在重复的名称。 + if (dataHolders.GroupBy(dc => dc.Name).Any(g => g.Count() > 1)) + throw new ArgumentException("数据保持器列表中存在重复的名称。", nameof(dataHolders)); + + // 检查数据收集器列表中是否存在空的获取器。 + if (dataHolders.Any(dc => dc.Read == null)) + throw new ArgumentException("数据保持器列表中存在空的保持器。", nameof(dataHolders)); + + _insertExpression = BuildInsertExpression(); + } + + #endregion + + #region Properties + + /// + /// 返回当前缓存服务的模组名称。 + /// + public string Module { get; } + + /// + /// 返回数据获取器列表。 + /// + public IReadOnlyList DataHolders { get; } + + #endregion + + #region Methods + + /// + /// 创建Insert语句。 + /// + /// + private string BuildInsertExpression() + { + var sqlExpr = new StringBuilder(); + + lock (_syncRoot) + { + sqlExpr.Append($"INSERT INTO \"{_tableName}\"(\"time\" "); + foreach (var holder in DataHolders) + { + sqlExpr.Append($",\"{holder.Name}\""); + } + + sqlExpr.Append(")"); + } + + return sqlExpr.ToString(); + } + + /// + /// 立即缓存当前数据。 + /// + public void Cache() + { + lock (_syncRoot) + { + #region Performance Monitor + + // 限制最大Cache频率 + if(_stopwatch.IsRunning && _stopwatch.ElapsedMilliseconds < _minCachePeriodMs) + return; + + //TODO 需要限制Q的最大数据量,并且检测HalfFull和Full事件,并触发警告,可能PC性能出现问题 + if (_qCachedTimestamps.Count >= _maxCacheSize) + { + // 缓存已满,不再缓存数据 + _rTrigCacheFull.CLK = true; + if(_rTrigCacheFull.Q) + EV.PostWarningLog(ModuleName.System.ToString(), $"[{Module}]DataRecorderCache Full, Capacity {_maxCacheSize} items"); + return; + } + + if (_qCachedTimestamps.Count >= _maxCacheSize / 2) + { + // 缓存的数据量已经超过一半,触发警告 + _rTrigCacheHalfFull.CLK = true; + if(_rTrigCacheHalfFull.Q) + EV.PostWarningLog(ModuleName.System.ToString(), $"[{Module}]DataRecorderCache Half full, Capacity {_maxCacheSize} items"); + } + else if (_qCachedTimestamps.Count >= _maxCacheSize * 0.9) + { + // 缓存的数据量已经超过90%,触发警告 + _rTrigCacheAlmostFull.CLK = true; + if(_rTrigCacheAlmostFull.Q) + EV.PostWarningLog(ModuleName.System.ToString(), $"[{Module}]DataRecorderCache Almost full, Usage {_qCachedTimestamps.Count} of {_maxCacheSize} items"); + } + else + { + _rTrigCacheFull.CLK = false; + _rTrigCacheHalfFull.CLK = false; + _rTrigCacheAlmostFull.CLK = false; + } + + #endregion + + _lastCachePeriodMs = _stopwatch.ElapsedMilliseconds; + _stopwatch.Restart(); + var ts = DateTime.Now.Ticks; + if (_qCachedTimestamps.Contains(ts)) + { + LOG.Error($"时间戳{ts}已经存在。"); + return; + } + + var ret = Parallel.ForEach(DataHolders, holder => { holder.Cache(ts); }); + + if (ret.IsCompleted) + { + _qCachedTimestamps.Enqueue(ts); + } + } + } + + /// + /// 获取将缓存数据写入数据库的SQL语句。 + /// + /// + /// + public string GetInsertSql() + { + lock (_syncRoot) + { + // 如果没数据,直接退出 + if (_qCachedTimestamps.Count == 0) + return ""; + + var sw = new Stopwatch(); + sw.Start(); + + var pickedCount = 0; + _sqlValuesList.Clear(); + + while (true) + { + // 没有缓存的数据了,退出 + if (_qCachedTimestamps.Count == 0) + break; + + var ts = _qCachedTimestamps.Dequeue(); + { + _sqlValuesList.Append("("); + _sqlValuesList.Append($"'{ts}',"); + foreach (var holder in DataHolders) + { + var value = holder.Get(ts); + switch (value) + { + case null: + _sqlValuesList.Append("'0'"); + break; + case double or float: + + var dValue = Convert.ToDouble(value); + if (double.IsNaN(dValue)) + dValue = 0.0; + + _sqlValuesList.Append($"'{dValue.ToString(CultureInfo.InvariantCulture)}'"); + break; + + case bool b: + _sqlValuesList.Append($"'{(b ? 1 : 0)}'"); + break; + + default: + _sqlValuesList.Append($"'{value}'"); + break; + } + + _sqlValuesList.Append(","); + } + + _sqlValuesList.Remove(_sqlValuesList.Length - 1, 1); + _sqlValuesList.Append("),"); + } + + pickedCount++; + + if (pickedCount >= _maxItemsPerPersist) + break; + } + + if (_sqlValuesList.Length <= 0) + { + _lastPersistRows = 0; + return ""; + } + else + { + _lastPersistRows = pickedCount; + } + + _sqlValuesList.Remove(_sqlValuesList.Length - 1, 1); + _sqlValuesList.Append(";"); + + // 拼接完整的SQL语句 + _sqlExpr.Clear(); + _sqlExpr.Append(_insertExpression); + _sqlExpr.Append(" VALUES "); + _sqlExpr.Append(_sqlValuesList); + + sw.Stop(); + + //TODO 加入性能监视器,如果执行时间过长,则引发Warn事件; + _rTrigBuildSqlExprTooSlow.CLK = sw.ElapsedMilliseconds > SQL_BUILD_DURATION_TOO_SLOW_MS; + if (_rTrigBuildSqlExprTooSlow.Q) + EV.PostWarningLog(ModuleName.System.ToString(), $"DataRecorderCache Build SQL Expression Too Slow, Took {sw.ElapsedMilliseconds:F1}ms greater than {SQL_BUILD_DURATION_TOO_SLOW_MS:F1}ms"); + + Debug.WriteLine( + $"[{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}] GenSQL:Ln{_lastPersistRows}/{sw.ElapsedMilliseconds}ms/B{_sqlExpr.Length}, LastCachePeriod:{_lastCachePeriodMs}ms, CacheRemained: {_qCachedTimestamps.Count}", + $"DIAG DBRC - {Module}"); + + return _sqlExpr.ToString(); + } + } + + #endregion + } +} diff --git a/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderDataTableCache.cs b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderDataTableCache.cs new file mode 100644 index 0000000..b077ca9 --- /dev/null +++ b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderDataTableCache.cs @@ -0,0 +1,150 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Aitex.Core.RT.DataCollection.HighPerformance +{ + internal class DataRecorderDataTableCache : IDataRecorderCache + { + #region Variables + + private readonly object _syncRoot = new(); + private readonly int _maxRowsPerFlush; + private readonly string _tableName; + + private DataTable _dataTable; + private readonly Queue _qCachedTimestamps = new(); + private readonly int _minCachePeriodMs; + private readonly Stopwatch _stopwatch = new(); + + private readonly StringBuilder _sqlExpr = new(5000000); // preallocate 5MB + private readonly StringBuilder _sqlValuesList = new(5000000); // preallocate 5MB + + private int _lastPersistRows = 0; + private double _lastCachePeriodMs = 0; + + #endregion + + #region Constructors + + /// + /// 创建数据记录器缓存对象。 + /// + /// 当前Cache对应的数据库表名称。 + /// 当前Cache对应的分组名称。 + /// 数据获取器对象集合,保存当前Cache需要缓存的数据源。 + /// 最小缓存周期,如果外部频繁调用方法请求缓存数据,该周期内的重复请求被忽略,以避免高频缓存导致性能下降。 + /// 每次持久化的最大行数,避免一次性对数据库写入太多数据造成性能问题。 + /// + public DataRecorderDataTableCache(string tableName, string module, IReadOnlyList dataHolders, int minCachePeriodMs = 50, int maxRowsPerFlush = 2000) + { + Debug.Assert(dataHolders is { Count: > 0 }, "Incorrect Data Collector List."); + + Module = module; + DataHolders = dataHolders; + _tableName = tableName; + _maxRowsPerFlush = maxRowsPerFlush; + _minCachePeriodMs = minCachePeriodMs; + + // 必须传入有效的数据收集器列表。 + if (dataHolders == null || dataHolders.Count == 0) + throw new ArgumentException("数据收集器列表不能为空。", nameof(dataHolders)); + + // 检查数据收集器列表中是否存在重复的序号。 + if (dataHolders.GroupBy(dc => dc.Index).Any(g => g.Count() > 1)) + throw new ArgumentException("数据收集器列表中存在重复的序号。", nameof(dataHolders)); + + // 检查数据收集器列表中是否存在重复的名称。 + if (dataHolders.GroupBy(dc => dc.Name).Any(g => g.Count() > 1)) + throw new ArgumentException("数据收集器列表中存在重复的名称。", nameof(dataHolders)); + + // 检查数据收集器列表中是否存在空的获取器。 + if (dataHolders.Any(dc => dc.Read == null)) + throw new ArgumentException("数据收集器列表中存在空的获取器。", nameof(dataHolders)); + + _dataTable = CreateDataTable(); + + + + /*DATA.Subscribe($"{ModuleName.System}.DBRC.{Module}.CachedCount", ()=> _qCachedTimestamps.Count, SubscriptionAttribute.FLAG.IgnoreSaveDB); + DATA.Subscribe($"{ModuleName.System}.DBRC.{Module}.PersistRows", ()=> _lastPersistRows, SubscriptionAttribute.FLAG.IgnoreSaveDB); + DATA.Subscribe($"{ModuleName.System}.DBRC.{Module}.PersistPeriodMs", ()=> _lastPersistPeriodMs, SubscriptionAttribute.FLAG.IgnoreSaveDB);*/ + } + + #endregion + + #region Properties + + /// + /// 返回当前缓存服务的模组名称。 + /// + public string Module { get; } + + /// + /// 返回数据获取器列表。 + /// + public IReadOnlyList DataHolders { get; } + + #endregion + + #region Methods + + /// + /// 创建临时数据表。 + /// + /// + private DataTable CreateDataTable() + { + lock (_syncRoot) + { + var dt = new DataTable(_tableName); + dt.Columns.Add("time", typeof(long)); + foreach (var dh in DataHolders) + { + var ret = dh.Read(); + dt.Columns.Add(dh.Name, ret.GetType()); + } + + return dt; + } + } + + + /// + /// 立即缓存当前数据。 + /// + public void Cache() + { + lock (_syncRoot) + { + // 限制最大Cache频率 + if (_stopwatch.IsRunning && _stopwatch.ElapsedMilliseconds < _minCachePeriodMs) + return; + + //TODO 需要限制Q的最大数据量,并且检测HalfFull和Full事件,并触发警告,可能PC性能出现问题 + + _lastCachePeriodMs = _stopwatch.ElapsedMilliseconds; + _stopwatch.Restart(); + var ts = DateTime.Now.Ticks; + var row = _dataTable.NewRow(); + row["time"] = ts; + var ret = Parallel.ForEach(DataHolders, dc => + { + var ret = dc.Read(); + row[dc.Name] = ret; + }); + + if (ret.IsCompleted) + { + _dataTable.Rows.Add(row); + } + } + } + + #endregion + } +} diff --git a/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderManager.cs b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderManager.cs new file mode 100644 index 0000000..253da65 --- /dev/null +++ b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderManager.cs @@ -0,0 +1,371 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Aitex.Core.RT.DataCenter; +using Aitex.Core.RT.DBCore; +using Aitex.Core.RT.Event; +using Aitex.Core.RT.Log; +using Aitex.Core.RT.SCCore; +using Aitex.Core.Util; +using MECF.Framework.Common.Equipment; + +namespace Aitex.Core.RT.DataCollection.HighPerformance +{ + + public class DataRecorderManager : Singleton + { + #region Variables + + /// + /// 当前记录器支持的数据类型。 + /// + private readonly Type[] _supportedDataTypes = + { typeof(bool), typeof(double), typeof(float), typeof(int), typeof(ushort), typeof(short) }; + + private IDataBase _db; + private readonly object _lock = new(); + private readonly int _dataRecorderInterval; + private readonly int _dataRecorderCacheSize; + private readonly int _dataRecorderCachePeriodMs; + private string[] _modules = { "Data" }; + private readonly Dictionary> _subscribedRecordedDataGetter = new(); + private Dictionary _dictCachePerModule; + private DateTime _dateOfTableName; + private IDataRecorderCallback _callback; + + #endregion + + + #region Constructors + + public DataRecorderManager() + { + _dataRecorderInterval = (SC.ContainsItem("System.DataRecorderInterval") + ? SC.GetValue("System.DataCollectionInterval") + : 1000); + + _dataRecorderCacheSize = (SC.ContainsItem("System.DataRecorderCacheSize") + ? SC.GetValue("System.DataRecorderCacheSize") + : DataRecorderCache.MIN_CACHE_SIZE); + + _dataRecorderCachePeriodMs = (SC.ContainsItem("System.DataRecorderCachePeriod") + ? SC.GetValue("System.DataRecorderCachePeriod") + : DataRecorderCache.MIN_CACHE_PERIOD_MS); + } + + #endregion + + #region Methods + + public void Initialize(IDataRecorderCallback callback) + { + IDataRecorderCallback callback2; + if (callback != null) + { + callback2 = callback; + } + else + { + IDataRecorderCallback dataCollectionCallback = new DefaultDataRecorderCallback(); + callback2 = dataCollectionCallback; + } + + _callback = callback2; + Task.Factory.StartNew(DataRecorderThread); + } + + public void Initialize(string[] modules, string dbName) + { + _callback = new DefaultDataRecorderCallback(dbName); + if (modules != null && modules.Length != 0 && !string.IsNullOrEmpty(modules[0])) + { + _modules = modules; + } + + Task.Factory.StartNew(DataRecorderThread); + } + + public void Initialize(string dbName) + { + _callback = new DefaultDataRecorderCallback(dbName); + Task.Factory.StartNew(DataRecorderThread); + } + + /// + /// 立即缓存所有模组的数据。 + /// + public void ImmediateCache() + { + DoCache(); + } + + private void DataRecorderThread() + { + while (true) + { + Thread.Sleep(_dataRecorderInterval); + try + { + CreateCache(); + + // 开始周期性数据采集 + var lastFlushTime = DateTime.Now; + while (true) + { + Thread.Sleep((int)(_dataRecorderInterval * 0.99)); + + // 跨天,退出循环并创建新表 + if (DateTime.Now.Date != _dateOfTableName) + break; + + // 立即缓存一次数据 + DoCache(); + + // 每隔5秒钟将缓存写入数据库 + if (lastFlushTime.AddSeconds(5) < DateTime.Now) + { + var ret = Persist(); + lastFlushTime = DateTime.Now; + if (ret < 0) // 插入数据错误,退出 + break; + } + } + } + catch (Exception ex) + { + LOG.Write(ex, "数据库操作记录发生异常"); + } + } + } + + /// + /// 扫描,获取所有可记录的数据。 + /// + /// + /// 仅数据管理器中注册的数据,且类型为的数据支持保存到数据库。 + /// > + private void GetRecordableDataSource() + { + var dBRecorderList = DATA.GetDBRecorderList(); + foreach (var item in dBRecorderList) + { + if (_subscribedRecordedDataGetter.ContainsKey(item.Key)) + { + continue; + } + + var obj = item.Value(); + if (obj != null) + { + var type = obj.GetType(); + if (type == typeof(bool) || type == typeof(double) || type == typeof(float) || + type == typeof(int) || type == typeof(ushort) || type == typeof(short)) + { + _subscribedRecordedDataGetter[item.Key] = item.Value; + } + } + } + } + + /// + /// 创建数据缓存对象。 + /// + private void CreateCache() + { + lock (_lock) + { + GetRecordableDataSource(); + + _dictCachePerModule = new(); + Dictionary> dictModules = new(); + + var defaultModuleName = (_modules.Contains(ModuleName.System.ToString()) + ? ModuleName.System.ToString() + : (_modules.Contains("Data") ? "Data" : _modules[0])); + + + if (_subscribedRecordedDataGetter is { Count: > 0 }) + { + foreach (var dataName in _subscribedRecordedDataGetter.Keys) + { + var isAvailableModuleName = false; + foreach (var module in _modules) + { + if (dataName.StartsWith(module + ".") || + dataName.StartsWith("IO." + module + ".")) + { + if (!dictModules.ContainsKey(module)) + dictModules[module] = new List(); + + dictModules[module].Add(new DataHolder(dictModules[module].Count, + dataName, _subscribedRecordedDataGetter[dataName])); + isAvailableModuleName = true; + break; + } + } + + if (!isAvailableModuleName) + { + + if (!dictModules.ContainsKey(defaultModuleName)) + dictModules[defaultModuleName] = new List(); + + dictModules[defaultModuleName].Add(new DataHolder( + dictModules[defaultModuleName].Count, + dataName, _subscribedRecordedDataGetter[dataName])); + } + } + } + + _dateOfTableName = DateTime.Now.Date; + foreach (var module in dictModules.Keys) + { + var dhs = dictModules[module]; + var tableName = $"{_dateOfTableName:yyyyMMdd}.{module}"; + _dictCachePerModule[module] = + new DataRecorderCache(tableName, module, dhs, minCachePeriodMs: _dataRecorderCachePeriodMs, + maxCacheSize: _dataRecorderCacheSize); + + UpdateTableSchema(tableName, dhs); + } + } + } + + /// + /// 立即缓存所有模组的数据。 + /// + /// + private void DoCache() + { + lock (_lock) + { + // DO调用ForceCache方法可能发生在当前对象初始化之前,此时_dictCachePerModule为null + if (_dictCachePerModule == null) + return; + + + // 立即缓存一次数据 + foreach (var drc in _dictCachePerModule.Values) + drc.Cache(); + } + + } + + /// + /// 缓存写入数据库。 + /// + /// 插入数据库的行数。-1 表示写入数据库错误。 + public int Persist() + { + var num = 0; + + lock (_lock) + { + foreach (var dcc in _dictCachePerModule.Values) + { + var sql = dcc.GetInsertSql(); + + if (!string.IsNullOrEmpty(sql)) + { + try + { + num = DB.ExecuteNonQuery(sql); + } + catch (Exception ex) + { + LOG.Write(ex, $"{ModuleName.System.ToString()}写入数据库发生异常"); + num = -1; + } + } + } + } + + return num; + } + + private bool UpdateTableSchema(string tblName, IEnumerable dataCollectors) + { + var cmdText = $"select column_name from information_schema.columns where table_name = '{tblName}';"; + var reader = DB.ExecuteReader(cmdText); + var sqlExprAddColumn = string.Empty; + var existedColumns = new List(); + while (reader.Read()) + { + for (var i = 0; i < reader.FieldCount; i++) + { + existedColumns.Add(reader[i].ToString()); + } + } + + reader.Close(); + if (existedColumns.Count > 0) + { + foreach (var dataItem in dataCollectors) + { + var colName = dataItem.Name; + if (!existedColumns.Contains(colName)) + { + var type = dataItem.Read().GetType(); + if (type == typeof(bool)) + { + sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Boolean"};"; + } + else if (_supportedDataTypes.Contains(type)) + { + sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Real"};"; + } + } + } + + if (!string.IsNullOrEmpty(sqlExprAddColumn)) + { + try + { + DB.ExecuteNonQuery(sqlExprAddColumn); + } + catch (Exception ex) + { + LOG.Write(ex); + return false; + } + } + } + else + { + var sqlExprCreateTable = $"CREATE TABLE \"{tblName}\"(Time bigint NOT NULL,"; + foreach (var dataItem in dataCollectors) + { + var colName = dataItem.Name; + var dataType = dataItem.Read().GetType(); + if (dataType == typeof(bool)) + { + sqlExprCreateTable += $"\"{colName}\" Boolean,\n"; + } + else if (_supportedDataTypes.Contains(dataType)) + { + sqlExprCreateTable += $"\"{colName}\" Real,\n"; + } + } + + sqlExprCreateTable += $"CONSTRAINT \"{tblName}_pkey\" PRIMARY KEY (Time));"; + sqlExprCreateTable += $"GRANT SELECT ON TABLE \"{tblName}\" TO postgres;"; + try + { + DB.ExecuteNonQuery(sqlExprCreateTable.ToString()); + } + catch (Exception ex2) + { + LOG.Write(ex2, "创建数据库表格失败"); + return false; + } + } + + return true; + } + } + + #endregion + +} diff --git a/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DefaultDataRecorderCallback.cs b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DefaultDataRecorderCallback.cs new file mode 100644 index 0000000..bbd80e1 --- /dev/null +++ b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DefaultDataRecorderCallback.cs @@ -0,0 +1,37 @@ +using Aitex.Common.Util; + +namespace Aitex.Core.RT.DataCollection.HighPerformance +{ + public class DefaultDataRecorderCallback : IDataRecorderCallback + { + private string _db = "postgres"; + + public DefaultDataRecorderCallback() + { + } + + public DefaultDataRecorderCallback(string db) + { + _db = db; + } + + public void PostDBFailedEvent() + { + } + + public string GetSqlUpdateFile() + { + return PathManager.GetCfgDir() + "SqlUpdate.sql"; + } + + public string GetDataTablePrefix() + { + return "Data"; + } + + public string GetDBName() + { + return _db; + } + } +} diff --git a/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCache.cs b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCache.cs new file mode 100644 index 0000000..a3317c9 --- /dev/null +++ b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCache.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; + +namespace Aitex.Core.RT.DataCollection.HighPerformance +{ + public interface IDataRecorderCache + { + /// + /// 返回当前缓存服务的模组名称。 + /// + string Module { get; } + + + IReadOnlyList DataHolders { get; } + + /// + /// 立即缓存当前数据。 + /// + void Cache(); + } +} diff --git a/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCallback.cs b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCallback.cs new file mode 100644 index 0000000..2a5a780 --- /dev/null +++ b/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/IDataRecorderCallback.cs @@ -0,0 +1,13 @@ +namespace Aitex.Core.RT.DataCollection.HighPerformance +{ + public interface IDataRecorderCallback + { + void PostDBFailedEvent(); + + string GetDBName(); + + string GetSqlUpdateFile(); + + string GetDataTablePrefix(); + } +} diff --git a/MECF.Framework.Common/MECF.Framework.Common.csproj b/MECF.Framework.Common/MECF.Framework.Common.csproj index 35ae965..61eeb58 100644 --- a/MECF.Framework.Common/MECF.Framework.Common.csproj +++ b/MECF.Framework.Common/MECF.Framework.Common.csproj @@ -282,6 +282,13 @@ + + + + + + +