一、工程创建与准备
使用maven进行工程创建,且采用提供的flink-quickstart模版,便利很多。😄
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.6.2本实验的数据采用自拟电影评分数据(userId, movieId, rating, timestamp),userId和movieId范围分别为1-100和1-200的随机数,rating范围为[0:0.5:5.0]一共10个档位,timestamp为10000-20000之间的随机数,且数据顺序采用timestamp的升序排列。(2.1-2.6节的数据是乱序)
由于该文只是为了熟悉操作符的用法,所以数据自拟更有针对性。
二、操作符
2.0 Baseline
以下是本次实验的baseline,源source为kafka提供,所以还需要建立一个将数据一个个放入kafka的class。
这里的流对象使用的是POJO类型,即MovieRate类,在之后的各种操作符的使用中也更加方便。
public class Baseline { public static void main(String[] args) throws Exception { // 1. Get an ExecutionEnvironment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test2"); properties.setProperty("auto.offset.reset", "earliest"); // 2. Get the source FlinkKafkaConsumer011<MovieRate> myConsumer = new FlinkKafkaConsumer011<MovieRate>( "test2", new MovieRateSchema(), properties); DataStream<MovieRate> rates = env .addSource(myConsumer); // 3. Set the sink rates.print(); // 4. Execute env.execute(); } } ... 1> 50,181,1.5,13667 1> 53,83,1,11838 1> 87,112,3.5,11701 1> 66,199,5,12427 ...2.1 Filter
对读入的每个element执行bool操作,保留返回True的element。
这里,我们新建一个MovieFilter过滤器,对movieId大于100的电影过滤掉。
// Baseline DataStream<MovieRate> filteredRate = rates .filter(new MovieFilter());// MovieFilter public static class MovieFilter implements FilterFunction<MovieRate>{ @Override public boolean filter(MovieRate movieRate) throws Exception { if (movieRate.movieId > 100){ return false; } else { return true; } } }运行后,可以看到movieId大于100的日志已经被过滤:
... 1> 74,36,3.5,14522 1> 90,46,4.5,14166 1> 3,52,1.5,12222 1> 19,36,1.0,12055 ...2.2 Map
DataStream ➡ DataStream
对流中的每一个元素进行转换。
如下列表示将刚才处理后的每个element的评分✖️2
