【.NET Core项目实战-统一认证平台】开篇及目录索引
上篇文章我介绍了如何在网关上增加自定义客户端授权功能,从设计到编码实现,一步一步详细讲解,相信大家也掌握了自定义中间件的开发技巧了,本篇我们将介绍如何实现自定义客户端的限流功能,来进一步完善网关的基础功能。
.netcore项目实战交流群(637326624),有兴趣的朋友可以在群里交流讨论。
一、功能描述
限流就是为了保证网关在高并发或瞬时并发时,在服务能承受范围内,牺牲部分请求为代价,保证系统的整体可用性而做的安全策略,避免单个服务影响整体网关的服务能力。
比如网关有商品查询接口 ,能接受的极限请求是每秒100次查询,如果此时不限流,可能因为瞬时请求太大,造成服务卡死或崩溃的情况,这种情况可以使用Ocelot客户端全局限流即可满足需求,现在又有一个需求,我需要把接口开放给A公司,他们也要查询这个商品接口,这时A公司请求频率也是我们设置的每秒100次请求,显然我们不希望A公司有这么高的请求频率,我只会给A公司最大每秒一次的请求,那怎么实现呢?这时我们就无法通过Ocelot配置限流来进行自定义控制了,这块就需要我们增加自定义限流管道来实现功能。
下面我们就该功能如何实现展开讲解,希望大家先理解下功能需求,然后在延伸到具体实现。
二、数据库设计
限流这块设计表结构和关系如下。
主要有限流规则表、路由限流规则表、限流组表、限流组策略表、客户端授权限流组表、客户端白名单表组成,设计思想就是客户端请求时先检查是否在白名单,如果白名单不存在,就检查是否在限流组里,如果在限流组里校验限流的规则是什么,然后比对这个规则和当前请求次数看是否能够继续访问,如果超过限流策略直接返回429状态,否则路由到下端请求。
梳理下后发现流程不是很复杂,最起码实现的思路非常清晰,然后我们就运用上篇自定义授权中间件的方式来开发我们第二个中间件,自定义限流中间件。
三、功能实现
1、功能开启配置
网关应该支持自定义客户端限流中间件是否启用,因为一些小型项目是不需要对每个客户端进行单独限流的,中型和大型项目才有可能遇到自定义配置情况,所以我们需要在配置文件增加配置选项。在AhphOcelotConfiguration.cs配置类中增加属性,默认不开启。
/// 
/// 金焰的世界
/// 2018-11-18
/// 是否开启自定义限流,默认不开启
/// 
public bool ClientRateLimit { get; set; } = false;
/// 
/// 金焰的世界
/// 2018-11-18
/// 客户端限流缓存时间,默认30分钟
/// 
public int ClientRateLimitCacheTime { get; set; } = 1800;
那我们如何把自定义的限流增加到网关流程里呢?这块我们就需要订制自己的限流中间件。
2、实现客户端限流中间件
首先我们定义一个自定义限流中间件AhphClientRateLimitMiddleware,需要继承OcelotMiddleware,然后我们要实现Invoke方法,详细代码如下。
using Ctr.AhphOcelot.Configuration;
using Ctr.AhphOcelot.Errors;
using Ocelot.Logging;
using Ocelot.Middleware;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Ctr.AhphOcelot.RateLimit.Middleware
{
    /// 
    /// 金焰的世界
    /// 2018-11-18
    /// 自定义客户端限流中间件
    /// 
    public class AhphClientRateLimitMiddleware : OcelotMiddleware
    {
        private readonly IClientRateLimitProcessor _clientRateLimitProcessor;
        private readonly OcelotRequestDelegate _next;
        private readonly AhphOcelotConfiguration _options;
        public AhphClientRateLimitMiddleware(OcelotRequestDelegate next,
            IOcelotLoggerFactory loggerFactory,
            IClientRateLimitProcessor clientRateLimitProcessor,
            AhphOcelotConfiguration options)
            : base(loggerFactory.CreateLogger
())
        {
            _next = next;
            _clientRateLimitProcessor = clientRateLimitProcessor;
            _options = options;
        }
        public async Task Invoke(DownstreamContext context)
        {
            var clientId = "client_cjy"; //使用默认的客户端
            if (!context.IsError)
            {
                if (!_options.ClientRateLimit)
                {
                    Logger.LogInformation($"未启用客户端限流中间件");
                    await _next.Invoke(context);
                }
                else
                {
                    //非认证的渠道
                    if (!context.DownstreamReRoute.IsAuthenticated)
                    {
                        if (context.HttpContext.Request.Headers.Keys.Contains(_options.ClientKey))
                        {
                            clientId = context.HttpContext.Request.Headers[_options.ClientKey].First();
                        }
                    }
                    else
                    {//认证过的渠道,从Claim中提取
                        var clientClaim = context.HttpContext.User.Claims.FirstOrDefault(p => p.Type == _options.ClientKey);
                        if (!string.IsNullOrEmpty(clientClaim?.Value))
                        {
                            clientId = clientClaim?.Value;
                        }
                    }
                    //路由地址
                    var path = context.DownstreamReRoute.UpstreamPathTemplate.OriginalValue;
                    //1、校验路由是否有限流策略
                    //2、校验客户端是否被限流了
                    //3、校验客户端是否启动白名单
                    //4、校验是否触发限流及计数
                    if (await _clientRateLimitProcessor.CheckClientRateLimitResultAsync(clientId, path))
                    {
                        await _next.Invoke(context);
                    }
                    else
                    {
                        var error = new RateLimitOptionsError($"请求路由 {context.HttpContext.Request.Path}触发限流策略");
                        Logger.LogWarning($"路由地址 {context.HttpContext.Request.Path} 触发限流策略. {error}");
                        SetPipelineError(context, error);
                    }
                }
            }
            else
            {
                await _next.Invoke(context);
            }
        }
    }
}
首先我们来分析下我们的代码,为了知道是哪个客户端请求了我们网关,需要提取clientId,分别从无需授权接口和需要授权接口两个方式提取,如果提取不到值直接给定默认值,放到全局限流里,防止绕过限流策略。然后根据客户端通过4步检验下是否允许访问(后面会介绍这4步怎么实现),如果满足限流策略直接返回限流错误提醒。
有了这个中间件,那么如何添加到Ocelot的管道里呢?上一篇介绍的非常详细,这篇我就不介绍了,自定义限流中间件扩展AhphClientRateLimitMiddlewareExtensions,代码如下。
using Ocelot.Middleware.Pipeline;
using System;
using System.Collections.Generic;
using System.Text;
namespace Ctr.AhphOcelot.RateLimit.Middleware
{
    /// 
    /// 金焰的世界
    /// 2018-11-18
    /// 限流中间件扩展
    /// 
    public static class AhphClientRateLimitMiddlewareExtensions
    {
        public static IOcelotPipelineBuilder UseAhphAuthenticationMiddleware(this IOcelotPipelineBuilder builder)
        {
            return builder.UseMiddleware();
        }
    }
}
有了这个中间件扩展后,我们就在管道的合适地方加入我们自定义的中间件。我们添加我们自定义的管道扩展OcelotPipelineExtensions,然后把自定义限流中间件加入到认证之后。
//添加自定义限流中间件 2018-11-18 金焰的世界
builder.UseAhphClientRateLimitMiddleware();
现在我们完成了网关的扩展和应用,是时候把定义的IClientRateLimitProcessor接口实现了 ,是不是感觉做一个中间件很简单呢?而且每一步都是层层关联,只要一步一步按照自己的想法往下写就能实现。
3、结合数据库实现校验及缓存
首先我们新建AhphClientRateLimitProcessor类来实现接口,中间增加必要的缓存和业务逻辑,详细代码如下。
using Ctr.AhphOcelot.Configuration;
using Ocelot.Cache;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Ctr.AhphOcelot.RateLimit
{
    /// 
    /// 金焰的世界
    /// 2018-11-19
    /// 实现客户端限流处理器
    /// 
    public class AhphClientRateLimitProcessor : IClientRateLimitProcessor
    {
        private readonly AhphOcelotConfiguration _options;
        private readonly IOcelotCache _ocelotCache;
        private readonly IOcelotCache _rateLimitRuleCache;
        private readonly IOcelotCache _clientRateLimitCounter;
        private readonly IClientRateLimitRepository _clientRateLimitRepository;
        private static readonly object _processLocker = new object();
        public AhphClientRateLimitProcessor(AhphOcelotConfiguration options,IClientRateLimitRepository clientRateLimitRepository, IOcelotCache clientRateLimitCounter, IOcelotCache ocelotCache, IOcelotCache rateLimitRuleCache)
        {
            _options = options;
            _clientRateLimitRepository = clientRateLimitRepository;
            _clientRateLimitCounter = clientRateLimitCounter;
            _ocelotCache = ocelotCache;
            _rateLimitRuleCache = rateLimitRuleCache;
        }
        /// 
        /// 校验客户端限流结果
        /// 
        /// 客户端ID
        /// 请求地址
        /// 
        public async Task CheckClientRateLimitResultAsync(string clientid, string path)
        {
            var result = false;
            var clientRule = new List();
            //1、校验路由是否有限流策略
            result = !await CheckReRouteRuleAsync(path);
            if (!result)
            {//2、校验客户端是否被限流了
                var limitResult = await CheckClientRateLimitAsync(clientid, path);
                result = !limitResult.RateLimit;
                clientRule = limitResult.rateLimitOptions;
            }
            if (!result)
            {//3、校验客户端是否启动白名单
                result = await CheckClientReRouteWhiteListAsync(clientid, path);
            }
            if (!result)
            {//4、校验是否触发限流及计数
                result = CheckRateLimitResult(clientRule);
            }
            return result;
        }
        /// 
        /// 检验是否启用限流规则
        /// 
        /// 请求地址
        /// 
        private async Task CheckReRouteRuleAsync(string path)
        {
            var region = _options.RedisKeyPrefix + "CheckReRouteRuleAsync";
            var key = region + path;
            var cacheResult = _ocelotCache.Get(key, region);
            if (cacheResult != null)
            {//提取缓存数据
                return cacheResult.Role;
            }
            else
            {//重新获取限流策略
                var result = await _clientRateLimitRepository.CheckReRouteRuleAsync(path);
                _ocelotCache.Add(key, new ClientRoleModel() { CacheTime = DateTime.Now, Role = result }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region);
                return result;
            }
            
        }
        /// 
        /// 校验客户端限流规则
        /// 
        /// 客户端ID
        /// 请求地址
        /// 
        private async Task<(bool RateLimit, List rateLimitOptions)> CheckClientRateLimitAsync(string clientid, string path)
        {
            var region = _options.RedisKeyPrefix + "CheckClientRateLimitAsync";
            var key = region + clientid + path;
            var cacheResult = _rateLimitRuleCache.Get(key, region);
            if (cacheResult != null)
            {//提取缓存数据
                return (cacheResult.RateLimit, cacheResult.rateLimitOptions);
            }
            else
            {//重新获取限流策略
                var result = await _clientRateLimitRepository.CheckClientRateLimitAsync(clientid, path);
                _rateLimitRuleCache.Add(key, new RateLimitRuleModel() { RateLimit=result.RateLimit, rateLimitOptions=result.rateLimitOptions }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region);
                return result;
            }
        }
        /// 
        /// 校验是否设置了路由白名单
        /// 
        /// 客户端ID
        /// 请求地址
        /// 
        private async Task CheckClientReRouteWhiteListAsync(string clientid, string path)
        {
            var region = _options.RedisKeyPrefix + "CheckClientReRouteWhiteListAsync";
            var key = region +clientid+ path;
            var cacheResult = _ocelotCache.Get(key, region);
            if (cacheResult != null)
            {//提取缓存数据
                return cacheResult.Role;
            }
            else
            {//重新获取限流策略
                var result = await _clientRateLimitRepository.CheckClientReRouteWhiteListAsync(clientid,path);
                _ocelotCache.Add(key, new ClientRoleModel() { CacheTime = DateTime.Now, Role = result }, TimeSpan.FromSeconds(_options.ClientRateLimitCacheTime), region);
                return result;
            }
        }
        /// 
        /// 校验完整的限流规则
        /// 
        /// 限流配置
        /// 
        private bool CheckRateLimitResult(List rateLimitOptions)
        {
            bool result = true;
            if (rateLimitOptions != null && rateLimitOptions.Count > 0)
            {//校验策略
                foreach (var op in rateLimitOptions)
                {
                    AhphClientRateLimitCounter counter = new AhphClientRateLimitCounter(DateTime.UtcNow, 1);
                    //分别对每个策略校验
                    var enablePrefix = _options.RedisKeyPrefix + "RateLimitRule";
                    var key = AhphOcelotHelper.ComputeCounterKey(enablePrefix, op.ClientId, op.Period, op.RateLimitPath);
                    var periodTimestamp = AhphOcelotHelper.ConvertToSecond(op.Period);
                    lock (_processLocker)
                    {
                        var rateLimitCounter = _clientRateLimitCounter.Get(key, enablePrefix);
                        if (rateLimitCounter.HasValue)
                        {//提取当前的计数情况
                         // 请求次数增长
                            var totalRequests = rateLimitCounter.Value.TotalRequests + 1;
                            // 深拷贝
                            counter = new AhphClientRateLimitCounter(rateLimitCounter.Value.Timestamp, totalRequests);
                        }
                        else
                        {//写入限流策略
                            _clientRateLimitCounter.Add(key, counter,TimeSpan.FromSeconds(periodTimestamp), enablePrefix);
                        }
                    }
                    if (counter.TotalRequests > op.Limit)
                    {//更新请求记录,并标记为失败
                        result = false;
                    }
                    if (counter.TotalRequests > 1 && counter.TotalRequests <= op.Limit)
                    {//更新缓存配置信息
                        //获取限流剩余时间
                        var cur = (int)(counter.Timestamp.AddSeconds(periodTimestamp) - DateTime.UtcNow).TotalSeconds;
                        _clientRateLimitCounter.Add(key, counter, TimeSpan.FromSeconds(cur), enablePrefix);
                    }
                }
            }
            return result;
        }
    }
}
我们来分析下这块代码,里面涉及了限流的提取和实现规则,首先我们注入了数据库实体接口和缓存信息,实现步骤是参照之前的流程。
主要流程如下:
1、路由是否启用限流,如果未启