Sic.Framework-Nanjing-Baishi/MECF.Framework.Common/Aitex/Core/RT/DataCollection/DataCollectionManager.cs

226 lines
7.4 KiB
C#
Raw Normal View History

2023-04-13 11:51:03 +08:00
using System;
using System.Collections.Generic;
2023-07-10 18:21:15 +08:00
using System.Data.Common;
2023-04-13 11:51:03 +08:00
using System.Text;
using Aitex.Core.RT.DataCenter;
using Aitex.Core.RT.DBCore;
using Aitex.Core.RT.Log;
using Aitex.Core.RT.SCCore;
using Aitex.Core.Util;
namespace Aitex.Core.RT.DataCollection
{
2023-07-10 18:21:15 +08:00
public class DataCollectionManager
{
private int _dataCollectionInterval;
private object _locker = new object();
private DateTime _date;
private PeriodicJob _monitorJob;
private StringBuilder _stringBuilder = new StringBuilder(10000);
private string[] _modules = new string[] { "System", "IO.PMA", "IO.PMB", "IO.TM", "PMA", "PMB" };
private Dictionary<string, string> _dateModule = new Dictionary<string, string>();
private Dictionary<string, Func<object>> _subscribedData =
new Dictionary<string, Func<object>>();
private Dictionary<string, Dictionary<string, Func<object>>> _moduleDictionary =
new Dictionary<string, Dictionary<string, Func<object>>>();
public void Initialize()
{
_dataCollectionInterval = SC.ContainsItem("System.DataCollectionInterval") ? SC.GetValue<int>("System.DataCollectionInterval") : 1000;
InitSubscribedRecorded();
_monitorJob = new PeriodicJob(_dataCollectionInterval, MonitorData, "DataCollection",true);
}
private void InitSubscribedRecorded()
{
//初始化Dictionary
foreach (string module1 in _modules)
{
_moduleDictionary[module1] = new Dictionary<string, Func<object>>();
}
SortedDictionary<string, Func<object>> dbRecordList = DATA.GetDBRecorderList();
foreach (KeyValuePair<string, Func<object>> item in dbRecordList)
{
if (_subscribedData.ContainsKey(item.Key))
{
continue;
}
//判断类型,排除引用类型
object obj = item.Value();
if (obj != null)
{
Type type = obj.GetType();
if (type == typeof(bool) || type == typeof(double) || type == typeof(float) || type == typeof(int) || type == typeof(ushort) ||
type == typeof(short) || type == typeof(long))
{
_subscribedData[item.Key] = item.Value;
}
}
}
//注册数据分配在哪个表中
if (_subscribedData.Count > 0)
{
foreach (string key in _subscribedData.Keys)
{
bool flag = false;
foreach (string module2 in _modules)
{
if (key.StartsWith(module2 + ".") || key.StartsWith("IO." + module2 + "."))
{
_moduleDictionary[module2][key] = _subscribedData[key];
flag = true;
break;
}
}
if (!flag)
{
_moduleDictionary["System"][key] = _subscribedData[key];
}
}
}
}
private void UpdateTableSchema(string tableName, Dictionary<string, Func<object>> dataItem)
{
if (dataItem.Keys.Count == 0)
{
return;
}
string sql = $"select column_name from information_schema.columns where table_name='{tableName}'";
DbDataReader dataReader = DB.Manager.ExecuteReader(sql);
List<string> list = new List<string>();
while (dataReader.Read())
{
for (int i = 0; i < dataReader.FieldCount; i++)
{
list.Add(dataReader[i].ToString());
}
}
dataReader.Close();
if (list.Count > 0)
{
string cmdText = string.Empty;
foreach (string key in dataItem.Keys)
{
if (!list.Contains(key))
{
Type type = dataItem[key]().GetType();
string sql1 = DB.GetSqlByNameType(key, type);
cmdText += string.Format($"ALTER TABLE \"{tableName}\" ADD COLUMN " + sql1 + ";");
}
}
DB.ExecuteNonQuery(cmdText);
}
else
{
Dictionary<string, Type> keyValuePairs = new Dictionary<string, Type>();
foreach (string key in dataItem.Keys)
{
Type type = dataItem[key]().GetType();
keyValuePairs.Add(key, type);
}
//time对应PostgreSQL的数据类型为bigint
DB.CreateTable(tableName, keyValuePairs, "time", typeof(Int64));
}
}
private void MonitorDateTime()
{
foreach (string module3 in _modules)
{
string strDate = $"{_date:yyyyMMdd}.{module3}";
UpdateTableSchema(strDate, _moduleDictionary[module3]);
_dateModule[module3] = strDate;
}
}
private bool MonitorData()
{
try
{
lock (_locker)
{
if (_date != DateTime.Now.Date)
{
_date = DateTime.Now.Date;
MonitorDateTime();
}
string dateTime = DateTime.Now.Ticks.ToString();
foreach (string module in _modules)
{
string cmdText = $"INSERT INTO \"{_dateModule[module]}\" (\"time\" ";
_stringBuilder.Remove(0, _stringBuilder.Length);
_stringBuilder.Append("Values(");
_stringBuilder.Append(dateTime);
//没有数据则不处理
if (_moduleDictionary[module].Keys.Count == 0)
{
continue;
}
foreach (string key in _moduleDictionary[module].Keys)
{
cmdText += $",\"{key}\"";
_stringBuilder.Append(",");
object obj = _moduleDictionary[module][key]();
if (obj == null)
{
_stringBuilder.Append("0");
}
else
{
_stringBuilder.Append(obj.ToString());
}
}
cmdText += ")";
_stringBuilder.Append(");");
cmdText += _stringBuilder.ToString();
DB.InsertSql(cmdText);
}
}
return true;
}
catch (Exception ex)
{
LOG.Error(ex.Message, ex);
return true;
}
}
public void Terminate()
{
_monitorJob?.Stop();
_monitorJob = null;
}
}
2023-04-13 11:51:03 +08:00
}