using Serilog;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;namespace TaskManager
{classTaskFactoryMananger{//USEpublicstaticvoidRun(){try{while(true){LimitedConcurrencyLevelTaskScheduler lcts =newLimitedConcurrencyLevelTaskScheduler(10);TaskFactory factory =newTaskFactory(lcts);Task[] spiderTask =newTask[]{factory.StartNew(()=>{Log.Logger.Information("{0} Start on thread {1}","111", Thread.CurrentThread.ManagedThreadId); Log.Logger.Information("{0} Finish on thread {1}","111", Thread.CurrentThread.ManagedThreadId);}),factory.StartNew(()=>{Thread.Sleep(TimeSpan.FromSeconds(3));Log.Logger.Information("{0} Start on thread {1}","222", Thread.CurrentThread.ManagedThreadId);Log.Logger.Information("{0} Finish on thread {1}","222", Thread.CurrentThread.ManagedThreadId);}),factory.StartNew(()=>{Thread.Sleep(TimeSpan.FromSeconds(5));Log.Logger.Information("{0} Start on thread {1}","333", Thread.CurrentThread.ManagedThreadId);Log.Logger.Information("{0} Finish on thread {1}","333", Thread.CurrentThread.ManagedThreadId);})};Task.WaitAll(spiderTask);Thread.Sleep(TimeSpan.FromMinutes(1));}}catch(AggregateException ex){foreach(Exception inner in ex.InnerExceptions){Log.Logger.Error(inner.Message);}}}/// <summary>/// Provides a task scheduler that ensures a maximum concurrency level while/// running on top of the ThreadPool./// </summary>publicclassLimitedConcurrencyLevelTaskScheduler:TaskScheduler{/// <summary>Whether the current thread is processing work items.</summary>[ThreadStatic]privatestaticbool _currentThreadIsProcessingItems;/// <summary>The list of tasks to be executed.</summary>privatereadonly LinkedList<Task> _tasks =newLinkedList<Task>();// protected by lock(_tasks)/// <summary>The maximum concurrency level allowed by this scheduler.</summary>privatereadonlyint _maxDegreeOfParallelism;/// <summary>Whether the scheduler is currently processing work items.</summary>privateint _delegatesQueuedOrRunning =0;// protected by lock(_tasks)/// <summary>/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the/// specified degree of parallelism./// </summary>/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>publicLimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism){if(maxDegreeOfParallelism <1)thrownewArgumentOutOfRangeException("maxDegreeOfParallelism");_maxDegreeOfParallelism = maxDegreeOfParallelism;}/// <summary>Queues a task to the scheduler.</summary>/// <param name="task">The task to be queued.</param>protectedsealedoverridevoidQueueTask(Task task){// Add the task to the list of tasks to be processed. If there aren't enough// delegates currently queued or running to process tasks, schedule another.lock(_tasks){_tasks.AddLast(task);if(_delegatesQueuedOrRunning < _maxDegreeOfParallelism){++_delegatesQueuedOrRunning;NotifyThreadPoolOfPendingWork();}}}/// <summary>/// Informs the ThreadPool that there's work to be executed for this scheduler./// </summary>privatevoidNotifyThreadPoolOfPendingWork(){ThreadPool.UnsafeQueueUserWorkItem(_ =>{// Note that the current thread is now processing work items.// This is necessary to enable inlining of tasks into this thread._currentThreadIsProcessingItems =true;try{// Process all available items in the queue.while(true){Task item;lock(_tasks){// When there are no more items to be processed,// note that we're done processing, and get out.if(_tasks.Count ==0){--_delegatesQueuedOrRunning;break;}// Get the next item from the queueitem = _tasks.First.Value;_tasks.RemoveFirst();}// Execute the task we pulled out of the queuebase.TryExecuteTask(item);}}// We're done processing items on the current threadfinally{ _currentThreadIsProcessingItems =false;}},null);}/// <summary>Attempts to execute the specified task on the current thread.</summary>/// <param name="task">The task to be executed.</param>/// <param name="taskWasPreviouslyQueued"></param>/// <returns>Whether the task could be executed on the current thread.</returns>protectedsealedoverrideboolTryExecuteTaskInline(Task task,bool taskWasPreviouslyQueued){// If this thread isn't already processing a task, we don't support inliningif(!_currentThreadIsProcessingItems)returnfalse;// If the task was previously queued, remove it from the queueif(taskWasPreviouslyQueued)TryDequeue(task);// Try to run the task.returnbase.TryExecuteTask(task);}/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>/// <param name="task">The task to be removed.</param>/// <returns>Whether the task could be found and removed.</returns>protectedsealedoverrideboolTryDequeue(Task task){lock(_tasks)return _tasks.Remove(task);}/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>publicsealedoverrideint MaximumConcurrencyLevel {get{return _maxDegreeOfParallelism;}}/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>/// <returns>An enumerable of the tasks currently scheduled.</returns>protectedsealedoverride IEnumerable<Task>GetScheduledTasks(){bool lockTaken =false;try{Monitor.TryEnter(_tasks,ref lockTaken);if(lockTaken)return _tasks.ToArray();elsethrownewNotSupportedException();}finally{if(lockTaken) Monitor.Exit(_tasks);}}}}}