时代,一大技术特征是对海量数据采集、存储和分析的多组件解决方案。而其中对来自于传感器、APP的SDK和各类互联网应用的原生日志数据的采集存储则是基本中的基本。本系列文章将从0到1,概述一下搭建基于Kafka、Flume、Zookeeper、HDFS、Hive的海量数据分析系统的框架、核心应用和关键模块。
项目源代码存储于GitHub:源码
系统架构概述
本系列文章所介绍的数据分析系统,定位于一种通用的大数据分析系统,可用于电商、互联网和物联网的实际解决方案中。该应用主要解决从多种多样的互联网应用、APP、传感器、小程序等网络客户端中预设的接口采集数据,并进行分布式存储,通过RESTful或服务订阅的方式,连接BI应用或者嵌入了机器学习模块的业务数据分析系统。其项目架构如下:

项目主体实现了从各种互联网客户端的日志数据到集中的BI分析系统的全过程,主要包括以下构件:
1. 日志收集Web应用:基于REST风格的接口,处理从网络客户端回传的数据文件,其中包括了对数据对象的定义、核心Web应用和模拟客户端测试程序。
2. Kafka集群:Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
2. Zookeeper集群:是一个为Kafka的分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。Zookeeper可以实现封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
3. Flume:用于存储数据到HDFS。Flume的意义在于:当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据。
4. HDFS:提供高吞吐量的分布式存储方案。
5. Hive:Hive是建立在 Hadoop 上的数据仓库基础构架,定义了简单的类 SQL 查询语言,便于快速搭建基于SQL的数据应用。
6. Hive Server2:一种可选服务,允许远程客户端可以使用各种编程语言向Hive提交请求并检索结果。
7. Dubbo和RPC:Dubbo是阿里开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,轻松实现面向服务的应用开发。
数据收集应用
数据收集应用的目标是提供一个对外的接口,基于实时或准实时的要求收集来自海量客户端应用所上传的数据文件,因此可以根据需求进行集群化和添加负载均衡机制。以常规的日志数据收集应用为例,一个数据应用应该实现的主要功能包括:数据属性拷贝、数据对象封装、时间校对、地理数据提取和缓存、发送数据至Kafka,以及一个可选的模拟客户端上传数据应用。
一、应用结构
基于Maven的多模块应用布局方案,具体包括:
——EasyBI-Parent:父组件,仅维护一个pom文件,作为个子组件的parent pom文件,定义了统一的项目版本、依赖管理和Maven插件管理。
|——EasyBI-Common:子组件,定义了日志数据对象和通用的工具类方法。
|——EasyBI-Logs-Collect-Web:核心组件,基于Rest风格收集日志数据,封装数据对象并发送至Kafka,其中对一些数据进行初级加工。
|——EasyBI-Logs-MockApp:模拟一个客户端上传数据的应用,可选。
二、Common组件
数据对象
数据对象以日志对象为载体,里面封装了从客户端发送过来的不同日志的POJO对象,其类图为:

AppBaseLog为日志类型的统一父类,定义了一些公共的数据属性,被用于各个具体日志实现类继承。
Startup、Event、Page、Usage和Error分别对应了应用启动、事件、页面、功能和错误的日志记录,继承了公共基类并维护了各自的特有属性。
APPLogEntity是按客户端为单位的日志对象,组合了各个不同的子日志对象,作为整个数据分析系统的核心数据模型。
通用的工具类
主要包括两个部分,分别是复制各子日志对象的属性至LogEntity对象的一个工具方法,以及一个提取IP位置信息的工具方法。
拷贝日志属性的工具类,核心代码如下:
1 public class PropertiesUtil { 2 /* 3 * 通过内省进行属性复制(对象到对象) 4 */ 5 public static void copyProperties(Object src, Object dest) { 6 7 try { 8 //源对象的BeanInfo 9 BeanInfo srcBeanInfo = Introspector.getBeanInfo(src.getClass()); 10 //获取属性描述符11 PropertyDescriptor[] descriptors = srcBeanInfo.getPropertyDescriptors(); 12 for (PropertyDescriptor descriptor : descriptors) { 13 //获取getter和setter方法14 Method getter = descriptor.getReadMethod(); 15 Method setter = descriptor.getWriteMethod(); 16 //获取set方法名称17 String setterName = setter.getName(); 18 //获取setter方法参数19 Class<?>[] parameterTypes = setter.getParameterTypes(); 20 21 Object value = gett

