第四部分-推荐系统-数据ETL 本模块完成数据清洗,并将清洗后的数据load到Hive数据表里面去 前置准备: spark +hive vim $SPARK_HOME/conf/hive-site.xml hive.metastore.uris thrift://hadoop001:9083 启动Hive metastore server [root@hadoop001 conf]# nohup hive --service metastore & [root@hadoop001 conf]# netstat -tanp | grep 9083 tcp 0 0 0.0.0.0:9083 0.0.0.0:* LISTEN 24787/java [root@hadoop001 conf]# 测试: [root@hadoop001 ~]# spark-shell --master local[2] scala> spark.sql("select * from liuge_db.dept").show; +------+-------+-----+ |deptno| dname| loc| +------+-------+-----+ | 1| caiwu| 3lou| | 2| renli| 4lou| | 3| kaifa| 5lou| | 4|qiantai| 1lou| | 5|lingdao|4 lou| +------+-------+-----+ ==》保证Spark SQL 能够访问到Hive 的元数据才行。 然而我们采用的是standalone模式:需要启动master worker [root@hadoop001 sbin]# pwd /root/app/spark-2.4.3-bin-2.6.0-cdh5.7.0/sbin [root@hadoop001 sbin]# ./start-all.sh [root@hadoop001 sbin]# jps 26023 Master 26445 Worker Spark常用端口 8080 spark.master.ui.port Master WebUI 8081 spark.worker.ui.port Worker WebUI 18080 spark.history.ui.port History server WebUI 7077 SPARK_MASTER_PORT Master port 6066 spark.master.rest.port Master REST port 4040 spark.ui.port Driver WebUI 这个时候打开:http://hadoop001:8080/ 在这里插入图片描述 开始项目Coding IDEA+Scala+Maven进行项目的构建 步骤一: 新建scala项目后,可以参照如下pom进行配置修改 4.0.0 com.csylh movie-recommend 1.0 2008 2.11.8 2.4.3 scala-tools.org Scala-Tools Maven2 Repository http://scala-tools.org/repo-releases org.apache.spark spark-core_2.11 ${spark.version} org.apache.hadoop hadoop-client 2.6.0 org.apache.spark spark-sql_2.11 ${spark.version} org.apache.spark spark-hive_2.11 ${spark.version} org.apache.spark spark-mllib_2.11 ${spark.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-8_2.11 ${spark.version} org.apache.kafka kafka-clients 1.1.1 mysql mysql-connector-java 5.1.39 log4j log4j 1.2.17 net.alchim31.maven scala-maven-plugin 3.1.3 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-surefire-plugin 2.13 false true **/*Test.* **/*Suite.* 步骤二:新建com.csylh.recommend.dataclearer.SourceDataETLApp import com.csylh.recommend.entity.{Links, Movies, Ratings, Tags} import org.apache.spark.sql.{SaveMode, SparkSession} /** * Description: * hadoop001 file:///root/data/ml/ml-latest 下的文件 * ====> SparkSQL ETL * ===> load data to Hive数据仓库 * * @Author: 留歌36 * @Date: 2019-07-12 13:48 */ object SourceDataETLApp{ def main(args: Array[String]): Unit = { // 面向SparkSession编程 val spark = SparkSession.builder() // .master("local[2]") .enableHiveSupport() //开启访问Hive数据, 要将hive-site.xml等文件放入Spark的conf路径 .getOrCreate() val sc = spark.sparkContext // 设置RDD的partitions 的数量一般以集群分配给应用的CPU核数的整数倍为宜, 4核8G ,设置为8就可以 // 问题一:为什么设置为CPU核心数的整数倍? // 问题二:数据倾斜,拿到数据大的partitions的处理,会消耗大量的时间,因此做数据预处理的时候,需要考量会不会发生数据倾斜 val minPartitions = 8 // 在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200,及需要配置分区的数量 val shuffleMinPartitions = "8" spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions) /** * 1 */ import spark.implicits._ val links = sc.textFile("file:///root/data/ml/ml-latest/links.txt",minPartitions) //DRIVER .filter(!_.endsWith(",")) //EXRCUTER .map(_.split(",")) //EXRCUTER .map(x => Links(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toInt)) //EXRCUTER .toDF() println("===============links===================:",links.count()) links.show() // 把数据写入到HDFS上 links.write.mode(SaveMode.Overwrite).parquet("/tmp/links") // 将数据从HDFS加载到Hive数据仓库中去 spark.sql("drop table if exists links") spark.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet") spark.sql("load data inpath '/tmp/links' overwrite into table links") /** * 2 */ val movies = sc.textFile("file:///root/data/ml/ml-latest/movies.txt",minPartitions) .filter(!_.endsWith(",")) .map(_.split(",")) .map(x => Movies(x(0).trim.toInt, x(1).trim.toString, x(2).trim.toString)) .toDF() println("===============movies===================:",movies.count()) movies.show() // 把数据写入到HDFS上 movies.write.mode(SaveMode.Overwrite).parquet("/tmp/movies") // 将数据从HDFS加载到Hive数据仓库中去 spark.sql("drop table if exists movies") spark.sql("create table if not exists movies(movieId int,title String,genres String) stored as parquet") spark.sql("load data inpath '/tmp/movies' overwrite into table movies") /** * 3 */ val ratings = sc.textFile("file:///root/data/ml/ml-latest/ratings.txt",minPartitions) .filter(!_.endsWith(",")) .map(_.split(",")) .map(x => Ratings(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toDouble, x(3).trim.toInt)) .toDF() println("===============ratings===================:",ratings.count()) ratings.show() // 把数据写入到HDFS上 ratings.write.mode(SaveMode.Overwrite).parquet("/tmp/ratings") // 将数据从HDFS加载到Hive数据仓库中去 spark.sql("drop table if exists ratings") spark.sql("create table if not exists ratings(userId int,movieId int,rating Double,timestamp int) stored as parquet") spark.sql("load data inpath '/tmp/ratings' overwrite into table ratings") /** * 4 */ val tags = sc.textFile("file:///root/data/ml/ml-latest/tags.txt",minPartitions) .filter(!_.endsWith(",")) .map(x => rebuild(x)) // 注意这个坑的解决思路 .map(_.split(",")) .map(x => Tags(x(0).trim.toInt, x(1).trim.toInt, x(2).trim.toString, x(3).trim.toInt)) .toDF() tags.show() // 把数据写入到HDFS上 tags.write.mode(SaveMode.Overwrite).parquet("/tmp/tags") // 将数据从HDFS加载到Hive数据仓库中去 spark.sql("drop table if exists tags") spark.sql("create table if not exists tags(userId int,movieId int,tag String,timestamp int) stored as parquet") spark.sql("load data inpath '/tmp/tags' overwrite into table tags") } /** * 该方法是用于处理不符合规范的数据 * @param input * @return */ private def rebuild(input:String): String ={ val a = input.split(",") val head = a.take(2).mkString(",") val tail = a.takeRight(1).mkString val tag = a.drop(2).dropRight(1).mkString.replaceAll("\"","") val output = head + "," + tag + "," + tail output } } 再有一些上面主类引用到的case 对象,你可以理解为Java 实体类 package com.csylh.recommend.entity /** * Description: 数据的schema * * @Author: 留歌36 * @Date: 2019-07-12 13:46 */ case class Links(movieId:Int,imdbId:Int,tmdbId:Int) package com.csylh.recommend.entity /** * Description: TODO * * @Author: 留歌36 * @Date: 2019-07-12 14:09 */ case class Movies(movieId:Int,title:String,genres:String) package com.csylh.recommend.entity /** * Description: TODO * * @Author: 留歌36 * @Date: 2019-07-12 14:10 */ case class Ratings(userId:Int,movieId:Int,rating:Double,timestamp:Int) package com.csylh.recommend.entity /** * Description: TODO * * @Author: 留歌36 * @Date: 2019-07-12 14:11 */ case class Tags(userId:Int,movieId:Int,tag:String,timestamp:Int) 步骤三:将创建的项目进行打包上传到服务器 mvn clean package -Dmaven.test.skip=true [root@hadoop001 ml]# ll -h movie-recommend-1.0.jar -rw-r--r--. 1 root root 156K 10月 20 13:56 movie-recommend-1.0.jar [root@hadoop001 ml]# 步骤四:提交运行上面的jar,编写shell脚本 [root@hadoop001 ml]# vim etl.sh export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop $SPARK_HOME/bin/spark-submit --class com.csylh.recommend.dataclearer.SourceDataETLApp --master spark://hadoop001:7077 --name SourceDataETLApp --driver-memory 10g --executor-memory 5g /root/data/ml/movie-recommend-1.0.jar 步骤五:sh etl.sh 即可 先把数据写入到HDFS上 创建Hive表 load 数据到表 sh etl.sh之前: [root@hadoop001 ml]# hadoop fs -ls /tmp 19/10/20 19:26:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items drwx------ - root supergroup 0 2019-04-01 16:27 /tmp/hadoop-yarn drwx-wx-wx - root supergroup 0 2019-04-02 09:33 /tmp/hive [root@hadoop001 ml]# hadoop fs -ls /user/hive/warehouse 19/10/20 19:27:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [root@hadoop001 ml]# sh etl.sh之后: 这里的shell 是 ,spark on standalone,后面会spark on yarn。其实也没差,都可以 [root@hadoop001 ~]# hadoop fs -ls /tmp 19/10/20 19:43:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 6 items drwx------ - root supergroup 0 2019-04-01 16:27 /tmp/hadoop-yarn drwx-wx-wx - root supergroup 0 2019-04-02 09:33 /tmp/hive drwxr-xr-x - root supergroup 0 2019-10-20 19:42 /tmp/links drwxr-xr-x - root supergroup 0 2019-10-20 19:42 /tmp/movies drwxr-xr-x - root supergroup 0 2019-10-20 19:43 /tmp/ratings drwxr-xr-x - root supergroup 0 2019-10-20 19:43 /tmp/tags [root@hadoop001 ~]# hadoop fs -ls /user/hive/warehouse 19/10/20 19:43:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 4 items drwxr-xr-x - root supergroup 0 2019-10-20 19:42 /user/hive/warehouse/links drwxr-xr-x - root supergroup 0 2019-10-20 19:42 /user/hive/warehouse/movies drwxr-xr-x - root supergroup 0 2019-10-20 19:43 /user/hive/warehouse/ratings drwxr-xr-x - root supergroup 0 2019-10-20 19:43 /user/hive/warehouse/tags [root@hadoop001 ~]# 这样我们就把数据etl到我们的数据仓库里了,接下来,基于这份基础数据做数据加工 有任何问题,欢迎留言一起交流~~ 更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285https://www.cnblogs.com/liuge36/p/11713148.html