分享
3 - Flink高级API开发.docx
下载文档

ID:3488928

大小:6.90MB

页数:134页

格式:DOCX

时间:2024-05-09

收藏 分享赚钱
温馨提示:
1. 部分包含数学公式或PPT动画的文件,查看预览时可能会显示错乱或异常,文件下载后无此问题,请放心下载。
2. 本文档由用户上传,版权归属用户,汇文网负责整理代发布。如果您对本文档版权有争议请及时联系客服。
3. 下载前请仔细阅读文档内容,确认文档内容符合您的需求后进行下载,若出现内容与标题不符可向本站投诉处理。
4. 下载文档时可能由于网络波动等原因无法下载或下载错误,付费完成后未能成功下载的用户请联系客服处理。
网站客服:3074922707
Flink高级API开发 Flink 高级 API 开发
第三章 Flink 高级API开发 课程目标 l 掌握Flink的Time分类及各自作用 l 掌握Flink的Window操作及原理 l 掌握Flink的State操作及原理 l 掌握Flink的Checkpoint操作及原理 l 了解Flink的任务链 1. Flink的Window操作 Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。 1.1 为什么需要Window 在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。 在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。 Windows 是处理无限流的核心。Windows 将流拆分为有限大小的“桶”,我们可以对其进行计算。 1.2 Flink窗口应用代码结构 Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理。本文将介绍如何在Flink上进行窗口的计算。 一个Flink窗口应用的大致骨架结构如下所示: l Keyed Window // Keyed Window stream .keyBy(...) <- 按照一个Key进行分组 .window(...) <- 将数据流中的元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function l Non-Keyed Window // Non-Keyed Window stream .windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function 在上面,方括号 ([…]) 中的命令是可选的。这表明 Flink 允许您以多种不同的方式自定义窗口逻辑,使其最适合您的需求。 首先:我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。决定是否分组之后,窗口的后续操作基本相同,经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。 1.3 Window类型和概念 Window可以分成两类: l CountWindow:按照指定的数据条数生成一个Window,与时间无关。 n 滚动计数窗口,每隔N条数据,统计前N条数据 n 滑动计数窗口,每隔N条数据,统计前M条数据 l TimeWindow:按照时间生成Window。 n 滚动时间窗口,每隔N时间,统计前N时间范围内的数据,窗口长度N,滑动距离N n 滑动时间窗口,每隔N时间,统计前M时间范围内的数据,窗口长度M,滑动距离N n 会话窗口,按照会话划定的窗口 1.3.1 滚动窗口 - TumblingWindow概念 流是连续的,无界的(有明确的开始,无明确的结束) 假设有个红绿灯,提出个问题:计算一下通过这个路口的汽车数量 对于这个问题,肯定是无法回答的,为何? 因为,统计是一种对固定数据进行计算的动作。 因为流的数据是源源不断的,无法满足固定数据的要求(因为不知道何时结束) 那么,我们换个问题:统计1分钟内通过的汽车数量 那么,对于这个问题,我们就可以解答了。因为这个问题确定了数据的边界,从无界的流数据中,取出了一部分有边界的数据子集合进行计算。 那么,这个行为或者说这个统计的数据边界,就称之为窗口。 同时,我们的问题,是以时间来划分被处理的数据边界的,那么按照时间划分边界的就称之为:时间窗口 反之,如果换个问题,统计100辆通过的车里面有多少宝马品牌,那么这个边界的划分就是按照数量的,这样的称之为:计数窗口 同时,这样的窗口被称之为 滚动窗口,按照窗口划分依据分为:滚动时间窗口、滚动计数窗口 1.3.2 滑动窗口 – SlidingWindow概念 同样是需求,改为: 每隔1分钟,统计前面2分钟内通过的车辆数 对于这个需求我们可以看出,窗口长度是2分钟,每隔1分钟统计一次 或者:每通过100辆车,统计前面通过的50辆车的品牌占比 对于这个需求可以看出,窗口长度是50辆车,但是每隔100辆车统计一次 对于这样的窗口,我们称之为滑动窗口 那么在这里面,统计多少数据是窗口长度(如统计2分钟内的数据,统计50辆车中的数据) 隔多久统计一次称之为滑动距离(如,每隔1分钟,每隔100辆车) 那么可以看出,滑动窗口,就是滑动距离 不等于 窗口长度的一种窗口 比如,每隔1分钟 统计先前5分钟的数据,窗口长度5分钟,滑动距离1分钟,不相等 比如,每隔100条数据,统计先前50条数据,窗口长度50条,滑动距离100条,不相等 那如果相等呢?相等就是比如:每隔1分钟统计前面1分钟的数据,窗口长度1分钟,滑动距离1分钟,相等。 对于这样的需求可以简化成:每隔1分钟统计一次数据,这就是前面说的滚动窗口咯 那么,我们可以看出: 滚动窗口: 窗口长度 = 滑动距离 滑动窗口: 窗口长度 != 滑动距离 其中可以发现,对于滑动窗口: 滑动距离 > 窗口长度, 会漏掉数据,比如:每隔5分钟,统计前面1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟的数据) 滑动距离 < 窗口长度, 会重复处理数据,比如:每隔1分钟,统计前面5分钟的数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟的数据) 滑动距离 = 窗口长度, 不漏也不会重复,也就是滚动窗口 1.4 Time - Flink的三种时间语义 我们抛开计数窗口,先看时间窗口 对于时间窗口最主要的就是时间,比如1分钟的窗口长度,那么这个1分钟是如何定义呢? Flink中针对时间有3种类型 l EventTime[事件时间] 事件发生的时间,例如:点击网站上的某个链接的时间 l IngestionTime[摄入时间] 某个Flink节点的source operator接收到数据的时间,例如:某个source消费到kafka中的数据 l ProcessingTime[处理时间] 某个Flink节点执行某个operation的时间,例如:timeWindow接收到数据的时间 l 事件时间 event time 事件真实发生的时间。Flink1.12版本起默认事件时间。 xxx.window(TumblingEventTimeWindows) l 处理时间 process time Flink处理start-log中这条数据时的设备时间。Flink1.12之前默认处理时间。 xxx.window(TumblingProcessingTimeWindows) Flink1.12版本之前,如何指定为事件时间呢? // 设置按照事件时间来进行计算env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 1.4.1 Event Time 在大数据领域,日志服务器生成的一条数据也可以称为一个事件。Event Time是指在数据产生时该设备上对应的时间,这个时间在进入Flink之前已经存在于数据记录中了。以后数据被Flink处理数据,如果使用Event Time作为时间标准,那么数据并不是按照Event Time的先后顺序被处理的,由于数据可能产生在多个不同的日志服务器,然后通常是再将数据写入到分布性消息中间件,然后被被Flink拉取进行处理时,处理的实际时间相对于数据产生的实际肯定有一定的延迟,并且Event Time可能也是乱序的。那么为什么还要使用Event Time呢?是因为使用Event Time时,Flink程序可以处理乱序事件和延迟数据。并且最重要的功能就是可以统计在数据产生时,对应时间的数据指标。 总之,使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。 1.4.2 Processing Time Processing Time是指事件数据被Operator处理时所在机器的系统时间,它提供了最好的性能和最低的延迟。但是,Flink是一个在分布式的计算框架,数据从产生到被处理会有一定的延迟(例如从消息队列拉取数据到Source,Source再到处理的Operator会有一定的延迟),所以Processing Time无法精准的体现出数据在产生的那个时刻的变化情况。 1.4.3 Ingestion Time Ingestion Time指的是事件数据进入到Flink的时间。每条数据的Ingestion Time就是进入到Source Operator时所在机器的系统时间。比如Flink从Kafka消息中间件消费数据,每一条数据的Ingestion Time就是FlinkKafkaConsumer拉取数据进入到TaskManager对应的时间。Ingestion Time介于Event Time和Processing Time之间,与 Event Time 相比,Ingestion Time程序无法处理任何无序事件或延迟数据,并且程序不必指定如何生成水,Flink会自动分配时间戳和自动生成水位线。 1.5 窗口的使用 1.5.1 滚动窗口 滚动窗口下窗口之间之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time中的seconds、minutes、hours和days来设置。 下面的代码展示了如何使用滚动窗口。代码中最后一个例子,我们在固定长度的基础上设置了偏移(Offset)。默认情况下,时间窗口会做一个对齐,比如设置一个一小时的窗口,那么窗口的起止时间是[0:00:00.000 - 0:59:59.999)。如果设置了Offset,那么窗口的起止时间将变为[0:15:00.000 - 1:14:59.999)。Offset可以用在全球不同时区设置上,如果系统时间基于格林威治标准时间(UTC-0),中国的当地时间可以设置offset为Time.hours(-8)。 DataStream<T> input = ... // 基于Event Time的滚动窗口 input .keyBy(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<window function>(...) // 基于Processing Time的滚动窗口 input .keyBy(<KeySelector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<window function>(...) // 在小时级滚动窗口上设置15分钟的Offset偏移 input .keyBy(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) .<window function>(...) 注意: 时间窗口使用的是timeWindow()也可以使用window(),比如,input.keyBy(...).timeWindow(Time.seconds(1))。timeWindow()是一种简写。 当我们在执行环境设置了TimeCharacteristic.EventTime时,Flink对应调用TumblingEventTimeWindows;如果我们基于TimeCharacteristic.ProcessingTime,Flink使用TumblingProcessingTimeWindows,但是这种方式被废弃。 1.5.2 滑动窗口 滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多快的速度来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个元素会被分配到多个窗口;Slide大于Size,有些元素可能被丢掉。 跟前面介绍的一样,我们使用Time类中的时间单位来定义Slide和Size,也可以设置offset。同样,timeWindow是一种缩写,根据执行环境中设置的时间语义来选择相应的方法初始化窗口。 val input: DataStream[T] = ... // sliding event-time windows input .keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<window function>(...) // sliding processing-time windows input .keyBy(<...>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<window function>(...) // sliding processing-time windows offset by -8 hours input .keyBy(<...>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<window function>(...) 1.5.3 会话窗口 会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。我们可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。 下面的代码展示了如何使用定长和可变的Session gap来建立会话窗口,其中SessionWindowTimeGapExtractor[T]的泛型T为数据流的类型,我们可以根据数据流中的元素来生成Session gap。 val input: DataStream[T] = ... // event-time session windows with static gap input .keyBy(...) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<window function>(...) // event-time session windows with dynamic gap input .keyBy(...) .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] { override def extract(element: T): Long = { // determine and return session gap } })) .<window function>(...) // processing-time session windows with static gap input .keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<window function>(...) // processing-time session windows with dynamic gap input .keyBy(...) .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] { override def extract(element: T): Long = { // determine and return session gap } })) .<window function>(...) 1.6 窗口的范围 窗口的判断是按照毫秒为单位 如果窗口长度是5秒 窗口的开始: start 窗口的结束: start + 窗口长度 -1 毫秒 比如窗口长度是5秒, 从0开始 那么窗口结束是: 0 + 5000 -1 = 4999 1.6.1 窗口的关闭和触发 窗口不会一直存在,当达到某些条件后,窗口就会执行触发计算 + 关闭窗口的动作 窗口的关闭和触发是两个步骤 1. 触发窗口计算,对窗口内的数据进行计算 2. 关闭窗口,数据无法进入窗口了 这两者是同步进行的 后面会学习到先触发,然后等待一段时间后才关闭的情况,后面再说 1.6.2 窗口关闭、触发的条件 每个窗口都会有: - 开始时间 - 结束时间 窗口的时间单位是毫秒 1.6.3 如何确定数据进入哪个窗口 开始时间 和 结束时间两者结合 决定了数据是属于哪个窗口的 数据的时间要满足: - 大于等于开始时间 - 小于等于结束时间 如 5秒的窗口,假设窗口开始是0,结束是5000(毫秒) 那么时间1000属于这个窗口 时间6000不属于这个窗口 时间4999属于这个窗口,时间5000不属于这个窗口 1.6.4 窗口如何确定执行触发和关闭 结束时间决定了窗口何时关闭和触发计算,规则是:数据的时间 满足 大于等于 结束时间 - 1毫秒 1.6.4.1 使用处理时间(Processing Time)的情况 如果使用处理时间,那么窗口按照系统时间进行判断 如果当前系统时间,大于等于窗口的结束时间,那么这个窗口就会被关闭,并且被触发计算 比如 0 – 5000的窗口 当系统时间走到了: 大于 等于 4999就会触发窗口计算和关闭 1.6.4.2 使用事件时间(Event Time)的情况 如果使用事件时间,那么: 当新进入的一条数据,其事件时间大于等于某个窗口的结束时间,那么这个窗口被关闭并触发计算 比如:两个窗口 窗口A是0-5000,窗口B是5000-10000 当数据事件时间是大于等于4999(5000 – 1)的数据进来,会导致窗口A进行关闭和触发计算。 1.6.4.3 使用水印的情况 如果使用水印,那么: 当新进入的一条数据,其水印时间,大于等于某个窗口的结束时间,那么这个窗口被关闭并触发计算 1.6.4.4 总结 l 处理时间:通过当前系统时间决定窗口触发和关闭 当前系统时间会不停的向前走,所以这样的情况下,窗口的关闭和触发很稳定,比如5秒窗口,就每隔5秒触发一次 l 事件时间:通过进入到Flink的数据,所带的 事件时间来决定是否关闭窗口 数据如果不进入Flink,那么这个窗口就一直不会被关闭。 所以事件时间窗口的开关不稳定,取决于数据 l 水印时间:基于数据的事件时间,同样开闭不稳定,取决于数据是否到来以及到来的数据的事件时间是多少,后面学习水印机制的时候细说 1.7 Window API 1.7.1 Window API的调用方式 我们如果想要对数据加窗口可以调用以下两种方法 1.7.1.1 window方法 仅针对keyby后的流可以使用 对分流后的每个子流加窗口 如图,可见有8个快捷方法可以使用 底层是帮组我们调用的window和windowAll方法 具体根据需要使用即可 1.7.1.2 windowAll方法 使用了keyby分流后的流或者未使用keyby分流后的流,均可使用 作用是:对数据进行加窗口操作,并且会忽略是否进行了keyby分流 区别在于: l 使用keyby分流后的流如果调用windowAll, 相当于未分流的效果, Flink会忽略分流后的各个子流,而是将全量数据一起进行窗口计算 l 而未使用keyby分流后的数据,只能调用windowAll方法,无法调用window方法 这两个方法均需要传入一个WindowAssigner对象的实例 WindowAssigner对象就是指窗口的类型具体是什么?是时间窗口还是计数窗口还是会话窗口 如图,WindowAssigner是一个抽象类,我们不能直接实例化它,一般使用它的子类 如图,这些是WindowAssigner的一些子类 我们一般常用的有: l TumblingEventTimeWindows 滚动时间窗口, 以event时间为时间依据 实例化方式:TumblingEventTimeWindows.of(滚动窗口时间) l TumblingProcessingTimeWindows 滚动时间窗口, 以processing时间为依据 实例化方式:TumblingProcessingTimeWindows.of() l SlidingEventTimeWindows 滑动时间窗口, 以event时间为依据 实例化方式:SlidingEventTimeWindows of(窗口长度, 滑动距离) l SlidingProcessingTimeWindows 滑动时间窗口, 以processing时间为依据 实例化方式:SlidingProcessingTimeWindows.of(窗口长度, 滑动距离) l GlobalWindows 全局窗口, 滚动计数, 滑动计数均使用这个窗口来实现 实例化方式:GlobalWindows.create() l EventTimeSessionWindows 会话时间窗口, 以event时间为依据 实例化方式:EventTimeSessionWindows.withGap(会话gap时间) l ProcessingTimeSessionWindows 会话时间窗口, 以processing时间为依据 实例化方式:ProcessingTimeSessionWindows.withGap(会话gap时间) 1.8 Time Window 案例 1.8.1 tumbling-time-window (滚动窗口-无重叠数据) 窗口可以作用与DataStream之上。 如果数据是未分流(keyby)的,那么就对全量数据加窗口 如果数据是分流后的,那么针对每个流加窗口(类似SQL的group by 后对每个分组做聚合) 可以看出,未分流的数据,只能使用带ALL关键字的方法 l 案例: n 自定义一个Source, 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字 n 对数据加窗口, 窗口1对未分流的数据统计数字总和 n 窗口2对按key分组后的数据统计每个key对应的数字总和 l 代码实现 /** * 滚动-时间-窗口演示 * 自定义一个Source, 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字 * 用时间窗口统计和 */ public class TumblingTimeWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Source DataStreamSource<Tuple2<String, Integer>> randomIntSource = env.addSource(new GenerateRandomNumEverySecond()); // 如果直接对source执行窗口的话, 是执行windowAll系列的方法 // 对未分组的数据统计总和, 每5秒统计一次 SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = randomIntSource .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1); // 安装key进行分流, 对分流后每个组进行求和统计, 窗口是滚动窗口, 每5秒一次 SingleOutputStreamOperator<Tuple2<String, Integer>> sumEachKey = randomIntSource .keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1); sumOfAll.print("Sum of all:"); sumEachKey.print("Sum each key:"); env.execute(); } /* 自定义Source 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字 */ public static class GenerateRandomNumEverySecond implements SourceFunction<Tuple2<String, Integer>> { private boolean isRun = true; private final Random random = new Random(); private final List<String> keyList = Arrays.asList("hadoop", "spark", "flink"); @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (this.isRun) { String key = keyList.get(random.nextInt(3)); ctx.collect(Tuple2.of(key, random.nextInt(99))); Thread.sleep(1000L); } } @Override public void cancel() { this.isRun = false; } } } 1.8.2 sliding-time-window (滑动窗口-有重叠数据) 按照时间来进行窗口划分,每次窗口的滑动距离小于窗口的长度,这样数据就会有一部分重复计算,我们参考上面的案例 /** * 滑动时间窗口案例 * 自定义一个Source, 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字 * 每隔5秒统计前10秒的数据, 分别统计 * 1. 全量数字之和 * 2. 分组后每个key对应的数字之和 */ public class SlidingTimeWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Source DataStreamSource<Tuple2<String, Integer>> source = env.addSource(new GenerateRandomNumEverySecond()); // 统计全量的滑动窗口 SingleOutputStreamOperator<Tuple2<String, Integer>> sumAll = source .timeWindowAll(Time.seconds(10), Time.seconds(5)).sum(1); // 按照key分组后统计 SingleOutputStreamOperator<Tuple2<String, Integer>> sumEachKey = source .keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1); sumAll.print("Sum all>>>"); sumEachKey.print("Sum each key>>>"); env.execute(); } /* 自定义Source 每隔1秒产生一个的k,v k是hadoop spark flink 其中某一个, v是随机数字 */ public static class GenerateRandomNumEverySecond implements SourceFunction<Tuple2<String, Integer>> { private boolean isRun = true; private final Random random = new Random(); private final List<String> keyList = Arrays.asList("hadoop", "spark", "flink"); @Override public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { while (this.isRun) { String key = keyList.get(random.nextInt(3)); Tuple2<String, Integer> value = Tuple2.of(key, random.nextInt(99)); ctx.collect(value); System.out.println("------: " + value); Thread.sleep(1000L); } } @Override public void cancel() { this.isRun = false; } } } 结果: ------: (spark,33) ------: (spark,66) ------: (flink,25) ------: (flink,57) Sum each key>>>:10> (flink,82) Sum each key>>>:1> (spark,99) Sum all>>>:11> (spark,181) ------: (spark,25) ------: (spark,80) ------: (spark,4) ------: (flink,61) ------: (hadoop,2) Sum each key>>>:11> (hadoop,2) Sum each key>>>:1> (spark,208) Sum ea

此文档下载收益归作者所有

下载文档
你可能关注的文档
收起
展开