宣布 .NET 的速率限制

内容纲要

我们很高兴地宣布,作为 .NET 7 的一部分,内置的速率限制支持。速率限制提供了一种保护资源的方法,以避免使应用不堪重负,并将流量保持在安全级别。

什么是速率限制?

速率限制是限制可以访问的资源量的概念。例如,您知道应用程序访问的数据库每分钟可以安全地处理 1000 个请求,但不确定它是否能处理更多请求。您可以在应用程序中放置一个速率限制器,该限制器每分钟允许 1000 个请求,并在请求访问数据库之前拒绝任何更多请求。因此,速率限制了数据库,并允许应用程序处理安全数量的请求,而不会出现来自数据库的严重故障。

有多种不同的速率限制算法来控制请求流。我们将介绍其中的 4 个,这些内容将在 .NET 7 中提供。

并发限制

并发限制器限制可以访问资源的并发请求数。如果您的限制为 10,则一次可以访问一个资源 10 个请求,并且不允许第 11 个请求。请求完成后,允许的请求数增加到 1,当第二个请求完成时,该请求数增加到 2,依此类推。这是通过配置一个RateLimitLease来完成的,我们将在后面讨论。

令牌存储桶限制

令牌存储桶是一种算法,它的名称来源于描述其工作原理。想象一下,有一个桶装满了令牌。当请求传入时,它会获取一个令牌并永久保留它。经过一段时间后,有人将预先确定数量的令牌添加回存储桶,永远不会添加超过存储桶可以容纳的令牌。如果存储桶为空,则当请求传入时,将拒绝该请求访问该资源。

举一个更具体的例子,假设存储桶可以容纳 10 个令牌,每分钟有 2 个令牌添加到存储桶中。当一个请求进来时,它需要一个令牌,所以我们剩下9个,还有3个请求进来,每个请求都拿一个令牌,给我们留下6个令牌,一分钟后,我们得到2个新令牌,使我们处于8。8个请求进来并拿走剩余的令牌,留下0。如果另一个请求传入,则不允许访问资源,直到我们获得更多令牌,这每分钟都会发生一次。在没有请求的 5 分钟后,存储桶将再次拥有所有 10 个令牌,并且不会在随后的几分钟内添加更多令牌,除非请求需要更多令牌。

固定窗口限制

固定窗口算法使用窗口的概念,该概念也将在下一个算法中使用。窗口是我们在进入下一个窗口之前应用限制的时间量。在固定窗口的情况下,移动到下一个窗口意味着将限制重置回其起点。让我们想象一下,有一个电影院,有一个可容纳100人的单人房间,电影播放时间为2个小时。当电影开始时,我们让人们开始排队等待下一个放映,这将是2小时后,在我们开始告诉他们在其他时间回来之前,最多允许100人排队。一旦2小时的电影完成,0到100人的线就可以进入电影院,我们重新启动线。这与在固定窗口算法中移动窗口相同。

滑动窗口限制

滑动窗口算法类似于固定窗口算法,但添加了线段。一个段是窗口的一部分,如果我们采用前2个小时的窗口并将其拆分为4个段,我们现在有4个30分钟的段。还有一个当前区段索引,它将始终指向窗口中的最新区段。30 分钟内的请求将进入当前段,窗口每 30 分钟滑动一段。如果在分段期间有任何请求,窗口会滑过,这些请求现在会刷新,并且我们的限制会增加该数量。如果没有任何请求,我们的限制保持不变。

例如,让我们使用具有 3 个 10 分钟段和 100 个请求限制的滑动窗口算法。我们的初始状态是 3 个区段,所有区段计数均为 0,而我们当前的区段索引指向第 3 个区段。

在前 10 分钟内,我们收到 50 个请求,所有这些请求都在第 3 段(我们当前的段索引)中跟踪。10 分钟后,我们将窗口滑动 1 段,并将当前段索引移动到第 4 段。第 1 段中任何已使用的请求现在都已添加回我们的限制。由于没有限制,我们的限制是50(因为50已经在第三段中使用)。

在接下来的 10 分钟内,我们又收到了 20 个请求,因此现在第 3 段有 50 个请求,第 4 段有 20 个请求。同样,我们在 10 分钟后滑动窗口,因此我们当前的段索引指向 5,并且我们将来自段 2 的任何请求添加到我们的限制中。

10 分钟后,我们再次滑动窗口,这次当窗口滑动时,当前段索引位于 6,段 3(具有 50 个请求的段)现在位于窗口之外。因此,我们取回 50 个请求并将它们添加到我们的限制中,现在限制为 80,因为第 4 段仍有 20 个正在使用。

速率限制器 API

在 .NET 7 中引入新的 nuget 包 System.Threading.RateLimiting!

此软件包提供了用于写入速率限制器的基元,并提供了一些内置的常用算法。主要类型是抽象基类 。RateLimiter

public abstract class RateLimiter : IAsyncDisposable, IDisposable
{
    public abstract int GetAvailablePermits();
    public abstract TimeSpan? IdleDuration { get; }

    public RateLimitLease Acquire(int permitCount = 1);
    public ValueTask<RateLimitLease> WaitAsync(int permitCount = 1, CancellationToken cancellationToken = default);

    public void Dispose();
    public ValueTask DisposeAsync();
}

RateLimiter包含 和 作为尝试为受保护的资源获得许可的核心方法。根据应用程序的不同,受保护的资源可能需要获取 1 个以上的许可证,因此两者都接受可选参数。 是一种同步方法,它将检查是否有足够的许可证可用,并返回 a,其中包含有关您是否成功获得许可证的信息。 与它类似,只是它可以支持排队许可请求,这些请求可以在将来某个时候当许可证变为可用时取消排队,这就是为什么它是异步的,并且接受允许取消排队请求的可选选项。AcquireWaitAsyncAcquireWaitAsyncpermitCountAcquireRateLimitLeaseWaitAsyncAcquireCancellationToken

RateLimitLease有一个财产,用于查看是否获得了许可证。此外,可能包含元数据,例如如果租约失败,则建议的重试期(将在后面的示例中显示)。最后,是一次性的,应在使用受保护的资源完成代码时释放。处置将让知情者根据获得的许可证数量更新其限制。下面是使用 a 尝试获取具有 1 个许可证的资源的示例。IsAcquiredRateLimitLeaseRateLimitLeaseRateLimiterRateLimiter

RateLimiter limiter = GetLimiter();
using RateLimitLease lease = limiter.Acquire(permitCount: 1);
if (lease.IsAcquired)
{
    // Do action that is protected by limiter
}
else
{
    // Error handling or add retry logic
}

在上面的示例中,我们尝试使用同步方法获取 1 个许可证。我们还用于确保在完成资源处理后处置租约。然后检查租约以查看我们请求的许可证是否已获得,如果是,则我们可以使用受保护的资源,否则我们可能希望进行一些日志记录或错误处理,以通知用户或应用程序由于达到速率限制而未使用该资源。Acquireusing

尝试获取许可证的另一种方法是 。此方法允许对许可证进行排队,并等待许可证变为可用(如果许可证不可用)。让我们展示另一个示例来解释排队概念。WaitAsync

RateLimiter limiter = new ConcurrencyLimiter(
    new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2));

// thread 1:
using RateLimitLease lease = limiter.Acquire(permitCount: 2);
if (lease.IsAcquired) { }

// thread 2:
using RateLimitLease lease = await limiter.WaitAsync(permitCount: 2);
if (lease.IsAcquired) { }

在这里,我们展示了使用内置速率限制实现之一的第一个示例。我们创建的最大允许限制为 2,队列限制为 2 的限制。这意味着任何时候最多可以获得2个许可证,我们允许排队呼叫,总共最多有2个许可证请求。ConcurrencyLimiterWaitAsync

该参数确定队列中项目的处理顺序,它可以是 (FIFO) 或 (LIFO) 的值。需要注意的一个有趣的行为是,在队列已满时使用将完成最旧的排队调用,并失败,直到队列中有空间用于最新的队列项。queueProcessingOrderQueueProcessingOrder.OldestFirstQueueProcessingOrder.NewestFirstQueueProcessingOrder.NewestFirstWaitAsyncRateLimitLease

在此示例中,有 2 个线程尝试获取许可证。如果线程 1 首先运行,它将成功获取 2 个许可证,并且 in 线程 2 将排队等待 in 线程 1 被释放。此外,如果另一个线程尝试使用任何一个线程获取许可证,或者它将立即收到属性等于 false 的 a,因为 和 已经用完了。WaitAsyncRateLimitLeaseAcquireWaitAsyncRateLimitLeaseIsAcquiredpermitLimitqueueLimit

如果线程 2 首先运行,它将立即获得等于 true 的 a,并且当线程 1 接下来运行时(假设线程 2 中的租约尚未释放),它将同步获取属性等于 false 的 a,因为不会排队并且调用会用完。RateLimitLeaseIsAcquiredRateLimitLeaseIsAcquiredAcquirepermitLimitWaitAsync

到目前为止,我们已经看到了,我们在包装盒内提供了另外3个限制器。、和所有这些实现抽象类,抽象类本身实现 。 介绍了该方法以及用于观察限制器上常见设置的几个属性。 在展示这些速率限制器的一些示例后,将进行解释。ConcurrencyLimiterTokenBucketRateLimiterFixedWindowRateLimiterSlidingWindowRateLimiterReplenishingRateLimiterRateLimiterReplenishingRateLimiterTryReplenishTryReplenish

RateLimiter limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,
    queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));

using RateLimitLease lease = await limiter.WaitAsync(5);

// will complete after ~5 seconds
using RateLimitLease lease2 = await limiter.WaitAsync();

在这里,我们显示 ,它比 .这是新令牌(与许可证的概念相同,只是在令牌存储桶的上下文中有一个更好的名称)被添加回限制的频率。在此示例中,值为 1,为 5 秒,因此每 5 秒将 1 个令牌添加回最大值 5。最后,设置为 true,这意味着限制器将在内部创建一个以处理每 5 秒补充一次令牌。TokenBucketRateLimiterConcurrencyLimiterreplenishmentPeriodtokensPerPeriodreplenishmentPeriodtokenLimitautoReplenishmentTimer

如果设置为 false,则由开发人员调用限制器。当管理多个实例并希望通过创建单个实例并自行管理补充调用来降低开销时,这很有用,而不是让每个限制器创建一个 .autoReplenishmentTryReplenishReplenishingRateLimiterTimerTimer

ReplenishingRateLimiter[] limiters = GetLimiters();
Timer rateLimitTimer = new Timer(static state =>
{
    var replenishingLimiters = (ReplenishingRateLimiter[])state;
    foreach (var limiter in replenishingLimiters)
    {
        limiter.TryReplenish();
    }
}, limiters, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));

FixedWindowRateLimiter有一个选项,用于定义窗口更新所需的时间。window

new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions(permitLimit: 2,
    queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 1, window: TimeSpan.FromSeconds(10), autoReplenishment: true));

并且还有一个选项,除了该选项之外,还可以指定有多少段以及窗口滑动的频率。SlidingWindowRateLimitersegmentsPerWindowwindow

new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions(permitLimit: 2,
    queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 1, window: TimeSpan.FromSeconds(10), segmentsPerWindow: 5, autoReplenishment: true));

回到前面提到的元数据,让我们展示一个元数据可能有用的示例。

class RateLimitedHandler : DelegatingHandler
{
    private readonly RateLimiter _rateLimiter;

    public RateLimitedHandler(RateLimiter limiter) : base(new HttpClientHandler())
    {
        _rateLimiter = limiter;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        using RateLimitLease lease = await _rateLimiter.WaitAsync(1, cancellationToken);
        if (lease.IsAcquired)
        {
            return await base.SendAsync(request, cancellationToken);
        }
        var response = new HttpResponseMessage(System.Net.HttpStatusCode.TooManyRequests);
        if (lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
        {
            response.Headers.Add(HeaderNames.RetryAfter, ((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo));
        }
        return response;
    }
}

RateLimiter limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,
    queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));;
HttpClient client = new HttpClient(new RateLimitedHandler(limiter));
await client.GetAsync("https://example.com");

在此示例中,我们限制了速率,如果未能获得请求的许可,我们希望返回具有429状态代码(太多请求)的失败http请求,而不是向我们的下游资源发出HTTP请求。此外,429 个响应可以包含“重试后”标头,让使用者知道重试何时可能成功。我们通过查找有关 using 和 的元数据来实现此目的。我们还使用,因为它能够计算请求的令牌数量何时可用的估计值,因为它知道它补充令牌的频率。而没有办法知道许可证何时可用,因此它不会提供任何元数据。HttpClientRateLimitLeaseTryGetMetadataMetadataName.RetryAfterTokenBucketRateLimiterConcurrencyLimiterRetryAfter

MetadataName是一个静态类,它提供了几个预先创建的实例,即我们刚刚看到的实例,它的类型为 , 和 ,它被类型为 。还有一个静态方法可用于创建自己的强类型命名元数据键。 有 2 个重载,一个用于具有参数的强类型,另一个接受元数据名称的字符串并具有参数。MetadataName<T>MetadataName.RetryAfterMetadataName<TimeSpan>MetadataName.ReasonPhraseMetadataName<string>MetadataName.Create<T>(string name)RateLimitLease.TryGetMetadataMetadataName<T>out Tout object

现在让我们看一下正在引入的另一个 API,以帮助处理更复杂的场景,即 !PartitionedRateLimiter

分区速率限制器

也包含在 System.Threading.RateLimiting nuget 包中。这是一个与类非常相似的抽象,只是它接受一个实例作为其上的方法的参数。例如现在:.这对于您可能希望根据传入的速率更改速率限制行为的情况非常有用。这可以是针对不同 s 的独立并发限制,也可以是更复杂的方案,例如将 X 和 Y 分组到相同的并发限制下,但将 W 和 Z 分组到令牌存储桶限制下。PartitionedRateLimiter<TResource>RateLimiterTResourceAcquireAcquire(TResource resourceID, int permitCount = 1)TResourceTResource

为了帮助解决常见用法,我们提供了一种构造 via .PartitionedRateLimiter<TResource>PartitionedRateLimiter.Create<TResource, TPartitionKey>(...)

enum MyPolicyEnum
{
    One,
    Two,
    Admin,
    Default
}

PartitionedRateLimiter<string> limiter = PartitionedRateLimiter.Create<string, MyPolicyEnum>(resource =>
{
    if (resource == "Policy1")
    {
        return RateLimitPartition.Create(MyPolicyEnum.One, key => new MyCustomLimiter());
    }
    else if (resource == "Policy2")
    {
        return RateLimitPartition.CreateConcurrencyLimiter(MyPolicyEnum.Two, key =>
            new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2));
    }
    else if (resource == "Admin")
    {
        return RateLimitPartition.CreateNoLimiter(MyPolicyEnum.Admin);
    }
    else
    {
        return RateLimitPartition.CreateTokenBucketLimiter(MyPolicyEnum.Default, key =>
            new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,
                queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));
    }
});
RateLimitLease lease = limiter.Acquire(resourceID: "Policy1", permitCount: 1);

// ...

RateLimitLease lease = limiter.Acquire(resourceID: "Policy2", permitCount: 1);

// ...

RateLimitLease lease = limiter.Acquire(resourceID: "Admin", permitCount: 12345678);

// ...

RateLimitLease lease = limiter.Acquire(resourceID: "other value", permitCount: 1);

PartitionedRateLimiter.Create有 2 个泛型类型参数,第一个表示资源类型,该资源类型也将是返回的 。第二个泛型类型是分区键类型,在上面的示例中,我们将其用作键类型。该键用于区分具有相同限制器的一组实例,这就是我们所说的分区。 接受一个我们称之为分区器的分区程序。每次与 via 或 交互时都会调用此函数,并且从函数返回 a。 包含一个方法,该方法表示用户如何指定分区将具有的标识符以及将与该标识符关联的限制器。TResourcePartitionedRateLimiter<TResource>MyPolicyEnumTResourcePartitionedRateLimiter.CreateFunc<TResource, RateLimitPartition<TPartitionKey>>PartitionedRateLimiterAcquireWaitAsyncRateLimitPartition<TKey>RateLimitPartition<TKey>Create

在上面的第一个代码块中,我们正在检查资源是否与“Policy1”相等,如果它们匹配,我们将创建一个带有键的分区,并返回一个用于创建自定义的工厂。工厂被调用一次,然后缓存速率限制器,以便将来对密钥的访问将使用相同的速率限制器实例。MyPolicyEnum.OneRateLimiterMyPolicyEnum.One

查看第一个条件,当资源等于“Policy2”时,我们同样创建一个分区,这次我们使用方便的方法创建一个.我们对此分区使用新的分区键 ,并为将要生成的 指定的选项。现在,每个 或 对于“Policy2”都将使用相同的实例。else ifCreateConcurrencyLimiterConcurrencyLimiterMyPolicyEnum.TwoConcurrencyLimiterAcquireWaitAsyncConcurrencyLimiter

我们的第三个条件是我们的“管理员”资源,我们不想限制我们的管理员,所以我们使用不会应用任何限制。我们还为此分区分配分区键。CreateNoLimiterMyPolicyEnum.Admin

最后,我们有一个回退,让所有其他资源使用实例,并将 的键分配给此分区。对我们的条件未涵盖的资源的任何请求都将使用此 .通常,最好使用非 noop 回退限制器,以防将来未涵盖所有条件或向应用程序添加新行为。TokenBucketLimiterMyPolicyEnum.DefaultifTokenBucketLimiter

在下一个示例中,让我们将 与前面的自定义组合。我们将用作 的资源类型,这是我们在 的方法中获得的类型。以及一个用于我们的分区键,因为我们将基于url路径进行分区。PartitionedRateLimiterHttpClientHttpRequestMessagePartitionedRateLimiterSendAsyncDelegatingHandlerstring

PartitionedRateLimiter<HttpRequestMessage> limiter = PartitionedRateLimiter.Create<HttpRequestMessage, string>(resource =>
{
    if (resource.RequestUri?.IsLoopback)
    {
        return RateLimitPartition.CreateNoLimiter("loopback");
    }

    string[]? segments = resource.RequestUri?.Segments;
    if (segments?.Length >= 2 && segments[1] == "api/")
    {
        // segments will be [] { "/", "api/", "next_path_segment", etc.. }
        return RateLimitPartition.CreateConcurrencyLimiter(segments[2].Trim('/'), key =>
            new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2));
    }

    return RateLimitPartition.Create("default", key => new MyCustomLimiter());
});

class RateLimitedHandler : DelegatingHandler
{
    private readonly PartitionedRateLimiter<HttpRequestMessage> _rateLimiter;

    public RateLimitedHandler(PartitionedRateLimiter<HttpRequestMessage> limiter) : base(new HttpClientHandler())
    {
        _rateLimiter = limiter;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        using RateLimitLease lease = await _rateLimiter.WaitAsync(request, 1, cancellationToken);
        if (lease.IsAcquired)
        {
            return await base.SendAsync(request, cancellationToken);
        }
        var response = new HttpResponseMessage(System.Net.HttpStatusCode.TooManyRequests);
        if (lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
        {
            response.Headers.Add(HeaderNames.RetryAfter, ((int)retryAfter.TotalSeconds).ToString(NumberFormatInfo.InvariantInfo));
        }
        return response;
    }
}

仔细看看上面的例子,我们的第一个检查是针对localhost的,我们已经决定,如果用户在本地做事,我们不想限制他们,他们不会使用我们试图保护的上游资源。下一个检查更有趣,我们正在查看url路径并查找对端点的任何请求。如果请求匹配,我们获取路径的一部分,并为该特定路径创建一个分区。这意味着任何请求都将使用我们的一个实例,而任何请求都将使用我们的.这是因为我们对这些请求使用不同的分区键,因此我们的限制器工厂为不同的分区生成新的限制器。最后,我们对任何不是针对本地主机或终结点的请求都有回退限制。PartitionedRateLimiter/api/<something><something>/api/apple/*ConcurrencyLimiter/api/orange/*ConcurrencyLimiter/api/*

还显示了更新的,它现在接受a而不是a并传入调用,否则代码的其余部分保持不变。RateLimitedHandlerPartitionedRateLimiter<HttpRequestMessage>RateLimiterrequestWaitAsync

在此示例中,有几件事值得指出。如果发出大量唯一请求,我们可能会创建许多分区,这将导致我们的.返回的确实有一些逻辑,可以在限制器有一段时间未使用时将其删除,以帮助缓解这种情况,但应用程序开发人员也应该意识到创建无限分区,并尽可能避免这种情况。此外,对于分区键,调用是避免在使用的情况下使用不同的限制器,因为这些限制器在使用时会产生不同的段。/api/*PartitionedRateLimiterPartitionedRateLimiterPartitionedRateLimiter.Createsegments[2].Trim('/')Trim/api/apple/api/apple/Uri.Segments

也可以在不使用该方法的情况下编写自定义实现。下面是为每个资源使用并发限制的自定义实现的示例。所以资源有自己的极限,有自己的极限,等等。这具有更灵活和更有效的优点,但代价是维护成本更高。PartitionedRateLimiter<T>PartitionedRateLimiter.Createint12

public sealed class PartitionedConcurrencyLimiter : PartitionedRateLimiter<int>
{
    private ConcurrentDictionary<int, int> _keyLimits = new();
    private int _permitLimit;

    private static readonly RateLimitLease FailedLease = new Lease(null, 0, 0);

    public PartitionedConcurrencyLimiter(int permitLimit)
    {
        _permitLimit = permitLimit;
    }

    public override int GetAvailablePermits(int resourceID)
    {
        if (_keyLimits.TryGetValue(resourceID, out int value))
        {
            return value;
        }
        return 0;
    }

    protected override RateLimitLease AcquireCore(int resourceID, int permitCount)
    {
        if (_permitLimit < permitCount)
        {
            return FailedLease;
        }

        bool wasUpdated = false;
        _keyLimits.AddOrUpdate(resourceID, (key) =>
        {
            wasUpdated = true;
            return _permitLimit - permitCount;
        }, (key, currentValue) =>
        {
            if (currentValue >= permitCount)
            {
                wasUpdated = true;
                currentValue -= permitCount;
            }
            return currentValue;
        });

        if (wasUpdated)
        {
            return new Lease(this, resourceID, permitCount);
        }
        return FailedLease;
    }

    protected override ValueTask<RateLimitLease> WaitAsyncCore(int resourceID, int permitCount, CancellationToken cancellationToken)
    {
        return new ValueTask<RateLimitLease>(AcquireCore(resourceID, permitCount));
    }

    private void Release(int resourceID, int permitCount)
    {
        _keyLimits.AddOrUpdate(resourceID, _permitLimit, (key, currentValue) =>
        {
            currentValue += permitCount;
            return currentValue;
        });
    }

    private sealed class Lease : RateLimitLease
    {
        private readonly int _permitCount;
        private readonly int _resourceId;
        private PartitionedConcurrencyLimiter? _limiter;

        public Lease(PartitionedConcurrencyLimiter? limiter, int resourceId, int permitCount)
        {
            _limiter = limiter;
            _resourceId = resourceId;
            _permitCount = permitCount;
        }

        public override bool IsAcquired => _limiter is not null;

        public override IEnumerable<string> MetadataNames => throw new NotImplementedException();

        public override bool TryGetMetadata(string metadataName, out object? metadata)
        {
            throw new NotImplementedException();
        }

        protected override void Dispose(bool disposing)
        {
            if (_limiter is null)
            {
                return;
            }

            _limiter.Release(_resourceId, _permitCount);
            _limiter = null;
        }
    }
}

PartitionedRateLimiter<int> limiter = new PartitionedConcurrencyLimiter(permitLimit: 10);
// both will be successful acquisitions as they use different resource IDs
RateLimitLease lease = limiter.Acquire(resourceID: 1, permitCount: 10);
RateLimitLease lease2 = limiter.Acquire(resourceID: 2, permitCount: 7);

此实现确实存在一些问题,例如从不删除字典中的条目,不支持队列以及在访问元数据时抛出,因此请将其用作实现自定义的灵感,并且不要在没有修改的情况下复制到代码中。PartitionedRateLimiter<T>

现在我们已经了解了主要的 API,让我们来看看 ASP.NET 核心中利用这些基元的速率限制中间件。

速率限制中间件

此中间件通过 Microsoft.AspNetCore.RateLimiting NuGet 包提供。主要使用模式是配置一些速率限制策略,然后将这些策略附加到终端节点。策略是命名的 ,它与方法采用的内容相同,现在在哪里,并且仍然是用户定义的键。当您想要为策略配置单个限制器而不需要不同的分区时,4 个内置速率限制器也有扩展方法。Func<HttpContext, RateLimitPartition<TPartitionKey>>PartitionedRateLimiter.CreateTResourceHttpContextTPartitionKey

var app = WebApplication.Create(args);

app.UseRateLimiter(new RateLimiterOptions()
    .AddConcurrencyLimiter(policyName: "get", new ConcurrencyLimiterOptions(permitLimit: 2, queueProcessingOrder: QueueProcessingOrder.OldestFirst, queueLimit: 2))
    .AddNoLimiter(policyName: "admin")
    .AddPolicy(policyName: "post", partitioner: httpContext =>
    {
        if (!StringValues.IsNullOrEmpty(httpContext.Request.Headers["token"]))
        {
            return RateLimitPartition.CreateTokenBucketLimiter("token", key =>
                new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,
                    queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));
        }
        else
        {
            return RateLimitPartition.Create("default", key => new MyCustomLimiter());
        }
    }));

app.MapGet("/get", context => context.Response.WriteAsync("get")).RequireRateLimiting("get");

app.MapGet("/admin", context => context.Response.WriteAsync("admin")).RequireRateLimiting("admin").RequireAuthorization("admin");

app.MapPost("/post", context => context.Response.WriteAsync("post")).RequireRateLimiting("post");

app.Run();

此示例演示如何添加中间件、配置某些策略以及将不同的策略应用于不同的终结点。从顶部开始,我们使用 将中间件添加到中间件管道中。接下来,我们使用便利方法将一些策略添加到我们的选项中,并分别为其中 2 个策略添加一些策略,分别命名为 和。然后,我们使用允许根据传入的资源配置不同分区的方法(对于中间件)。最后,我们在各个端点上使用该方法,让速率限制中间件知道在哪个端点上运行什么策略。(请注意,在此最小示例中,终结点上的用法不会执行任何操作,假设已配置身份验证和授权)UseRateLimiterAddConcurrencyLimiterAddNoLimiter"get""admin"AddPolicyHttpContextRequireRateLimitingRequireAuthorization/admin

该方法还有 2 个使用 的重载。此接口公开一个回调,与我在下面描述的回调相同,以及一个将 作为参数并返回 .第一个重载 采用 的实例,第二个重载将 的实现作为泛型参数。泛型参数将使用依赖关系注入来调用构造函数并为您实例化。AddPolicyIRateLimiterPolicy<TPartitionKey>OnRejectedRateLimiterOptionsGetPartitionHttpContextRateLimitPartition<TPartitionKey>AddPolicyIRateLimiterPolicyIRateLimiterPolicyIRateLimiterPolicy

public class CustomRateLimiterPolicy<string> : IRateLimiterPolicy<string>
{
    private readonly ILogger _logger;

    public CustomRateLimiterPolicy(ILogger<CustomRateLimiterPolicy<string>> logger)
    {
        _logger = logger;
    }

    public Func<OnRejectedContext, CancellationToken, ValueTask>? OnRejected
    {
        get => (context, lease) =>
        {
            context.HttpContext.Response.StatusCode = 429;
            _logger.LogDebug("Request rejected");
            return new ValueTask();
        };
    }

    public RateLimitPartition<string> GetPartition(HttpContext context)
    {
        if (!StringValues.IsNullOrEmpty(httpContext.Request.Headers["token"]))
        {
            return RateLimitPartition.CreateTokenBucketLimiter("token", key =>
                new TokenBucketRateLimiterOptions(tokenLimit: 5, queueProcessingOrder: QueueProcessingOrder.OldestFirst,
                    queueLimit: 1, replenishmentPeriod: TimeSpan.FromSeconds(5), tokensPerPeriod: 1, autoReplenishment: true));
        }
        else
        {
            return RateLimitPartition.Create("default", key => new MyCustomLimiter());
        }
    }
}

var app = WebApplication.Create(args);
var logger = app.Services.GetRequiredService<ILogger<CustomRateLimiterPolicy<string>>>();

app.UseRateLimiter(new RateLimitOptions()
    .AddPolicy("a", new CustomRateLimiterPolicy<string>(logger))
    .AddPolicy<CustomRateLimiterPolicy<string>>("b"));

其他配置 包括 这是在无法获取租约时将返回的状态代码,默认情况下返回 503。对于更高级的用法,还有一个函数,该函数将在使用后被调用并作为参数接收。RateLimiterOptionsRejectionStatusCodeOnRejectedRejectionStatusCodeOnRejectedContext

new RateLimiterOptions()
{
    OnRejected = (context, cancellationToken) =>
    {
        context.HttpContext.StatusCode = StatusCodes.Status429TooManyRequests;
        return new ValueTask();
    }
};

最后但并非最不重要的一点是,允许通过 配置全局。如果提供了 ,它将在终结点上指定的任何策略之前运行。例如,如果要将应用程序限制为处理 1000 个并发请求(无论指定了什么终结点策略),则可以使用这些设置配置 a 并设置属性。RateLimiterOptionsPartitionedRateLimiter<HttpContext>RateLimiterOptions.GlobalLimiterGlobalLimiterPartitionedRateLimiterGlobalLimiter

总结

请尝试速率限制,并让我们知道您的想法!对于 System.Threading.RateLimiting 命名空间中的 RateLimiting API,请使用 nuget 包 System.Threading.RateLimiting,并在 Runtime GitHub 存储库中提供反馈。对于 RateLimiting 中间件,请使用 nuget 包 Microsoft.AspNetCore.RateLimiting,并在 AspNetCore GitHub 存储库中提供反馈。

给TA打赏
共{{data.count}}人
人已打赏
.NET.Net Core

.NET Framework, .NET Core 和.NET Standard的区别和联系

2022-7-22 19:05:03

.NET

自定义EventSource(一)EventCounter

2022-7-25 11:19:23

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
今日签到
有新私信 私信列表
搜索