317 lines
9.8 KiB
C#
317 lines
9.8 KiB
C#
/************************************************************************
|
||
*@file FrameworkLocal\UIClient\CenterViews\DataLogs\Core\TwoStagePipelineBasedTaskExecutor.cs
|
||
* @author Su Liang
|
||
* @Date 2022-08-01
|
||
*
|
||
* @copyright © Sicentury Inc.
|
||
*
|
||
* @brief The class is design to operate data querying and UI rendering processes in a 2-stage pipeline.
|
||
*
|
||
* @details
|
||
* 2 queues are used to buffer the querying functions and UI rendering functions, and execute them
|
||
* one by one.
|
||
*
|
||
* *****************************************************************************/
|
||
|
||
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
using System.Windows.Media.TextFormatting;
|
||
|
||
namespace Sicentury.Core.Pipelines
|
||
{
|
||
public class TwoStagePipelineBasedTaskExecutor<TResult1, TResult2> : IDisposable
|
||
{
|
||
|
||
#region Variables
|
||
|
||
/// <summary>
|
||
/// 1级流水线中的任务全部完成事件。
|
||
/// </summary>
|
||
public event EventHandler Stage1Finished;
|
||
|
||
public event EventHandler Stage2Finished;
|
||
|
||
/// <summary>
|
||
/// 1级流水线中的单个任务启动事件。
|
||
/// </summary>
|
||
public event EventHandler Stage1ActionStarted;
|
||
|
||
/// <summary>
|
||
/// 1级流水线中的单个任务完成事件。
|
||
/// </summary>
|
||
public event EventHandler Stage1ActionFinished;
|
||
|
||
/// <summary>
|
||
/// 2级流水线中的单个任务启动事件。
|
||
/// </summary>
|
||
public event EventHandler Stage2ActionStarted;
|
||
|
||
/// <summary>
|
||
/// 2级流水线中的单个任务完成事件。
|
||
/// </summary>
|
||
public event EventHandler Stage2ActionFinished;
|
||
|
||
|
||
private readonly ConcurrentQueue<PipelineMethodInvoker<TResult1>> _queueAction1;
|
||
private readonly ConcurrentQueue<PipelineMethodInvoker<TResult1, TResult2>> _queueAction2;
|
||
private readonly ConcurrentQueue<TResult1> _queueAction1Result;
|
||
private readonly ConcurrentQueue<TResult2> _queueAction2Result;
|
||
|
||
private bool _isDisposed;
|
||
private Exception _lastException;
|
||
private readonly List<Task> _tskPipeline;
|
||
|
||
#endregion
|
||
|
||
#region Constructors
|
||
|
||
public TwoStagePipelineBasedTaskExecutor()
|
||
{
|
||
//ThreadPool.GetMinThreads(out _workerThreadsMin, out _mIocMin);
|
||
//ThreadPool.SetMinThreads(100, 100);
|
||
|
||
_queueAction1 = new ConcurrentQueue<PipelineMethodInvoker<TResult1>>();
|
||
_queueAction2 = new ConcurrentQueue<PipelineMethodInvoker<TResult1, TResult2>>();
|
||
_queueAction1Result = new ConcurrentQueue<TResult1>();
|
||
_queueAction2Result = new ConcurrentQueue<TResult2>();
|
||
|
||
_lastException = null;
|
||
_tskPipeline = new List<Task>();
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region Properties
|
||
|
||
#endregion
|
||
|
||
#region Methods
|
||
|
||
private Task ExecuteFunc1(CancellationTokenSource cancellation)
|
||
{
|
||
return Task.Run(() =>
|
||
{
|
||
while (_isDisposed == false)
|
||
{
|
||
if (_queueAction1.Count <= 0)
|
||
{
|
||
Thread.Sleep(1);
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
continue;
|
||
}
|
||
|
||
if (!_queueAction1.TryDequeue(out var invoker))
|
||
{
|
||
Thread.Sleep(1);
|
||
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
|
||
continue;
|
||
}
|
||
|
||
if (invoker.IsEmpty)
|
||
{
|
||
// 列队结束
|
||
Stage1Finished?.Invoke(this, System.EventArgs.Empty);
|
||
break;
|
||
}
|
||
|
||
Stage1ActionStarted?.Invoke(this, System.EventArgs.Empty);
|
||
|
||
try
|
||
{
|
||
_queueAction1Result.Enqueue(invoker.Invoke());
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_lastException = ex;
|
||
throw;
|
||
}
|
||
|
||
Stage1ActionFinished?.Invoke(this, System.EventArgs.Empty);
|
||
|
||
Thread.Sleep(1);
|
||
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
}
|
||
});
|
||
}
|
||
|
||
private Task ExecuteFunc2(CancellationTokenSource cancellation)
|
||
{
|
||
return Task.Run(() =>
|
||
{
|
||
while (_isDisposed == false)
|
||
{
|
||
if (_queueAction2.Count <= 0)
|
||
{
|
||
Thread.Sleep(1);
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
continue;
|
||
}
|
||
|
||
if (!_queueAction2.TryDequeue(out var invoker))
|
||
{
|
||
Thread.Sleep(1);
|
||
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
|
||
continue;
|
||
}
|
||
|
||
if (invoker.IsEmpty)
|
||
{
|
||
// 列队结束
|
||
Stage2Finished?.Invoke(this, System.EventArgs.Empty);
|
||
return;
|
||
}
|
||
|
||
while (_isDisposed == false)
|
||
{
|
||
// 等待 Stage 1 方法返回的结果。
|
||
if (_queueAction1Result.Count > 0)
|
||
break;
|
||
|
||
Thread.Sleep(1);
|
||
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
|
||
// 如果流水线1有错误发生,则中断流水线2
|
||
if (_lastException != null)
|
||
{
|
||
return;
|
||
}
|
||
}
|
||
|
||
if (!_queueAction1Result.TryDequeue(out var arg))
|
||
{
|
||
Thread.Sleep(1);
|
||
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
|
||
continue;
|
||
}
|
||
|
||
Stage2ActionStarted?.Invoke(this, System.EventArgs.Empty);
|
||
try
|
||
{
|
||
_queueAction2Result.Enqueue(invoker.Invoke(arg));
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_lastException = ex;
|
||
throw;
|
||
}
|
||
|
||
Stage2ActionFinished?.Invoke(this, System.EventArgs.Empty);
|
||
|
||
Thread.Sleep(1);
|
||
|
||
if (cancellation?.Token.IsCancellationRequested == true)
|
||
return;
|
||
}
|
||
});
|
||
}
|
||
|
||
/// <summary>
|
||
/// 添加 Stage1 功能函数。
|
||
/// </summary>
|
||
/// <param name="func"></param>
|
||
public void AppendFunc1(Func<TResult1> func)
|
||
{
|
||
_queueAction1.Enqueue(new PipelineMethodInvoker<TResult1>(func));
|
||
}
|
||
|
||
/// <summary>
|
||
/// 添加 Stage2 功能函数。
|
||
/// </summary>
|
||
public void AppendFunc2(Func<TResult1, TResult2> func)
|
||
{
|
||
_queueAction2.Enqueue(new PipelineMethodInvoker<TResult1, TResult2>(func));
|
||
}
|
||
|
||
/// <summary>
|
||
/// 启动流水线。
|
||
/// </summary>
|
||
public List<Task> Start(CancellationTokenSource cancellation)
|
||
{
|
||
if (_tskPipeline.Count > 0)
|
||
throw new InvalidOperationException("the pipeline has been started.");
|
||
|
||
_tskPipeline.Add(ExecuteFunc1(cancellation));
|
||
_tskPipeline.Add(ExecuteFunc2(cancellation));
|
||
|
||
return _tskPipeline;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 等待流水线执行结束。
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
/// <exception cref="InvalidOperationException"></exception>
|
||
public async Task WaitDone()
|
||
{
|
||
if (_tskPipeline.Count <= 0)
|
||
throw new InvalidOperationException("the pipeline is not started.");
|
||
|
||
await Task.WhenAll(_tskPipeline);
|
||
|
||
// 检查是否有Pipeline线程抛异常。
|
||
var faultedTask = _tskPipeline.Where(t => t.IsFaulted).ToList();
|
||
if (faultedTask.Any())
|
||
{
|
||
var lstEx = new List<Exception>();
|
||
foreach (var t in faultedTask)
|
||
{
|
||
if (t.Exception?.InnerExceptions != null)
|
||
lstEx.AddRange(t.Exception.InnerExceptions);
|
||
}
|
||
|
||
var aex = new AggregateException(lstEx);
|
||
throw aex;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 启动流水线并且等待执行完成。
|
||
/// </summary>
|
||
/// <param name="cancellation"></param>
|
||
/// <returns></returns>
|
||
public async Task StartAndWaitDone(CancellationTokenSource cancellation)
|
||
{
|
||
Start(cancellation);
|
||
await WaitDone();
|
||
}
|
||
|
||
public void Dispose()
|
||
{
|
||
_isDisposed = true;
|
||
_tskPipeline.Clear();
|
||
|
||
//try
|
||
//{
|
||
// ThreadPool.SetMinThreads(_workerThreadsMin, _mIocMin);
|
||
//}
|
||
//catch (Exception e)
|
||
//{
|
||
// // Ignore
|
||
//}
|
||
}
|
||
|
||
#endregion
|
||
|
||
|
||
}
|
||
}
|