hive调优(hive调优总结)
本篇文章给大家谈谈hive调优,以及hive调优总结对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
hive基础调优方法(一)
1.查看执行计划:Explain
查看执行计划:explain select kind, count(*) from table_name group by kind
常见名词示意:
STAGE DEPENDENCIES:阶段的依赖关系
FETCH Operator :抓取操作
limit: -1 未对数据做限制
TableScan:扫描的表
alias:查询的表名
Select Operator:查询操作
expressions:查询的列名
outputColumnNames:输出的别名
详改雹细执行计划:explain extended select kind, count(*) from table_name group by kind
2.分区表:分区对应不同文件夹。
查询时用where语句可以指定分区目录蔽配dt='20211112'。
建表时用partitioned by(dt string)。
加载时需要指定分区信息 into table partition_table partition(dt='20211112')。
增加分区alter partition_table add partition(dt='20211122')。
删除分区alter partition_table drop partition(dt='20211122')。
可同时增加或删除多个,增加只需空格,删除中间需要逗号隔开。
查看分区 show partitions partition_table;
分区字段可以指定多个。
3.动态分区:
动态分区:set hive.exec.dynamic.partition=true;
非严格状态:set hive.exec.dynamic.partition.mode=nonstrict;
最大可创建动态分区:set hive.exec.max.dynamic.partitions=1000;
单个MR最大可创建动态分区:set hive.exec.max.dynamic.partitions.pernode=100
MR Job 中,最大可以创建多少个 HDFS 文件:set hive.exec.max.created.files=100000
空分区时否需要核并帆抛出异常:set hive.error.on.empty.partition=false
4.分桶表:将数据放到不同的文件
创建表clustered by(id)
用于抽样tablesample(bucket 1 out of 4 on id);
5.文件存储和压缩格式
行存储TEXTFILE、SEQUENCEFILE
列存储ORC、PARQUET
LZO和SNAPPY有优秀的压缩比和压缩速度
6.裁剪
列裁剪,只读取需要的列
分区裁剪,只读取需要的分区
7.group by数据倾斜
Map端进行聚合:set hive.map.aggr = true;
Map端进行聚合操作的条目数目 set hive.groupby.mapaggr.checkinterval = 100000;
有数据倾斜的时候进行负载均衡:set hive.groupby.skewindata = true;
8.矢量计算,可以在似scan, filter, aggregation进行批量处理
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
9.left semi join用来替代exist/in
10.CBO优化
成本优化器
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
11.谓词下推
set hive.optimize.ppd = true;
12.mapjoin:大表left join小表
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=25000000;
13.大表和大表join(极少用到)
Sort Merge Bucket Join
建表时
clustered by(id)
sorted by(id)
into 3 buckets
开启桶join
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
14.笛卡尔积
hive.mapred.mode=strict;开启后不会出现笛卡尔积
hive优化方法
1、列裁剪和分区裁剪
2、谓词下推
3、sort by 替换order by
4、group by 代替distinct
5、group by 配置调整
map 端预聚合:set hive.map.aggr=true ; set hive.groupby.mapaggr.checkinterval=100000
倾斜均衡配置项:set hive.groupby.skewindate=true
6、join优化
6.1 大小表,小表前置
6.2 多表Join时key相同
6.3 利用mapjoin特性
6.4 分桶表 mapjoin
6.5 倾斜均衡配置项:
设置hive.optimize.skewjoin=true,开启后,在join过程中hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.task参数还可以控制第二个job的mapper数量,默认10000.
6.7 优化sql处理join数据倾斜
6.7.1、空值或无意义的值:若不需要空值数据,就提前写到where语句过滤掉,如果需要保留的话,将空值key用随机方式打散。
6.7.2、单独处理倾斜key
6.7.3、不同数据类型首侍,join的关联字段类型不一样,导致耗时长,所以需要注意做类型转换(join关联字段,一个是int,一个是string,,计算key的者禅吵hash值时默认时int类型做的,这样导致所有真正的string类型的key都分配到一个reducer上了)
6.7.4、mapjoin的小表比较大的时候,,无法直接使用mapjoin,则 select/*+mapjoin(b)*/ from a left join ( select /*+mapjoin (b)*/ from b inner join a )
6.8、mapreduce 优化
6.8.1、调整mapper数
mapper数量与输入文件的split数息息相关。如果想减少mapper数,就适当提高mapred.min.split.size,split数就减少了;如果想增大mapper数,除了降低maperd.min.split.size之外,也可以提高mapred.map.task;一般来讲,如果输入文件是少量大文件,就减少mapper数;如果是大量非小文件,就增大mapper数,如果是小文件,就喝吧小文件
6.8.2 调整reducer数
使用参数mapred.reduce.task可以直接设定reducer数量,如果袭滑不设置的话,hive会自行推测,推测逻辑如下:
参数hive.exec.reducers.bytes.per.reducer.设定每个reducer能够处理的最大的数据量,默认时1G
参数hive.exec.reducers.max设定每个job的最大的reducer数量,默认时999或者1009
得到reducer数:reducers_num=min(total_input_size/reducers.bytes.per.reducer,reducers.max)
reducer数量与输出文件的数量相关,如果reducer数太多,会产生大量小文件,对hdfs造成压力,如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或造成oom
6.8.3 合并小文件
输入阶段合并:需要更改hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.hiveiputformat,我们该成org.apache.hadoop.hive.ql.io.combinehiveinputformat.
这样比调整mapper数时,又多出两个参数,分别是mapred.min.split.size.per.node和mapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现又split大小小于这两个默认值100MB,则会进行合并
输出阶段合并:直接将hive.merge.mapfiles和hive.merge.mapredfiles都设置为true即可。
hive.merge.mapfiles表示将map-only任务的输出合并
hive.merge.mapredfiles表示将map-reduce任务的输出合并
另外hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值
hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认时1G
6.9 启用压缩
压缩job的中间结果数据和输出数据,可以少量CPU时间节省出很多空间。压缩方式一般选择snappy,效率最高,要启用中间压缩,需要设定:
hive.exec.compress.intermediate为true,同时指定压缩方式hive.intermediate.compression.code为org.apache.hadoop.io.compress.snappycode.
另外,参数hive.intermediate.compression.type可以选对块block还是对记录record压缩,block压缩率比较高,输出压缩的配置基本相同:打开hive.exec.compress.output
6.10 jvm重用
在mr job中,默认时每执行一个task就会启动一个jvm,如果task非常小而且碎,那么jvm启动和关闭耗时都会比较长。可以通过调节参数mapred.job.reuse.jvm.num.task来重用。例如将这个参数设置为5,那么久代表同一个mr job中顺序执行的5各task可以重复使用一个jvm,减少启动和关闭开销,但是他对不同mr job的task无效
6.11 采用合适的存储格式
6.12 数据倾斜
数据倾斜原因:1、join 倾斜 2、聚合倾斜
group by 倾斜: group by 字段中某个字段的值过多,导致处理这个值得reduce耗时很久
解决方法:set hive.map.aggr=true; -- 表示开启map端聚合
set hive.groupby.skewindata=true; 注意:只能对单个字段聚合 , 生成两个map job ,第一个map job中map输出结果随机分布到reduce中,每个reduce做部分聚合操作,并输出结果,这样相同的groub key有可能被分发到不同的reduce中,从而达到负载均衡的目的;第二个map job再根据预处理的数据结果按照group by key分布到reduce中,这个过程可以保证相同的key被分到同一个reduce中,最后完成最终的聚合操作
6.13 优化in、exists语句
使用left semi join 替换in 、exists
6.14 合并 mapreduce操作
multi-group by 是hive的一个非常好的特性,它使得hive中利用中间结果变更非常方便
例如:
from ( select a.status,b.school from status_update a join profilees b on (a.userid=b.suerid)) subq1
insert overwirte table a1 partition(ds='2021-03-07')
select subq1.school,count(1) group by subq1.school
insert overwrite table b1 partition(ds='2021-03-07')
select subq1.status,count(1) group by subq1.status
上述语句使用了multi-group by特性联系group by 2次数据,使用不同的group by key,这一特性可以减少一次mapreduce操作
[img]hive性能优化及参数调优
记录一下自己在工作中经常用到的几个参数设置,从调整的实际效果看还是有效果的。
企业相关服务器资源配置:平均600台active的节点,
每个节点可用的内存在200G左右,可用的memory total:116T
1、**set hive.exec.parallel=true;**
开启job的并行:基本每个hql脚本都会开启这个参数,默认并行度为8,
在集群资源充足的情况下,可以提高job并行的数量:
set hive.exec.parallel.thread.number=16; (企业生产中我是很少用到这个的,都是用的默认值,因为太消耗资源怕影响别的任务,搞不好会被运维抓住,邮件通报批评!当然使用时还是看具体情况吧!)
因为需求中一张表的job的数量每次基本都在20个以上,在相关维度多,涉及到的字段逻辑复杂的情况下,
一张表中job的数量会超过100个,之前做的一个需求中insert插入的脚本中job的数量达到了169个,
在测试环境运行的时候只用了一个小时就跑完了,数据量在一亿条左右,大概有一百多G。
2、**set hive.map.aggr=true;**
在map端中会做部分聚集操作,效率更高但需要更多的内存,可以根据自己企业的资源情况来设置,
如果我的脚本涉及到的数据量不大的话,我一般不会开启这个参数。
3、**set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;**
hive0.5开始的默认值,执行map前进行小文件合并,在一个job中生成的map的数量很多的时候,
和第二个参数一起开启配合使用,在实际生产中多次验证发现可以减少一倍以上的map数量。
在开启前我的一个job的map数量有577个,开启后的map的数量只有196个,极大提高程序的运行效率。
4、**set mapred.max.split.size=256000000;**
每个Map(一个切片的)最大输入大小(这个值决定了合并后文件的数量),和第3个参数配合一起使用
默认值也是256000000,
mapred.min.split.size默认值是10000000
dfs.block.size默认是128M,这个参数通过hive来更改是没有实际用的,只能通过hdfs来修改
***实际在hive中,并不是split的大小要小于等于blocksize,而是可以远大于blocksize,为什么???(map的数量)***
1当hive需要处理的文件是压缩,且压缩算法不支持文件切分的时候,决定map个数的因素主要是文件块实际存储的大小,
如果文件块本身很大,比如500Mb左右,那么每个map处理的splitsize至少要是500Mb左右。
这个时候我们不能人为通过参数降低每嫌物个map的splitsize来增加map个数,只能通过增加splitsize,减少map个数,
如果hive处理的文件是压缩模式,且压缩模式不支持文件切分,那么这个时候我们只能通过控制参数来减少map个数,而不能通过配置参数来增加map个数,所以Hive对于压缩不可切分文件的调优有限
2如果Hive处理的的文件为非压缩格式或者压缩可切分,且inputFormat为握此CombineHiveInputFormat时,
则控制map个数是由以下四个参数起作用,关于这四个参数作用优先级与使用注意事项请参考如下:
一般来讲这几个参数的结果大小要满足以下条件:
max.split.size = min.split.size = min.size.per.node = min.size.per.rack
几个参数的作用优先级为:
max.split.size = min.split.size = min.size.per.node = min.size.per.rack
总结:所以对于控制map的个数进行调优,首先需要看是否开启了压缩,压缩算法是段者迅否支持切分,参数的设置等等!
5、**set mapred.min.split.size.per.node=256000000;**
一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并) ,
和第3和第4个参数一起配合使用。
6、**set mapred.min.split.size.per.rack=256000000;**
一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) ,
也适合第3,4,5的参数一起配合使用。
7、**set hive.exec.mode.local.auto=true;**
开启本地模式,这个参数在自己学习中可能经常用到,但是在实际生产中用到的还是比较少,
因为这个参数开启后,针对的是小数据集,在单台机器上处理所有的任务,对生产中的任务不适用!
8、**set hive.exec.reducers.bytes.per.reducer=512*1000*1000;**
每个reduce任务处理的数据量,默认为256M,在hive0.14.0之前默认是1G,我们公司设置的是512M,写的是512*1000*1000因为在网络传输中用的是1000,而不是1024机制,
将该参数值调小可以增加reduce的数量,提高运行的效率,
当然也不是reduce的数量越多越好,因为启动和初始化reduce都是会消耗资源和时间的,
而且有多少个reduce就会有多少个输出文件,如果这些文件作为下一个任务的输入,就会造成小文件过多的问题
9、**hive.exec.reducers.max**
每个任务最大的reduce数,默认为1009,在hive0.14.0之前默认是999
计算reducer数的公式很简单N=min(参数9,总输入数据量/参数8)
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
10、**set mapred.reduce.tasks = 15;**
设置reduce的个数(在实际生产中谨慎使用)
那么什么时候可以进行手动设定reduce数量呢?比如系统自动计算的reduce个数,因为集群资源不足,
造成程序运行出现OOM(内存溢出不足)时,可以根据推定的reduce个数手动增加数量,保证程序在跑的慢的基础上可以完整运行
那么在什么情况下只有一个reduce呢?
1、当map的输出文件小于hive.exec.reducers.bytes.per.reducer时
2、手动设置set mapred.reduce.tasks =1时
3、使用了order by时(全局排序会使用一个reduce去处理)
4、表关联时出现笛卡尔积
5、单独使用count时,比如:select count(*) from tablename,
如果改写加入了group by配合使用就不会出现一个reduce,比如:select sign_date,count(*) from tablename group by sign_date;
11、**set mapred.job.reuse.jvm.num.tasks=10;**
用于避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短,因为hive调起mapreduce任务,JVM的启动过程会造成很大的开销,尤其是job有成千上万个task任务时,JVM重用可以使得JVM实例在同一个job中重新使用N次
12、**set hive.exec.dynamic.partition=true;**
表示开启动态分区功能
13、**set hive.exec.dynamic.partition.mode=nonstrict;**
表示允许所有分区都是动态的,
默认是strict,表示必须保证至少有一个分区是静态的
14、**set hive.groupby.skewindata=true;**
有数据倾斜的时候进行负载均衡 ,决定group by操作是否支持倾斜数据,其实说白了就相当于MR中的conbiner做了一次预聚合。
注意:只能对单个字段聚合。
控制生成两个MR Job,第一个MR Job Map的输出结果随机分配到reduce中减少某些key值条数过多某些key条数过小造成的数据倾斜问题。
在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可能分发到不同的reduce中,从而达到负载均衡的目的;
第二个 MapReduce 任务再根据预处理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作
15、**set hive.auto.convert.join=true;**
开启map join
16、**set hive.mapjoin.smalltable.filesize=512000000;**
map join的小表的大小,也是开启和关闭map join的阈值
17、**hive.exec.compress.output=true;**
开启压缩,我们公司使用的是默认的压缩算法deflate
压缩算法有:1、org.apache.hadoop.io.compress.GzipCodec,
2、org.apache.hadoop.io.compress.DefaultCodec,
3、com.hadoop.compression.lzo.LzoCodec,
4、com.hadoop.compression.lzo.LzopCodec,
5、org.apache.hadoop.io.compress.BZip2Codec
使用的压缩算法:
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
**针对上述小文件合并的三个参数值做以下解释:**
大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并
关于hive调优和hive调优总结的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。