using System;
using System.Collections.Generic;
using System.Threading;
using Developpez.Dotnet.Properties;
namespace Developpez.Dotnet.Threading
{
///
/// Classe permettant d'effectuer des actions de manière asynchrone et simultanée sur une série d'objets
///
/// Type des objets à traiter
public class ThreadedWorker : IDisposable
{
#region Nested types
///
/// Callback appelé pour effectuer les actions sur le type générique des arguments
///
/// Objet TreadWorker ayant effectué l'appel"/>
/// Objet de travail actuel
public delegate void CallBackMethod(ThreadedWorker caller, T currentObject);
///
/// Tâche à effectuer
///
protected class WorkItem
{
///
/// Initialise une nouvelle instance de tâche à effectuer
///
/// Tâche
/// Méthode à appeler
public WorkItem(T item, CallBackMethod method)
{
Item = item;
Method = method;
}
///
/// Tâche
///
public virtual T Item { get; set; }
///
/// Méthode à appeler
///
public virtual CallBackMethod Method { get; set; }
///
/// Indique si la tâche a été exécutée ou non
///
public virtual bool Finished { get; set; }
}
#endregion
#region Worker threads
private int _currentThreadCount = 0;
///
/// Démarre les threads de travaux
///
protected virtual void StartWorkers()
{
lock (sync)
{
for (int i = 0; i < MaxThreadCount; i++)
{
StartWorker();
}
}
}
///
/// Initialise un nouveau thread de travaille et l'exécute
///
protected virtual void StartWorker()
{
Thread work = new Thread(new ThreadStart(WorkThreadMethod));
work.Name = "ThreadWorker Work Thread";
work.IsBackground = true;
_currentThreadCount++;
work.Start();
}
///
/// Méthode appelée par les threads afin d'exécuter les tâches
///
protected virtual void WorkThreadMethod()
{
WorkItem actualWorkItem = null;
while (true)
{
lock (sync)
{
if (_currentThreadCount > _maxThreadCount)
{
/* fin du thread */
_currentThreadCount--;
return;
}
else if (_currentThreadCount < _maxThreadCount)
{
StartWorker();
}
if (Items.Count > 0)
{
actualWorkItem = Items.Dequeue();
}
else
{
actualWorkItem = null;
}
}
if (actualWorkItem != null)
{
try
{
actualWorkItem.Method.Invoke(this, actualWorkItem.Item);
}
finally
{
actualWorkItem.Finished = true;
}
}
else
{
/* CPU Sleep */
Thread.Sleep(50);
}
}
}
#endregion
///
/// Initialise une nouvelle instance du gestionnaire de tâches asynchrones
///
public ThreadedWorker()
: this(5)
{
}
///
/// Initialise une nouvelle instance du gestionnaire de tâches asynchrones
///
/// Nombre maximum de threads dédiés à créer.
/// Cette valeur ne peut pas être inférieure à 1
public ThreadedWorker(int maxThreadCount)
{
if (maxThreadCount < 1)
throw new ArgumentOutOfRangeException("maxThreadCount", ExceptionMessages.MaxThreadCountOutOfRange);
MaxThreadCount = maxThreadCount;
Items = new Queue.WorkItem>();
StartWorkers();
}
///
/// Object de synchronisation
///
protected object sync = new object();
private int _maxThreadCount = 5;
/* Une modification de MaxThreadCount ne modifie pas le nombre
* de threads instantanément, c'est juste une valeur que le ThreadWorker va tenter
* de suivre
*/
///
/// Nombre de threads de travail.
///
public virtual int MaxThreadCount
{
get
{
return _maxThreadCount;
}
set
{
if (value < 1)
throw new ArgumentOutOfRangeException("value");
else
{
_maxThreadCount = value;
}
}
}
///
/// Travaux globaux
///
protected virtual Queue Items { get; private set; }
/* attention, il faut protéger (via des try-catch) callback sinon c'est un crash assuré.
* pas de protection direct dans l'appel à callback car alors on ne saurai quoi faire vu que
* cela dépend beaucoup de la situation actuelle
*/
///
/// Appelle callback pour chaque object de objects de manière asynchrone, callback pouvant
/// être appellé en même temps sur plusieurs threads différents.
/// Le nombre maximum de threads est définit par MaxThreadCounts.
///
/// Objets constituants le travail
/// Méthode à appeller pour le travail
/// Indique si ForEach doit être bloquant jusqu'à la fin de l'exécution
/// du travail sur tous les éléments de objects, ou s'il doit retourner immédiatement
public virtual void ForEach(IEnumerable objects, CallBackMethod callback, bool wait)
{
if (objects == null)
throw new ArgumentNullException("objects");
if (callback == null)
throw new ArgumentNullException("callback");
if (wait)
ExecuteWork(objects, callback);
else
{
/* on crée un thread dédié à ça */
Thread th = new Thread(new ThreadStart(delegate
{
ExecuteWork(objects, callback);
}));
th.IsBackground = true;
th.Name = "ThreadWorker manager thread";
th.Start();
}
}
///
/// Exécute une tâche dans un des threads dédiés
///
/// Objet constituant le travail
/// Méthode à appeller pour le travail
/// Indique si ExecuteSingleTask doit être bloquant jusqu'à la fin de l'exécution
/// du travail sur tous les éléments de objects, ou s'il doit retourner immédiatement
public virtual void ExecuteSingleTask(T obj, CallBackMethod callback, bool wait)
{
if (obj == null)
throw new ArgumentNullException("obj");
if (callback == null)
throw new ArgumentNullException("callback");
if (!wait)
{
/* On a rien à gagner vis à vis de ForEach */
ForEach(new T[] { obj }, callback, wait);
}
else
{
/* là, on évite de créer une nouvelle collection et on passe directement au queue */
ThreadedWorker.WorkItem work = null;
lock (sync)
{
work = new ThreadedWorker.WorkItem(obj, callback);
Items.Enqueue(work);
}
while (!work.Finished)
{
/* CPU Sleep */
Thread.Sleep(10);
}
}
}
///
/// Exécute le travail
///
/// Collection d'objets sur lesquels exécuter le travail
/// Méthode de callback qui effectue le travail
protected virtual void ExecuteWork(IEnumerable objects, CallBackMethod callback)
{
bool finished = false;
/* on rajoute par paquet, c'est plus rapide */
lock (sync)
{
foreach (T item in objects)
{
Items.Enqueue(new ThreadedWorker.WorkItem(item, callback));
}
/* pour permettre d'attendre, on rajoute un objet supplémentaire à la fin */
Items.Enqueue(new WorkItem(default(T), new CallBackMethod(delegate
{
finished = true;
})));
}
while (!finished)
{
/* CPU Sleep */
Thread.Sleep(10);
}
}
///
/// Met fin à tous les travaux et termine tout les thread (à la fin de ce qu'ils ont a faire, éventuellement)
///
public virtual void Release()
{
_maxThreadCount = 0;
}
#region IDisposable Membres
///
/// Libère les ressources associées à cette classe et libère les threads utilisés
///
public void Dispose()
{
Release();
}
#endregion
}
}