jhlee8804
5/3/2014 - 7:11 PM

ParallelForEach

using Microsoft.ConcurrencyVisualizer.Instrumentation;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication3
{
    public class MyParallel
    {
        public static void ForEach<T>(IEnumerable<T> items, Action<T> action)
        {
            if (items == null)
                throw new ArgumentNullException("enumerable");
            if (action == null)
                throw new ArgumentNullException("action");

            var resetEvents = new List<ManualResetEvent>();

            foreach (var item in items)
            {
                var evt = new ManualResetEvent(false);
                ThreadPool.QueueUserWorkItem((i) =>
                {
                    action((T)i);
                    evt.Set();
                }, item);
                resetEvents.Add(evt);
            }

            foreach (var re in resetEvents)
                re.WaitOne();
        }
    }

    public class IEnum : IEnumerable<byte[]>
    {
        byte[] data;
        int count;
        public IEnum(byte[] data, int count)
        {
            this.count = count;
            this.data = data;
        }

        IEnumerator<byte[]> IEnumerable<byte[]>.GetEnumerator()
        {
            for (int i = 0; i < count; i++)
                yield return data;
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            return (this as IEnumerable<byte[]>).GetEnumerator();
        }
    }


    class Program
    {
        static TimeSpan Time(Action todo)
        {
            var sw = new Stopwatch();
            sw.Start();
            todo();
            sw.Stop();
            return sw.Elapsed;
        }

        static void Main(string[] args)
        {
            var mw = new MarkerWriter(MarkerWriter.DefaultProviderGuid);

            var rand = new Random();
            var source = new byte[100000];
            rand.NextBytes(source);
            byte[] compressed;
            using (var out_ms = new MemoryStream())
            {
                using (var gzp = new GZipStream(out_ms, CompressionMode.Compress))
                {
                    gzp.Write(source, 0, source.Length);
                }
                compressed = out_ms.ToArray();
            }

            Action<byte[]> body = b =>
            {
                using (var in_ms = new MemoryStream(b))
                using (var gzp = new GZipStream(in_ms, CompressionMode.Decompress))
                using (var out_ms = new MemoryStream())
                {
                    byte[] buffer = new byte[4096];
                    int read;
                    while ((read = gzp.Read(buffer, 0, buffer.Length)) > 0)
                        out_ms.Write(buffer, 0, read);
                    var deompressed = out_ms.ToArray();
                }
            };
            Action msImpl = () =>
            {
                for (int i = 0; i < 10; i++)
                    Parallel.ForEach(new IEnum(compressed, 1000), body);
            };
            Action myImpl = () =>
            {
                for (int i = 0; i < 10; i++)
                    MyParallel.ForEach(new IEnum(compressed, 1000), body);
            };
            mw.DefaultSeries.WriteFlag("Starting warmup");
            msImpl();
            myImpl();

            var mssspan = mw.DefaultSeries.EnterSpan("Microsoft IMPL");
            var durationMS = Time(msImpl);
            mssspan.Leave();
            var mysspan = mw.DefaultSeries.EnterSpan("My IMPL");
            var durationMy = Time(myImpl);
            mysspan.Leave();

            mw.DefaultSeries.WriteFlag("Finished");
            Console.WriteLine("MS took {0}", durationMS);
            Console.WriteLine("MY took {0}", durationMy);

        }
    }
}