using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Aitex.Core.RT.DataCenter; using Aitex.Core.RT.DBCore; using Aitex.Core.RT.Log; using Aitex.Core.RT.SCCore; using Aitex.Core.Util; using MECF.Framework.Common.Equipment; using ThreadState = System.Threading.ThreadState; namespace Aitex.Core.RT.DataCollection.HighPerformance { public class DataRecorderManager : Singleton { #region Variables private readonly object _syncRoot = new(); private readonly int _dataRecorderInterval; private readonly int _dataRecorderCacheSize; private readonly int _dataRecorderCachePeriodMs; private string[] _dataTableCategory = { ModuleName.System.ToString() }; private readonly Dictionary> _dataGetters = new(); private readonly R_TRIG _rTrigPersistThreadError = new(); private readonly CancellationTokenSource _ctsDataRecorderThread; private readonly CancellationToken _ctDataRecorderThread; private Dictionary _dictDataRecCachePerDataTable; private DateTime _datePrefixTableName; private readonly Thread _threadCache; private readonly Thread _threadPersist; #endregion #region Constructors public DataRecorderManager() { _dataRecorderInterval = (SC.ContainsItem("System.DataRecorderInterval") ? SC.GetValue("System.DataCollectionInterval") : 1000); _dataRecorderCacheSize = (SC.ContainsItem("System.DataRecorderCacheSize") ? SC.GetValue("System.DataRecorderCacheSize") : DataRecorderCache.MIN_CACHE_SIZE); _dataRecorderCachePeriodMs = (SC.ContainsItem("System.DataRecorderCachePeriod") ? SC.GetValue("System.DataRecorderCachePeriod") : DataRecorderCache.MIN_CACHE_PERIOD_MS); _ctsDataRecorderThread = new(); _ctDataRecorderThread = _ctsDataRecorderThread.Token; _threadCache = new Thread(CacheThread); _threadCache.Name = nameof(CacheThread); _threadPersist = new Thread(PersistThread); _threadPersist.Name = nameof(PersistThread); } #endregion #region Methods /// /// 初始化对象。 /// /// public void Initialize(string[] dataTableCategory) { DATA.Subscribe($"{ModuleHelper.GetDiagnosisPath(ModuleName.System)}.DCPersistCostMs", () => _statPersistCostMs); DATA.Subscribe($"{ModuleHelper.GetDiagnosisPath(ModuleName.System)}.DCPersistInvMs", () => _statPersistInvMs); DATA.Subscribe($"{ModuleHelper.GetDiagnosisPath(ModuleName.System)}.DCPersistCounter", () => _statPersistCnt); DATA.Subscribe($"{ModuleHelper.GetDiagnosisPath(ModuleName.System)}.DCPersistErrorCounter", () => _statPersistErrorCnt); DATA.Subscribe($"{ModuleHelper.GetDiagnosisPath(ModuleName.System)}.DCCacheCostMs", () => _statCacheCostMs); DATA.Subscribe($"{ModuleHelper.GetDiagnosisPath(ModuleName.System)}.DCCacheInvMs", () => _statCacheInvMs); DATA.Subscribe($"{ModuleHelper.GetDiagnosisPath(ModuleName.System)}.DCCacheCounter", () => _statCacheCnt); _dataTableCategory = dataTableCategory; _threadCache.Start(); } /// /// 立即缓存所有模组的数据。 /// public void ImmediateCache(CacheDiagnosisInfo diagnosisInfo = null) { if (diagnosisInfo != null) Debug.WriteLine($"{diagnosisInfo.Module}.{diagnosisInfo.IoName} changed to {diagnosisInfo.Value}", $"{nameof(DataRecorderManager)} - {nameof(ImmediateCache)}"); DoCache(); } private readonly DeviceTimer _tmrPersistDelay = new(); private int _statPersistCostMs; private int _statPersistInvMs; private int _statPersistCnt; private int _statPersistErrorCnt; private void PersistThread() { var swPersisCostMsStat = new Stopwatch(); var swPersisInvMsStat = new Stopwatch(); try { while (true) { _ctDataRecorderThread.ThrowIfCancellationRequested(); Thread.Sleep(10); if(!_tmrPersistDelay.IsIdle() && !_tmrPersistDelay.IsTimeout()) continue; // next persist will be called 5sec later _tmrPersistDelay.Start(5000); // statistic persist called interval in ms. swPersisInvMsStat.Stop(); _statPersistInvMs = (int)swPersisInvMsStat.Elapsed.TotalMilliseconds; // persist data every 5sec swPersisCostMsStat.Restart(); var ret = Persist(); swPersisCostMsStat.Stop(); _statPersistCostMs = (int)swPersisCostMsStat.Elapsed.TotalMilliseconds; // check whether the DB write operation is succeeded _rTrigPersistThreadError.CLK = ret < 0; if (_rTrigPersistThreadError.Q) // 插入数据错误,退出 { // DB Write Error LOG.Error( $"{nameof(DataRecorderManager)}.{nameof(PersistThread)} Unable to persist the cache."); _statPersistErrorCnt++; } else { // DB Write OK _statPersistCnt++; } // start to calculate the interval of persist operation swPersisInvMsStat.Restart(); } } catch (OperationCanceledException) { LOG.Info($"{nameof(DataRecorderManager)}.{nameof(PersistThread)} terminated."); } catch (Exception ex) { LOG.Write(ex, $"{nameof(DataRecorderManager)}.{nameof(PersistThread)}线程意外终止,{ex.Message}"); } } private readonly DeviceTimer _tmrCacheThreadDelay = new(); private int _statCacheCostMs; private int _statCacheInvMs; private int _statCacheCnt; private void CacheThread() { try { var swCacheCostMsStat = new Stopwatch(); var swCacheInvMsStat = new Stopwatch(); while (true) { Thread.Sleep(_dataRecorderInterval); _ctDataRecorderThread.ThrowIfCancellationRequested(); CreateCache(); _ctDataRecorderThread.ThrowIfCancellationRequested(); // start persist thread once the cache instances are built. // note the persist thread must be started before CreateCache() method. if (_threadPersist.ThreadState == ThreadState.Unstarted) _threadPersist.Start(); // 开始周期性数据采集 while (true) { _ctDataRecorderThread.ThrowIfCancellationRequested(); Thread.Sleep(10); if(!_tmrCacheThreadDelay.IsIdle() && !_tmrCacheThreadDelay.IsTimeout()) continue; _tmrCacheThreadDelay.Start(_dataRecorderInterval); // statistic persist called interval in ms. swCacheInvMsStat.Stop(); _statCacheInvMs = (int)swCacheInvMsStat.Elapsed.TotalMilliseconds; // 跨天,退出循环并创建新表 if (DateTime.Now.Date != _datePrefixTableName) break; // 立即缓存一次数据 swCacheCostMsStat.Restart(); DoCache(); swCacheCostMsStat.Stop(); _statCacheCostMs = (int)swCacheCostMsStat.Elapsed.TotalMilliseconds; swCacheInvMsStat.Restart(); _statCacheCnt++; } } } catch (OperationCanceledException) { LOG.Info($"{nameof(DataRecorderManager)}.{nameof(CacheThread)} terminated."); } catch (Exception ex) { LOG.Write(ex, $"{nameof(DataRecorderManager)}.{nameof(CacheThread)}线程意外终止,{ex.Message}"); } } /// /// 扫描,获取所有可记录的数据。 /// /// /// 仅数据管理器中注册的数据,且类型为的数据支持保存到数据库。 /// > private void GetRecordableDataSource() { var dBRecorderList = DATA.GetDBRecorderList(); foreach (var item in dBRecorderList) { if (_dataGetters.ContainsKey(item.Key)) continue; var getter = item.Value; var value = getter.Invoke(); if (value != null) { var type = value.GetType(); if(DataRecorderHelper.SupportedValueTypes.Contains(type)) _dataGetters[item.Key] = getter; } } } /// /// 创建数据缓存对象。 /// private void CreateCache() { lock (_syncRoot) { GetRecordableDataSource(); _dictDataRecCachePerDataTable = new(); Dictionary> dictDataHolderPerDataTable = new(); var defaultModuleName = (_dataTableCategory.Contains(ModuleName.System.ToString()) ? ModuleName.System.ToString() : (_dataTableCategory.Contains(ModuleName.System.ToString()) ? ModuleName.System.ToString() : _dataTableCategory[0])); if (_dataGetters is { Count: > 0 }) { foreach (var dataName in _dataGetters.Keys) { Type valueType; try { // determine the type of the data. var getter = _dataGetters[dataName]; var value = getter.Invoke(); valueType = value.GetType(); Debug.Assert(valueType != null, $"Unable to determine the type of DATA item {dataName}"); } catch (Exception ex) { Debug.Assert(false, $"Unable to determine the type of DATA item {dataName}"); LOG.Write(ex, $"Unable to determine the type of DATA item {dataName}, {ex.Message}"); continue; } var isTableAssigned = false; foreach (var categoryName in _dataTableCategory) { if (dataName.StartsWith(categoryName + ".") || dataName.StartsWith("IO." + categoryName + ".")) { if (!dictDataHolderPerDataTable.ContainsKey(categoryName)) dictDataHolderPerDataTable[categoryName] = []; dictDataHolderPerDataTable[categoryName].Add(new DataHolder(dictDataHolderPerDataTable[categoryName].Count, dataName, _dataGetters[dataName])); isTableAssigned = true; break; } } if (!isTableAssigned) { if (!dictDataHolderPerDataTable.ContainsKey(defaultModuleName)) dictDataHolderPerDataTable[defaultModuleName] = []; dictDataHolderPerDataTable[defaultModuleName].Add(new DataHolder( dictDataHolderPerDataTable[defaultModuleName].Count, dataName, _dataGetters[dataName])); } } } _datePrefixTableName = DateTime.Now.Date; foreach (var category in dictDataHolderPerDataTable.Keys) { var holders = dictDataHolderPerDataTable[category]; var tableName = $"{_datePrefixTableName:yyyyMMdd}.{category}"; _dictDataRecCachePerDataTable[category] = new DataRecorderCache(tableName, category, holders, minCachePeriodMs: _dataRecorderCachePeriodMs, maxCacheSize: _dataRecorderCacheSize); UpdateTableSchema(tableName, holders); } } } /// /// 立即缓存所有模组的数据。 /// /// private void DoCache() { lock (_syncRoot) { // DO调用ForceCache方法可能发生在当前对象初始化之前,此时_dictCachePerModule为null if (_dictDataRecCachePerDataTable == null) return; // 立即缓存一次数据 foreach (var drc in _dictDataRecCachePerDataTable.Values) drc.Cache(); } } /// /// 缓存写入数据库。 /// /// 插入数据库的行数。-1 表示写入数据库错误。 public int Persist() { var num = 0; lock (_syncRoot) { foreach (var dcc in _dictDataRecCachePerDataTable.Values) { var sql = dcc.GetInsertSql(); if (!string.IsNullOrEmpty(sql)) { try { num = DB.ExecuteNonQuery(sql); } catch (Exception ex) { LOG.Write(ex, $"{ModuleName.System.ToString()}写入数据库发生异常"); num = -1; } } } } return num; } public void Terminate() { _ctsDataRecorderThread.Cancel(); } private bool UpdateTableSchema(string tblName, IEnumerable holders) { var cmdText = $"select column_name from information_schema.columns where table_name = '{tblName}';"; var reader = DB.ExecuteReader(cmdText); var sqlExprAddColumn = string.Empty; var existedColumns = new List(); while (reader.Read()) { for (var i = 0; i < reader.FieldCount; i++) { existedColumns.Add(reader[i].ToString()); } } reader.Close(); if (existedColumns.Count > 0) { foreach (var holder in holders) { var colName = holder.Name; if (!existedColumns.Contains(colName)) { var type = holder.ValueType; if (type == typeof(bool)) { sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Boolean"};"; } else if (DataRecorderHelper.SupportedValueTypes.Contains(type)) { sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Real"};"; } } } if (!string.IsNullOrEmpty(sqlExprAddColumn)) { try { DB.ExecuteNonQuery(sqlExprAddColumn); } catch (Exception ex) { LOG.Write(ex); return false; } } } else { var sqlExprCreateTable = $"CREATE TABLE \"{tblName}\"(Time bigint NOT NULL,"; foreach (var dataItem in holders) { var colName = dataItem.Name; var dataType = dataItem.Read().GetType(); if (dataType == typeof(bool)) { sqlExprCreateTable += $"\"{colName}\" Boolean,\n"; } else if (DataRecorderHelper.SupportedValueTypes.Contains(dataType)) { sqlExprCreateTable += $"\"{colName}\" Real,\n"; } } sqlExprCreateTable += $"CONSTRAINT \"{tblName}_pkey\" PRIMARY KEY (Time));"; sqlExprCreateTable += $"GRANT SELECT ON TABLE \"{tblName}\" TO postgres;"; try { DB.ExecuteNonQuery(sqlExprCreateTable.ToString()); } catch (Exception ex2) { LOG.Write(ex2, "创建数据库表格失败"); return false; } } return true; } #endregion } }