.Net中的并行与异步

内容纲要

什么是并行?

通过计算机多CPU内核让多个进程或多个线程可以同步执行的操作就是并行操作。

理解进程,线程的概念

进程:进程是资源分配的单位,每个进程是一个应用程序的实例。每个进程是相互独立的,有自己的内存空间,一个进程可以有多个线程,并且它们共享进程分配到的资源。进程之间的通信(IPC)。一个进程挂掉不会使其他进程受到干扰,而一个前台线程挂掉会使进程也挂掉。 线程:每个线程具有计划优先级并维护系统用于保存线程执行暂停时线程上下文的一组结构。 线程上下文包含线程顺畅继续执行所需的全部信息,包括线程的一组 CPU 寄存器和堆栈。线程是CPU在进程内切换的单位,每个CPU核在同一时刻,只能执行一个线程。一个线程如果受到阻塞,并不会影响其他线程的执行。由于CPU有时间分片的概念,所以在多线程执行时,可以极大的条运行效率,CPU的切换保存了线程的上下文。

  • 对于单CPU多核,进程是并发执行,线程是并行执行。
  • 对于单CPU单核,进程是并发执行,线程也是并发执行。
  • 对于多CPU多核,进程是并行执行,线程也是并行执行。
  • 每个CPU调度进程中的线程,在自己的核心上执行对应线程。

非抢占调度算法:进程一直执行直至被阻塞或自动释放CPU。

抢占式调度算法:(unix)让进程占用CPU某个时间的最大值时,当到达最大时间时还未完成就会被挂起,执行下一个时间分片。(windows)让进程独占CPU执行,如果存在优先级更高的进程则挂起当前调度,切换进程,内部会在线程执行完之后或者被挂起,重新计算线程优先级。也是时间分片的形式(2ms),Thread.Sleep(0)中断线程占用cpu。

所以我们为了更好的让CPU核心都不空闲(并发),多线程是个不错的选择。 默认情况下,.NET 程序由单个线程(通常称为主线程)启动。 但是,它可以创建其他线程,以与主线程并行或同时执行代码。 这些线程通常称为工作线程。

.Net中的多线程

System.Threading和System.Threading.Tasks 平常使用线程的方式:Thread和Task

  • Thread常用用法和知识点
  1. Thread只能接受一个object参数或无参,并且返回值是void类型的函数。
  2. Thread.Start()开启一个线程。
  3. Thread.IsBackground开启一个后台线程,即主线程结束,后台线程也会结束。
  4. Thread默认创建的是前台线程,主线程结束时,必须等待前台线程也运行结束,当前进程才会结束。
  5. Thread.Join()将一个线程加入到某个调用线程中,让其等待完成,再继续执行调用线程。
  6. Thread.Abort()终止一个线程。
  7. Thread.Interrupt()终止处于休眠状态(WaitSleepJoin)的线程。
  • ThreadPool知识:
  1. 每个进程都有一个线程池,可以通过GetMaxThread和SetMaxThread读取和修改线程池线程数大小。
  2. ThreadPool会自己调整内部的线程数,来达到资源的有效利用。
  3. System.Threading.Tasks和PLinq默认都是用ThreadPool。
  4. 线程池为每个进程提供一个全局的FIFO(先进先出)的工作队列,每次调用QueueUserWorkItem时则添加任务到此队列,内部通过ConcurrentQueue类实现。顶级任务放在全局队列里面,而在任务上下文中创建的嵌套任务或子任务被放置在线程的本地队列中,线程执行任务时,优先访问本地队列LIFO(后进先出),当本地队列执行完成后,则去全局队列中查看,然后再查看其他线程的本地队列,如果找到可执行任务,则从尾部以FIFO(先进先出)的顺序执行,保证本地队列的数据位置。
  • Task常用用法和知识点
  1. Task.Factory.StartNew()支持有返回值或无返回值的函数,并且参数是object类型或无参。
  2. Task.Run支持无参的有返回值或无返回值的函数。
  3. Task.Factory.StartNew(前者)和Task.Run(后者)区别:后者是简化版的前者,它们两种都可以创建任务,只不过前者有更多的可选参数选择,通过设置TaskCreationOptions可以让任务有多种方式运行(例如长时间的运行,不会被线程池回收)。而Task.Run创建出来的人物是TaskCreationOptions.DenyChildAttach(作为子任务执行)。后者更加轻量。
  • Task和Thread区别
  1. Task默认是创建任务,然后等待线程池中线程执行任务,而Thread是创建独立线程,对于资源(线程)的利用来说,Task更合适。
  2. Task有更多的扩展方法可供使用,更好的控制任务的执行。

.Net中的锁

既然提到了多线程,就不得不再提到锁。而C#中包含的锁有:Monitor(混合锁),Mutex(互斥锁),Interlock(原子锁),ReaderWriterLockSlim(读写锁),SpinLock(自旋锁),SemaphoreSlim(信号锁),AutoResetEvent(事件锁)。

  • Monitor

提供了限制访问代码块的功能,通常称为临界区,当有线程拥有对象的锁的时候,任何其他线程都无法获取该锁。 用法如下:

/// <summary>
/// 排它锁
/// </summary>
static void MonitorTest()
{
    var taskA = Task.Run<string>(MonitorFunc);
    var taskB = Task.Run<string>(MonitorFunc);
    Console.WriteLine($"TaskA:{taskA.Result}");
    Console.WriteLine($"TaskB:{taskB.Result}");
}

static string MonitorFunc()
{
    Monitor.Enter(lockObj);
    try
    {
        if (CurrentNums > 0)
        {
            CurrentNums--;
            return "购买成功,库存充足";
        }
        else
        {
            return "库存不足了";
        }
    }
    finally
    {
        Monitor.Exit(lockObj);
    }
}

我们平常用的lock实际上是对Monitor的封装,lock代码块里面的内容被封装在try语句里面。

需要注意的是,我们在锁的时候,尽量使用引用类型对象(除string类型外),也不要使用值类型。因为值类型每次在Enter和Exit的时候都会进行一次装箱操作,会导致指向的不是同一个变量,会发生错误。string类型,.net内部会将string类型放到一个池子里,可能多个变量指向同一份引用,如果对string类型进行操作的话,就会出现和预期结果不一致的情况,如果用没有用过的字符串,则不会。

  • Mutex

Mutex是一个同步基元,该基元仅向一个线程授予对共享资源的独占访问权限。 如果线程获取互斥体,则将获取该互斥体的第二个线程将挂起,直到第一个线程释放该互斥体才能具有访问权限。还可在进程中控制同步基元。 用法如下:

private static bool flag = true;
private static Mutex mut = new Mutex(true, "TestName", out flag);
private static Mutex mutT = new Mutex();
/// <summary>
/// 互斥锁,它能对进程之间
/// </summary>
static void MutexTest()
{
    if(!flag)
    {
        Console.WriteLine("只能开启一个客户端");
    }
    var taskA = Task.Run(MutexFunc);
    var taskB = Task.Run(MutexFunc);
    Task.WaitAll(taskA, taskB);

}

static void MutexFunc()
{
    mutT.WaitOne();

    if (CurrentNums == 1)
    {
        for (var i = 0; i <= 10; i++)
        {
            Console.WriteLine($"{i}");
        }
    }
    else
    {
        Console.WriteLine("不能执行了");
    }
    CurrentNums--;
    mutT.ReleaseMutex();

}

Mutex分为本地互斥体(未命名的,仅存在于当前进程中)和系统互斥体(命名的,多个进程公用)。我们可以通过本地互斥体达到Monitor的锁效果,而系统互斥体的一个用法:控制一个服务器内只能开启一个相同的客户端。相比Monitor来说,它并不轻量,以系统资源为代价,性能不太好,除非需要进程同步资源时,考虑使用它。

  • Interlock

针对值类型做增量或减量操作,因为通常情况下对于值类型,如果多个线程同时进行操作时,可能会导致变量与最终预期结果不一致的情况,所以这时候可以通过这个类,达到变量原子操作。 用法如下:

/// <summary>
/// 值变量锁
/// </summary>
static void InterlockedTest()
{
    var taskA = Task.Run(InterlockedFunc);
    var taskB = Task.Run(InterlockedFunc);
    Task.WaitAll(taskA, taskB);
    Console.WriteLine($"{CurrentNums}");

}

static void InterlockedFunc()
{
    for(var i = 0;i < 10;i++)
    {
        Interlocked.Increment(ref CurrentNums);
        Interlocked.Add(ref CurrentNums,1);
    }
}

以上面的例子来说,这个值的结果,在两个线程的同时运行下,这个值会达到最后的预期结果,在CurrentNums的变量上总共递增40。

  • ReaderWriterLockSlim

允许多个线程对其进行资源的读取,但是只允许一个线程以独占方式写入。读写锁线程有三种模式:未输入模式,读取模式,写入模式和可升级读取模式。任意数量的线程都能处于读取模式,仅有一个线程可处于写入模式,并且也仅有一个线程可处于可升级读取模式。

无递归的读写锁:

/// <summary>
/// 无递归读写锁
/// </summary>
static void ReaderWriterLockedSlimTest()
{
    var taskA = Task.Run(() =>
    {
        for(var i = 0;i< 10;i++)
        {
            AddOrUpdateText("test", "222");
            Console.WriteLine(ReadText("test"));
        }
	});
    var taskB = Task.Run(() =>
    {
        for (var i = 0; i < 10; i++)
        {
            AddOrUpdateText("test", "111");
            Console.WriteLine(ReadText("test"));
        }
    });
    Task.WaitAll(taskA, taskB);
}

static string ReadText(string key)
{
    rwLock.EnterReadLock();
    try
    {
        if(dicTest.TryGetValue(key, out var val))
        {
            return val;
        }
        else
        {
            return "";
        }
    }
    finally
    {
        rwLock.ExitReadLock();
    }
}

static void AddOrUpdateText(string key, string val)
{
    rwLock.EnterUpgradeableReadLock();
    try
    {
        if(dicTest.ContainsKey(key))
        {
            rwLock.EnterWriteLock();
            try
            {
                dicTest[key] = val;
            }
            finally
            {
                rwLock.ExitWriteLock();
            }
        }
        else
        {
            rwLock.EnterWriteLock();
            try
            {
                dicTest.Add(key, val);
            }
            finally
            {
                rwLock.ExitWriteLock();
            }
        }
    }
    finally
    {
        rwLock.ExitUpgradeableReadLock();
    }
}

官方建议使用无递归读写锁,减少其内部复杂性,也可以减少死锁的情况。 特点:如果有线程等待进入写模式或者只有一个线程处于写模式,那么尝试进入读模式的线程就会阻塞。如果已经有线程处于可升级模式或者有线程等待进入写模式或者只有一个线程处于写模式,那么尝试进入可升级模式的线程就会阻塞。如果有线程处于这三种模式中的任何一种,那么尝试进入写模式的线程就会阻塞。 为了策略性的公平起见,当写入器排队时阻塞新的读取器是一种有利于写入器的锁公平策略。

  • SpinLock

提供一个相互排斥锁基元,在该基元中,尝试获取锁的线程将在重复检查的循环中等待,直至该锁变为可用为止。 用法如下:

 /// <summary>
/// 自旋锁
/// </summary>
static void SpinLockTest()
{
    var spinLockInstance = new SpinLock();
    Action actionF = () =>
    {
        var flag = false;
        try
        {
            spinLockInstance.Enter(ref flag);
            for (var i = 0; i < 10; i++)
            {
                CurrentNums++;
            }
        }
        finally
        {
            if (flag) spinLockInstance.Exit();
        }

    };
    Parallel.Invoke(actionF, actionF, actionF);
    Console.WriteLine($"{CurrentNums}");
}

自旋锁适合同步基元内非常短的代码执行,如果长时间的运行,其他线程会不断重试的占用cpu核心,影响其他线程的运行。

  • SemaphoreSlim

对同时可访问资源的线程数加以限制。可用于异步时的资源同步。 用法如下:

static void SemaphoreSlimTest()
        {
            var ssT = new SemaphoreSlim(0,2);
            Console.WriteLine($"剩余可授予的线程数数量:{ssT.CurrentCount}");
            Task[] tasks = new Task[5];
            for(var i = 0;i<5;i++)
            {
                tasks[i] = Task.Run(() =>
                {
                    ssT.Wait();
                    try
                    {
                        Console.WriteLine($"开始执行");
                        for (var i = 0; i < 10; i++)
                        {
                            if (i == 9)
                            {
                                Console.WriteLine(i);
                            };
                        }
                    }
                    finally
                    {
                        var currentCount = ssT.Release();
                        Console.WriteLine("剩余可授予的线程数数量:" + currentCount);
                    }
                });
            }
            ssT.Release(2);
            Console.WriteLine($"剩余可授予的线程数数量:{ssT.CurrentCount}");
            Task.WaitAll(tasks);
        }
  • AutoResetEvent

线程同步事件在一个等待线程释放后收到信号时自动重置。其主要用于线程之间的协调。 用法如下:

static void AutoResetEventTest()
{
    var resetEventA = new AutoResetEvent(false);
    var resetEventB = new AutoResetEvent(false);
    var taskA = Task.Run(() =>
        {
            for (var i = 0; i < 10; i = i + 2)
            {
                Console.WriteLine($"任务A:{i}");
                resetEventB.Set();
                resetEventA.WaitOne();
            }
        });
    var taskB = Task.Run(() =>
        {
            for (var i = 1; i < 10; i = i + 2)
            {
                resetEventB.WaitOne();
                Console.WriteLine($"任务B:{i}");
                resetEventA.Set();
            }
        });
    Task.WaitAll(taskA, taskB);
}

什么是异步?

可以直接处理多个CPU核上的阻塞 I/O 和并发操作。它可以增加服务器处理的请求(不升级硬件的情况下),并且能够让线程充分的被利用起来,减少处于空闲中的线程。

.Net中的异步

.Net中支持三种种异步编程模式:异步编程模型(APM),基于事件的异步模式(EAP),基于任务的异步模式(TAP,微软官方建议使用这种模式)。 这里只解析最后一种(TAP),其他的见文档:docs.microsoft.com/zh-cn/dotne…

我们知道Task是创建一个任务队列,然后由线程池中的线程来执行。在C#5.0时,微软提供了新特性Async和Await来更方便的让开发人员像写同步代码一样编写异步代码。这里我们就来认识一下,它到底是怎么工作的以及源码解析。 异步包含的返回类型有:Task,Task,ValueTask(C#7.0),IAsyncEnumerable(C#8.0)。

  • ValueTask和Task有什么区别:
  1. ValueTask是值类型,Task是引用类型。
  2. Task作为引用类型,意味着需要操作堆栈,创建对象和分配对象,分配的对象越多,GC需要做的工作就越多,在它上面花费的资源就越多。
  3. ValueTask使用时不能进行多次的await,因为对象可能已经被回收,正在被另一个操作使用,所以你得到的结果和预期结果可能不一致。不要调用AsTask多次。不要对一个未完成的任务执行GetAwaiter()和GetResult()。
  4. ValueTask更适合完成近乎同步完成的异步任务。这样的话就不需要分配过多的资源,毕竟是值类型,不用在堆上分配资源
  • IAsyncEnumerable用法

对指定类型,提供异步的迭代器,用法如下:

static async Task Main(string[] args)
{
    await foreach(var item in TestFuncAsync())
    {
        Console.WriteLine(item);
    }
    Console.ReadKey();
}

static async IAsyncEnumerable<string> TestFuncAsync()
{
    var data = @"i am test
    	i am rnm
        i am very cool";

    using var strReader = new StringReader(data);
    var line = await strReader.ReadLineAsync();
    while(line != null)
    {
        foreach(var word in line.Split(" ",StringSplitOptions.RemoveEmptyEntries))
        {
            yield return word;
        }
        line = await strReader.ReadLineAsync();
    }
}

通过Task.Run创建任务的时候,实际上是在线程池上的任务队列中添加一个任务,如果设置了LongRunning,则会创建一个非线程池内的后台线程执行任务。

解析异步执行过程

static async Task Main(string[] args)
{
    var test = await CallBackF();
    Console.WriteLine(test);
    Console.ReadKey();
}

static async Task<string> CallBackF()
{
    return await TestFuncAsyncA();
}

static Task<string> TestFuncAsyncA()
{
    return Task.Run<string>(() =>
    {
        return "我是大神A";
    });
}

将上面代码反编译:

internal class Program
{
    [DebuggerStepThrough]
    private static void <Main>(string[] args)
    {
        Main(args).GetAwaiter().GetResult();
    }
    
    [AsyncStateMachine(typeof(<Main>d__0))]
    [DebuggerStepThrough]
    private static Task Main(string[] args)
    {
        <Main>d__0 stateMachine = new <Main>d__0();
        stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
        stateMachine.args = args;
        stateMachine.<>1__state = -1;
        stateMachine.<>t__builder.Start(ref stateMachine);
        return stateMachine.<>t__builder.Task;
    }

    private static async Task<string> CallBackF()
    {
        return await TestFuncAsyncA();
    }

    private static Task<string> TestFuncAsyncA()
    {
        return Task.Run(() => "我是大神A");
    }
    
    [CompilerGenerated]
    private sealed class <Main>d__0 : IAsyncStateMachine
    {
        public int <>1__state;

        public AsyncTaskMethodBuilder <>t__builder;

        public string[] args;

        private string <test>5__1;

        private string <>s__2;

        private TaskAwaiter<string> <>u__1;

        private void MoveNext()
        {
            int num = <>1__state;
            try
            {
                TaskAwaiter<string> awaiter;
                if (num != 0)
                {
                    awaiter = CallBackF().GetAwaiter();
                    if (!awaiter.IsCompleted)
                    {
                        num = (<>1__state = 0);
                        <>u__1 = awaiter;
                        <Main>d__0 stateMachine = this;
                        <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                        return;
                    }
                }
                else
                {
                    awaiter = <>u__1;
                    <>u__1 = default(TaskAwaiter<string>);
                    num = (<>1__state = -1);
                }
                <>s__2 = awaiter.GetResult();
                <test>5__1 = <>s__2;
                <>s__2 = null;
                Console.WriteLine(<test>5__1);
                Console.ReadKey();
            }
            catch (Exception exception)
            {
                <>1__state = -2;
                <test>5__1 = null;
                <>t__builder.SetException(exception);
                return;
            }
            <>1__state = -2;
            <test>5__1 = null;
            <>t__builder.SetResult();
        }

        void IAsyncStateMachine.MoveNext()
        {
            //ILSpy generated this explicit interface implementation from .override directive in MoveNext
            this.MoveNext();
        }

        [DebuggerHidden]
        private void SetStateMachine(IAsyncStateMachine stateMachine)
        {
        }

        void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
        {
            //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine
            this.SetStateMachine(stateMachine);
        }
    }
    
    [CompilerGenerated]
    private sealed class <CallBackF>d__1 : IAsyncStateMachine
    {
        public int <>1__state;

        public AsyncTaskMethodBuilder<string> <>t__builder;

        private string <>s__1;

        private TaskAwaiter<string> <>u__1;

        private void MoveNext()
        {
            int num = <>1__state;
            string result;
            try
            {
                TaskAwaiter<string> awaiter;
                if (num != 0)
                {
                    awaiter = TestFuncAsyncA().GetAwaiter();
                    if (!awaiter.IsCompleted)
                    {
                        num = (<>1__state = 0);
                        <>u__1 = awaiter;
                        <CallBackF>d__1 stateMachine = this;
                        <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                        return;
                    }
                }
                else
                {
                    awaiter = <>u__1;
                    <>u__1 = default(TaskAwaiter<string>);
                    num = (<>1__state = -1);
                }
                <>s__1 = awaiter.GetResult();
                result = <>s__1;
            }
            catch (Exception exception)
            {
                <>1__state = -2;
                <>t__builder.SetException(exception);
                return;
            }
            <>1__state = -2;
            <>t__builder.SetResult(result);
        }

        void IAsyncStateMachine.MoveNext()
        {
            //ILSpy generated this explicit interface implementation from .override directive in MoveNext
            this.MoveNext();
        }

        [DebuggerHidden]
        private void SetStateMachine(IAsyncStateMachine stateMachine)
        {
        }

        void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
        {
            //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine
            this.SetStateMachine(stateMachine);
        }
    }
}

通过反编译的代码可以看到,asyn task Main方法,最终被编译成一个状态机。初始状态为-1,(17行)内部开始执行编译后的MoveNext方法。首先会将一个异步方法编译成一个动态生成的类,获取等待Task完成的对象awaiter ,然后判断是否完成(IsCompleted可以防止多次await时重复执行),然后进入awaitUnsafeOnCompleted方法(当指定awaiter完成时,执行下一操作,这个操作就是再次调用MoveNext方法)。当再次进入MoveNext时,任务已经完成,执行GetResult获取数据结果,再执行后续操作代码。可以看出,实际上异步最后还是同步操作。其中num值主要是为了对一个异步方法中多个await做顺序操作处理,来查看当前是在执行第几个await方法(从0开始)。所以一个异步方法会被编译成一个状态机,但是里面的多个await只会在一个MoveNext中控制执行。

ps:我们获取异步结果时,最好使用GetAwaiter().GetResult()获取,不要使用.Result获取,因为后者会把内部抛出的异常封装成一个AggregateException返回出来,拿不到真正的异常。

作者:蜗牛看技术
链接:https://juejin.cn/post/7125604680637349919

给TA打赏
共{{data.count}}人
人已打赏
.NET

如何用AWS ECS Fargate部署一个.NET容器

2022-8-4 9:52:23

.NET

.NET性能优化-使用SourceGenerator-Logger记录日志

2022-8-8 13:12:49

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
今日签到
有新私信 私信列表
搜索