增加DataRecorderManager及其相关对象。
This commit is contained in:
SL 2023-10-08 21:18:59 +08:00
parent 0b643ac5c9
commit 5521325263
9 changed files with 1018 additions and 7 deletions

View File

@ -5,9 +5,6 @@ using System.Collections.Generic;
using System.Configuration;
using System.Data.Common;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Aitex.Core.RT.DBCore
{
@ -16,18 +13,31 @@ namespace Aitex.Core.RT.DBCore
/// <summary>
/// 数据库名称
/// </summary>
public string DBName;
private readonly string _dbName;
/// <summary>
/// 连接字符串
/// </summary>
private string _connString = ConfigurationManager.ConnectionStrings["PostgreSQL"].ConnectionString;
private readonly string _connString;
#region Constructors
public PostgreSQLHelper(string dBName)
{
DBName = dBName;
_dbName = dBName;
_connString = ConfigurationManager.ConnectionStrings["PostgreSQL"].ConnectionString;
}
#endregion
#region Properties
public string DBName => _dbName;
public string ConnectionString => _connString;
#endregion
public string GetSqlByNameType(string name, Type type)
{
string sql = string.Empty;

View File

@ -0,0 +1,99 @@
using Aitex.Core.RT.Log;
using Aitex.Core.Util;
using System;
using System.Collections.Generic;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
/// <summary>
/// 数据收集器对象,用于收集在<see cref="DataManager"/>中注册的数据源的数据。
/// </summary>
public class DataHolder
{
#region Variables
private readonly Dictionary<long, object> _buffer = new ();
private readonly R_TRIG _rTrigReadFailed = new ();
#endregion
#region Constructors
public DataHolder(int index, string name, Func<object> read)
{
Index = index;
Name = name;
Read = read;
}
#endregion
#region Properties
/// <summary>
/// 返回数据获取器的序号。
/// </summary>
public int Index { get; }
/// <summary>
/// 返回数据获取器名称。
/// </summary>
public string Name { get; }
/// <summary>
/// 返回数据获取器。
/// </summary>
internal Func<object> Read{ get; }
/// <summary>
/// 返回缓存数据的长度。
/// </summary>
internal int CacheCount => _buffer.Count;
#endregion
#region Methods
/// <summary>
/// 缓存数据。
/// </summary>
/// <param name="timestamp"></param>
internal void Cache(long timestamp)
{
try
{
_buffer[timestamp] = Read();
_rTrigReadFailed.CLK = false;
}
catch (Exception ex)
{
// 首次发生错误时记录日志。
_rTrigReadFailed.CLK = true;
if(_rTrigReadFailed.Q)
LOG.Error($"数据获取器 {Name} 在 {timestamp} 时发生错误。", ex);
_buffer[timestamp] = null;
}
}
/// <summary>
/// 获取指定时间戳的数据。
/// </summary>
/// <param name="timestamp">时间戳。</param>
/// <returns></returns>
internal object Get(long timestamp)
{
if (_buffer.TryGetValue(timestamp, out var value))
{
_buffer.Remove(timestamp);
return value;
}
else
{
return null;
}
}
#endregion
}
}

View File

@ -0,0 +1,304 @@
using Aitex.Core.RT.Event;
using Aitex.Core.RT.Log;
using Aitex.Core.Util;
using MECF.Framework.Common.Equipment;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
/// <summary>
/// 数据收集器缓存对象。
/// </summary>
/// <remarks>
/// 每个Table对应一个DataRecordCache对象。该对象负责缓存数据而不提供任何写数据库相关操作。
/// <see cref="DataRecorderManager"/>周期性调用<see cref="GetInsertSql"/>方法获取数据库写入表达式进行持久化操作。
/// </remarks>
internal class DataRecorderCache : IDataRecorderCache
{
#region Variables
internal const int SQL_BUILD_DURATION_TOO_SLOW_MS = 200;
internal const int MIN_CACHE_SIZE = 1000;
internal const int MIN_CACHE_PERIOD_MS = 10;
internal const int MIN_PERSIST_ITEMS = 100;
private const int MAX_SIZE_SQL_EXPR = 5000000; // approx. 5MB
private readonly object _syncRoot = new();
private readonly int _maxItemsPerPersist;
private readonly int _minCachePeriodMs;
private readonly int _maxCacheSize;
private readonly string _tableName;
private readonly string _insertExpression;
private readonly Queue<long> _qCachedTimestamps;
private readonly Stopwatch _stopwatch = new();
private readonly StringBuilder _sqlExpr = new (MAX_SIZE_SQL_EXPR); // preallocate 5MB
private readonly StringBuilder _sqlValuesList = new (MAX_SIZE_SQL_EXPR); // preallocate 5MB
private readonly R_TRIG _rTrigCacheHalfFull = new ();
private readonly R_TRIG _rTrigCacheAlmostFull = new ();
private readonly R_TRIG _rTrigCacheFull = new ();
private readonly R_TRIG _rTrigBuildSqlExprTooSlow = new ();
private int _lastPersistRows = 0;
private double _lastCachePeriodMs = 0;
#endregion
#region Constructors
/// <summary>
/// 创建数据记录器缓存对象。
/// </summary>
/// <param name="tableName">当前Cache对应的数据库表名称。</param>
/// <param name="module">当前Cache对应的分组名称。</param>
/// <param name="dataHolders">数据获取器对象集合保存当前Cache需要缓存的数据源。</param>
/// <param name="minCachePeriodMs">最小缓存周期,如果外部频繁调用<see cref="Cache"/>方法请求缓存数据,该周期内的重复请求被忽略,以避免高频缓存导致性能下降。</param>
/// <param name="maxItemsPerPersist">每次持久化的最大行数,避免一次性对数据库写入太多数据造成性能问题。</param>
/// <param name="maxCacheSize">数据缓存的最大项目数。</param>
/// <exception cref="ArgumentException"></exception>
public DataRecorderCache(string tableName, string module, IReadOnlyList<DataHolder> dataHolders, int minCachePeriodMs = 50, int maxItemsPerPersist = 2000, int maxCacheSize = 100)
{
Debug.Assert(dataHolders is { Count: > 0 }, "Incorrect Data Holders List.");
Module = module;
DataHolders = dataHolders;
_tableName = tableName;
_maxItemsPerPersist = maxItemsPerPersist < MIN_PERSIST_ITEMS ? MIN_PERSIST_ITEMS : maxItemsPerPersist;
_minCachePeriodMs = minCachePeriodMs < MIN_CACHE_PERIOD_MS ? MIN_CACHE_PERIOD_MS : minCachePeriodMs;
_maxCacheSize = maxCacheSize < MIN_CACHE_SIZE ? MIN_CACHE_SIZE : maxCacheSize;
_qCachedTimestamps = new Queue<long>(maxCacheSize);
// 必须传入有效的数据收集器列表。
if (dataHolders == null || dataHolders.Count == 0)
throw new ArgumentException("数据保持器列表不能为空。", nameof(dataHolders));
// 检查数据收集器列表中是否存在重复的序号。
if (dataHolders.GroupBy(dc => dc.Index).Any(g => g.Count() > 1))
throw new ArgumentException("数据保持器列表中存在重复的序号。", nameof(dataHolders));
// 检查数据收集器列表中是否存在重复的名称。
if (dataHolders.GroupBy(dc => dc.Name).Any(g => g.Count() > 1))
throw new ArgumentException("数据保持器列表中存在重复的名称。", nameof(dataHolders));
// 检查数据收集器列表中是否存在空的获取器。
if (dataHolders.Any(dc => dc.Read == null))
throw new ArgumentException("数据保持器列表中存在空的保持器。", nameof(dataHolders));
_insertExpression = BuildInsertExpression();
}
#endregion
#region Properties
/// <summary>
/// 返回当前缓存服务的模组名称。
/// </summary>
public string Module { get; }
/// <summary>
/// 返回数据获取器列表。
/// </summary>
public IReadOnlyList<DataHolder> DataHolders { get; }
#endregion
#region Methods
/// <summary>
/// 创建Insert语句。
/// </summary>
/// <returns></returns>
private string BuildInsertExpression()
{
var sqlExpr = new StringBuilder();
lock (_syncRoot)
{
sqlExpr.Append($"INSERT INTO \"{_tableName}\"(\"time\" ");
foreach (var holder in DataHolders)
{
sqlExpr.Append($",\"{holder.Name}\"");
}
sqlExpr.Append(")");
}
return sqlExpr.ToString();
}
/// <summary>
/// 立即缓存当前数据。
/// </summary>
public void Cache()
{
lock (_syncRoot)
{
#region Performance Monitor
// 限制最大Cache频率
if(_stopwatch.IsRunning && _stopwatch.ElapsedMilliseconds < _minCachePeriodMs)
return;
//TODO 需要限制Q的最大数据量并且检测HalfFull和Full事件并触发警告可能PC性能出现问题
if (_qCachedTimestamps.Count >= _maxCacheSize)
{
// 缓存已满,不再缓存数据
_rTrigCacheFull.CLK = true;
if(_rTrigCacheFull.Q)
EV.PostWarningLog(ModuleName.System.ToString(), $"[{Module}]DataRecorderCache Full, Capacity {_maxCacheSize} items");
return;
}
if (_qCachedTimestamps.Count >= _maxCacheSize / 2)
{
// 缓存的数据量已经超过一半,触发警告
_rTrigCacheHalfFull.CLK = true;
if(_rTrigCacheHalfFull.Q)
EV.PostWarningLog(ModuleName.System.ToString(), $"[{Module}]DataRecorderCache Half full, Capacity {_maxCacheSize} items");
}
else if (_qCachedTimestamps.Count >= _maxCacheSize * 0.9)
{
// 缓存的数据量已经超过90%,触发警告
_rTrigCacheAlmostFull.CLK = true;
if(_rTrigCacheAlmostFull.Q)
EV.PostWarningLog(ModuleName.System.ToString(), $"[{Module}]DataRecorderCache Almost full, Usage {_qCachedTimestamps.Count} of {_maxCacheSize} items");
}
else
{
_rTrigCacheFull.CLK = false;
_rTrigCacheHalfFull.CLK = false;
_rTrigCacheAlmostFull.CLK = false;
}
#endregion
_lastCachePeriodMs = _stopwatch.ElapsedMilliseconds;
_stopwatch.Restart();
var ts = DateTime.Now.Ticks;
if (_qCachedTimestamps.Contains(ts))
{
LOG.Error($"时间戳{ts}已经存在。");
return;
}
var ret = Parallel.ForEach(DataHolders, holder => { holder.Cache(ts); });
if (ret.IsCompleted)
{
_qCachedTimestamps.Enqueue(ts);
}
}
}
/// <summary>
/// 获取将缓存数据写入数据库的SQL语句。
/// </summary>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public string GetInsertSql()
{
lock (_syncRoot)
{
// 如果没数据,直接退出
if (_qCachedTimestamps.Count == 0)
return "";
var sw = new Stopwatch();
sw.Start();
var pickedCount = 0;
_sqlValuesList.Clear();
while (true)
{
// 没有缓存的数据了,退出
if (_qCachedTimestamps.Count == 0)
break;
var ts = _qCachedTimestamps.Dequeue();
{
_sqlValuesList.Append("(");
_sqlValuesList.Append($"'{ts}',");
foreach (var holder in DataHolders)
{
var value = holder.Get(ts);
switch (value)
{
case null:
_sqlValuesList.Append("'0'");
break;
case double or float:
var dValue = Convert.ToDouble(value);
if (double.IsNaN(dValue))
dValue = 0.0;
_sqlValuesList.Append($"'{dValue.ToString(CultureInfo.InvariantCulture)}'");
break;
case bool b:
_sqlValuesList.Append($"'{(b ? 1 : 0)}'");
break;
default:
_sqlValuesList.Append($"'{value}'");
break;
}
_sqlValuesList.Append(",");
}
_sqlValuesList.Remove(_sqlValuesList.Length - 1, 1);
_sqlValuesList.Append("),");
}
pickedCount++;
if (pickedCount >= _maxItemsPerPersist)
break;
}
if (_sqlValuesList.Length <= 0)
{
_lastPersistRows = 0;
return "";
}
else
{
_lastPersistRows = pickedCount;
}
_sqlValuesList.Remove(_sqlValuesList.Length - 1, 1);
_sqlValuesList.Append(";");
// 拼接完整的SQL语句
_sqlExpr.Clear();
_sqlExpr.Append(_insertExpression);
_sqlExpr.Append(" VALUES ");
_sqlExpr.Append(_sqlValuesList);
sw.Stop();
//TODO 加入性能监视器如果执行时间过长则引发Warn事件
_rTrigBuildSqlExprTooSlow.CLK = sw.ElapsedMilliseconds > SQL_BUILD_DURATION_TOO_SLOW_MS;
if (_rTrigBuildSqlExprTooSlow.Q)
EV.PostWarningLog(ModuleName.System.ToString(), $"DataRecorderCache Build SQL Expression Too Slow, Took {sw.ElapsedMilliseconds:F1}ms greater than {SQL_BUILD_DURATION_TOO_SLOW_MS:F1}ms");
Debug.WriteLine(
$"[{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}] GenSQL:Ln{_lastPersistRows}/{sw.ElapsedMilliseconds}ms/B{_sqlExpr.Length}, LastCachePeriod:{_lastCachePeriodMs}ms, CacheRemained: {_qCachedTimestamps.Count}",
$"DIAG DBRC - {Module}");
return _sqlExpr.ToString();
}
}
#endregion
}
}

View File

@ -0,0 +1,150 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
internal class DataRecorderDataTableCache : IDataRecorderCache
{
#region Variables
private readonly object _syncRoot = new();
private readonly int _maxRowsPerFlush;
private readonly string _tableName;
private DataTable _dataTable;
private readonly Queue<long> _qCachedTimestamps = new();
private readonly int _minCachePeriodMs;
private readonly Stopwatch _stopwatch = new();
private readonly StringBuilder _sqlExpr = new(5000000); // preallocate 5MB
private readonly StringBuilder _sqlValuesList = new(5000000); // preallocate 5MB
private int _lastPersistRows = 0;
private double _lastCachePeriodMs = 0;
#endregion
#region Constructors
/// <summary>
/// 创建数据记录器缓存对象。
/// </summary>
/// <param name="tableName">当前Cache对应的数据库表名称。</param>
/// <param name="module">当前Cache对应的分组名称。</param>
/// <param name="dataHolders">数据获取器对象集合保存当前Cache需要缓存的数据源。</param>
/// <param name="minCachePeriodMs">最小缓存周期,如果外部频繁调用<see cref="Cache"/>方法请求缓存数据,该周期内的重复请求被忽略,以避免高频缓存导致性能下降。</param>
/// <param name="maxRowsPerFlush">每次持久化的最大行数,避免一次性对数据库写入太多数据造成性能问题。</param>
/// <exception cref="ArgumentException"></exception>
public DataRecorderDataTableCache(string tableName, string module, IReadOnlyList<DataHolder> dataHolders, int minCachePeriodMs = 50, int maxRowsPerFlush = 2000)
{
Debug.Assert(dataHolders is { Count: > 0 }, "Incorrect Data Collector List.");
Module = module;
DataHolders = dataHolders;
_tableName = tableName;
_maxRowsPerFlush = maxRowsPerFlush;
_minCachePeriodMs = minCachePeriodMs;
// 必须传入有效的数据收集器列表。
if (dataHolders == null || dataHolders.Count == 0)
throw new ArgumentException("数据收集器列表不能为空。", nameof(dataHolders));
// 检查数据收集器列表中是否存在重复的序号。
if (dataHolders.GroupBy(dc => dc.Index).Any(g => g.Count() > 1))
throw new ArgumentException("数据收集器列表中存在重复的序号。", nameof(dataHolders));
// 检查数据收集器列表中是否存在重复的名称。
if (dataHolders.GroupBy(dc => dc.Name).Any(g => g.Count() > 1))
throw new ArgumentException("数据收集器列表中存在重复的名称。", nameof(dataHolders));
// 检查数据收集器列表中是否存在空的获取器。
if (dataHolders.Any(dc => dc.Read == null))
throw new ArgumentException("数据收集器列表中存在空的获取器。", nameof(dataHolders));
_dataTable = CreateDataTable();
/*DATA.Subscribe($"{ModuleName.System}.DBRC.{Module}.CachedCount", ()=> _qCachedTimestamps.Count, SubscriptionAttribute.FLAG.IgnoreSaveDB);
DATA.Subscribe($"{ModuleName.System}.DBRC.{Module}.PersistRows", ()=> _lastPersistRows, SubscriptionAttribute.FLAG.IgnoreSaveDB);
DATA.Subscribe($"{ModuleName.System}.DBRC.{Module}.PersistPeriodMs", ()=> _lastPersistPeriodMs, SubscriptionAttribute.FLAG.IgnoreSaveDB);*/
}
#endregion
#region Properties
/// <summary>
/// 返回当前缓存服务的模组名称。
/// </summary>
public string Module { get; }
/// <summary>
/// 返回数据获取器列表。
/// </summary>
public IReadOnlyList<DataHolder> DataHolders { get; }
#endregion
#region Methods
/// <summary>
/// 创建临时数据表。
/// </summary>
/// <returns></returns>
private DataTable CreateDataTable()
{
lock (_syncRoot)
{
var dt = new DataTable(_tableName);
dt.Columns.Add("time", typeof(long));
foreach (var dh in DataHolders)
{
var ret = dh.Read();
dt.Columns.Add(dh.Name, ret.GetType());
}
return dt;
}
}
/// <summary>
/// 立即缓存当前数据。
/// </summary>
public void Cache()
{
lock (_syncRoot)
{
// 限制最大Cache频率
if (_stopwatch.IsRunning && _stopwatch.ElapsedMilliseconds < _minCachePeriodMs)
return;
//TODO 需要限制Q的最大数据量并且检测HalfFull和Full事件并触发警告可能PC性能出现问题
_lastCachePeriodMs = _stopwatch.ElapsedMilliseconds;
_stopwatch.Restart();
var ts = DateTime.Now.Ticks;
var row = _dataTable.NewRow();
row["time"] = ts;
var ret = Parallel.ForEach(DataHolders, dc =>
{
var ret = dc.Read();
row[dc.Name] = ret;
});
if (ret.IsCompleted)
{
_dataTable.Rows.Add(row);
}
}
}
#endregion
}
}

View File

@ -0,0 +1,371 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Aitex.Core.RT.DataCenter;
using Aitex.Core.RT.DBCore;
using Aitex.Core.RT.Event;
using Aitex.Core.RT.Log;
using Aitex.Core.RT.SCCore;
using Aitex.Core.Util;
using MECF.Framework.Common.Equipment;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public class DataRecorderManager : Singleton<DataRecorderManager>
{
#region Variables
/// <summary>
/// 当前记录器支持的数据类型。
/// </summary>
private readonly Type[] _supportedDataTypes =
{ typeof(bool), typeof(double), typeof(float), typeof(int), typeof(ushort), typeof(short) };
private IDataBase _db;
private readonly object _lock = new();
private readonly int _dataRecorderInterval;
private readonly int _dataRecorderCacheSize;
private readonly int _dataRecorderCachePeriodMs;
private string[] _modules = { "Data" };
private readonly Dictionary<string, Func<object>> _subscribedRecordedDataGetter = new();
private Dictionary<string, DataRecorderCache> _dictCachePerModule;
private DateTime _dateOfTableName;
private IDataRecorderCallback _callback;
#endregion
#region Constructors
public DataRecorderManager()
{
_dataRecorderInterval = (SC.ContainsItem("System.DataRecorderInterval")
? SC.GetValue<int>("System.DataCollectionInterval")
: 1000);
_dataRecorderCacheSize = (SC.ContainsItem("System.DataRecorderCacheSize")
? SC.GetValue<int>("System.DataRecorderCacheSize")
: DataRecorderCache.MIN_CACHE_SIZE);
_dataRecorderCachePeriodMs = (SC.ContainsItem("System.DataRecorderCachePeriod")
? SC.GetValue<int>("System.DataRecorderCachePeriod")
: DataRecorderCache.MIN_CACHE_PERIOD_MS);
}
#endregion
#region Methods
public void Initialize(IDataRecorderCallback callback)
{
IDataRecorderCallback callback2;
if (callback != null)
{
callback2 = callback;
}
else
{
IDataRecorderCallback dataCollectionCallback = new DefaultDataRecorderCallback();
callback2 = dataCollectionCallback;
}
_callback = callback2;
Task.Factory.StartNew(DataRecorderThread);
}
public void Initialize(string[] modules, string dbName)
{
_callback = new DefaultDataRecorderCallback(dbName);
if (modules != null && modules.Length != 0 && !string.IsNullOrEmpty(modules[0]))
{
_modules = modules;
}
Task.Factory.StartNew(DataRecorderThread);
}
public void Initialize(string dbName)
{
_callback = new DefaultDataRecorderCallback(dbName);
Task.Factory.StartNew(DataRecorderThread);
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
public void ImmediateCache()
{
DoCache();
}
private void DataRecorderThread()
{
while (true)
{
Thread.Sleep(_dataRecorderInterval);
try
{
CreateCache();
// 开始周期性数据采集
var lastFlushTime = DateTime.Now;
while (true)
{
Thread.Sleep((int)(_dataRecorderInterval * 0.99));
// 跨天,退出循环并创建新表
if (DateTime.Now.Date != _dateOfTableName)
break;
// 立即缓存一次数据
DoCache();
// 每隔5秒钟将缓存写入数据库
if (lastFlushTime.AddSeconds(5) < DateTime.Now)
{
var ret = Persist();
lastFlushTime = DateTime.Now;
if (ret < 0) // 插入数据错误,退出
break;
}
}
}
catch (Exception ex)
{
LOG.Write(ex, "数据库操作记录发生异常");
}
}
}
/// <summary>
/// 扫描<see cref="DataManager"/>,获取所有可记录的数据。
/// </summary>
/// <remarks>
/// 仅数据管理器中注册的数据,且类型为<see cref="bool"/>、<see cref="double"/>、<see cref="float"/>、<see cref="int"/>、<see cref="ushort"/>、<see cref="short"/>的数据支持保存到数据库。
/// </remarks>>
private void GetRecordableDataSource()
{
var dBRecorderList = DATA.GetDBRecorderList();
foreach (var item in dBRecorderList)
{
if (_subscribedRecordedDataGetter.ContainsKey(item.Key))
{
continue;
}
var obj = item.Value();
if (obj != null)
{
var type = obj.GetType();
if (type == typeof(bool) || type == typeof(double) || type == typeof(float) ||
type == typeof(int) || type == typeof(ushort) || type == typeof(short))
{
_subscribedRecordedDataGetter[item.Key] = item.Value;
}
}
}
}
/// <summary>
/// 创建数据缓存对象。
/// </summary>
private void CreateCache()
{
lock (_lock)
{
GetRecordableDataSource();
_dictCachePerModule = new();
Dictionary<string, List<DataHolder>> dictModules = new();
var defaultModuleName = (_modules.Contains(ModuleName.System.ToString())
? ModuleName.System.ToString()
: (_modules.Contains("Data") ? "Data" : _modules[0]));
if (_subscribedRecordedDataGetter is { Count: > 0 })
{
foreach (var dataName in _subscribedRecordedDataGetter.Keys)
{
var isAvailableModuleName = false;
foreach (var module in _modules)
{
if (dataName.StartsWith(module + ".") ||
dataName.StartsWith("IO." + module + "."))
{
if (!dictModules.ContainsKey(module))
dictModules[module] = new List<DataHolder>();
dictModules[module].Add(new DataHolder(dictModules[module].Count,
dataName, _subscribedRecordedDataGetter[dataName]));
isAvailableModuleName = true;
break;
}
}
if (!isAvailableModuleName)
{
if (!dictModules.ContainsKey(defaultModuleName))
dictModules[defaultModuleName] = new List<DataHolder>();
dictModules[defaultModuleName].Add(new DataHolder(
dictModules[defaultModuleName].Count,
dataName, _subscribedRecordedDataGetter[dataName]));
}
}
}
_dateOfTableName = DateTime.Now.Date;
foreach (var module in dictModules.Keys)
{
var dhs = dictModules[module];
var tableName = $"{_dateOfTableName:yyyyMMdd}.{module}";
_dictCachePerModule[module] =
new DataRecorderCache(tableName, module, dhs, minCachePeriodMs: _dataRecorderCachePeriodMs,
maxCacheSize: _dataRecorderCacheSize);
UpdateTableSchema(tableName, dhs);
}
}
}
/// <summary>
/// 立即缓存所有模组的数据。
/// </summary>
/// <returns></returns>
private void DoCache()
{
lock (_lock)
{
// DO调用ForceCache方法可能发生在当前对象初始化之前此时_dictCachePerModule为null
if (_dictCachePerModule == null)
return;
// 立即缓存一次数据
foreach (var drc in _dictCachePerModule.Values)
drc.Cache();
}
}
/// <summary>
/// 缓存写入数据库。
/// </summary>
/// <returns>插入数据库的行数。-1 表示写入数据库错误。</returns>
public int Persist()
{
var num = 0;
lock (_lock)
{
foreach (var dcc in _dictCachePerModule.Values)
{
var sql = dcc.GetInsertSql();
if (!string.IsNullOrEmpty(sql))
{
try
{
num = DB.ExecuteNonQuery(sql);
}
catch (Exception ex)
{
LOG.Write(ex, $"{ModuleName.System.ToString()}写入数据库发生异常");
num = -1;
}
}
}
}
return num;
}
private bool UpdateTableSchema(string tblName, IEnumerable<DataHolder> dataCollectors)
{
var cmdText = $"select column_name from information_schema.columns where table_name = '{tblName}';";
var reader = DB.ExecuteReader(cmdText);
var sqlExprAddColumn = string.Empty;
var existedColumns = new List<string>();
while (reader.Read())
{
for (var i = 0; i < reader.FieldCount; i++)
{
existedColumns.Add(reader[i].ToString());
}
}
reader.Close();
if (existedColumns.Count > 0)
{
foreach (var dataItem in dataCollectors)
{
var colName = dataItem.Name;
if (!existedColumns.Contains(colName))
{
var type = dataItem.Read().GetType();
if (type == typeof(bool))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Boolean"};";
}
else if (_supportedDataTypes.Contains(type))
{
sqlExprAddColumn += $"ALTER TABLE \"{tblName}\" ADD COLUMN \"{colName}\" {"Real"};";
}
}
}
if (!string.IsNullOrEmpty(sqlExprAddColumn))
{
try
{
DB.ExecuteNonQuery(sqlExprAddColumn);
}
catch (Exception ex)
{
LOG.Write(ex);
return false;
}
}
}
else
{
var sqlExprCreateTable = $"CREATE TABLE \"{tblName}\"(Time bigint NOT NULL,";
foreach (var dataItem in dataCollectors)
{
var colName = dataItem.Name;
var dataType = dataItem.Read().GetType();
if (dataType == typeof(bool))
{
sqlExprCreateTable += $"\"{colName}\" Boolean,\n";
}
else if (_supportedDataTypes.Contains(dataType))
{
sqlExprCreateTable += $"\"{colName}\" Real,\n";
}
}
sqlExprCreateTable += $"CONSTRAINT \"{tblName}_pkey\" PRIMARY KEY (Time));";
sqlExprCreateTable += $"GRANT SELECT ON TABLE \"{tblName}\" TO postgres;";
try
{
DB.ExecuteNonQuery(sqlExprCreateTable.ToString());
}
catch (Exception ex2)
{
LOG.Write(ex2, "创建数据库表格失败");
return false;
}
}
return true;
}
}
#endregion
}

View File

@ -0,0 +1,37 @@
using Aitex.Common.Util;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public class DefaultDataRecorderCallback : IDataRecorderCallback
{
private string _db = "postgres";
public DefaultDataRecorderCallback()
{
}
public DefaultDataRecorderCallback(string db)
{
_db = db;
}
public void PostDBFailedEvent()
{
}
public string GetSqlUpdateFile()
{
return PathManager.GetCfgDir() + "SqlUpdate.sql";
}
public string GetDataTablePrefix()
{
return "Data";
}
public string GetDBName()
{
return _db;
}
}
}

View File

@ -0,0 +1,20 @@
using System.Collections.Generic;
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public interface IDataRecorderCache
{
/// <summary>
/// 返回当前缓存服务的模组名称。
/// </summary>
string Module { get; }
IReadOnlyList<DataHolder> DataHolders { get; }
/// <summary>
/// 立即缓存当前数据。
/// </summary>
void Cache();
}
}

View File

@ -0,0 +1,13 @@
namespace Aitex.Core.RT.DataCollection.HighPerformance
{
public interface IDataRecorderCallback
{
void PostDBFailedEvent();
string GetDBName();
string GetSqlUpdateFile();
string GetDataTablePrefix();
}
}

View File

@ -282,6 +282,13 @@
<Compile Include="Aitex\Core\RT\DataCenter\ICommonData.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\DataCollectionManager.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\DefaultDataCollectionCallback.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\HighPerformance\DataHolder.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\HighPerformance\DataRecorderCache.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\HighPerformance\DataRecorderDataTableCache.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\HighPerformance\DataRecorderManager.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\HighPerformance\DefaultDataRecorderCallback.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\HighPerformance\IDataRecorderCache.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\HighPerformance\IDataRecorderCallback.cs" />
<Compile Include="Aitex\Core\RT\DataCollection\IDataCollectionCallback.cs" />
<Compile Include="Aitex\Core\RT\DBCore\DataBaseCleaner.cs" />
<Compile Include="Aitex\Core\RT\DBCore\DataBaseManager.cs" />