Sic.Framework-Nanjing-Baishi/MECF.Framework.Common/Aitex/Core/RT/DataCollection/HighPerformance/DataRecorderManager.cs

372 lines
13 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Aitex.Core.RT.DataCenter;
using Aitex.Core.RT.DBCore;
using Aitex.Core.RT.Event;
using Aitex.Core.RT.Log;
using Aitex.Core.RT.SCCore;
using Aitex.Core.Util;
using MECF.Framework.Common.Equipment;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public class DataRecorderManager : Singleton<DataRecorderManager>
{
#region Variables
/// <summary>
/// 当前记录器支持的数据类型。
/// </summary>
private readonly Type[] _supportedDataTypes =
{ typeof(bool), typeof(double), typeof(float), typeof(int), typeof(ushort), typeof(short) };
private IDataBase _db;
private readonly object _lock = new();
private readonly int _dataRecorderInterval;
private readonly int _dataRecorderCacheSize;
private readonly int _dataRecorderCachePeriodMs;
private string[] _modules = { "Data" };
private readonly Dictionary<string, Func<object>> _subscribedRecordedDataGetter = new();
private Dictionary<string, DataRecorderCache> _dictCachePerModule;
private DateTime _dateOfTableName;
private IDataRecorderCallback _callback;
#endregion
#region Constructors
public DataRecorderManager()
{
_dataRecorderInterval = (SC.ContainsItem("System.DataRecorderInterval")
? SC.GetValue<int>("System.DataCollectionInterval")
: 1000);
_dataRecorderCacheSize = (SC.ContainsItem("System.DataRecorderCacheSize")
? SC.GetValue<int>("System.DataRecorderCacheSize")
: DataRecorderCache.MIN_CACHE_SIZE);
_dataRecorderCachePeriodMs = (SC.ContainsItem("System.DataRecorderCachePeriod")
? SC.GetValue<int>("System.DataRecorderCachePeriod")
: DataRecorderCache.MIN_CACHE_PERIOD_MS);
}
#endregion
#region Methods
public void Initialize(IDataRecorderCallback callback)
{
IDataRecorderCallback callback2;
if (callback != null)
{
callback2 = callback;
}
else
{
IDataRecorderCallback dataCollectionCallback = new DefaultDataRecorderCallback();
callback2 = dataCollectionCallback;
}
_callback = callback2;
Task.Factory.StartNew(DataRecorderThread);
}
public void Initialize(string[] modules, string dbName)
{
_callback = new DefaultDataRecorderCallback(dbName);
if (modules != null && modules.Length != 0 && !string.IsNullOrEmpty(modules[0]))
{
_modules = modules;
}
Task.Factory.StartNew(DataRecorderThread);
}
public void Initialize(string dbName)
{
_callback = new DefaultDataRecorderCallback(dbName);
Task.Factory.StartNew(DataRecorderThread);
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
public void ImmediateCache()
{
DoCache();
}
private void DataRecorderThread()
{
while (true)
{
Thread.Sleep(_dataRecorderInterval);
try
{
CreateCache();
// 开始周期性数据采集
var lastFlushTime = DateTime.Now;
while (true)
{
Thread.Sleep((int)(_dataRecorderInterval * 0.99));
// 跨天,退出循环并创建新表
if (DateTime.Now.Date != _dateOfTableName)
break;
// 立即缓存一次数据
DoCache();
// 每隔5秒钟将缓存写入数据库
if (lastFlushTime.AddSeconds(5) < DateTime.Now)
{
var ret = Persist();
lastFlushTime = DateTime.Now;
if (ret < 0) // 插入数据错误,退出
break;
}
}
}
catch (Exception ex)
{
LOG.Write(ex, "数据库操作记录发生异常");
}
}
}
/// <summary>
/// 扫描<see cref="DataManager"/>,获取所有可记录的数据。
/// </summary>
/// <remarks>
/// 仅数据管理器中注册的数据,且类型为<see cref="bool"/>、<see cref="double"/>、<see cref="float"/>、<see cref="int"/>、<see cref="ushort"/>、<see cref="short"/>的数据支持保存到数据库。
/// </remarks>>
private void GetRecordableDataSource()
{
var dBRecorderList = DATA.GetDBRecorderList();
foreach (var item in dBRecorderList)
{
if (_subscribedRecordedDataGetter.ContainsKey(item.Key))
{
continue;
}
var obj = item.Value();
if (obj != null)
{
var type = obj.GetType();
if (type == typeof(bool) || type == typeof(double) || type == typeof(float) ||
type == typeof(int) || type == typeof(ushort) || type == typeof(short))
{
_subscribedRecordedDataGetter[item.Key] = item.Value;
}
}
}
}
/// <summary>
/// 创建数据缓存对象。
/// </summary>
private void CreateCache()
{
lock (_lock)
{
GetRecordableDataSource();
_dictCachePerModule = new();
Dictionary<string, List<DataHolder>> dictModules = new();
var defaultModuleName = (_modules.Contains(ModuleName.System.ToString())
? ModuleName.System.ToString()
: (_modules.Contains("Data") ? "Data" : _modules[0]));
if (_subscribedRecordedDataGetter is { Count: > 0 })
{
foreach (var dataName in _subscribedRecordedDataGetter.Keys)
{
var isAvailableModuleName = false;
foreach (var module in _modules)
{
if (dataName.StartsWith(module + ".") ||
dataName.StartsWith("IO." + module + "."))
{
if (!dictModules.ContainsKey(module))
dictModules[module] = new List<DataHolder>();
dictModules[module].Add(new DataHolder(dictModules[module].Count,
dataName, _subscribedRecordedDataGetter[dataName]));
isAvailableModuleName = true;
break;
}
}
if (!isAvailableModuleName)
{
if (!dictModules.ContainsKey(defaultModuleName))
dictModules[defaultModuleName] = new List<DataHolder>();
dictModules[defaultModuleName].Add(new DataHolder(
dictModules[defaultModuleName].Count,
dataName, _subscribedRecordedDataGetter[dataName]));
}
}
}
_dateOfTableName = DateTime.Now.Date;
foreach (var module in dictModules.Keys)
{
var dhs = dictModules[module];
var tableName = $"{_dateOfTableName:yyyyMMdd}.{module}";
_dictCachePerModule[module] =
new DataRecorderCache(tableName, module, dhs, minCachePeriodMs: _dataRecorderCachePeriodMs,
maxCacheSize: _dataRecorderCacheSize);
UpdateTableSchema(tableName, dhs);
}
}
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
/// <returns></returns>
private void DoCache()
{
lock (_lock)
{
// DO调用ForceCache方法可能发生在当前对象初始化之前此时_dictCachePerModule为null
if (_dictCachePerModule == null)
return;
// 立即缓存一次数据
foreach (var drc in _dictCachePerModule.Values)
drc.Cache();
}
}
/// <summary>
/// 缓存写入数据库。
/// </summary>
/// <returns>插入数据库的行数。-1 表示写入数据库错误。</returns>
public int Persist()
{
var num = 0;
lock (_lock)
{
foreach (var dcc in _dictCachePerModule.Values)
{
var sql = dcc.GetInsertSql();
if (!string.IsNullOrEmpty(sql))
{
try
{
num = DB.ExecuteNonQuery(sql);
}
catch (Exception ex)
{
LOG.Write(ex, $"{ModuleName.System.ToString()}写入数据库发生异常");
num = -1;
}
}
}
}
return num;
}
private bool UpdateTableSchema(string tblName, IEnumerable<DataHolder> dataCollectors)
{
var cmdText = $"select column_name from information_schema.columns where table_name = '{tblName}';";
var reader = DB.ExecuteReader(cmdText);
var sqlExprAddColumn = string.Empty;
var existedColumns = new List<string>();
while (reader.Read())
{
for (var i = 0; i < reader.FieldCount; i++)
{
existedColumns.Add(reader[i].ToString());
}
}
reader.Close();
if (existedColumns.Count > 0)
{
foreach (var dataItem in dataCollectors)
{
var colName = dataItem.Name;
if (!existedColumns.Contains(colName))
{
var type = dataItem.Read().GetType();
if (type == typeof(bool))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Boolean"};";
}
else if (_supportedDataTypes.Contains(type))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Real"};";
}
}
}
if (!string.IsNullOrEmpty(sqlExprAddColumn))
{
try
{
DB.ExecuteNonQuery(sqlExprAddColumn);
}
catch (Exception ex)
{
LOG.Write(ex);
return false;
}
}
}
else
{
var sqlExprCreateTable = $"CREATE TABLE \"{tblName}\"(Time bigint NOT NULL,";
foreach (var dataItem in dataCollectors)
{
var colName = dataItem.Name;
var dataType = dataItem.Read().GetType();
if (dataType == typeof(bool))
{
sqlExprCreateTable += $"\"{colName}\" Boolean,\n";
}
else if (_supportedDataTypes.Contains(dataType))
{
sqlExprCreateTable += $"\"{colName}\" Real,\n";
}
}
sqlExprCreateTable += $"CONSTRAINT \"{tblName}_pkey\" PRIMARY KEY (Time));";
sqlExprCreateTable += $"GRANT SELECT ON TABLE \"{tblName}\" TO postgres;";
try
{
DB.ExecuteNonQuery(sqlExprCreateTable.ToString());
}
catch (Exception ex2)
{
LOG.Write(ex2, "创建数据库表格失败");
return false;
}
}
return true;
}
}
#endregion
}