简介:Flink 中的 Source 是数据的来源,用于生成数据流。这个示例演示了如何使用 Flink 的 FileSourceFunction 从文件中创建 Source。
在 Apache Flink 的流处理和批处理编程模型中,数据流被划分为三个核心部分:Source、Transformation 和 Sink。这些概念构成了 Flink 程序的基本结构。下面我们将通过一些基础示例来详细解释这三个概念。
一、Source
Source 是 Flink 程序的入口点,它产生数据流。Flink 支持多种数据源,例如文件、套接字、外部数据库等。这里我们使用一个简单的例子来展示如何创建一个 Source。
示例:从文件创建 Source
在这个例子中,我们将使用 Flink 的 FileSourceFunction 来从文件中读取数据。首先,确保你已经添加了 Flink 的相关依赖。然后,你可以创建一个实现 SourceFunction 接口的类,如下所示:
import org.apache.flink.api.common.io.InputFormat;import org.apache.flink.api.common.io.RichInputFormat;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.core.fs.Path;import org.apache.flink.core.io.InputSplit;import org.apache.flink.core.io.InputSplitAssigner;import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.util.Collector;public class FileSourceExample extends RichInputFormat<Tuple2<Long, String>> {private static final long serialVersionUID = 1L;private final String fileDir;public FileSourceExample(String fileDir) {this.fileDir = fileDir;}@Overridepublic void open(Configuration parameters) throws Exception {// Open the file specified by fileDir// and create a collection of records with Tuple2<Long, String> type// where Long represents line number and String represents the content of each line.}@Overridepublic void close() throws Exception {// Close the file source}@Overridepublic void openInputFormat() throws Exception {// Open the file source for reading}@Overridepublic void closeInputFormat() throws Exception {// Close the file source after all records are read.}}
在上面的代码中,我们定义了一个 FileSourceExample 类,该类实现了 RichInputFormat 接口。我们可以在 open() 方法中编写逻辑来打开文件并读取数据。当所有数据被读取后,closeInputFormat() 方法将被调用,以关闭文件源。