Sic.Framework-Nanjing-Baishi/Sicentury.Core/Pipelines/TwoStagePipelineBasedTaskEx...

317 lines
9.8 KiB
C#
Raw Normal View History

2023-04-13 11:51:03 +08:00
/************************************************************************
*@file FrameworkLocal\UIClient\CenterViews\DataLogs\Core\TwoStagePipelineBasedTaskExecutor.cs
* @author Su Liang
* @Date 2022-08-01
*
* @copyright &copy Sicentury Inc.
*
* @brief The class is design to operate data querying and UI rendering processes in a 2-stage pipeline.
*
* @details
* 2 queues are used to buffer the querying functions and UI rendering functions, and execute them
* one by one.
*
* *****************************************************************************/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Media.TextFormatting;
namespace Sicentury.Core.Pipelines
{
public class TwoStagePipelineBasedTaskExecutor<TResult1, TResult2> : IDisposable
{
#region Variables
/// <summary>
/// 1级流水线中的任务全部完成事件。
/// </summary>
public event EventHandler Stage1Finished;
public event EventHandler Stage2Finished;
/// <summary>
/// 1级流水线中的单个任务启动事件。
/// </summary>
public event EventHandler Stage1ActionStarted;
/// <summary>
/// 1级流水线中的单个任务完成事件。
/// </summary>
public event EventHandler Stage1ActionFinished;
/// <summary>
/// 2级流水线中的单个任务启动事件。
/// </summary>
public event EventHandler Stage2ActionStarted;
/// <summary>
/// 2级流水线中的单个任务完成事件。
/// </summary>
public event EventHandler Stage2ActionFinished;
private readonly ConcurrentQueue<PipelineMethodInvoker<TResult1>> _queueAction1;
private readonly ConcurrentQueue<PipelineMethodInvoker<TResult1, TResult2>> _queueAction2;
private readonly ConcurrentQueue<TResult1> _queueAction1Result;
private readonly ConcurrentQueue<TResult2> _queueAction2Result;
private bool _isDisposed;
private Exception _lastException;
private readonly List<Task> _tskPipeline;
#endregion
#region Constructors
public TwoStagePipelineBasedTaskExecutor()
{
//ThreadPool.GetMinThreads(out _workerThreadsMin, out _mIocMin);
//ThreadPool.SetMinThreads(100, 100);
_queueAction1 = new ConcurrentQueue<PipelineMethodInvoker<TResult1>>();
_queueAction2 = new ConcurrentQueue<PipelineMethodInvoker<TResult1, TResult2>>();
_queueAction1Result = new ConcurrentQueue<TResult1>();
_queueAction2Result = new ConcurrentQueue<TResult2>();
_lastException = null;
_tskPipeline = new List<Task>();
}
#endregion
#region Properties
#endregion
#region Methods
private Task ExecuteFunc1(CancellationTokenSource cancellation)
{
return Task.Run(() =>
{
while (_isDisposed == false)
{
if (_queueAction1.Count <= 0)
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (!_queueAction1.TryDequeue(out var invoker))
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (invoker.IsEmpty)
{
// 列队结束
Stage1Finished?.Invoke(this, System.EventArgs.Empty);
break;
}
Stage1ActionStarted?.Invoke(this, System.EventArgs.Empty);
try
{
_queueAction1Result.Enqueue(invoker.Invoke());
}
catch (Exception ex)
{
_lastException = ex;
throw;
}
Stage1ActionFinished?.Invoke(this, System.EventArgs.Empty);
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
}
});
}
private Task ExecuteFunc2(CancellationTokenSource cancellation)
{
return Task.Run(() =>
{
while (_isDisposed == false)
{
if (_queueAction2.Count <= 0)
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (!_queueAction2.TryDequeue(out var invoker))
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (invoker.IsEmpty)
{
// 列队结束
Stage2Finished?.Invoke(this, System.EventArgs.Empty);
return;
}
while (_isDisposed == false)
{
// 等待 Stage 1 方法返回的结果。
if (_queueAction1Result.Count > 0)
break;
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
// 如果流水线1有错误发生则中断流水线2
if (_lastException != null)
{
return;
}
}
if (!_queueAction1Result.TryDequeue(out var arg))
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
Stage2ActionStarted?.Invoke(this, System.EventArgs.Empty);
try
{
_queueAction2Result.Enqueue(invoker.Invoke(arg));
}
catch (Exception ex)
{
_lastException = ex;
throw;
}
Stage2ActionFinished?.Invoke(this, System.EventArgs.Empty);
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
}
});
}
/// <summary>
/// 添加 Stage1 功能函数。
/// </summary>
/// <param name="func"></param>
public void AppendFunc1(Func<TResult1> func)
{
_queueAction1.Enqueue(new PipelineMethodInvoker<TResult1>(func));
}
/// <summary>
/// 添加 Stage2 功能函数。
/// </summary>
public void AppendFunc2(Func<TResult1, TResult2> func)
{
_queueAction2.Enqueue(new PipelineMethodInvoker<TResult1, TResult2>(func));
}
/// <summary>
/// 启动流水线。
/// </summary>
public List<Task> Start(CancellationTokenSource cancellation)
{
if (_tskPipeline.Count > 0)
throw new InvalidOperationException("the pipeline has been started.");
_tskPipeline.Add(ExecuteFunc1(cancellation));
_tskPipeline.Add(ExecuteFunc2(cancellation));
return _tskPipeline;
}
/// <summary>
/// 等待流水线执行结束。
/// </summary>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public async Task WaitDone()
{
if (_tskPipeline.Count <= 0)
throw new InvalidOperationException("the pipeline is not started.");
await Task.WhenAll(_tskPipeline);
// 检查是否有Pipeline线程抛异常。
var faultedTask = _tskPipeline.Where(t => t.IsFaulted).ToList();
if (faultedTask.Any())
{
var lstEx = new List<Exception>();
foreach (var t in faultedTask)
{
if (t.Exception?.InnerExceptions != null)
lstEx.AddRange(t.Exception.InnerExceptions);
}
var aex = new AggregateException(lstEx);
throw aex;
}
}
/// <summary>
/// 启动流水线并且等待执行完成。
/// </summary>
/// <param name="cancellation"></param>
/// <returns></returns>
public async Task StartAndWaitDone(CancellationTokenSource cancellation)
{
Start(cancellation);
await WaitDone();
}
public void Dispose()
{
_isDisposed = true;
_tskPipeline.Clear();
//try
//{
// ThreadPool.SetMinThreads(_workerThreadsMin, _mIocMin);
//}
//catch (Exception e)
//{
// // Ignore
//}
}
#endregion
}
}