Flink流处理操作符

 

一、工程创建与准备

使用maven进行工程创建,且采用提供的flink-quickstart模版,便利很多。😄

$ mvn archetype:generate \                             -DarchetypeGroupId=org.apache.flink  \             -DarchetypeArtifactId=flink-quickstart-java  \     -DarchetypeVersion=1.6.2

本实验的数据采用自拟电影评分数据(userId, movieId, rating, timestamp),userId和movieId范围分别为1-100和1-200的随机数,rating范围为[0:0.5:5.0]一共10个档位,timestamp为10000-20000之间的随机数,且数据顺序采用timestamp的升序排列。(2.1-2.6节的数据是乱序)

由于该文只是为了熟悉操作符的用法,所以数据自拟更有针对性。

二、操作符

2.0 Baseline

以下是本次实验的baseline,源source为kafka提供,所以还需要建立一个将数据一个个放入kafka的class。

这里的流对象使用的是POJO类型,即MovieRate类,在之后的各种操作符的使用中也更加方便。

public class Baseline {     public static void main(String[] args) throws Exception {         // 1. Get an ExecutionEnvironment         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          Properties properties = new Properties();         properties.setProperty("zookeeper.connect", "localhost:2181");         properties.setProperty("bootstrap.servers", "localhost:9092");         properties.setProperty("group.id", "test2");         properties.setProperty("auto.offset.reset", "earliest");          // 2. Get the source         FlinkKafkaConsumer011<MovieRate> myConsumer = new FlinkKafkaConsumer011<MovieRate>(                 "test2",                 new MovieRateSchema(),                 properties);         DataStream<MovieRate> rates = env                 .addSource(myConsumer);          // 3. Set the sink         rates.print();          // 4. Execute         env.execute();     } } 
... 1> 50,181,1.5,13667 1> 53,83,1,11838 1> 87,112,3.5,11701 1> 66,199,5,12427 ...

2.1 Filter

对读入的每个element执行bool操作,保留返回True的element。

这里,我们新建一个MovieFilter过滤器,对movieId大于100的电影过滤掉。

// Baseline DataStream<MovieRate> filteredRate = rates                 .filter(new MovieFilter());
// MovieFilter public static class MovieFilter implements FilterFunction<MovieRate>{         @Override         public boolean filter(MovieRate movieRate) throws Exception {             if (movieRate.movieId > 100){                 return false;             } else {                 return true;             }         }     }

运行后,可以看到movieId大于100的日志已经被过滤:

... 1> 74,36,3.5,14522 1> 90,46,4.5,14166 1> 3,52,1.5,12222 1> 19,36,1.0,12055 ...

2.2 Map

DataStream ➡ DataStream

对流中的每一个元素进行转换。

如下列表示将刚才处理后的每个element的评分✖️2

                        
关键字:
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信