Apache Flink 是一个分布式流处理框架,支持批处理和流处理。在 Flink 中,API 是核心部分,允许用户定义数据流处理逻辑、配置作业并执行操作。Flink 提供了多种 API 来满足不同的需求,包括 DataStream API、DataSet API(批处理 API)、Table API 和 SQL API。
1. Flink DataStream API(流处理)
DataStream API 是 Flink 最常用的 API,专为实时数据流处理而设计。它支持通过流式操作对数据进行处理,并生成一个数据流结果。
典型的数据流处理操作
以下是一些常用的 DataStream API 操作示例:
创建流:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("input.txt");
映射操作:
DataStream<Integer> lengths = text.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return value.length();
}
});
过滤操作:
DataStream<String> filtered = text.filter(value -> value.contains("Flink"));
窗口操作:
DataStream<Integer> windowedStream = text
.map(value -> value.length())
.keyBy(value -> 1) // 使用常量键值进行分区
.timeWindow(Time.seconds(5))
.sum(0);
窗口内聚合:
DataStream<Integer> sumStream = text
.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return value.length();
}
})
.keyBy(value -> 1)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) {
return value1 + value2;
}
});
Sink操作(输出):
sumStream.addSink(new SinkFunction<Integer>() {
@Override
public void invoke(Integer value, Context context) throws Exception {
System.out.println("Result: " + value);
}
});
执行作业:
env.execute("Flink Stream Job");
2. Flink DataSet API(批处理)
DataSet API 主要用于处理批数据,也就是一次性加载到内存中的数据集。批处理作业通常不涉及实时数据流,而是对静态数据源进行处理。
典型的批处理操作
创建数据集:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("input.txt");
映射操作:
DataSet<Integer> lengths = text.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return value.length();
}
});
过滤操作:
DataSet<String> filtered = text.filter(value -> value.contains("Flink"));
聚合操作:
DataSet<Integer> sum = text
.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return value.length();
}
})
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) {
return value1 + value2;
}
});
输出结果:
sum.writeAsText("output.txt");
执行作业:
env.execute("Flink Batch Job");
3. Flink Table API & SQL API
Flink 的 Table API 和 SQL API 是一种更高级的抽象,允许用户以类似 SQL 的方式操作流数据和批数据。它们提供了一种声明式的方式来表达流处理逻辑。
Table API 示例
创建表环境:
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
注册表:
tableEnv.createTemporaryTable("MyTable", tableDescriptor);
查询表:
Table result = tableEnv.from("MyTable")
.select("column1, column2")
.filter("column1 > 100");
转换为流:
DataStream<Row> rowStream = tableEnv.toDataStream(result);
SQL API 示例
创建表环境:
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
查询 SQL:
String query = "SELECT column1, column2 FROM MyTable WHERE column1 > 100";
Table result = tableEnv.sqlQuery(query);
执行 SQL 查询:
tableEnv.executeSql("CREATE TABLE ...");
4. Flink API 组合使用
Flink 的强大之处在于可以将不同类型的 API 进行组合使用。例如,你可以通过 DataStream API 和 Table API 的结合来实现更复杂的流处理逻辑。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 使用 DataStream API 读取数据
DataStream<String> text = env.readTextFile("input.txt");
// 将 DataStream 转换为 Table
Table table = tableEnv.fromDataStream(text, "columnName");
// 使用 SQL API 执行 SQL 查询
Table result = tableEnv.sqlQuery("SELECT * FROM " + table);
// 将结果转换回 DataStream
DataStream<Row> resultStream = tableEnv.toDataStream(result);
5. Flink API 中的 KeyedStream 和 Window
Flink 提供了丰富的窗口操作和状态管理功能,支持按照键(Key)对数据进行分区,进而进行窗口计算。
KeyedStream
KeyBy 操作:java
DataStream<String> keyedStream = text.keyBy(value -> value);
Window 操作
时间窗口:
DataStream<Integer> result = text
.map(value -> value.length())
.keyBy(value -> 1)
.timeWindow(Time.seconds(10))
.sum(0);
滚动窗口:
DataStream<Integer> result = text
.map(value -> value.length())
.keyBy(value -> 1)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.sum(0);
总结
DataStream API 适用于流式数据的实时处理,提供了丰富的转换、过滤、聚合等操作。
DataSet API 适用于批处理数据,支持类似于 MapReduce 的操作。
Table API 和 SQL API 提供了更高级的抽象,允许通过 SQL 查询来处理数据。
窗口和状态管理 支持对流数据进行按时间或按事件划分的窗口操作。
Flink 提供的多种 API 能够支持各种不同的处理需求,从简单的流处理到复杂的事件驱动计算。如果你需要更高层次的抽象和更易用的 API,可以考虑使用 Table API 或 SQL API。
发布者:myrgd,转载请注明出处:https://www.object-c.cn/4963