using System; using System.Collections.Generic; using System.Threading; using Developpez.Dotnet.Properties; namespace Developpez.Dotnet.Threading { /// /// Classe permettant d'effectuer des actions de manière asynchrone et simultanée sur une série d'objets /// /// Type des objets à traiter public class ThreadedWorker : IDisposable { #region Nested types /// /// Callback appelé pour effectuer les actions sur le type générique des arguments /// /// Objet TreadWorker ayant effectué l'appel"/> /// Objet de travail actuel public delegate void CallBackMethod(ThreadedWorker caller, T currentObject); /// /// Tâche à effectuer /// protected class WorkItem { /// /// Initialise une nouvelle instance de tâche à effectuer /// /// Tâche /// Méthode à appeler public WorkItem(T item, CallBackMethod method) { Item = item; Method = method; } /// /// Tâche /// public virtual T Item { get; set; } /// /// Méthode à appeler /// public virtual CallBackMethod Method { get; set; } /// /// Indique si la tâche a été exécutée ou non /// public virtual bool Finished { get; set; } } #endregion #region Worker threads private int _currentThreadCount = 0; /// /// Démarre les threads de travaux /// protected virtual void StartWorkers() { lock (sync) { for (int i = 0; i < MaxThreadCount; i++) { StartWorker(); } } } /// /// Initialise un nouveau thread de travaille et l'exécute /// protected virtual void StartWorker() { Thread work = new Thread(new ThreadStart(WorkThreadMethod)); work.Name = "ThreadWorker Work Thread"; work.IsBackground = true; _currentThreadCount++; work.Start(); } /// /// Méthode appelée par les threads afin d'exécuter les tâches /// protected virtual void WorkThreadMethod() { WorkItem actualWorkItem = null; while (true) { lock (sync) { if (_currentThreadCount > _maxThreadCount) { /* fin du thread */ _currentThreadCount--; return; } else if (_currentThreadCount < _maxThreadCount) { StartWorker(); } if (Items.Count > 0) { actualWorkItem = Items.Dequeue(); } else { actualWorkItem = null; } } if (actualWorkItem != null) { try { actualWorkItem.Method.Invoke(this, actualWorkItem.Item); } finally { actualWorkItem.Finished = true; } } else { /* CPU Sleep */ Thread.Sleep(50); } } } #endregion /// /// Initialise une nouvelle instance du gestionnaire de tâches asynchrones /// public ThreadedWorker() : this(5) { } /// /// Initialise une nouvelle instance du gestionnaire de tâches asynchrones /// /// Nombre maximum de threads dédiés à créer.
/// Cette valeur ne peut pas être inférieure à 1 public ThreadedWorker(int maxThreadCount) { if (maxThreadCount < 1) throw new ArgumentOutOfRangeException("maxThreadCount", ExceptionMessages.MaxThreadCountOutOfRange); MaxThreadCount = maxThreadCount; Items = new Queue.WorkItem>(); StartWorkers(); } /// /// Object de synchronisation /// protected object sync = new object(); private int _maxThreadCount = 5; /* Une modification de MaxThreadCount ne modifie pas le nombre * de threads instantanément, c'est juste une valeur que le ThreadWorker va tenter * de suivre */ /// /// Nombre de threads de travail. /// public virtual int MaxThreadCount { get { return _maxThreadCount; } set { if (value < 1) throw new ArgumentOutOfRangeException("value"); else { _maxThreadCount = value; } } } /// /// Travaux globaux /// protected virtual Queue Items { get; private set; } /* attention, il faut protéger (via des try-catch) callback sinon c'est un crash assuré. * pas de protection direct dans l'appel à callback car alors on ne saurai quoi faire vu que * cela dépend beaucoup de la situation actuelle */ /// /// Appelle callback pour chaque object de objects de manière asynchrone, callback pouvant /// être appellé en même temps sur plusieurs threads différents. /// Le nombre maximum de threads est définit par MaxThreadCounts. /// /// Objets constituants le travail /// Méthode à appeller pour le travail /// Indique si ForEach doit être bloquant jusqu'à la fin de l'exécution /// du travail sur tous les éléments de objects, ou s'il doit retourner immédiatement public virtual void ForEach(IEnumerable objects, CallBackMethod callback, bool wait) { if (objects == null) throw new ArgumentNullException("objects"); if (callback == null) throw new ArgumentNullException("callback"); if (wait) ExecuteWork(objects, callback); else { /* on crée un thread dédié à ça */ Thread th = new Thread(new ThreadStart(delegate { ExecuteWork(objects, callback); })); th.IsBackground = true; th.Name = "ThreadWorker manager thread"; th.Start(); } } /// /// Exécute une tâche dans un des threads dédiés /// /// Objet constituant le travail /// Méthode à appeller pour le travail /// Indique si ExecuteSingleTask doit être bloquant jusqu'à la fin de l'exécution /// du travail sur tous les éléments de objects, ou s'il doit retourner immédiatement public virtual void ExecuteSingleTask(T obj, CallBackMethod callback, bool wait) { if (obj == null) throw new ArgumentNullException("obj"); if (callback == null) throw new ArgumentNullException("callback"); if (!wait) { /* On a rien à gagner vis à vis de ForEach */ ForEach(new T[] { obj }, callback, wait); } else { /* là, on évite de créer une nouvelle collection et on passe directement au queue */ ThreadedWorker.WorkItem work = null; lock (sync) { work = new ThreadedWorker.WorkItem(obj, callback); Items.Enqueue(work); } while (!work.Finished) { /* CPU Sleep */ Thread.Sleep(10); } } } /// /// Exécute le travail /// /// Collection d'objets sur lesquels exécuter le travail /// Méthode de callback qui effectue le travail protected virtual void ExecuteWork(IEnumerable objects, CallBackMethod callback) { bool finished = false; /* on rajoute par paquet, c'est plus rapide */ lock (sync) { foreach (T item in objects) { Items.Enqueue(new ThreadedWorker.WorkItem(item, callback)); } /* pour permettre d'attendre, on rajoute un objet supplémentaire à la fin */ Items.Enqueue(new WorkItem(default(T), new CallBackMethod(delegate { finished = true; }))); } while (!finished) { /* CPU Sleep */ Thread.Sleep(10); } } /// /// Met fin à tous les travaux et termine tout les thread (à la fin de ce qu'ils ont a faire, éventuellement) /// public virtual void Release() { _maxThreadCount = 0; } #region IDisposable Membres /// /// Libère les ressources associées à cette classe et libère les threads utilisés /// public void Dispose() { Release(); } #endregion } }