fliflink dataset与datastream有什么区别(dataset和datastream的区别)

这里选用的工具是idea,java版本是1.8,需要注意java版本和flink的版本,如果选用高版本的java,很容易出现报错。

fliflink dataset与datastream有什么区别(dataset和datastream的区别)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dsy</groupId>
    <artifactId>flinkstudy</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>
</project>

之后创建maven项目,在里面导入两个依赖,一个是flink-java,一个是flink-streaming-java_2.12。

对于dataset来说,它每次处理文件中的数据,其实还是以有界流的方式来处理。

fliflink dataset与datastream有什么区别(dataset和datastream的区别)

package com.dsy.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @author liyuan
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        // 从文件中读取数据
        String inputString = "C:Users23085IdeaProjectsflinkstudysrcmainresourceshello.txt";
        // dataSource继承了dataset
        DataSource<String> dataSource = executionEnvironment.readTextFile(inputString);
        // 对数据集进行处理,按空格分词展开转换成(word,1)二元组统计
       DataSet<Tuple2<String, Integer>> resultSet =  dataSource.flatMap(new MyFlatMapper())
                // 按照元组第一个位置的元素进行分组
                .groupBy(0)
                // 按照元组第二个位置的元素进行求和
                .sum(1);
       // 需要抛异常
       resultSet.print();
    }
    /**
     * 自定义类 定义MyFlatFunction接口
     */
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String,Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            // 空格分词
            String[] words = s.split("");
            // 遍历word 输出二元组
            for (String word:
                 words) {
                collector.collect(new Tuple2<>(word,1));
            }
        }
    }
}

Tuple是flink中的元组,后面的2表示里面有两个不同的元素。

下面是datastream的处理方式,虽然大部分代码相似,但很容易可以看出二者有很大区别。

fliflink dataset与datastream有什么区别(dataset和datastream的区别)

package com.dsy.wc;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author liyuan
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理执行环境
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行数
        executionEnvironment.setParallelism(16);
        // 从文件中读取数据
        String inputString = "C:Users23085IdeaProjectsflinkstudysrcmainresourceshello.txt";
        // dataSource继承了dataset
        DataStream<String> dataStream = executionEnvironment.readTextFile(inputString);
        // 基于数据流进行转换计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> singleOutputStreamOperator = dataStream.flatMap(new WordCount.MyFlatMapper())
                // DataStream中没有group by api,而是key by,根据key的哈希值进行分区操作
                .keyBy(0)
                .sum(1);
        // 这里并不会启动任务,而是定义任务的执行过程
        singleOutputStreamOperator.print();
        // 执行任务 会输出多个结果,因为来一次就输出一次
        executionEnvironment.execute();
    }
}

为了省力气,这里直接调用了上面的MyFlatMapper()方法,而且需要注意datastream中的算子和dataset中有很大的一个区别,当然这里只是本地的一个操作,真正的flink自然是需要在集群中去执行。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 xxx@163.com 举报,一经查实,本站将立刻删除。

发表评论

登录后才能评论