Sic.Framework-Nanjing-Baishi/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderManager.cs

362 lines
13 KiB
C#
Raw Normal View History

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Aitex.Core.RT.DataCenter;
using Aitex.Core.RT.DBCore;
using Aitex.Core.RT.Log;
using Aitex.Core.RT.SCCore;
using Aitex.Core.Util;
using MECF.Framework.Common.Equipment;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public class DataRecorderManager : Singleton<DataRecorderManager>
{
#region Variables
/// <summary>
/// 当前记录器支持的数据类型。
/// </summary>
private readonly Type[] _supportedDataTypes =
{ typeof(bool), typeof(double), typeof(float), typeof(int), typeof(ushort), typeof(short) };
private readonly object _lock = new();
private readonly int _dataRecorderInterval;
private readonly int _dataRecorderCacheSize;
private readonly int _dataRecorderCachePeriodMs;
private string[] _dataTableCategory = { ModuleName.System.ToString() };
private readonly Dictionary<string, Func<object>> _subscribedRecordedDataGetter = new();
private readonly CancellationTokenSource _ctsDataRecorderThread;
private readonly CancellationToken _ctDataRecorderThread;
private Dictionary<string, DataRecorderCache> _dictCachePerModule;
private DateTime _dateOfTableName;
private IDataRecorderCallback _callback;
#endregion
#region Constructors
public DataRecorderManager()
{
_dataRecorderInterval = (SC.ContainsItem("System.DataRecorderInterval")
? SC.GetValue<int>("System.DataCollectionInterval")
: 1000);
_dataRecorderCacheSize = (SC.ContainsItem("System.DataRecorderCacheSize")
? SC.GetValue<int>("System.DataRecorderCacheSize")
: DataRecorderCache.MIN_CACHE_SIZE);
_dataRecorderCachePeriodMs = (SC.ContainsItem("System.DataRecorderCachePeriod")
? SC.GetValue<int>("System.DataRecorderCachePeriod")
: DataRecorderCache.MIN_CACHE_PERIOD_MS);
_ctsDataRecorderThread = new();
_ctDataRecorderThread = _ctsDataRecorderThread.Token;
}
#endregion
#region Methods
public void Initialize(string[] dataTableCategory)
{
_dataTableCategory = dataTableCategory;
Task.Run(DataRecorderThread, _ctDataRecorderThread);
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
public void ImmediateCache()
{
DoCache();
}
private void DataRecorderThread()
{
try
{
while (true)
{
Thread.Sleep(_dataRecorderInterval);
_ctDataRecorderThread.ThrowIfCancellationRequested();
CreateCache();
// 开始周期性数据采集
var lastFlushTime = DateTime.Now;
while (true)
{
_ctDataRecorderThread.ThrowIfCancellationRequested();
Thread.Sleep((int)(_dataRecorderInterval * 0.99));
// 跨天,退出循环并创建新表
if (DateTime.Now.Date != _dateOfTableName)
break;
// 立即缓存一次数据
DoCache();
// 每隔5秒钟将缓存写入数据库
if (lastFlushTime.AddSeconds(5) < DateTime.Now)
{
var ret = Persist();
lastFlushTime = DateTime.Now;
if (ret < 0) // 插入数据错误,退出
break;
}
}
}
}
catch (OperationCanceledException)
{
LOG.Info($"{nameof(DataRecorderManager)}.{nameof(DataRecorderThread)} terminated.");
}
catch (Exception ex)
{
LOG.Write(ex, $"{nameof(DataRecorderManager)}.{nameof(DataRecorderThread)}线程意外终止,{ex.Message}");
}
}
/// <summary>
/// 扫描<see cref="DataManager"/>,获取所有可记录的数据。
/// </summary>
/// <remarks>
/// 仅数据管理器中注册的数据,且类型为<see cref="bool"/>、<see cref="double"/>、<see cref="float"/>、<see cref="int"/>、<see cref="ushort"/>、<see cref="short"/>的数据支持保存到数据库。
/// </remarks>>
private void GetRecordableDataSource()
{
var dBRecorderList = DATA.GetDBRecorderList();
foreach (var item in dBRecorderList)
{
if (_subscribedRecordedDataGetter.ContainsKey(item.Key))
{
continue;
}
var obj = item.Value();
if (obj != null)
{
var type = obj.GetType();
if (type == typeof(bool) || type == typeof(double) || type == typeof(float) ||
type == typeof(int) || type == typeof(ushort) || type == typeof(short))
{
_subscribedRecordedDataGetter[item.Key] = item.Value;
}
}
}
}
/// <summary>
/// 创建数据缓存对象。
/// </summary>
private void CreateCache()
{
lock (_lock)
{
GetRecordableDataSource();
_dictCachePerModule = new();
Dictionary<string, List<DataHolder>> dictCategory = new();
var defaultModuleName = (_dataTableCategory.Contains(ModuleName.System.ToString())
? ModuleName.System.ToString()
: (_dataTableCategory.Contains("Data") ? "Data" : _dataTableCategory[0]));
if (_subscribedRecordedDataGetter is { Count: > 0 })
{
foreach (var dataName in _subscribedRecordedDataGetter.Keys)
{
var isAvailableModuleName = false;
foreach (var categoryName in _dataTableCategory)
{
if (dataName.StartsWith(categoryName + ".") ||
dataName.StartsWith("IO." + categoryName + "."))
{
if (!dictCategory.ContainsKey(categoryName))
dictCategory[categoryName] = new List<DataHolder>();
dictCategory[categoryName].Add(new DataHolder(dictCategory[categoryName].Count,
dataName, _subscribedRecordedDataGetter[dataName]));
isAvailableModuleName = true;
break;
}
}
if (!isAvailableModuleName)
{
if (!dictCategory.ContainsKey(defaultModuleName))
dictCategory[defaultModuleName] = new List<DataHolder>();
dictCategory[defaultModuleName].Add(new DataHolder(
dictCategory[defaultModuleName].Count,
dataName, _subscribedRecordedDataGetter[dataName]));
}
}
}
_dateOfTableName = DateTime.Now.Date;
foreach (var module in dictCategory.Keys)
{
var dhs = dictCategory[module];
var tableName = $"{_dateOfTableName:yyyyMMdd}.{module}";
_dictCachePerModule[module] =
new DataRecorderCache(tableName, module, dhs, minCachePeriodMs: _dataRecorderCachePeriodMs,
maxCacheSize: _dataRecorderCacheSize);
UpdateTableSchema(tableName, dhs);
}
}
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
/// <returns></returns>
private void DoCache()
{
lock (_lock)
{
// DO调用ForceCache方法可能发生在当前对象初始化之前此时_dictCachePerModule为null
if (_dictCachePerModule == null)
return;
// 立即缓存一次数据
foreach (var drc in _dictCachePerModule.Values)
drc.Cache();
}
}
/// <summary>
/// 缓存写入数据库。
/// </summary>
/// <returns>插入数据库的行数。-1 表示写入数据库错误。</returns>
public int Persist()
{
var num = 0;
lock (_lock)
{
foreach (var dcc in _dictCachePerModule.Values)
{
var sql = dcc.GetInsertSql();
if (!string.IsNullOrEmpty(sql))
{
try
{
num = DB.ExecuteNonQuery(sql);
}
catch (Exception ex)
{
LOG.Write(ex, $"{ModuleName.System.ToString()}写入数据库发生异常");
num = -1;
}
}
}
}
return num;
}
public void Terminate()
{
_ctsDataRecorderThread.Cancel();
}
private bool UpdateTableSchema(string tblName, IEnumerable<DataHolder> dataCollectors)
{
var cmdText = $"select column_name from information_schema.columns where table_name = '{tblName}';";
var reader = DB.ExecuteReader(cmdText);
var sqlExprAddColumn = string.Empty;
var existedColumns = new List<string>();
while (reader.Read())
{
for (var i = 0; i < reader.FieldCount; i++)
{
existedColumns.Add(reader[i].ToString());
}
}
reader.Close();
if (existedColumns.Count > 0)
{
foreach (var dataItem in dataCollectors)
{
var colName = dataItem.Name;
if (!existedColumns.Contains(colName))
{
var type = dataItem.Read().GetType();
if (type == typeof(bool))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Boolean"};";
}
else if (_supportedDataTypes.Contains(type))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Real"};";
}
}
}
if (!string.IsNullOrEmpty(sqlExprAddColumn))
{
try
{
DB.ExecuteNonQuery(sqlExprAddColumn);
}
catch (Exception ex)
{
LOG.Write(ex);
return false;
}
}
}
else
{
var sqlExprCreateTable = $"CREATE TABLE \"{tblName}\"(Time bigint NOT NULL,";
foreach (var dataItem in dataCollectors)
{
var colName = dataItem.Name;
var dataType = dataItem.Read().GetType();
if (dataType == typeof(bool))
{
sqlExprCreateTable += $"\"{colName}\" Boolean,\n";
}
else if (_supportedDataTypes.Contains(dataType))
{
sqlExprCreateTable += $"\"{colName}\" Real,\n";
}
}
sqlExprCreateTable += $"CONSTRAINT \"{tblName}_pkey\" PRIMARY KEY (Time));";
sqlExprCreateTable += $"GRANT SELECT ON TABLE \"{tblName}\" TO postgres;";
try
{
DB.ExecuteNonQuery(sqlExprCreateTable.ToString());
}
catch (Exception ex2)
{
LOG.Write(ex2, "创建数据库表格失败");
return false;
}
}
return true;
}
#endregion
}
}