转载声明:文章来源:https://blog.csdn.net/baidu_35760874/article/details/124724594
介绍
Flink是一个处理流数据的组件,在实时计算等场景下可以发挥巨大的作用。
流数据一般分为:
有界数据流(知道数据的起点和终点,例如一个txt文件的数据)
无界数据流(不知道数据的终点,例如kafka消息、socket数据)
java demo
添加依赖
<properties>
<java.version>1.8</java.version>
<flink.version>1.12.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
示例代码
无界数据流demo
配置netcat, 网上下载nc.exe 点我下载
在安装目录打开cmd,输入如下命令,配置端口为9000
nc -L -p 9000 -v
执行成功后如图所示
接下来编写代码,本demo实现了一个将字符串数据先按照逗号分割,然后转为大写的逻辑
package com.greenutility.mask.util;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlinkTest {
public static void main(String[] args) throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.加载数据源
DataStreamSource<String> elementsSource = env.socketTextStream("127.0.0.1", 9000);
// 3.数据转换
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});
//DataStream 下边为DataStream子类
SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
return value.toUpperCase();
}
});
// 4.数据输出
source.print();
// 5.执行程序
env.execute("flink-hello-world");
}
}
执行main方法,flink已经开始监听netcat的socket数据了,此时我们在cmd里输入一些字符串
然后观察控制台的输出
我们可以看到,来自socket的字符串数据已经成功按照预期进行了处理
有界数据流demo
我们首先在本地新建一个test.txt文件,随便输入一些字符串
然后将上个demo中的加载数据源那一行代码替换为
DataStreamSource<String> elementsSource = env.readTextFile("D:\\test.txt");
执行main方法
数据处理成功!
收藏不息,战斗不止