最近在学Flink,准备用Flink搭建一个实时的推荐系统。找到一个好的网站(也算作是flink创始者的官方网站),上面有关于Flink的上手教程,用来练练手,熟悉熟悉,下文仅仅是我的笔记。
1. 数据集
网站New York City Taxi & Limousine Commission提供了关于纽约市从2009-1015年关于出租车驾驶的公共数据集。
具体数据下载方法,可见# Taxi Data Streams,下载完数据后,不要解压缩。
我们的第一个数据集包含纽约市的出租车出行的信息,每一次出行包含两个事件:START和END,可以分别理解为开始和结束该行程。每一个事件又包括11个属性,详细介绍如下:
taxiId         : Long      // a unique id for each taxi driverId       : Long      // a unique id for each driver isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events startTime      : DateTime  // the start time of a ride endTime        : DateTime  // the end time of a ride,                            //   "1970-01-01 00:00:00" for start events startLon       : Float     // the longitude of the ride start location startLat       : Float     // the latitude of the ride start location endLon         : Float     // the longitude of the ride end location endLat         : Float     // the latitude of the ride end location passengerCnt   : Short     // number of passengers on the ride另一个数据集包含出租车的费用信息,与每一次行程对应:
taxiId         : Long      // a unique id for each taxi driverId       : Long      // a unique id for each driver startTime      : DateTime  // the start time of a ride paymentType    : String    // CSH or CRD tip            : Float     // tip(小费) for this ride tolls          : Float     // tolls for this ride totalFare      : Float     // total fare collected2. 生成数据流
首先定义TaxiRide事件,即数据集中的每一个record。
我们使用Flink的source函数(TaxiRideSource)读取TaxiRide流,这个source是基于事件时间进行的。同样的,费用事件TaxiFare的流通过函数TaxiFareSource进行传送。为了让生成的流更加真实,事件传送的时间是与timestamp成比例的。两个真实相隔十分钟发生的事件在流中也相差十分钟。此外,我们可以定义一个变量speed-up factor为60,该变量为加速因子,那么真实事件中的一分钟在流中只有1秒钟,缩短60倍嘛。不仅如此,我们还可以定义最大服务延时,这个延时使得每个事件在最大服务延时之内随机出现,这么做的目的是让这个流的事件产生与在real-world发生的不确定性更接近。
对于这个应用,我们设置speed-up factor为600(即10分钟相当于1秒),以及最大延时时间为60。
所有的行动都应使用事件时间(event time)(相对于处理时间(processing time))来实现。
Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.
事件时间(event time)将程序语义与服务速度分离开,即使在历史数据或无序传送的数据的情况下也能保证一致的结果。简单来说就是,在数据处理的过程中,依赖的时间跟在流中出现的时间无关,只跟该事件发生的时间有关。
private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {        // 设置服务开始时间servingStartTime     long servingStartTime = Calendar.getInstance().getTimeInMillis();        // 数据开始时间dataStartTime,即第一个ride的timestamp     long dataStartTime;        Random rand = new Random(7452);        // 使用优先队列进行emit,其比较方式为他们的等待时间     PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(            32,                new Comparator<Tuple2<Long, Object>>() {                 @Override                 public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {                          return o1.f0.compareTo(o2.f0); }                 });        // 读取第一个ride,并将第一个ride插入到schedule里     String line;     TaxiRide ride;     if (reader.ready() && (line = reader.readLine()) != null
                    