这里选用的工具是idea,java版本是1.8,需要注意java版本和flink的版本,如果选用高版本的java,很容易出现报错。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dsy</groupId>
<artifactId>flinkstudy</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
</project>
之后创建maven项目,在里面导入两个依赖,一个是flink-java,一个是flink-streaming-java_2.12。
对于dataset来说,它每次处理文件中的数据,其实还是以有界流的方式来处理。
package com.dsy.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author liyuan
*/
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
String inputString = "C:Users23085IdeaProjectsflinkstudysrcmainresourceshello.txt";
// dataSource继承了dataset
DataSource<String> dataSource = executionEnvironment.readTextFile(inputString);
// 对数据集进行处理,按空格分词展开转换成(word,1)二元组统计
DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new MyFlatMapper())
// 按照元组第一个位置的元素进行分组
.groupBy(0)
// 按照元组第二个位置的元素进行求和
.sum(1);
// 需要抛异常
resultSet.print();
}
/**
* 自定义类 定义MyFlatFunction接口
*/
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
// 空格分词
String[] words = s.split("");
// 遍历word 输出二元组
for (String word:
words) {
collector.collect(new Tuple2<>(word,1));
}
}
}
}
Tuple是flink中的元组,后面的2表示里面有两个不同的元素。
下面是datastream的处理方式,虽然大部分代码相似,但很容易可以看出二者有很大区别。
package com.dsy.wc;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author liyuan
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 创建流处理执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行数
executionEnvironment.setParallelism(16);
// 从文件中读取数据
String inputString = "C:Users23085IdeaProjectsflinkstudysrcmainresourceshello.txt";
// dataSource继承了dataset
DataStream<String> dataStream = executionEnvironment.readTextFile(inputString);
// 基于数据流进行转换计算
SingleOutputStreamOperator<Tuple2<String, Integer>> singleOutputStreamOperator = dataStream.flatMap(new WordCount.MyFlatMapper())
// DataStream中没有group by api,而是key by,根据key的哈希值进行分区操作
.keyBy(0)
.sum(1);
// 这里并不会启动任务,而是定义任务的执行过程
singleOutputStreamOperator.print();
// 执行任务 会输出多个结果,因为来一次就输出一次
executionEnvironment.execute();
}
}
为了省力气,这里直接调用了上面的MyFlatMapper()方法,而且需要注意datastream中的算子和dataset中有很大的一个区别,当然这里只是本地的一个操作,真正的flink自然是需要在集群中去执行。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 xxx@163.com 举报,一经查实,本站将立刻删除。