Hive 基础

请谈一下 Hive 的特点,Hive 和 RDBMS 有什么异同?

Hive 是基于 Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,可以将 SQL 语句转换为 MapReduce 任务进行运行。其优点是学习成本低,可以通过类 SQL 语句快速实现简单的 MapReduce 统计,不必开发专门的 MapReduce 应用,十分适合数据仓库的统计分析,但是 Hive 不支持实时查询。

Hive 与关系型数据库的区别:

比较项SQLHiveQL
ANSI SQL支持不完全支持
更新UPDATE\INSERT\DELETEINSERT OVERWRITE\INTO TABLE
事务支持不支持
模式写模式读模式
数据保存块设备、本地文件系统HDFS
延时
多表插入不支持支持
子查询完全支持只能用在 From 子句中
视图UpdatableRead-only
可扩展性
数据规模
...............

Hive 的架构原理?

Hive 架构原理

  1. 两种客户端

    • CLI(command-line interface):命令行客户端(可以在 shell 中操作);
    • JDBC 客户端
  2. 四种驱动 Driver

    • SQL Parser 解析器:检查 SQL 语法是否有错误;
    • Physical Plan 编译器:把 SQL 语句转化成 MR 程序;
    • Query Optimizer 优化器:优化 SQL 语句;
    • Execution 执行器:执行 MR 程序。
  3. 元数据库 Meta store

    • Meta store 是 Hive 数据库中的一个库,用于存储处理数据的元数据,包括:表名、表所属的数据库(默认是 default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等。
    • Meta store 默认存储在 Hive 自带的 Derby 数据库中,但因为 Hive 的元数据可能要面临不断地更新、修改和读取操作,Derby 无法实现并发,实际工作中一般将 Derby 替换为 MySQL。
    • Hive 中处理的数据分两部分存放:
      • 处理的结构化数据,存储在 HDFS 中;
      • 表的元数据存储在元数据库 MySQL 中。

这样,有了表的元数据信息,就能找到对应的字段;然后把字段映射到结构化数据中,这样就可以通过映射形成一张虚表;

也就是说,表的元数据和 MySQL 中的结构化数据,通过映射,构成一张虚表,用于 Hive 查询数据分析。

Hive 的存储过程:

启动 Hive 时,会初始化 Hive,这时会在 MySQL 中生成大约 36 张表(后续随着业务的复杂会增加),然后创建表,会在 MySQL 中存放这个表的信息(不是以表的形式存在的,而是把表的属性以数据的形式放在 MySQL 中,这样在 Hive 中使用 SQL 命令一样是能够查到这张表的)。然后把本地的文本文件使用 Hive 命令格式化导入到表中,这样这些数据就存放到 HDFS 中,而不是在 MySQL 或 Hive 中。

Hive 有哪些方式保存元数据,各有哪些特点?

Hive 支持三种不同的元存储服务器,分别为:内嵌式元存储服务器、本地元存储服务器、远程元存储服务器,每种存储方式使用不同的配置参数。

  • 内嵌式元存储主要用于单元测试,在该模式下每次只有一个进程可以连接到元存储,Derby 是内嵌式元存储的默认数据库。
  • 在本地模式下,每个 Hive 客户端都会打开到数据存储的连接并在该连接上请求 SQL 查询。
  • 在远程模式下,所有的 Hive 客户端都将打开一个到元数据服务器的连接,该服务器依次查询元数据,元数据服务器和客户端之间使用 Thrift 协议通信。

Hive 内部表和外部表的区别?

  • 创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。
  • 删除表时:在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。

Hive 中的压缩格式 TextFile、SequenceFile、RCFile 、ORCFile 各有什么区别?

  • TextFile

    默认格式,存储方式为行存储,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用(系统自动检查,执行查询时自动解压),但使用这种方式,压缩后的文件不支持 split,Hive 不会对数据进行切分,从而无法对数据进行并行操作。并且在反序列化过程中,必须逐个字符判断是不是分隔符和行结束符,因此反序列化开销会比 SequenceFile 高几十倍。

  • SequenceFile

    SequenceFile 是 Hadoop API 提供的一种二进制文件支持,存储方式为行存储,其具有使用方便、可分割、可压缩的特点。

    SequenceFile 支持三种压缩选择:NONE,RECORD,BLOCK。Record 压缩率低,一般建议使用 BLOCK 压缩。

    优势是文件和 Hadoop API 中的 MapFile 是相互兼容的。

  • RCFile

    存储方式:数据按行分块,每块按列存储。结合了行存储和列存储的优点:

    首先,RCFile 保证同一行的数据位于同一节点,因此元组重构的开销很低;

    其次,像列存储一样,RCFile 能够利用列维度的数据压缩,并且能跳过不必要的列读取;

  • ORCFile

    存储方式:数据按行分块 每块按照列存储。

    压缩快、快速列存取。

    效率比 RCFile 高,是 RCFile 的改良版本。

总结:相比 TextFile 和 SequenceFile,RCFile 由于列式存储方式,数据加载时性能消耗较大,但是具有较好的压缩比和查询响应。数据仓库的特点是一次写入、多次读取,因此,整体来看,RCFile 相比其余两种格式具有较明显的优势。

说说对 Hive 桶表的理解?

桶表是对数据进行哈希取值,然后放到不同文件中存储。

数据加载到桶表时,会对字段取 hash 值,然后与桶的数量取模。把数据放到对应的文件中。物理上,每个桶就是表(或分区)目录里的一个文件,一个作业产生的桶(输出文件)和 reduce 任务个数相同。

桶表专门用于抽样查询,是很专业性的,不是日常用来存储数据的表,需要抽样查询时,才创建和使用桶表。

Hive 的 HiveSQL 转换为 MapReduce 的过程?

HiveSQL 转换为 MapReduce 的过程

  1. SQL Parser:Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象语法树 AST Tree;
  2. Semantic Analyzer:遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock;
  3. Logical plan:遍历 QueryBlock,翻译为执行操作树 OperatorTree;
  4. Logical plan optimizer: 逻辑层优化器进行 OperatorTree 变换,合并不必要的 ReduceSinkOperator,减少 shuffle 数据量;
  5. Physical plan:遍历 OperatorTree,翻译为 MapReduce 任务;
  6. Logical plan optimizer:物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划。

所有的 Hive 任务都会有 MapReduce 的执行吗?

不是,从 Hive0.10.0 版本开始,对于简单的不需要聚合的类似 SELECT * FROM LIMIT n 语句,不需要起 MapReduce job,直接通过 Fetch task 获取数据。

什么是 Fetch 抓取

Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。例如:SELECT * FROM employees,在这种情况下,Hive 可以简单地读取 employee 对应的存储目录下的文件,然后输出查询结果到控制台。

hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,老版本 Hive 默认是 minimal,该属性修改为 more 以后,在全局查找、字段查找、limit 查找等都不走 MapReduce。

什么是本地模式

大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过,有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

用户可以通过设置 hive.exec.mode.local.auto 的值为 true,来让 Hive 在适当的时候自动启动这个优化。

Hive 的函数:UDF、UDAF、UDTF 的区别?

  • UDF:User-Defined Function,用户定义(普通)函数,只对单行数值产生作用;单行进入,单行输出。
  • UDAF:User-Defined Aggregation Function,用户定义聚合函数,可对多行数据产生作用;等同与 SQL 中常用的 SUM()AVG(),也是聚合函数;多行进入,单行输出。
  • UDTF:User-Defined Table-Generating Functions,用户定义表生成函数,用来解决输入一行输出多行,如 lateral view explore();单行输入,多行输出。

请说明 Hive 中 Order By,Sort By,Distribute By,Cluster By 各代表什么意思?

  • order by:会对输入做全局排序,因此只有一个 reducer(多个 reducer 无法保证全局有序)。只有一个 reducer,会导致当输入规模较大时,需要较长的计算时间。
  • sort by:不是全局排序,其在数据进入 reducer 前完成排序。
  • distribute by:按照指定的字段对数据进行划分输出到不同的 reduce 中。
  • cluster by:除了具有 distribute by 的功能外还兼具 sort by 的功能。

写出 Hive 中 split、coalesce 及 collect_list 函数的用法(可举例)?

  • split 将字符串转化为数组,即:split('a,b,c,d', ',') ==> ["a","b","c","d"]
  • coalesce(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回 NULL
  • collect_list 列出该字段所有的值,不去重 => select collect_list(id) from table

Hive 的两张表关联,使用 MapReduce 怎么实现?

  • 如果其中有一张表为小表,直接使用 map 端 join 的方式(map 端加载小表)进行聚合。

  • 如果两张都是大表,那么采用联合 key,联合 key 的第一个组成部分是 join on 中的公共字段,第二部分是一个 flag,0 代表表 A,1 代表表 B,由此让 Reduce 区分客户信息和订单信息;在 Mapper 中同时处理两张表的信息,将 join on 公共字段相同的数据划分到同一个分区中,进而传递到一个 Reduce 中,然后在 Reduce 中实现聚合。

Hive 优化

小表、大表 JOIN 优化

将 key 相对分散,并且数据量小的表放在 JOIN 的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用 Group 让小的维度表(1000 条以下的记录条数)先进内存。在 map 端完成 reduce。

实际测试发现:新版的 Hive 已经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放在左边和右边已经没有明显区别。

大表 JOIN 大表优化

  • 空 KEY 过滤

    有时 JOIN 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reducer 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。例如 key 对应的字段为空。

  • 空 key 转换

    有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 JOIN 的结果中,此时我们可以表 A 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的 reducer 上。

Group By 优化

默认情况下,Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就倾斜了。

并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果。

开启 Map 端聚合参数设置

  • 是否在 Map 端进行聚合,默认为 true:hive.map.aggr = true
  • 在 Map 端进行聚合操作的条目数目:hive.groupby.mapaggr.checkinterval = 100000
  • 有数据倾斜的时候进行负载均衡(默认是 false):hive.groupby.skewindata = true

当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

替换 COUNT(Distinct) 去重统计

数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换。

避免笛卡尔积

尽量避免笛卡尔积,join 的时候不加 on 条件,或者无效的 on 条件,Hive 只能使用 1 个 reducer 来完成笛卡尔积。

行列过滤

  • 列处理:在 SELECT 中,只拿需要的列,如果有,尽量使用分区过滤,少用 SELECT *
  • 行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在 where 后面,那么就会先全表关联,之后再过滤。

Hive 表关联查询,如何解决数据倾斜的问题?

  • 倾斜原因:map 输出数据按 key Hash 的分配到 reduce 中,由于 key 分布不均匀、业务数据本身的特点、建表时考虑不周等原因造成的 reduce 上的数据量差异过大。

    • key 分布不均匀;
    • 业务数据本身的特性;
    • 建表时考虑不周;
    • 某些 SQL 语句本身就有数据倾斜;

      如何避免:对于 key 为空产生的数据倾斜,可以对其赋予一个随机值。

  • 解决方案

    • 参数调节:

      hive.map.aggr=true

      hive.groupby.skewindata=true

      有数据倾斜的时候进行负载均衡,当选项设定位 true,生成的查询计划会有两个 MR Job。

      第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;

      第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

    • SQL 语句调节:

      • 选用 join key 分布最均匀的表作为驱动表。做好列裁剪和 filter 操作,以达到两表做 join 的时候,数据量相对变小的效果。
      • 大小表 join:使用 map join 让小的维度表(1000 条以下的记录条数)先进内存。在 map 端完成 reduce。
      • 大表 join 大表:把空值的 key 变成一个字符串加上随机数,把倾斜的数据分到不同的 reduce 上,由于 null 值关联不上,处理后并不影响最终结果。
      • count distinct 大量相同特殊值:count distinct 时,将值为空的情况单独处理,如果是计算 count distinct,可以不用处理,直接过滤,在最后结果中加 1。如果还有其他计算,需要进行 group by,可以先将值为空的记录单独处理,再和其他计算结果进行 union。

优化 Map 数

  1. 通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务。

    主要的决定因素有:input 的文件总个数,input 的文件大小,集群设置的文件块大小。

  2. 是不是 map 数越多越好?

    不一定。如果一个任务有很多小文件(远远小于块大小 128MB),则每个小文件也会被当做一个块,用一个 map 任务来完成,而一个 map 任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的 map 数是受限的。

  3. 是不是保证每个 map 处理接近 128MB 的文件块,就高枕无忧了?

    答案也是不一定。比如有一个 127MB 的文件,正常会用一个 map 去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果 map 处理的逻辑比较复杂,用一个 map 任务去做,肯定也比较耗时。

注意:针对上面的问题 2 和 3,我们需要采取两种方式来解决:即减少 map 数和增加 map 数。

小文件进行合并

在 map 执行前合并小文件,减少 map 数:CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

复杂文件增加 Map 数

当 input 的文件都很大,任务逻辑复杂,map 执行非常慢的时候,可以考虑增加 Map 数,来使得每个 map 处理的数据量减少,从而提高任务的执行效率。

增加 map 的方法为:根据 computeSliteSize(Math.max(minSize, Math.min(maxSize, blocksize))) = blocksize = 128MB 公式,调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。

优化 Reduce 数

  1. 调整 reduce 个数方法一

    • 每个 Reduce 处理的数据量默认是 256MB

      hive.exec.reducers.bytes.per.reducer=256000000

    • 每个任务最大的 reduce 数,默认为 1009

      hive.exec.reducers.max=1009

    • 计算 reducer 数的公式

      N = min(参数 2, 总输入数据量/参数 1)

  2. 调整 reduce 个数方法二

    在 hadoop 的 mapred-default.xml 文件中修改设置每个 job 的 Reduce 个数

    set mapreduce.job.reduces = 15

  3. reduce 个数并不是越多越好

    • 过多的启动和初始化 reduce 也会消耗时间和资源;
    • 另外,有多少个 reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

      在设置 reduce 个数的时候也需要考虑这两个原则:处理大数据量利用合适的 reduce 数;使单个 reduce 任务处理数据量大小要合适。

并行执行

Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。默认情况下,Hive 一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个 job 的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么 job 可能就越快完成。

通过设置参数 hive.exec.parallel=true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。