Sic.Framework-Nanjing-Baishi/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataTraceManager.cs

592 lines
23 KiB
C#
Raw Normal View History

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Aitex.Core.RT.DataCenter;
using Aitex.Core.RT.DBCore;
using Aitex.Core.RT.IOCore;
using Aitex.Core.RT.Log;
using Aitex.Core.RT.SCCore;
using Aitex.Core.Util;
using MECF.Framework.Common.Equipment;
using ThreadState = System.Threading.ThreadState;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public class DataTraceManager : Singleton<DataTraceManager>
{
#region Variables
private readonly object _syncRoot = new();
private readonly int _dataRecorderInterval;
private readonly int _dataRecorderCacheSize;
private readonly int _dataRecorderCachePeriodMs;
private string[] _dataTableCategory = { ModuleName.System.ToString() };
private readonly Dictionary<string, Func<object>> _dataGetters = new();
private readonly R_TRIG _rTrigPersistThreadError = new();
private readonly CancellationTokenSource _ctsDataRecorderThread;
private readonly CancellationToken _ctDataRecorderThread;
private Dictionary<string, DataTraceCache> _dictDataRecCachePerDataTable;
private DateTime _datePrefixTableName;
private readonly Thread _threadCache;
private readonly Thread _threadPersist;
private bool isAllowTraceAOHopping = false;
private bool isAllowTraceDOHopping = false;
#endregion
#region Constructors
public DataTraceManager()
{
_dataRecorderInterval = (SC.ContainsItem("System.DataRecorderInterval")
? SC.GetValue<int>("System.DataCollectionInterval")
: 1000);
_dataRecorderCacheSize = (SC.ContainsItem("System.DataRecorderCacheSize")
? SC.GetValue<int>("System.DataRecorderCacheSize")
: DataTraceCache.MIN_CACHE_SIZE);
_dataRecorderCachePeriodMs = (SC.ContainsItem("System.DataRecorderCachePeriod")
? SC.GetValue<int>("System.DataRecorderCachePeriod")
: DataTraceCache.MIN_CACHE_PERIOD_MS);
_ctsDataRecorderThread = new();
_ctDataRecorderThread = _ctsDataRecorderThread.Token;
_threadCache = new Thread(CacheThread);
_threadCache.Name = nameof(CacheThread);
_threadPersist = new Thread(PersistThread);
_threadPersist.Name = nameof(PersistThread);
}
#endregion
#region Methods
/// <summary>
/// 初始化<see cref="DataTraceManager"/>对象。
/// </summary>
/// <param name="dataTableCategory"></param>
public void Initialize(string[] dataTableCategory)
{
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistCostMs", () => _statPersistCostMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistInvMs", () => _statPersistInvMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistLines", () => _statPersistLines);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistCounter", () => _statPersistCnt);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistErrorCounter", () => _statPersistErrorCnt);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheCostMs", () => _statCacheCostMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheInvMs", () => _statCacheInvMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheCounter", () => _statCacheCnt);
#if CHECK_DATA_TRACE_OVERRUN_ISSUE
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheOverrunDuration", () => _statCacheOverrunDuration);
#endif
const string SC_ALLOW_TRACE_AO_HOPPING = "System.AllowTraceAOHopping";
const string SC_ALLOW_TRACE_DO_HOPPING = "System.AllowTraceDOHopping";
isAllowTraceAOHopping = SC.GetValue(SC_ALLOW_TRACE_AO_HOPPING, true);
SC.RegisterValueChangedCallback(SC_ALLOW_TRACE_AO_HOPPING, o =>
{
if(!bool.TryParse(o.ToString(), out isAllowTraceAOHopping))
LOG.Error($"Unable to convert value {o} of [{SC_ALLOW_TRACE_AO_HOPPING}] to boolean.");
});
isAllowTraceDOHopping = SC.GetValue(SC_ALLOW_TRACE_DO_HOPPING, true);
SC.RegisterValueChangedCallback(SC_ALLOW_TRACE_DO_HOPPING, o =>
{
if(!bool.TryParse(o.ToString(), out isAllowTraceDOHopping))
LOG.Error($"Unable to convert value {o} of [{SC_ALLOW_TRACE_DO_HOPPING}] to boolean.");
});
_dataTableCategory = dataTableCategory;
_threadCache.Start();
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
public void ImmediateCache(IIOAccessor io = null, CacheDiagnosisInfo diagnosisInfo = null)
{
if (io is AOAccessor)
{
if (!isAllowTraceAOHopping)
return;
// maybe DB free disk space too low to trace AO hopping
var obj = DATA.Poll($"{ModuleName.System}.{ModuleName.Diagnosis}.DiskMon.IsDBFreeDiskSpaceLow");
if (obj is true)
return;
}
if (io is DOAccessor && !isAllowTraceDOHopping)
return;
if (diagnosisInfo != null)
Debug.WriteLine($"{diagnosisInfo.Module}.{diagnosisInfo.IoName} changed to {diagnosisInfo.Value}",
$"{nameof(DataTraceManager)} - {nameof(ImmediateCache)}");
lock (_syncRoot)
{
_tmrCacheThreadDelay.Stop();
}
DoCache();
}
#region Background Threads
private readonly DeviceTimer _tmrPersistDelay = new();
private void PersistThread()
{
try
{
while (true)
{
_ctDataRecorderThread.ThrowIfCancellationRequested();
Thread.Sleep(10);
if (!_tmrPersistDelay.IsIdle() && !_tmrPersistDelay.IsTimeout())
continue;
// next persist will be called 5sec later
_tmrPersistDelay.Start(5000);
var ret = Persist();
// check whether the DB write operation is succeeded
_rTrigPersistThreadError.CLK = ret < 0;
if (_rTrigPersistThreadError.Q) // 插入数据错误,退出
{
// DB Write Error
LOG.Error(
$"{nameof(DataTraceManager)}.{nameof(PersistThread)} Unable to persist the cache.");
}
}
}
catch (OperationCanceledException)
{
LOG.Info($"{nameof(DataTraceManager)}.{nameof(PersistThread)} terminated.");
}
catch (Exception ex)
{
LOG.Write(ex, $"{nameof(DataTraceManager)}.{nameof(PersistThread)} terminated unexpectedly{ex.Message}");
}
finally
{
try
{
Persist();
}
catch (Exception ex)
{
LOG.Write(ex,
$"{nameof(DataTraceManager)}.{nameof(PersistThread)} Failed to persist cache when exit the thread, {ex.Message}");
}
}
}
private readonly DeviceTimer _tmrCacheThreadDelay = new();
private void CacheThread()
{
try
{
while (true)
{
Thread.Sleep(_dataRecorderInterval);
_ctDataRecorderThread.ThrowIfCancellationRequested();
CreateCache();
_ctDataRecorderThread.ThrowIfCancellationRequested();
// start persist thread once the cache instances are built.
// note the persist thread must be started before CreateCache() method.
if (_threadPersist.ThreadState == ThreadState.Unstarted)
_threadPersist.Start();
// 开始周期性数据采集
while (true)
{
_ctDataRecorderThread.ThrowIfCancellationRequested();
Thread.Sleep(10);
// 跨天,退出循环并创建新表
if (DateTime.Now.Date != _datePrefixTableName)
break;
// 立即缓存一次数据
DoCache();
}
}
}
catch (OperationCanceledException)
{
LOG.Info($"{nameof(DataTraceManager)}.{nameof(CacheThread)} terminated.");
}
catch (Exception ex)
{
LOG.Write(ex, $"{nameof(DataTraceManager)}.{nameof(CacheThread)}线程意外终止,{ex.Message}");
}
}
#endregion
/// <summary>
/// 扫描<see cref="DataManager"/>,获取所有可记录的数据。
/// </summary>
/// <remarks>
/// 仅数据管理器中注册的数据,且类型为<see cref="bool"/>、<see cref="double"/>、<see cref="float"/>、<see cref="int"/>、<see cref="ushort"/>、<see cref="short"/>的数据支持保存到数据库。
/// </remarks>>
private void GetRecordableDataSource()
{
var dBRecorderList = DATA.GetDBRecorderList();
foreach (var item in dBRecorderList)
{
if (_dataGetters.ContainsKey(item.Key))
continue;
var getter = item.Value;
var value = getter.Invoke();
if (value != null)
{
var type = value.GetType();
if(DataTraceHelper.SupportedValueTypes.Contains(type))
_dataGetters[item.Key] = getter;
}
}
}
/// <summary>
/// 创建数据缓存对象。
/// </summary>
private void CreateCache()
{
lock (_syncRoot)
{
// persist the existing data since the date is changing.
if (_dictDataRecCachePerDataTable != null)
Persist();
GetRecordableDataSource();
_dictDataRecCachePerDataTable = new();
Dictionary<string, List<IDataBuffer>> dictDataBufferPerTable = new();
var defaultModuleName = (_dataTableCategory.Contains(ModuleName.System.ToString())
? ModuleName.System.ToString()
: (_dataTableCategory.Contains(ModuleName.System.ToString())
? ModuleName.System.ToString()
: _dataTableCategory[0]));
if (_dataGetters is { Count: > 0 })
{
foreach (var dataName in _dataGetters.Keys)
{
Type valueType;
Func<object> getter;
try
{
// determine the type of the data.
getter = _dataGetters[dataName];
var value = getter.Invoke();
valueType = value.GetType();
Debug.Assert(valueType != null, $"Unable to determine the type of DATA item {dataName}");
}
catch (Exception ex)
{
LOG.Write(ex, $"Unable to determine the type of DATA item {dataName}, {ex.Message}");
Debug.Assert(false, $"Unable to determine the type of DATA item {dataName}");
continue;
}
var isTableAssigned = false;
var categoryName = "";
foreach (var category in _dataTableCategory)
{
if (dataName.StartsWith(category + ".") ||
dataName.StartsWith("IO." + category + "."))
{
if (!dictDataBufferPerTable.ContainsKey(category))
dictDataBufferPerTable[category] = [];
categoryName = category;
isTableAssigned = true;
break;
}
}
if (!isTableAssigned)
{
if (!dictDataBufferPerTable.ContainsKey(defaultModuleName))
dictDataBufferPerTable[defaultModuleName] = [];
categoryName = defaultModuleName;
}
// create DataHolder and add to dictDataHolderPerDataTable.
var index = dictDataBufferPerTable[categoryName].Count;
var type = typeof(DataBuffer<>).MakeGenericType(valueType);
var holder = (IDataBuffer)Activator.CreateInstance(type, index, dataName, getter);
if (holder == null)
{
LOG.Error($"Unable to create instance of DataBuffer for {dataName}.");
Debug.Assert(holder != null, "Unable to create instance of DataBuffer.");
}
else
{
dictDataBufferPerTable[categoryName].Add(holder);
}
}
}
_datePrefixTableName = DateTime.Now.Date;
foreach (var perTable in dictDataBufferPerTable.Keys)
{
var holders = dictDataBufferPerTable[perTable];
var tableName = $"{_datePrefixTableName:yyyyMMdd}.{perTable}";
_dictDataRecCachePerDataTable[perTable] =
new DataTraceCache(tableName, perTable, holders, minCachePeriodMs: _dataRecorderCachePeriodMs,
maxCacheSize: _dataRecorderCacheSize);
UpdateTableSchema(tableName, holders);
}
}
}
private int _statCacheCostMs;
private int _statCacheInvMs;
private int _statCacheCnt;
private int _statCacheOverrunDuration;
private readonly R_TRIG _rTrigTraceOverrun = new();
private readonly Stopwatch _swTraceOverrun = new();
private readonly Stopwatch _swCacheCostMsStat = new();
private readonly Stopwatch _swCacheInvMsStat = new();
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
/// <returns></returns>
private void DoCache()
{
lock (_syncRoot)
{
if (!_tmrCacheThreadDelay.IsIdle() && !_tmrCacheThreadDelay.IsTimeout())
return;
_tmrCacheThreadDelay.Start(_dataRecorderInterval);
// statistic cache called interval in ms.
_swCacheInvMsStat.Stop();
_statCacheInvMs = (int)_swCacheInvMsStat.Elapsed.TotalMilliseconds;
#if CHECK_DATA_TRACE_OVERRUN_ISSUE
const int OVERRUN_ALARM_THRESHOLD_M = 60;
// Check data-trace-overrun issue
_rTrigTraceOverrun.CLK = _statCacheInvMs < 900;
if (_rTrigTraceOverrun.Q && !_swTraceOverrun.IsRunning)
{
_swTraceOverrun.Restart();
}
else if (!_rTrigTraceOverrun.M && _swTraceOverrun.IsRunning)
{
_swTraceOverrun.Stop();
}
if (_swTraceOverrun.Elapsed.TotalSeconds > TimeSpan.FromMinutes(OVERRUN_ALARM_THRESHOLD_M).TotalSeconds)
{
_swTraceOverrun.Stop();
_rTrigTraceOverrun.RST = true;
Debug.Assert(false, "Data-Trace-Overrun Issue Detected",
$"The {nameof(DataTraceManager)} continues to trace data at a high frequency " +
$"within {OVERRUN_ALARM_THRESHOLD_M} minutes. Please check whether there are any Monitor() " +
$"methods running frequently to assign values to DO and AO.");
}
_statCacheOverrunDuration = (int)_swTraceOverrun.Elapsed.TotalSeconds;
#endif
_swCacheCostMsStat.Restart();
// DO调用ForceCache方法可能发生在当前对象初始化之前此时_dictCachePerModule为null
if (_dictDataRecCachePerDataTable == null)
return;
// 立即缓存一次数据
foreach (var drc in _dictDataRecCachePerDataTable.Values)
drc.Cache();
_swCacheCostMsStat.Stop();
_statCacheCostMs = (int)_swCacheCostMsStat.Elapsed.TotalMilliseconds;
_swCacheInvMsStat.Restart();
_statCacheCnt++;
}
}
private readonly Stopwatch _swPersisCostMsStat = new();
private readonly Stopwatch _swPersisInvMsStat = new();
private int _statPersistCostMs;
private int _statPersistInvMs;
private int _statPersistCnt;
private int _statPersistErrorCnt;
private int _statPersistLines;
/// <summary>
/// 缓存写入数据库。
/// </summary>
/// <returns>插入数据库的行数。-1 表示写入数据库错误。</returns>
public int Persist()
{
var effectedLines = 0;
lock (_syncRoot)
{
// statistic persist called interval in ms.
_swPersisInvMsStat.Stop();
_statPersistInvMs = (int)_swPersisInvMsStat.Elapsed.TotalMilliseconds;
// persist data every 5sec
_swPersisCostMsStat.Restart();
foreach (var dcc in _dictDataRecCachePerDataTable.Values)
{
var sql = dcc.GetInsertSql();
if (!string.IsNullOrEmpty(sql))
{
try
{
var num = DB.ExecuteNonQuery(sql);
effectedLines += num;
}
catch (Exception ex)
{
LOG.Write(ex, $"{ModuleName.System.ToString()}写入数据库发生异常");
effectedLines = int.MinValue;
}
}
}
_swPersisCostMsStat.Stop();
_statPersistCostMs = (int)_swPersisCostMsStat.Elapsed.TotalMilliseconds;
}
if (effectedLines < 0)
{
_statPersistErrorCnt++;
_statPersistLines = -1;
}
else
{
_statPersistCnt++;
_statPersistLines = effectedLines;
}
// start to calculate the interval of persist operation
_swPersisInvMsStat.Restart();
return effectedLines;
}
public void Terminate()
{
_ctsDataRecorderThread.Cancel();
}
private bool UpdateTableSchema(string tblName, IEnumerable<IDataBuffer> holders)
{
var cmdText = $"select column_name from information_schema.columns where table_name = '{tblName}';";
var reader = DB.ExecuteReader(cmdText);
var sqlExprAddColumn = string.Empty;
var existedColumns = new List<string>();
while (reader.Read())
{
for (var i = 0; i < reader.FieldCount; i++)
{
existedColumns.Add(reader[i].ToString());
}
}
reader.Close();
if (existedColumns.Count > 0)
{
foreach (var holder in holders)
{
var colName = holder.Name;
if (!existedColumns.Contains(colName))
{
var type = holder.ValueType;
if (type == typeof(bool))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Boolean"};";
}
else if (DataTraceHelper.SupportedValueTypes.Contains(type))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Real"};";
}
}
}
if (!string.IsNullOrEmpty(sqlExprAddColumn))
{
try
{
DB.ExecuteNonQuery(sqlExprAddColumn);
}
catch (Exception ex)
{
LOG.Write(ex);
return false;
}
}
}
else
{
var sqlExprCreateTable = $"CREATE TABLE \"{tblName}\"(Time bigint NOT NULL,";
foreach (var dataItem in holders)
{
var colName = dataItem.Name;
var dataType = dataItem.Read().GetType();
if (dataType == typeof(bool))
{
sqlExprCreateTable += $"\"{colName}\" Boolean,\n";
}
else if (DataTraceHelper.SupportedValueTypes.Contains(dataType))
{
sqlExprCreateTable += $"\"{colName}\" Real,\n";
}
}
sqlExprCreateTable += $"CONSTRAINT \"{tblName}_pkey\" PRIMARY KEY (Time));";
sqlExprCreateTable += $"GRANT SELECT ON TABLE \"{tblName}\" TO postgres;";
try
{
DB.ExecuteNonQuery(sqlExprCreateTable);
}
catch (Exception ex2)
{
LOG.Write(ex2, "创建数据库表格失败");
return false;
}
}
return true;
}
#endregion
}
}