- 论坛徽章:
- 59
|
有噬, m:n线程模板- // -- Multi-threads --
- namespace ntrt
- {
- /// <summary>
- /// -- sample code (parallel exec with 64 threads) --
- /// ntrt.ThreadAddress<int, int> thstart = delegate(int obj)
- /// {
- /// Console.Write(string.Format("{0}\n", obj));
- /// return obj;
- /// };
- /// ntrt.ParallelThread<int, int> thrds = new ntrt.ParallelThread<int, int>();
- /// for (int iC = 0; iC < 2048; iC++)
- /// {
- /// thrds.Add(thstart, iC);
- /// }
- /// int[] rts = ntrt.ParallelThread<int, int>.wait(thrds.exec(64)); //Environment.ProcessorCount *8);
- /// Console.WriteLine("\n-------------------------------------------------");
- /// foreach(int iret in rts){
- /// Console.Write(string.Format("{0}\n",iret));
- /// }
- /// </summary>
- /// <typeparam name="TRET">return value type</typeparam>
- /// <typeparam name="TPARAM">parameter type</typeparam>
- public delegate TRET ThreadAddress<TRET, TPARAM>(TPARAM obj); // where TRET : class;
- public delegate void FinishEventHandler<TRET, TPARAM>(ParallelThread<TRET, TPARAM> sender, TPARAM args, TRET ret, params object[] envs);
- public class ParallelThread<TRET, TPARAM> : List<KeyValuePair<ThreadAddress<TRET, TPARAM>, TPARAM>>
- {
- private const int IDX_SELF = 0;
- private const int IDX_NPARALLEL = 1;
- private const int IDX_COUNTER = 2;
- private const int IDX_RETURNS = 3;
- //private const int IDX_HANDLE = 1;
- public object exec(int nThread, TRET defBadRetval, params object[] envs)
- {
- this.isCancelled = false;
- ParameterizedThreadStart inner_start = delegate (object argobj)
- {
- object[] arglist = argobj as object[];
- System.Diagnostics.Debug.Assert(arglist.Length == 4);
- System.Diagnostics.Debug.Assert(arglist[IDX_SELF] is ParallelThread<TRET, TPARAM>);
- //System.Diagnostics.Debug.Assert(arglist[IDX_HANDLE] is IntPtr);
- System.Diagnostics.Debug.Assert(arglist[IDX_NPARALLEL] is int);
- System.Diagnostics.Debug.Assert(arglist[IDX_COUNTER] is InnerCounter);
- System.Diagnostics.Debug.Assert(arglist[IDX_RETURNS] is TRET[]);
- ParallelThread<TRET, TPARAM> thrds = arglist[IDX_SELF] as ParallelThread<TRET, TPARAM>;
- //IntPtr hGcHandle = (IntPtr)arglist[IDX_HANDLE];
- int nThreads = (int)arglist[IDX_NPARALLEL];
- InnerCounter counter = (InnerCounter)arglist[IDX_COUNTER];
- TRET[] retvals = arglist[IDX_RETURNS] as TRET[];
- do
- {
- int iCount = 0;
- lock (counter)
- {
- counter.iCount++;
- iCount = counter.iCount;
- }
- //int iCount = Interlocked.Increment(ref counter.iCount); //<- C# is unreliable... :-(.
- if (iCount < counter.nCount)
- {
- if (thrds[iCount].Key != null)
- {
- retvals[iCount] = thrds[iCount].Key(thrds[iCount].Value);
- if (thrds.Onfinished != null)
- {
- thrds.Onfinished(thrds, thrds[iCount].Value, retvals[iCount], envs);
- }
- }
- }
- else
- {
- break;
- }
- } while (this.isCancelled == false);
- int nComplete = 0;
- lock (counter)
- {
- counter.nComplete++;
- nComplete = counter.nComplete;
- }
- if (nComplete == nThreads)
- {
- if (thrds.OnCompeleted != null)
- {
- thrds.OnCompeleted(thrds, new EventArgs());
- }
- }
- };
- TRET[] rets = new TRET[this.Count];
- for (int iRet = 0; iRet < rets.Length; iRet++)
- {
- rets[iRet] = defBadRetval;
- }
- int nRealThread = nThread;
- if (nRealThread > this.Count)
- {
- nRealThread = this.Count;
- }
- InnerCounter ic = new InnerCounter() { iCount = -1, nComplete = 0, nCount = this.Count };
- Thread[] threads = new Thread[nRealThread];
- for (int iThd = 0; iThd < threads.Length; iThd++)
- {
- threads[iThd] = new Thread(inner_start);
- threads[iThd].Start(new object[] { this, nRealThread, ic, rets });
- }
- return new object[] { rets, threads } as object;
- //GCHandle gh = GCHandle.Alloc(new object[] { rets, threads });
- //IntPtr hgh = GCHandle.ToIntPtr(gh);
- //return hgh;
- }//end function: exec
- public void Add(ThreadAddress<TRET, TPARAM> thaddr, TPARAM args)
- {
- this.Add(new KeyValuePair<ThreadAddress<TRET, TPARAM>, TPARAM>(thaddr, args));
- }
- public bool isCancelled { set; get; }
- public event FinishEventHandler<TRET, TPARAM> Onfinished;
- public event EventHandler OnCompeleted;
- public object UserData { set; get; }
- // -- Helper function --
- public void abort(object h)
- {
- object[] args = h as object[];
- System.Diagnostics.Debug.Assert(args.Length == 2);
- System.Diagnostics.Debug.Assert(args[0] is TRET[]);
- System.Diagnostics.Debug.Assert(args[1] is Thread[]);
- foreach (Thread thd in args[1] as Thread[])
- {
- try
- {
- thd.Abort();
- }
- catch (Exception ex)
- {
- System.Diagnostics.Debug.Assert(ex != null);
- }
- }
- if (this.OnCompeleted != null)
- {
- this.OnCompeleted(this, new EventArgs());
- }
- }
- public static TRET[] wait(object h)
- {
- object[] args = h as object[];
- System.Diagnostics.Debug.Assert(args.Length == 2);
- System.Diagnostics.Debug.Assert(args[0] is TRET[]);
- System.Diagnostics.Debug.Assert(args[1] is Thread[]);
- Thread[] threads = args[1] as Thread[];
- foreach (Thread thd in threads)
- {
- thd.Join(); //<- Wait for all thread exits
- }
- TRET[] rets = args[0] as TRET[];
- return rets;
- }//end function: wait
- public static int nLogicCPUCores { get { return Environment.ProcessorCount; } }
- private class InnerCounter
- {
- public volatile int iCount = 0;
- public int nCount = 0;
- public volatile int nComplete = 0;
- }
- }//end class: class ParallelThread<TRET, TPARAM>
- }//end namespace: ntrt
复制代码 |
|