hivemapjoin(hivemapjoin大表进内存)
本篇文章给大家谈谈hivemapjoin,以及hivemapjoin大表进内存对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、hive中mapjoin和普通join的区别
- 2、Spark 广播join 与 Hive map join
- 3、【Hive】Hive Join 介绍
- 4、hive优化方法
- 5、Hive常用算子实现原理简述--MapReduce版
- 6、Hive 如何使用mapjoin
hive中mapjoin和普通join的区别
hive支持’left join’写; hiveleft outer join:右边行物悄左边表应,每行都映射输;右罩仔渣边没行与左边行应,输左边行,右边表字段NULL; hiveleft semi join:相于SQLin语句戚哪 两测试数据表建表语句...
Spark 广播join 与 Hive map join
Sprak 广播变量
广播如斗早变量(Broadcast Variables)允许开发人员在每个节点(Worker or Executor)缓存只读变量,而不是在Task之间传递这些变量。调用sc.broadcast(v)创建一个广播变量,该广播变量的值封装在v变量中,可使用获取该变量value的方法进行访问。
原理 将小表的数据 广渣雀播到每个executor 所在的内存当中, 每个executor内可能有多个task,但是只存在一个小表的数据
Hive 对 大表和销裂小表join的优化 就会走Map join
每个block 就是一个task任务
原理 将小表数据 先读到每个task 任务所在的内存当中,有几个task 就有几个 bkl
【Hive】Hive Join 介绍
[TOC]
Hive 中的 Join 只支持等值 Join,也就是说 Join on 中的 on 里面表之间连接条件只能是 = ,不能是 , 等符号。此外,on中的等值连接之间只能是 and,不能是or。
Hive 执行引擎会将 HQL “翻译” 成为map-reduce 任务,在执行表的 Join 操作时,如果多个表中每个表都使用同一个列进行连接(出现在 Join on 子句中),则只会生成一个 MR Job:
三个表 a、b、c 都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个 Join 子句中,从而只生成一个 MR Job。
如果多表中,其中存在一个表使用了至少 2 个字段进行连接(同一个表的至少2个列出现在山碰 Join 子句中),则会至少生成 2 个MR Job:
三个表基于 2 个字段进行连接,这两个字段 b.key1 和 b.key2 同时出现在 b 表中。连接的过程是这样的:首先 a 和 b 表基于a.key 和 b.key1 进行连接,对应着第一个 MR Job;表 a 和 b 连接的结果,再和 c 进行连接,对应着第二个 MR Job。
这是因为 Map 输出时候以 Join on 条件中的列为 key,如果 Join 有多个关联键,则以这些关联键的组合作为 key,Map 根据 Key 分发数据给 Reduce 端,具体的 Join 是在 Reduce 操作中完成,因此,如果多表基于不同的列做 Join,则无法在一轮 MR 任务中将所有相关数据 shuffle 到同一个 Reduce 操作中。
Hive 支持常用的 SQL Join 语句,例如内连接、左外连接、右外连接以及 Hive 独有的 map 端连接。其中 map 端连接是用于优化 Hive 连接查询的一个重要技巧。
先准备三张表。
employee员工表:
dept部门表:
薪水表:
多张表进行内连接操作时,只有所有表中与 on 条件中相匹配的数据才会显示,类似取交集。
JOIN 操作符 左边表中符合 where 条件的所有记录都会被保留 ,JOIN 操作符右边表中如果没有符合 on 后面连接条件的记录,则从右边表中选出的列为NULL,如果没有 where 条件,则左边表中的记录都被保留。
标准查询关键字执行顺序为 from-on-where-group by-having-order by ,on 是先对表进行筛选后再关联的,left 关联则 on 只对右表有效,左表都要选出来。
对于大量的数据,在编写 SQL 时尽量用 where 条件过滤掉不符合条件的数据是有益的。但是对于左外连接和右外连接, where 条件是在 on 条件执行之后才会执行,on 条件会产生一个临时表,where 条件是对这个临时表进行过滤 。
因此为了优化 Hive SQL 执行的效率, 在需要使用外连接的场景,如果是要条件查询后才连接应该把查询件放置于 on 后,如果是想再连接完毕后才筛选就应把条件放置于 where 后面,对主表的筛选要用 where 条件 。
特别要注意的是,如果是需要对主表过滤之后再和从表做左关联,最好将主表写成子查询的形式,可以减少主表扒衫的数据量 :
RIGHT OUTER JOIN,与 LEFT OUTER JOIN 相对, JOIN 操作符右边表中符合where 条件的所有记录都会被保留 ,JOIN 操作符左边表中如果没有符合 on 后面连接条件的记录,则从左边表中选出的列为 NULL。
保留满足 where 条件的两个表的数据,类似并集,没有符合连接条件的字段使用 NULL 填充。
以 LEFT SEMI JOIN 关键字前面的表为主表,返回主表的 KEY 也在副表中的记录。在早期的 Hive 版本中,不支持标准 SQL 中的春唯腔 IN 或 EXISTS 的功能,可以使用LEFT SEMI JOIN 实现类似的功能。
需要强调的是:
笛卡尔积是一种连接,表示左边表的行数乘以右边表的行数。
Hive中的 Join 可分为 Common Join(Reduce 阶段完成 join)和 Map Join(Map 阶段完成 join)。
如果不指定 Map Join 或者不符合 Map Join 的条件,那么 Hive 解析器会默认把执行 Common Join,即在 Reduce 阶段完成 join。整个过程包含 Map、Shuffle、Reduce 阶段。
以下面 HQL 为例,图解其过程:
Map Join 通常用于一个很小的表和一个大表进行 join 的场景,具体小表有多小,由参数 hive.mapjoin.smalltable.filesize 来决定,该参数表示小表的总大小,默认值为 25000000 字节,即 25M。
Hive 0.7 之前,需要使用 hint 提示 / + mapjoin(table) / 才会执行Map Join,否则执行 Common Join,但在 0.7 版本之后,默认自动会转换 Map Join,由参数hive.auto.convert.join 来控制,默认为 true。
如上图中的流程, 首先Task A 在客户端本地执行,负责扫描小表 b 的数据,将其转换成一个 HashTable 的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache 中。
接下来是 Task B,该任务是一个没有 Reduce 的 MR,启动 MapTasks 扫描大表 a,在 Map 阶段,根据 a 的每一条记录去和 DistributeCache 中 b 表对应的 HashTable 关联,并直接输出结果。
由于 MapJoin 没有 Reduce,所以由 Map 直接输出结果文件,有多少个 Map Task,就有多少个结果文件。
Map Join 效率比 Common Join 效率好,但总会有“小表”条件不满足的时候。这就需要 bucket map join 了。
Bucket map join 需要待连接的两个表在连接字段上进行分桶(每个分桶对应hdfs上的一个文件),而且小表的桶数需要时大表桶数的倍数。
建立分桶表的例子:
这样,my_user 表就对应 32 个桶,数据根据 uid 的 hash value 与32 取余,然后被分发导不同的桶中。
如果两个表在连接字段上分桶,则可以执行 bucket map join 了,具体的:
对于 bucket map join 中的两个表,如果每个桶内分区字段也是有序的,则还可以进行 sort merge bucket map join。
建表语句为:
这样一来当两边 bucket 要做局部 join 的时候,只需要用类似 merge sort 算法中的 merge 操作一样把两个 bucket 顺序遍历一遍即可完成,小表的数据可以每次只读取一部分,然后还是用大表一行一行的去匹配,这样的join 没有限制内存的大小. 并且也可以执行全外连接。
进行sort merge bucket map join时,需要设置的属性为:
Join 的过程中,Map 结束之后,会将相同的 Key 的数据 shuffle 到同一个 Reduce中,如果数据分布均匀的话,每个Reduce 处理的数据量大体上是比较均衡的,但是若明显存在数据倾斜的时候,会出现某些 Reducer 处理的数据量过大,从而使得该节点的处理时间过长,成为瓶颈。
大表与大表关联,如果其中一张表的多是空值或者 0 比较多,容易 shuffle 给一个reduce,造成运行慢。
这种情况可以对异常值赋一个随机值来分散 key,均匀分配给多个 reduce 去执行,比如:
当 key 值都是有效值时,解决办法为:
设置以下参数:
Hive 在运行的时候无法判断哪个 key 会产生倾斜,所以使用 hive.skewjoin.key 参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的 reduce,一般可以设置成待处理的总记录数/reduce 个数的 2-4 倍。
1. Hive基础二(join原理和机制,join的几种类型,数据倾斜简单处理)
2. Hive:JOIN及JOIN优化
3. hive中关于常见数据倾斜的处理
hive优化方法
1、列裁剪和分区裁剪
2、谓词下推
3、sort by 替换order by
4、group by 代替distinct
5、group by 配置调整
map 端预聚合:set hive.map.aggr=true ; set hive.groupby.mapaggr.checkinterval=100000
倾斜均衡配置项:set hive.groupby.skewindate=true
6、join优化
6.1 大小表,小表前置
6.2 多表Join时key相同
6.3 利用mapjoin特性
6.4 分桶表 mapjoin
6.5 倾斜均衡配置项:
设置hive.optimize.skewjoin=true,开启后,在join过程中hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.task参数还可以控制第二个job的mapper数量,默认10000.
6.7 优化sql处理join数据倾斜
6.7.1、空值或无意义的值:若不需要空值数据,就提前写到where语句过滤掉,如果需要保留的话,将空值key用随机方式打散。
6.7.2、单独处理倾斜key
6.7.3、不同数据类型首侍,join的关联字段类型不一样,导致耗时长,所以需要注意做类型转换(join关联字段,一个是int,一个是string,,计算key的者禅吵hash值时默认时int类型做的,这样导致所有真正的string类型的key都分配到一个reducer上了)
6.7.4、mapjoin的小表比较大的时候,,无法直接使用mapjoin,则 select/*+mapjoin(b)*/ from a left join ( select /*+mapjoin (b)*/ from b inner join a )
6.8、mapreduce 优化
6.8.1、调整mapper数
mapper数量与输入文件的split数息息相关。如果想减少mapper数,就适当提高mapred.min.split.size,split数就减少了;如果想增大mapper数,除了降低maperd.min.split.size之外,也可以提高mapred.map.task;一般来讲,如果输入文件是少量大文件,就减少mapper数;如果是大量非小文件,就增大mapper数,如果是小文件,就喝吧小文件
6.8.2 调整reducer数
使用参数mapred.reduce.task可以直接设定reducer数量,如果袭滑不设置的话,hive会自行推测,推测逻辑如下:
参数hive.exec.reducers.bytes.per.reducer.设定每个reducer能够处理的最大的数据量,默认时1G
参数hive.exec.reducers.max设定每个job的最大的reducer数量,默认时999或者1009
得到reducer数:reducers_num=min(total_input_size/reducers.bytes.per.reducer,reducers.max)
reducer数量与输出文件的数量相关,如果reducer数太多,会产生大量小文件,对hdfs造成压力,如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或造成oom
6.8.3 合并小文件
输入阶段合并:需要更改hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.hiveiputformat,我们该成org.apache.hadoop.hive.ql.io.combinehiveinputformat.
这样比调整mapper数时,又多出两个参数,分别是mapred.min.split.size.per.node和mapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现又split大小小于这两个默认值100MB,则会进行合并
输出阶段合并:直接将hive.merge.mapfiles和hive.merge.mapredfiles都设置为true即可。
hive.merge.mapfiles表示将map-only任务的输出合并
hive.merge.mapredfiles表示将map-reduce任务的输出合并
另外hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值
hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认时1G
6.9 启用压缩
压缩job的中间结果数据和输出数据,可以少量CPU时间节省出很多空间。压缩方式一般选择snappy,效率最高,要启用中间压缩,需要设定:
hive.exec.compress.intermediate为true,同时指定压缩方式hive.intermediate.compression.code为org.apache.hadoop.io.compress.snappycode.
另外,参数hive.intermediate.compression.type可以选对块block还是对记录record压缩,block压缩率比较高,输出压缩的配置基本相同:打开hive.exec.compress.output
6.10 jvm重用
在mr job中,默认时每执行一个task就会启动一个jvm,如果task非常小而且碎,那么jvm启动和关闭耗时都会比较长。可以通过调节参数mapred.job.reuse.jvm.num.task来重用。例如将这个参数设置为5,那么久代表同一个mr job中顺序执行的5各task可以重复使用一个jvm,减少启动和关闭开销,但是他对不同mr job的task无效
6.11 采用合适的存储格式
6.12 数据倾斜
数据倾斜原因:1、join 倾斜 2、聚合倾斜
group by 倾斜: group by 字段中某个字段的值过多,导致处理这个值得reduce耗时很久
解决方法:set hive.map.aggr=true; -- 表示开启map端聚合
set hive.groupby.skewindata=true; 注意:只能对单个字段聚合 , 生成两个map job ,第一个map job中map输出结果随机分布到reduce中,每个reduce做部分聚合操作,并输出结果,这样相同的groub key有可能被分发到不同的reduce中,从而达到负载均衡的目的;第二个map job再根据预处理的数据结果按照group by key分布到reduce中,这个过程可以保证相同的key被分到同一个reduce中,最后完成最终的聚合操作
6.13 优化in、exists语句
使用left semi join 替换in 、exists
6.14 合并 mapreduce操作
multi-group by 是hive的一个非常好的特性,它使得hive中利用中间结果变更非常方便
例如:
from ( select a.status,b.school from status_update a join profilees b on (a.userid=b.suerid)) subq1
insert overwirte table a1 partition(ds='2021-03-07')
select subq1.school,count(1) group by subq1.school
insert overwrite table b1 partition(ds='2021-03-07')
select subq1.status,count(1) group by subq1.status
上述语句使用了multi-group by特性联系group by 2次数据,使用不同的group by key,这一特性可以减少一次mapreduce操作
Hive常用算子实现原理简述--MapReduce版
Hive中的常用算子包括distinct、join、group by、order by、distribute by、sort by、count等,这些操作符在SQL中使用起来很方便,能快速达到我们想要的效果,但是这些算子在底层是怎么实现的呢?
order by很容易想到执行原理,在一个reduce中将所有记录按值排序即可。因此order by在数据量大的情况下执行时间非常长,容易out of memory,非特殊业务需求一般不使用。distribute by也比较明显,根据hash值将distribute的值分发到不同的reduce。sort by是小号的order by,只负责将本reducer中的值排序,达到局部有序的效果。sort by和distribute by配合使用风味更佳,二者可以合并简写为cluster by。count则更加明庆宽森晰,在combiner或reducer处按相同键累加值就能得到。
比较复杂的是distinct、join、group by,本文重点讨论这三个算子在MapReduce引擎中的大致实现原理。班门弄斧,抛砖引玉。
map阶段,将group by后的字段组合作为key,如果group by单字段那么key就一个。将group by之后要进行的聚合操作字段作为值,如要进行count,则value是1;如要sum另一个字段,则value就是该字段。
shuffle阶段,按照key的不同分发到不同的reducer。注意此时可能因为key分布不均匀而出现数据倾斜的问题。
reduce阶段,将相同key的值累加或作其他需要的聚合操作,得到结果。
对group by的过程讲解的比较清楚的是这篇文章 图文并茂,很生动。
实例如下图,对应语句是 select rank, isonline, count(*) from city group by rank, isonline;
如果巧虚group by出现数据倾斜,除去替换key为随机数、提前挑出大数量级key值等通用调优方法,适用于group by的特殊方法有以下几种:
(1)set hive.map.aggr=true,即开启map端的combiner,减少传到reducer的数据量,同时需设置参数hive.groupby.mapaggr.checkinterval 规定在 map 端进行聚合操作的条目数目。
(2)设置mapred.reduce.tasks为较大数量,降低每个reducer处理的数据量。
(3)set hive.groupby.skewindata=true,该参数可自动进行负载均衡。生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过誉亩程可以保证相同的 Group By Key 被分布到同一个 Reduce中),最后完成最终的聚合操作。
Hive中有两种join方式:map join和common join
如果不显式指定map side join,或者没有达到触发自动map join的条件,那么会进行reduce端的join,即common join,这种join包含map、shuffle、reduce三个步骤。
(1)Map阶段
读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key。Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表。然后按照key进行排序。
(2)Shuffle阶段
根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中
(3)Reduce阶段
根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
以下面的SQL为例,可用下图所示过程大致表达其join原理。
SELECT u.name, o.orderid FROM user u JOIN order o ON u.uid = o.uid;
关联字段是uid,因此以uid为map阶段的输出key,value为选取的字段name和标记源表的tag。shuffle阶段将相同key的键值对发到一起,reduce阶段将不同源表、同一key值的记录拼接起来,可能存在一对多的情况。
如果指定使用map join的方式,或者join的其中一张表小于某个体积(默认25MB),则会使用map join来执行。具体小表有多小,由参数 hive.mapjoin.smalltable.filesize 来决定。
Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数 hive.auto.convert.join 来控制,默认为true。
以下图为例说明map join如何执行,该图来自 ,博主是一个水平深厚又乐于分享的前辈,图片水印上也有其网址。
yarn会启动一个Local Task(在客户端本地执行的Task)--Task A,负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。
接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。
distinct一般和group by同时出现。
当distinct一个字段时,将group by的字段和distinct的字段组合在一起作为map输出的key,value设置为1,同时将group by的字段定为分区键,这可以确保相同group by字段的记录都分到同一个reducer,并且map的输入天然就是按照组合key排好序的。根据分区键将记录分发到reduce端后,按顺序取出组合键中的distinct字段,这时distinct字段也是排好序的。依次遍历distinct字段,每找到一个不同值,计数器就自增1,即可得到count distinct结果。例如下面的SQL语句,过程可以下图示意。
我暂时没有理解这是怎么实现的,别人写的也没有看明白。有善良的学富五车的大佬指点一下吗?
Hive 如何使用mapjoin
MapJoin是Hive的一种优化操作,其适用于小表JOIN大表的场景,由于表的JOIN操作是在Map端且在内存进行的,所以其并不需要埋世李启动Reduce任务也就不需要经过shuffle阶段,从而能在一定程度上节省资源提高JOIN效率
简单总结一下,mapjoin的使用场景:
1. 关联操作中有一张表非常小
2.不等值的链接操作
具体使用:
方法一:
在Hive0.11前,必须使用MAPJOIN来标记显示地启动该优化操作,由于其需要将小表加载进内存所以要注意小表的大小
SELECT/*+ MAPJOIN(smalltable)*/.key,valueFROMsmalltableJOINbigtableONsmalltable.key=bigtable.key
方法二 :
在Hive0.11后,Hive默认启动该优化,也就是不在需要显示的使用MAPJOIN标记,其会在必要的时候触发该优化操作将普通JOIN转换成MapJoin,可以返正通过以下两个属性来设置该优化的触发时机
hive.auto.convert.join
默认值为true,自动开户MAPJOIN优化
hive.mapjoin.smalltable.filesize
默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中
注意:使用默认启动该优化的方式如果出现默名奇妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化
hive.auto.convert.join=false(关闭自动MAPJOIN转换操作)
hive.ignore.mapjoin.hint=false(不忽略MAPJOIN标记)
对于以下查询是不支持使用方法二(MAPJOIN标记)来启动该优化的
select/*+MAPJOIN(smallTableTwo)*/idOne, idTwo, valueFROM ( select/*+MAPJOIN(smallTableOne)*/idOne, idTwo, valueFROM bigTable JOINsmallTableOneon(bigTable.idOne= smallTableOne.idOne) 弯迟
) firstjoin
JOIN
smallTableTwo ON(firstjoin.idTwo=smallTableTwo.idTwo)
但是,如果使用的是方法一即没有MAPJOIN标记则以上查询语句将会被作为两个MJ执行,进一步的,如果预先知道表大小是能够被加载进内存的,则可以通过以下属性来将两个MJ合并成一个MJ
hive.auto.convert.join.noconditionaltask:Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,并是否将多个MJ合并成一个
hive.auto.convert.join.noconditionaltask.size:多个MJ合并成一个MJ时,其表的总的大小须小于该值,同时hive.auto.convert.join.noconditionaltask必须为true
[img]关于hivemapjoin和hivemapjoin大表进内存的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。