Sic.Framework-Nanjing-Baishi/Sicentury.Core/Pipelines/TwoStagePipelineBasedTaskEx...

317 lines
9.8 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/************************************************************************
*@file FrameworkLocal\UIClient\CenterViews\DataLogs\Core\TwoStagePipelineBasedTaskExecutor.cs
* @author Su Liang
* @Date 2022-08-01
*
* @copyright &copy Sicentury Inc.
*
* @brief The class is design to operate data querying and UI rendering processes in a 2-stage pipeline.
*
* @details
* 2 queues are used to buffer the querying functions and UI rendering functions, and execute them
* one by one.
*
* *****************************************************************************/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Media.TextFormatting;
namespace Sicentury.Core.Pipelines
{
public class TwoStagePipelineBasedTaskExecutor<TResult1, TResult2> : IDisposable
{
#region Variables
/// <summary>
/// 1级流水线中的任务全部完成事件。
/// </summary>
public event EventHandler Stage1Finished;
public event EventHandler Stage2Finished;
/// <summary>
/// 1级流水线中的单个任务启动事件。
/// </summary>
public event EventHandler Stage1ActionStarted;
/// <summary>
/// 1级流水线中的单个任务完成事件。
/// </summary>
public event EventHandler Stage1ActionFinished;
/// <summary>
/// 2级流水线中的单个任务启动事件。
/// </summary>
public event EventHandler Stage2ActionStarted;
/// <summary>
/// 2级流水线中的单个任务完成事件。
/// </summary>
public event EventHandler Stage2ActionFinished;
private readonly ConcurrentQueue<PipelineMethodInvoker<TResult1>> _queueAction1;
private readonly ConcurrentQueue<PipelineMethodInvoker<TResult1, TResult2>> _queueAction2;
private readonly ConcurrentQueue<TResult1> _queueAction1Result;
private readonly ConcurrentQueue<TResult2> _queueAction2Result;
private bool _isDisposed;
private Exception _lastException;
private readonly List<Task> _tskPipeline;
#endregion
#region Constructors
public TwoStagePipelineBasedTaskExecutor()
{
//ThreadPool.GetMinThreads(out _workerThreadsMin, out _mIocMin);
//ThreadPool.SetMinThreads(100, 100);
_queueAction1 = new ConcurrentQueue<PipelineMethodInvoker<TResult1>>();
_queueAction2 = new ConcurrentQueue<PipelineMethodInvoker<TResult1, TResult2>>();
_queueAction1Result = new ConcurrentQueue<TResult1>();
_queueAction2Result = new ConcurrentQueue<TResult2>();
_lastException = null;
_tskPipeline = new List<Task>();
}
#endregion
#region Properties
#endregion
#region Methods
private Task ExecuteFunc1(CancellationTokenSource cancellation)
{
return Task.Run(() =>
{
while (_isDisposed == false)
{
if (_queueAction1.Count <= 0)
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (!_queueAction1.TryDequeue(out var invoker))
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (invoker.IsEmpty)
{
// 列队结束
Stage1Finished?.Invoke(this, System.EventArgs.Empty);
break;
}
Stage1ActionStarted?.Invoke(this, System.EventArgs.Empty);
try
{
_queueAction1Result.Enqueue(invoker.Invoke());
}
catch (Exception ex)
{
_lastException = ex;
throw;
}
Stage1ActionFinished?.Invoke(this, System.EventArgs.Empty);
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
}
});
}
private Task ExecuteFunc2(CancellationTokenSource cancellation)
{
return Task.Run(() =>
{
while (_isDisposed == false)
{
if (_queueAction2.Count <= 0)
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (!_queueAction2.TryDequeue(out var invoker))
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
if (invoker.IsEmpty)
{
// 列队结束
Stage2Finished?.Invoke(this, System.EventArgs.Empty);
return;
}
while (_isDisposed == false)
{
// 等待 Stage 1 方法返回的结果。
if (_queueAction1Result.Count > 0)
break;
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
// 如果流水线1有错误发生则中断流水线2
if (_lastException != null)
{
return;
}
}
if (!_queueAction1Result.TryDequeue(out var arg))
{
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
continue;
}
Stage2ActionStarted?.Invoke(this, System.EventArgs.Empty);
try
{
_queueAction2Result.Enqueue(invoker.Invoke(arg));
}
catch (Exception ex)
{
_lastException = ex;
throw;
}
Stage2ActionFinished?.Invoke(this, System.EventArgs.Empty);
Thread.Sleep(1);
if (cancellation?.Token.IsCancellationRequested == true)
return;
}
});
}
/// <summary>
/// 添加 Stage1 功能函数。
/// </summary>
/// <param name="func"></param>
public void AppendFunc1(Func<TResult1> func)
{
_queueAction1.Enqueue(new PipelineMethodInvoker<TResult1>(func));
}
/// <summary>
/// 添加 Stage2 功能函数。
/// </summary>
public void AppendFunc2(Func<TResult1, TResult2> func)
{
_queueAction2.Enqueue(new PipelineMethodInvoker<TResult1, TResult2>(func));
}
/// <summary>
/// 启动流水线。
/// </summary>
public List<Task> Start(CancellationTokenSource cancellation)
{
if (_tskPipeline.Count > 0)
throw new InvalidOperationException("the pipeline has been started.");
_tskPipeline.Add(ExecuteFunc1(cancellation));
_tskPipeline.Add(ExecuteFunc2(cancellation));
return _tskPipeline;
}
/// <summary>
/// 等待流水线执行结束。
/// </summary>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public async Task WaitDone()
{
if (_tskPipeline.Count <= 0)
throw new InvalidOperationException("the pipeline is not started.");
await Task.WhenAll(_tskPipeline);
// 检查是否有Pipeline线程抛异常。
var faultedTask = _tskPipeline.Where(t => t.IsFaulted).ToList();
if (faultedTask.Any())
{
var lstEx = new List<Exception>();
foreach (var t in faultedTask)
{
if (t.Exception?.InnerExceptions != null)
lstEx.AddRange(t.Exception.InnerExceptions);
}
var aex = new AggregateException(lstEx);
throw aex;
}
}
/// <summary>
/// 启动流水线并且等待执行完成。
/// </summary>
/// <param name="cancellation"></param>
/// <returns></returns>
public async Task StartAndWaitDone(CancellationTokenSource cancellation)
{
Start(cancellation);
await WaitDone();
}
public void Dispose()
{
_isDisposed = true;
_tskPipeline.Clear();
//try
//{
// ThreadPool.SetMinThreads(_workerThreadsMin, _mIocMin);
//}
//catch (Exception e)
//{
// // Ignore
//}
}
#endregion
}
}