您现在的位置是:主页 > 数据库技术 > 数据库技术

Apache Flink如何设置并行度

IDCBT2021-12-28服务器技术人已围观

简介这篇文章将为大家详细讲解有关Apache Flink如何设置并行度,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。 在使用Apache Flink对数据进行处理时

这篇文章将为大家详细讲解有关Apache Flink如何设置并行度,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

在使用Apache Flink对数据进行处理时候,通常需要设置并行度。并行度是Apache Flink中一个非常重要的概念。设置合理的并行度能够加快数据的处理效率,不合理的并行度会造成效率降低甚至是任务出错。
Apache Flink程序包含多个任务(source,transformations/operators,sink)。这些任务使用几个并行实例所进行执行,这些并行的实例称之为并行度。

 
 如何设置并行度

Apache Flink支持在不同的级别设置并行度。配置文件、env级别、算子级别。

    配置文件默认
    在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。我们可以通过命令查看Flink配置文件的并行度。

$ cat flink-conf.yaml |grep "parallelism.default"
parallelism.default: 1
 

例如当前获取到的并行度为1。也就是说当你不设置并行度的时候它就会使用配置文件默认的并行度 1。
2.  env级别
env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。

val env = Stream...
env.setParallelism(5)
 

    客户端级别
    如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。

./bin/flink run -p 5 ../wordCount-java*.jar
 

-p即设置WordCount的Job并行度为5。4.  算子级别
我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置

val env = Stream...
val text = ...
text.keyBy(XXX)
   .flatMap(XXX).setParallelism(5)  //计算时设置为5
   .addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1
 

    并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。

    从优先级上来看:  算子级别 > env级别 > Client级别 > 系统默认级别

    在实际的使用中,我们需要设置合理的并行度来保证数据的高效处理,在一般情况下例如source,Sink等可能会需要不同的并行度来保证数据的快速读取与写入负载等。

     

    标签:

    很赞哦! ()

本栏推荐