Flink 编程模型中的 Source、Transformation 和 Sink:基础示例

作者:公子世无双2024.02.04 12:58浏览量:3

简介:Flink 中的 Source 是数据的来源,用于生成数据流。这个示例演示了如何使用 Flink 的 FileSourceFunction 从文件中创建 Source。

在 Apache Flink 的流处理和批处理编程模型中,数据流被划分为三个核心部分:Source、Transformation 和 Sink。这些概念构成了 Flink 程序的基本结构。下面我们将通过一些基础示例来详细解释这三个概念。
一、Source
Source 是 Flink 程序的入口点,它产生数据流。Flink 支持多种数据源,例如文件、套接字、外部数据库等。这里我们使用一个简单的例子来展示如何创建一个 Source。
示例:从文件创建 Source
在这个例子中,我们将使用 Flink 的 FileSourceFunction 来从文件中读取数据。首先,确保你已经添加了 Flink 的相关依赖。然后,你可以创建一个实现 SourceFunction 接口的类,如下所示:

  1. import org.apache.flink.api.common.io.InputFormat;
  2. import org.apache.flink.api.common.io.RichInputFormat;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.core.fs.Path;
  6. import org.apache.flink.core.io.InputSplit;
  7. import org.apache.flink.core.io.InputSplitAssigner;
  8. import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.datastream.DataStream;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.util.Collector;
  13. public class FileSourceExample extends RichInputFormat<Tuple2<Long, String>> {
  14. private static final long serialVersionUID = 1L;
  15. private final String fileDir;
  16. public FileSourceExample(String fileDir) {
  17. this.fileDir = fileDir;
  18. }
  19. @Override
  20. public void open(Configuration parameters) throws Exception {
  21. // Open the file specified by fileDir
  22. // and create a collection of records with Tuple2<Long, String> type
  23. // where Long represents line number and String represents the content of each line.
  24. }
  25. @Override
  26. public void close() throws Exception {
  27. // Close the file source
  28. }
  29. @Override
  30. public void openInputFormat() throws Exception {
  31. // Open the file source for reading
  32. }
  33. @Override
  34. public void closeInputFormat() throws Exception {
  35. // Close the file source after all records are read.
  36. }
  37. }

在上面的代码中,我们定义了一个 FileSourceExample 类,该类实现了 RichInputFormat 接口。我们可以在 open() 方法中编写逻辑来打开文件并读取数据。当所有数据被读取后,closeInputFormat() 方法将被调用,以关闭文件源。