当前位置:主页 > java教程 > Flink归约聚合

Flink实现特定统计的归约聚合reduce操作

发布:2023-04-08 15:30:01 59


给网友们整理相关的编程文章,网友敖浩荡根据主题投稿了本篇教程内容,涉及到Flink归约聚合、Flink reduce、Flink归约聚合相关内容,已被919网友关注,下面的电子资料对本篇知识点有更加详尽的解释。

Flink归约聚合

如果说简单聚合是对一些特定统计需求的实现,那么 reduce 算子就是一个一般化的聚合统计操作了。从大名鼎鼎的 MapReduce 开始,我们对 reduce 操作就不陌生:它可以对已有的

数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元

素数据类型,所以输出类型和输入类型是一样的。调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
    /**
     * The core method of ReduceFunction, combining two values into one value of the same type. The
     * reduce function is consecutively applied to all values of a group until only a single value
     * remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再

将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”

作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

其实,reduce 的语义是针对列表进行规约操作,运算规则由 ReduceFunction 中的 reduce方法来定义,而在 ReduceFunction 内部会维护一个初始值为空的累加器,注意累加器的类型

和输入元素的类型相同,当第一条元素到来时,累加器的值更新为第一条元素的值,当新的元素到来时,新元素会和累加器进行累加操作,这里的累加操作就是 reduce 函数定义的运算规

则。然后将更新以后的累加器的值向下游输出。

我们可以单独定义一个函数类实现 ReduceFunction 接口,也可以直接传入一个匿名类。当然,同样也可以通过传入 Lambda 表达式实现类似的功能。与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStrema。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。下面我们来看一个稍复杂的例子。

我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

package com.rosh.flink.test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
 * 我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个
 * 用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,
 * 记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
 */
public class TransReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //随机生成数据
        Random random = new Random();
        List<Integer> userIds = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            userIds.add(random.nextInt(5));
        }
        DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);
        //每个ID访问记录一次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(Integer value) throws Exception {
                return new Tuple2<>(value, 1L);
            }
        });
        //统计每个user访问多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        });
        sumDS.print("sumDS  ->>>>>>>>>>>>>");
        //把所有分区合并,求出最大的访问量
        SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                if (value1.f1 > value2.f1) {
                    return value1;
                } else {
                    return value2;
                }
            }
        });
        maxDS.print("maxDS ->>>>>>>>>>>");
        env.execute("TransReduceTest");
    }
}

到此这篇关于Flink实现特定统计的归约聚合reduce操作的文章就介绍到这了,更多相关Flink归约聚合内容请搜索码农之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持码农之家!


参考资料

相关文章

网友讨论