asp.net core mcroservices 架构之 分布式日志(三):集成kafka

一 kafka介绍 kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了。硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快。kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性。最熟悉的消息队列框架有ActiveMQ 和 RabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列的一些瓶颈,比如并发,rabbitMQ在多个处理程序下,并不能保证执行顺序,还是必须自己去处理独占,而kafka使用consumer group的方式,实现了可以多个处理程序处理一个topic下的记录。如图: image 每个分区的记录保证能被每个组接受,这样可以并发去处理一个topic的记录,而且扩展组,则可以随意根据应用需求去扩展你的应用程序,但是每个组的消费者不能超过分区的数量。 kafka Distribution 提供了容错的功能,每一个partition都有一个服务器叫leader,还有零个或者一个以上的服务器叫follower,当这些follower都在同步数据的时候,leader扛起所有的写和读,当leader挂掉,follower会随机选取一个服务器当leader,当然必须有几个follower同步时 in-sync的。还有kafka虽然的那个记录具有原子性,但是并不支持事务。 因为这一篇并不是专门讲解kafka,所以点到为止。 二 扩展服务 开发 以前讲过,netcore的一个很重要的特性就是支持依赖注入,在这里一切皆服务。那么如果需要kafka作为日志服务的终端,就首先需要kafka服务,下面咱们就开发一个kafka服务。 首先,服务就是需要构建,这是netcore开发服务的第一步,我们首先建立一个IKafkaBuilder.cs接口类,如下: 复制代码 homusing Microsoft.Extensions.DependencyInjection; namespace Walt.Freamwork.Service { public interface IKafkaBuilder { /// /// Gets the where Logging services are configured. /// IServiceCollection Services { get; } } } 复制代码 再实现它,KafkaBuilder.cs 复制代码 using Microsoft.Extensions.DependencyInjection; namespace Walt.Freamwork.Service { public class KafkaBuilder : IKafkaBuilder { public IServiceCollection Services {get;} public KafkaBuilder(IServiceCollection services) { Services=services; } } } 复制代码 再利用扩展方法为serviceCollection类加上扩展方法: 复制代码 using System; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Walt.Framework.Service.Kafka; namespace Walt.Framework.Service { public static class ServiceCollectionExtensions { /// /// Adds logging services to the specified . /// /// The to add services to. /// The so that additional calls can be chained. public static IServiceCollection AddKafka(this IServiceCollection services) { return AddKafka(services, builder => { }); } public static IServiceCollection AddKafka(this IServiceCollection services , Action configure) { if (services == null) { throw new ArgumentNullException(nameof(services)); } services.AddOptions(); configure(new KafkaBuilder(services)); services.TryAddSingleton(); //kafka的服务类 return services; } } } 复制代码 KafkaService的实现: 复制代码 using System; using System.Collections.Generic; using System.Threading.Tasks; using Confluent.Kafka; using Microsoft.Extensions.Options; namespace Walt.Framework.Service.Kafka { public class KafkaService : IKafkaService { private KafkaOptions _kafkaOptions; private Producer _producer; public KafkaService(IOptionsMonitor kafkaOptions) { _kafkaOptions=kafkaOptions.CurrentValue; kafkaOptions.OnChange((kafkaOpt,s)=>{ _kafkaOptions=kafkaOpt; System.Diagnostics.Debug .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s); }); _producer=new Producer(_kafkaOptions.Properties); } private byte[] ConvertToByte(string str) { return System.Text.Encoding.Default.GetBytes(str); } public async Task Producer(string topic,string key,string value) { if(string.IsNullOrEmpty(topic) ||string.IsNullOrEmpty(value)) { throw new ArgumentNullException("topic或者value不能为null."); } var task= await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value)); return task; } } } 复制代码 那么咱们是不是忘记什么了,看上面的代码,是不是那个配置类KafkaOptions 还没有说明? 在image这个位置添加kafka的配置类KafkaConfigurationOptions: 复制代码 using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Options; using Walt.Freamwork.Service; namespace Walt.Freamwork.Configuration { public class KafkaConfigurationOptions : IConfigureOptions { private readonly IConfiguration _configuration; public KafkaConfigurationOptions(IConfiguration configuration) { _configuration=configuration; } public void Configure(KafkaOptions options) { //这里仅仅自定义一些你自己的代码,使用上面configuration配置中的配置节,处理程序没法自动绑定的 一些事情。 } } } 复制代码 然后,将配置类添加进服务: 复制代码 using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; using Walt.Framework.Service; namespace Walt.Framework.Configuration { public static class KafkaConfigurationExtensioncs { public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder ,IConfiguration configuration) { InitService( builder,configuration); return builder; } public static void InitService(IKafkaBuilder builder,IConfiguration configuration) { builder.Services.TryAddSingleton>( new KafkaConfigurationOptions(configuration)); //配置类和配置内容 builder.Services.TryAddSingleton (ServiceDescriptor.Singleton>( new ConfigurationChangeTokenSource(configuration)) );//这个是观察类,如果更改,会激发onchange方法 builder.Services .TryAddEnumerable(ServiceDescriptor.Singleton> (new ConfigureFromConfigurationOptions(configuration))); //这个是option类,没这个,配置无法将类绑定 builder.Services.AddSingleton(new KafkaConfiguration(configuration)); } } } 复制代码 ok,推送nuget,业务部分调用。 三 kafka服务调用 在project中引用然后restore: image 引入命名空间: image 调用: 复制代码 using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Walt.Framework.Log; using Walt.Framework.Configuration; using Walt.Framework.Service; namespace Walt.TestMcroServoces.Webapi { public class Program { public static void Main(string[] args) { var host = new WebHostBuilder() .ConfigureAppConfiguration((hostingContext, configContext) =>{ var en=hostingContext.HostingEnvironment; if(en.IsDevelopment()) { configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json"); } else { configContext.AddJsonFile("appsettings.json"); } configContext.AddCommandLine(args) .AddEnvironmentVariables() .SetBasePath(Directory.GetCurrentDirectory()).Build(); }).ConfigureServices((context,configureServices)=>{ configureServices.AddKafka(KafkaBuilder=>{ KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService")); }); }) //kafka的调用。 .ConfigureLogging((hostingContext, logging) => { logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging")) .AddCustomizationLogger(); }).UseKestrel(KestrelServerOption=>{ KestrelServerOption.ListenAnyIP(801); }) .UseStartup().Build(); host.Run(); Console.ReadKey(); } } } 复制代码 然后提交git,让jenkins构建docker发布运行: jenkin是是非常牛的一款构建工具,不仅仅根据插件可以扩展不同环境,还支持分布式构建. image 这是我们用jenikins构建的的: image然 让它跑起来: image 调用看看: image 这个方法是输出Properties数组的,这个配置结构只是演示,后面的结构要变,因为要放kafka的配置,比如连接服务ip等, 改动也很简单,在配置好configuration和service后,改动这个类KafkaOptions和配置文件中kafka节点中的json结构就行。: image 四 集成kafka kafka的接口不多,看看都有那些: https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html image Consumer和Producer是咱们发布消息和消费消息的两个主类,代码在上文已经实现的service。 客户端代码: 使用my-replicated-topic-morepart这儿topic,还是希望多分区,因为后面consumer使用分布式计算读取。 image consumer先在客户端监听: image product端的调用代码: image 执行这个接口后,再看consumer接收到的消息: image 最后一步,将咱们kafka日志部分替换为真实的kafka环境,看结果: image 那么最后的配置是这样的: 复制代码 { "Logging": { "LogLevel": { "Default": "Debug", "System": "Debug", "Microsoft": "Debug" }, "KafkaLog":{ "Prix":"这是我的自定义日志提供程序" } }, "KafkaService":{ "Properties":{ "bootstrap.servers":"192.168.249.106:9092" } } } 复制代码 log使用这个kafka服务就很简单了,在前面文章中实现的log扩展类中,直接构造函数注入这个kafkaService,就可以以使用了。 分布式日志到这里结束,可能大家觉得后面还有日志索引和日志展现,因为这个读kafka需要分布式去处理, 我下面刚好要写分布式计算的文章,所以到时可以拿这个当例子,承前继后。https://www.cnblogs.com/ck0074451665/p/10211725.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信