Spark实战——日志分析

1-1 -用户行为日志概述 为什么要记录用户访问行为日志?   网站页面的访问量   网站的粘性   推荐   用户行为日志   Nginx ajax   用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击...)   用户行为轨迹、流量日志 日志数据内容   1)访问的系统属性:操作系统,浏览器等等   2)访问特征:点击的url,从哪个URL跳转过来的(referer),页面上的停留时间等   3) 访问信息:session_id,访问ip(访问城市)等 用户行为日志分析的意义   网站的眼睛 网站的神经 网站的大脑 1-2 -离线数据处理架构 数据处理流程   1)数据采集     flume: web日志写入到HDFS   2)数据清洗     脏数据     spark、hive、MapReduce 或者是其他的一些分布式计算框架     清洗完之后的数据可以存放到HDFS(Hive/spark sql)   3)数据处理     按照我们的需要进行相应的统计和分析     spark、hive、MapReduce 或者是其他的一些分布式计算框架   4)处理结果入库     结果可以存放在RDBMS、Nosql   5)数据的可视化     通过图形化展示出来:饼图、柱状图、地图、折线图     ECharts、HUE、Zepplin6 1-3-项目需求 1-4 日志内容构成 1-5 数据清洗之第一步原始日志解析 日志解析代码(使用spark完成数据清洗操作) 复制代码 package com.log import org.apache.spark.sql.SparkSession /** *第一步清洗:抽取出我们所需要的指定列的数据 */ object SparkStatFormatJob { def main(args: Array[String]): Unit = { val spark=SparkSession.builder().appName("SparkStatFormatJob") .master("local[2]").getOrCreate() val access=spark.sparkContext.textFile("E:\\data\\10000_access.log") //access.take(10).foreach(println) access.map(line=>{ val splits=line.split(" ") val ip=splits(0) /** * [10/Nov/2016:00:01:02 +0800]=>yyyy-mm-dd hh:mm:ss */ try{ val time=splits(3)+" "+splits(4) val url=splits(11).replaceAll("\"","") val traffic =splits(9) (ip,DataUtils.parse(time),url,traffic) DataUtils.parse(time)+"\t"+url+"\t"+traffic+"\t"+ip }catch { case e:Exception=>{ 0l } } }).saveAsTextFile("E:\\data\\output") spark.stop() } } 复制代码 数据清洗结果: 1-6 -二次清洗 复制代码 package com.log import org.apache.spark.sql.{SaveMode, SparkSession} /** *使用spark完成数据清洗操作 */ object SparkStatCleanJob { def main(args: Array[String]): Unit = { val spark=SparkSession.builder().appName("SparkStatCleanJob") .master("local[2]").getOrCreate() val accessRDD=spark.sparkContext.textFile("E:\\data\\spark\\access.log") // accessRDD.take(10).foreach(println) val accessDF=spark.createDataFrame(accessRDD.map(line=>AccessConverUtil.parseLog(line)), AccessConverUtil.struct) // accessDF.printSchema() // accessDF.show() //coalesce文件输出数量(默认是多个文件) // mode(SaveMode.Overwrite)默认每次重写文件 accessDF.coalesce(1).write.format("parquet").partitionBy("day") .mode(SaveMode.Overwrite).save("E:\\data\\spark\\clean") spark.stop() } } 复制代码 访问日志转换工具类(输入=》输出) AccessConverUtil 清洗结果 1-7-需求功能实现 1.使用DataFreame API完成统计分析 2.使用SQL API完成统计分析 3.将统计分析结果写入到MySQL数据库 TopNStatJob 1-8-统计结果可视化展示 https://www.cnblogs.com/aishanyishi/p/10319200.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信