/************************************************************************ *@file FrameworkLocal\UIClient\CenterViews\DataLogs\Core\TwoStagePipelineBasedTaskExecutor.cs * @author Su Liang * @Date 2022-08-01 * * @copyright © Sicentury Inc. * * @brief The class is design to operate data querying and UI rendering processes in a 2-stage pipeline. * * @details * 2 queues are used to buffer the querying functions and UI rendering functions, and execute them * one by one. * * *****************************************************************************/ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Windows.Media.TextFormatting; namespace Sicentury.Core.Pipelines { public class TwoStagePipelineBasedTaskExecutor : IDisposable { #region Variables /// /// 1级流水线中的任务全部完成事件。 /// public event EventHandler Stage1Finished; public event EventHandler Stage2Finished; /// /// 1级流水线中的单个任务启动事件。 /// public event EventHandler Stage1ActionStarted; /// /// 1级流水线中的单个任务完成事件。 /// public event EventHandler Stage1ActionFinished; /// /// 2级流水线中的单个任务启动事件。 /// public event EventHandler Stage2ActionStarted; /// /// 2级流水线中的单个任务完成事件。 /// public event EventHandler Stage2ActionFinished; private readonly ConcurrentQueue> _queueAction1; private readonly ConcurrentQueue> _queueAction2; private readonly ConcurrentQueue _queueAction1Result; private readonly ConcurrentQueue _queueAction2Result; private bool _isDisposed; private Exception _lastException; private readonly List _tskPipeline; #endregion #region Constructors public TwoStagePipelineBasedTaskExecutor() { //ThreadPool.GetMinThreads(out _workerThreadsMin, out _mIocMin); //ThreadPool.SetMinThreads(100, 100); _queueAction1 = new ConcurrentQueue>(); _queueAction2 = new ConcurrentQueue>(); _queueAction1Result = new ConcurrentQueue(); _queueAction2Result = new ConcurrentQueue(); _lastException = null; _tskPipeline = new List(); } #endregion #region Properties #endregion #region Methods private Task ExecuteFunc1(CancellationTokenSource cancellation) { return Task.Run(() => { while (_isDisposed == false) { if (_queueAction1.Count <= 0) { Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; continue; } if (!_queueAction1.TryDequeue(out var invoker)) { Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; continue; } if (invoker.IsEmpty) { // 列队结束 Stage1Finished?.Invoke(this, System.EventArgs.Empty); break; } Stage1ActionStarted?.Invoke(this, System.EventArgs.Empty); try { _queueAction1Result.Enqueue(invoker.Invoke()); } catch (Exception ex) { _lastException = ex; throw; } Stage1ActionFinished?.Invoke(this, System.EventArgs.Empty); Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; } }); } private Task ExecuteFunc2(CancellationTokenSource cancellation) { return Task.Run(() => { while (_isDisposed == false) { if (_queueAction2.Count <= 0) { Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; continue; } if (!_queueAction2.TryDequeue(out var invoker)) { Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; continue; } if (invoker.IsEmpty) { // 列队结束 Stage2Finished?.Invoke(this, System.EventArgs.Empty); return; } while (_isDisposed == false) { // 等待 Stage 1 方法返回的结果。 if (_queueAction1Result.Count > 0) break; Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; // 如果流水线1有错误发生,则中断流水线2 if (_lastException != null) { return; } } if (!_queueAction1Result.TryDequeue(out var arg)) { Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; continue; } Stage2ActionStarted?.Invoke(this, System.EventArgs.Empty); try { _queueAction2Result.Enqueue(invoker.Invoke(arg)); } catch (Exception ex) { _lastException = ex; throw; } Stage2ActionFinished?.Invoke(this, System.EventArgs.Empty); Thread.Sleep(1); if (cancellation?.Token.IsCancellationRequested == true) return; } }); } /// /// 添加 Stage1 功能函数。 /// /// public void AppendFunc1(Func func) { _queueAction1.Enqueue(new PipelineMethodInvoker(func)); } /// /// 添加 Stage2 功能函数。 /// public void AppendFunc2(Func func) { _queueAction2.Enqueue(new PipelineMethodInvoker(func)); } /// /// 启动流水线。 /// public List Start(CancellationTokenSource cancellation) { if (_tskPipeline.Count > 0) throw new InvalidOperationException("the pipeline has been started."); _tskPipeline.Add(ExecuteFunc1(cancellation)); _tskPipeline.Add(ExecuteFunc2(cancellation)); return _tskPipeline; } /// /// 等待流水线执行结束。 /// /// /// public async Task WaitDone() { if (_tskPipeline.Count <= 0) throw new InvalidOperationException("the pipeline is not started."); await Task.WhenAll(_tskPipeline); // 检查是否有Pipeline线程抛异常。 var faultedTask = _tskPipeline.Where(t => t.IsFaulted).ToList(); if (faultedTask.Any()) { var lstEx = new List(); foreach (var t in faultedTask) { if (t.Exception?.InnerExceptions != null) lstEx.AddRange(t.Exception.InnerExceptions); } var aex = new AggregateException(lstEx); throw aex; } } /// /// 启动流水线并且等待执行完成。 /// /// /// public async Task StartAndWaitDone(CancellationTokenSource cancellation) { Start(cancellation); await WaitDone(); } public void Dispose() { _isDisposed = true; _tskPipeline.Clear(); //try //{ // ThreadPool.SetMinThreads(_workerThreadsMin, _mIocMin); //} //catch (Exception e) //{ // // Ignore //} } #endregion } }