Sic.Framework-Nanjing-Baishi/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataTraceManager.cs

592 lines
23 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Aitex.Core.RT.DataCenter;
using Aitex.Core.RT.DBCore;
using Aitex.Core.RT.IOCore;
using Aitex.Core.RT.Log;
using Aitex.Core.RT.SCCore;
using Aitex.Core.Util;
using MECF.Framework.Common.Equipment;
using ThreadState = System.Threading.ThreadState;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public class DataTraceManager : Singleton<DataTraceManager>
{
#region Variables
private readonly object _syncRoot = new();
private readonly int _dataRecorderInterval;
private readonly int _dataRecorderCacheSize;
private readonly int _dataRecorderCachePeriodMs;
private string[] _dataTableCategory = { ModuleName.System.ToString() };
private readonly Dictionary<string, Func<object>> _dataGetters = new();
private readonly R_TRIG _rTrigPersistThreadError = new();
private readonly CancellationTokenSource _ctsDataRecorderThread;
private readonly CancellationToken _ctDataRecorderThread;
private Dictionary<string, DataTraceCache> _dictDataRecCachePerDataTable;
private DateTime _datePrefixTableName;
private readonly Thread _threadCache;
private readonly Thread _threadPersist;
private bool isAllowTraceAOHopping = false;
private bool isAllowTraceDOHopping = false;
#endregion
#region Constructors
public DataTraceManager()
{
_dataRecorderInterval = (SC.ContainsItem("System.DataRecorderInterval")
? SC.GetValue<int>("System.DataCollectionInterval")
: 1000);
_dataRecorderCacheSize = (SC.ContainsItem("System.DataRecorderCacheSize")
? SC.GetValue<int>("System.DataRecorderCacheSize")
: DataTraceCache.MIN_CACHE_SIZE);
_dataRecorderCachePeriodMs = (SC.ContainsItem("System.DataRecorderCachePeriod")
? SC.GetValue<int>("System.DataRecorderCachePeriod")
: DataTraceCache.MIN_CACHE_PERIOD_MS);
_ctsDataRecorderThread = new();
_ctDataRecorderThread = _ctsDataRecorderThread.Token;
_threadCache = new Thread(CacheThread);
_threadCache.Name = nameof(CacheThread);
_threadPersist = new Thread(PersistThread);
_threadPersist.Name = nameof(PersistThread);
}
#endregion
#region Methods
/// <summary>
/// 初始化<see cref="DataTraceManager"/>对象。
/// </summary>
/// <param name="dataTableCategory"></param>
public void Initialize(string[] dataTableCategory)
{
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistCostMs", () => _statPersistCostMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistInvMs", () => _statPersistInvMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistLines", () => _statPersistLines);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistCounter", () => _statPersistCnt);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.PersistErrorCounter", () => _statPersistErrorCnt);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheCostMs", () => _statCacheCostMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheInvMs", () => _statCacheInvMs);
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheCounter", () => _statCacheCnt);
#if CHECK_DATA_TRACE_OVERRUN_ISSUE
DATA.Subscribe($"{ModuleName.System}.{ModuleName.Diagnosis}.DataTrace.CacheOverrunDuration", () => _statCacheOverrunDuration);
#endif
const string SC_ALLOW_TRACE_AO_HOPPING = "System.AllowTraceAOHopping";
const string SC_ALLOW_TRACE_DO_HOPPING = "System.AllowTraceDOHopping";
isAllowTraceAOHopping = SC.GetValue(SC_ALLOW_TRACE_AO_HOPPING, true);
SC.RegisterValueChangedCallback(SC_ALLOW_TRACE_AO_HOPPING, o =>
{
if(!bool.TryParse(o.ToString(), out isAllowTraceAOHopping))
LOG.Error($"Unable to convert value {o} of [{SC_ALLOW_TRACE_AO_HOPPING}] to boolean.");
});
isAllowTraceDOHopping = SC.GetValue(SC_ALLOW_TRACE_DO_HOPPING, true);
SC.RegisterValueChangedCallback(SC_ALLOW_TRACE_DO_HOPPING, o =>
{
if(!bool.TryParse(o.ToString(), out isAllowTraceDOHopping))
LOG.Error($"Unable to convert value {o} of [{SC_ALLOW_TRACE_DO_HOPPING}] to boolean.");
});
_dataTableCategory = dataTableCategory;
_threadCache.Start();
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
public void ImmediateCache(IIOAccessor io = null, CacheDiagnosisInfo diagnosisInfo = null)
{
if (io is AOAccessor)
{
if (!isAllowTraceAOHopping)
return;
// maybe DB free disk space too low to trace AO hopping
var obj = DATA.Poll($"{ModuleName.System}.{ModuleName.Diagnosis}.DiskMon.IsDBFreeDiskSpaceLow");
if (obj is true)
return;
}
if (io is DOAccessor && !isAllowTraceDOHopping)
return;
if (diagnosisInfo != null)
Debug.WriteLine($"{diagnosisInfo.Module}.{diagnosisInfo.IoName} changed to {diagnosisInfo.Value}",
$"{nameof(DataTraceManager)} - {nameof(ImmediateCache)}");
lock (_syncRoot)
{
_tmrCacheThreadDelay.Stop();
}
DoCache();
}
#region Background Threads
private readonly DeviceTimer _tmrPersistDelay = new();
private void PersistThread()
{
try
{
while (true)
{
_ctDataRecorderThread.ThrowIfCancellationRequested();
Thread.Sleep(10);
if (!_tmrPersistDelay.IsIdle() && !_tmrPersistDelay.IsTimeout())
continue;
// next persist will be called 5sec later
_tmrPersistDelay.Start(5000);
var ret = Persist();
// check whether the DB write operation is succeeded
_rTrigPersistThreadError.CLK = ret < 0;
if (_rTrigPersistThreadError.Q) // 插入数据错误,退出
{
// DB Write Error
LOG.Error(
$"{nameof(DataTraceManager)}.{nameof(PersistThread)} Unable to persist the cache.");
}
}
}
catch (OperationCanceledException)
{
LOG.Info($"{nameof(DataTraceManager)}.{nameof(PersistThread)} terminated.");
}
catch (Exception ex)
{
LOG.Write(ex, $"{nameof(DataTraceManager)}.{nameof(PersistThread)} terminated unexpectedly{ex.Message}");
}
finally
{
try
{
Persist();
}
catch (Exception ex)
{
LOG.Write(ex,
$"{nameof(DataTraceManager)}.{nameof(PersistThread)} Failed to persist cache when exit the thread, {ex.Message}");
}
}
}
private readonly DeviceTimer _tmrCacheThreadDelay = new();
private void CacheThread()
{
try
{
while (true)
{
Thread.Sleep(_dataRecorderInterval);
_ctDataRecorderThread.ThrowIfCancellationRequested();
CreateCache();
_ctDataRecorderThread.ThrowIfCancellationRequested();
// start persist thread once the cache instances are built.
// note the persist thread must be started before CreateCache() method.
if (_threadPersist.ThreadState == ThreadState.Unstarted)
_threadPersist.Start();
// 开始周期性数据采集
while (true)
{
_ctDataRecorderThread.ThrowIfCancellationRequested();
Thread.Sleep(10);
// 跨天,退出循环并创建新表
if (DateTime.Now.Date != _datePrefixTableName)
break;
// 立即缓存一次数据
DoCache();
}
}
}
catch (OperationCanceledException)
{
LOG.Info($"{nameof(DataTraceManager)}.{nameof(CacheThread)} terminated.");
}
catch (Exception ex)
{
LOG.Write(ex, $"{nameof(DataTraceManager)}.{nameof(CacheThread)}线程意外终止,{ex.Message}");
}
}
#endregion
/// <summary>
/// 扫描<see cref="DataManager"/>,获取所有可记录的数据。
/// </summary>
/// <remarks>
/// 仅数据管理器中注册的数据,且类型为<see cref="bool"/>、<see cref="double"/>、<see cref="float"/>、<see cref="int"/>、<see cref="ushort"/>、<see cref="short"/>的数据支持保存到数据库。
/// </remarks>>
private void GetRecordableDataSource()
{
var dBRecorderList = DATA.GetDBRecorderList();
foreach (var item in dBRecorderList)
{
if (_dataGetters.ContainsKey(item.Key))
continue;
var getter = item.Value;
var value = getter.Invoke();
if (value != null)
{
var type = value.GetType();
if(DataTraceHelper.SupportedValueTypes.Contains(type))
_dataGetters[item.Key] = getter;
}
}
}
/// <summary>
/// 创建数据缓存对象。
/// </summary>
private void CreateCache()
{
lock (_syncRoot)
{
// persist the existing data since the date is changing.
if (_dictDataRecCachePerDataTable != null)
Persist();
GetRecordableDataSource();
_dictDataRecCachePerDataTable = new();
Dictionary<string, List<IDataBuffer>> dictDataBufferPerTable = new();
var defaultModuleName = (_dataTableCategory.Contains(ModuleName.System.ToString())
? ModuleName.System.ToString()
: (_dataTableCategory.Contains(ModuleName.System.ToString())
? ModuleName.System.ToString()
: _dataTableCategory[0]));
if (_dataGetters is { Count: > 0 })
{
foreach (var dataName in _dataGetters.Keys)
{
Type valueType;
Func<object> getter;
try
{
// determine the type of the data.
getter = _dataGetters[dataName];
var value = getter.Invoke();
valueType = value.GetType();
Debug.Assert(valueType != null, $"Unable to determine the type of DATA item {dataName}");
}
catch (Exception ex)
{
LOG.Write(ex, $"Unable to determine the type of DATA item {dataName}, {ex.Message}");
Debug.Assert(false, $"Unable to determine the type of DATA item {dataName}");
continue;
}
var isTableAssigned = false;
var categoryName = "";
foreach (var category in _dataTableCategory)
{
if (dataName.StartsWith(category + ".") ||
dataName.StartsWith("IO." + category + "."))
{
if (!dictDataBufferPerTable.ContainsKey(category))
dictDataBufferPerTable[category] = [];
categoryName = category;
isTableAssigned = true;
break;
}
}
if (!isTableAssigned)
{
if (!dictDataBufferPerTable.ContainsKey(defaultModuleName))
dictDataBufferPerTable[defaultModuleName] = [];
categoryName = defaultModuleName;
}
// create DataHolder and add to dictDataHolderPerDataTable.
var index = dictDataBufferPerTable[categoryName].Count;
var type = typeof(DataBuffer<>).MakeGenericType(valueType);
var holder = (IDataBuffer)Activator.CreateInstance(type, index, dataName, getter);
if (holder == null)
{
LOG.Error($"Unable to create instance of DataBuffer for {dataName}.");
Debug.Assert(holder != null, "Unable to create instance of DataBuffer.");
}
else
{
dictDataBufferPerTable[categoryName].Add(holder);
}
}
}
_datePrefixTableName = DateTime.Now.Date;
foreach (var perTable in dictDataBufferPerTable.Keys)
{
var holders = dictDataBufferPerTable[perTable];
var tableName = $"{_datePrefixTableName:yyyyMMdd}.{perTable}";
_dictDataRecCachePerDataTable[perTable] =
new DataTraceCache(tableName, perTable, holders, minCachePeriodMs: _dataRecorderCachePeriodMs,
maxCacheSize: _dataRecorderCacheSize);
UpdateTableSchema(tableName, holders);
}
}
}
private int _statCacheCostMs;
private int _statCacheInvMs;
private int _statCacheCnt;
private int _statCacheOverrunDuration;
private readonly R_TRIG _rTrigTraceOverrun = new();
private readonly Stopwatch _swTraceOverrun = new();
private readonly Stopwatch _swCacheCostMsStat = new();
private readonly Stopwatch _swCacheInvMsStat = new();
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
/// <returns></returns>
private void DoCache()
{
lock (_syncRoot)
{
if (!_tmrCacheThreadDelay.IsIdle() && !_tmrCacheThreadDelay.IsTimeout())
return;
_tmrCacheThreadDelay.Start(_dataRecorderInterval);
// statistic cache called interval in ms.
_swCacheInvMsStat.Stop();
_statCacheInvMs = (int)_swCacheInvMsStat.Elapsed.TotalMilliseconds;
#if CHECK_DATA_TRACE_OVERRUN_ISSUE
const int OVERRUN_ALARM_THRESHOLD_M = 60;
// Check data-trace-overrun issue
_rTrigTraceOverrun.CLK = _statCacheInvMs < 900;
if (_rTrigTraceOverrun.Q && !_swTraceOverrun.IsRunning)
{
_swTraceOverrun.Restart();
}
else if (!_rTrigTraceOverrun.M && _swTraceOverrun.IsRunning)
{
_swTraceOverrun.Stop();
}
if (_swTraceOverrun.Elapsed.TotalSeconds > TimeSpan.FromMinutes(OVERRUN_ALARM_THRESHOLD_M).TotalSeconds)
{
_swTraceOverrun.Stop();
_rTrigTraceOverrun.RST = true;
Debug.Assert(false, "Data-Trace-Overrun Issue Detected",
$"The {nameof(DataTraceManager)} continues to trace data at a high frequency " +
$"within {OVERRUN_ALARM_THRESHOLD_M} minutes. Please check whether there are any Monitor() " +
$"methods running frequently to assign values to DO and AO.");
}
_statCacheOverrunDuration = (int)_swTraceOverrun.Elapsed.TotalSeconds;
#endif
_swCacheCostMsStat.Restart();
// DO调用ForceCache方法可能发生在当前对象初始化之前此时_dictCachePerModule为null
if (_dictDataRecCachePerDataTable == null)
return;
// 立即缓存一次数据
foreach (var drc in _dictDataRecCachePerDataTable.Values)
drc.Cache();
_swCacheCostMsStat.Stop();
_statCacheCostMs = (int)_swCacheCostMsStat.Elapsed.TotalMilliseconds;
_swCacheInvMsStat.Restart();
_statCacheCnt++;
}
}
private readonly Stopwatch _swPersisCostMsStat = new();
private readonly Stopwatch _swPersisInvMsStat = new();
private int _statPersistCostMs;
private int _statPersistInvMs;
private int _statPersistCnt;
private int _statPersistErrorCnt;
private int _statPersistLines;
/// <summary>
/// 缓存写入数据库。
/// </summary>
/// <returns>插入数据库的行数。-1 表示写入数据库错误。</returns>
public int Persist()
{
var effectedLines = 0;
lock (_syncRoot)
{
// statistic persist called interval in ms.
_swPersisInvMsStat.Stop();
_statPersistInvMs = (int)_swPersisInvMsStat.Elapsed.TotalMilliseconds;
// persist data every 5sec
_swPersisCostMsStat.Restart();
foreach (var dcc in _dictDataRecCachePerDataTable.Values)
{
var sql = dcc.GetInsertSql();
if (!string.IsNullOrEmpty(sql))
{
try
{
var num = DB.ExecuteNonQuery(sql);
effectedLines += num;
}
catch (Exception ex)
{
LOG.Write(ex, $"{ModuleName.System.ToString()}写入数据库发生异常");
effectedLines = int.MinValue;
}
}
}
_swPersisCostMsStat.Stop();
_statPersistCostMs = (int)_swPersisCostMsStat.Elapsed.TotalMilliseconds;
}
if (effectedLines < 0)
{
_statPersistErrorCnt++;
_statPersistLines = -1;
}
else
{
_statPersistCnt++;
_statPersistLines = effectedLines;
}
// start to calculate the interval of persist operation
_swPersisInvMsStat.Restart();
return effectedLines;
}
public void Terminate()
{
_ctsDataRecorderThread.Cancel();
}
private bool UpdateTableSchema(string tblName, IEnumerable<IDataBuffer> holders)
{
var cmdText = $"select column_name from information_schema.columns where table_name = '{tblName}';";
var reader = DB.ExecuteReader(cmdText);
var sqlExprAddColumn = string.Empty;
var existedColumns = new List<string>();
while (reader.Read())
{
for (var i = 0; i < reader.FieldCount; i++)
{
existedColumns.Add(reader[i].ToString());
}
}
reader.Close();
if (existedColumns.Count > 0)
{
foreach (var holder in holders)
{
var colName = holder.Name;
if (!existedColumns.Contains(colName))
{
var type = holder.ValueType;
if (type == typeof(bool))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Boolean"};";
}
else if (DataTraceHelper.SupportedValueTypes.Contains(type))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Real"};";
}
}
}
if (!string.IsNullOrEmpty(sqlExprAddColumn))
{
try
{
DB.ExecuteNonQuery(sqlExprAddColumn);
}
catch (Exception ex)
{
LOG.Write(ex);
return false;
}
}
}
else
{
var sqlExprCreateTable = $"CREATE TABLE \"{tblName}\"(Time bigint NOT NULL,";
foreach (var dataItem in holders)
{
var colName = dataItem.Name;
var dataType = dataItem.Read().GetType();
if (dataType == typeof(bool))
{
sqlExprCreateTable += $"\"{colName}\" Boolean,\n";
}
else if (DataTraceHelper.SupportedValueTypes.Contains(dataType))
{
sqlExprCreateTable += $"\"{colName}\" Real,\n";
}
}
sqlExprCreateTable += $"CONSTRAINT \"{tblName}_pkey\" PRIMARY KEY (Time));";
sqlExprCreateTable += $"GRANT SELECT ON TABLE \"{tblName}\" TO postgres;";
try
{
DB.ExecuteNonQuery(sqlExprCreateTable);
}
catch (Exception ex2)
{
LOG.Write(ex2, "创建数据库表格失败");
return false;
}
}
return true;
}
#endregion
}
}