Hive join optimization Hive join optimization hadoop hadoop

Hive join optimization


1. Optimize Joins

We can improve the performance of joins by enabling Auto Convert Map Joins and enabling optimization of skew joins.

Auto Map Joins

Auto Map-Join is a very useful feature when joining a big table with a small table. if we enable this feature, the small table will be saved in the local cache on each node, and then joined with the big table in the Map phase. Enabling Auto Map Join provides two advantages. First, loading a small table into cache will save read time on each data node. Second, it avoids skew joins in the Hive query, since the join operation has been already done in the Map phase for each block of data.

Skew Joins

We can enable optimization of skew joins, i.e. imbalanced joins by setting hive.optimize.skewjoin property to true either via SET command in hive shell or hive-site.xml file.

  <property>    <name>hive.optimize.skewjoin</name>    <value>true</value>    <description>      Whether to enable skew join optimization.       The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of      processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce      job, process those skewed keys. The same key need not be skewed for all the tables, and so,      the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a      map-join.    </description>  </property>  <property>    <name>hive.skewjoin.key</name>    <value>100000</value>    <description>      Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator,      we think the key as a skew join key.     </description>  </property>  <property>    <name>hive.skewjoin.mapjoin.map.tasks</name>    <value>10000</value>    <description>      Determine the number of map task used in the follow up map join job for a skew join.      It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.    </description>  </property>  <property>    <name>hive.skewjoin.mapjoin.min.split</name>    <value>33554432</value>    <description>      Determine the number of map task at most used in the follow up map join job for a skew join by specifying       the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.    </description>  </property>2. Enable Bucketed Map Joins

If tables are bucketed by a particular column and these tables are being used in joins then we can enable bucketed map join to improve the performance.

  <property>    <name>hive.optimize.bucketmapjoin</name>    <value>true</value>    <description>Whether to try bucket mapjoin</description>  </property>  <property>    <name>hive.optimize.bucketmapjoin.sortedmerge</name>    <value>true</value>    <description>Whether to try sorted bucket merge map join</description>  </property>

.

3. Enable Tez Execution Engine

Instead of running Hive queries on venerable Map-reduce engine, we can improve the performance of hive queries at least by 100% to 300 % by running on Tez execution engine. We can enable the Tez engine with below property from hive shell.

hive> set hive.execution.engine=tez;

.

4. Enable Parallel Execution

Hive converts a query into one or more stages. Stages could be a MapReduce stage, sampling stage, a merge stage, a limit stage. By default, Hive executes these stages one at a time. A particular job may consist of some stages that are not dependent on each other and could be executed in

parallel, possibly allowing the overall job to complete more quickly. Parallel execution can be enabled by setting below properties.

  <property>    <name>hive.exec.parallel</name>    <value>true</value>    <description>Whether to execute jobs in parallel</description>  </property>  <property>    <name>hive.exec.parallel.thread.number</name>    <value>8</value>    <description>How many jobs at most can be executed in parallel</description>  </property>

.

5. Enable Vectorization

Vectorization feature is introduced into hive for the first time in hive-0.13.1 release only. By vectorized query execution, we can improve performance of operations like scans, aggregations, filters and joins, by performing them in batches of 1024 rows at once instead of single row each time.

We can enable vectorized query execution by setting below three properties in either hive shell or hive-site.xml file.

hive> set hive.vectorized.execution.enabled = true;hive> set hive.vectorized.execution.reduce.enabled = true;hive> set hive.vectorized.execution.reduce.groupby.enabled = true;

.

6. Enable Cost Based Optimization

Recent Hive releases provided the feature of cost based optimization, one can achieve further optimizations based on query cost, resulting in potentially different decisions: how to order joins, which type of join to perform, degree of parallelism and others.

cost based optimization can be enabled by setting below properties in hive-site.xml file.

  <property>    <name>hive.cbo.enable</name>    <value>true</value>    <description>Flag to control enabling Cost Based Optimizations using Calcite framework.</description>  </property>  <property>    <name>hive.compute.query.using.stats</name>    <value>true</value>    <description>      When set to true Hive will answer a few queries like count(1) purely using stats      stored in metastore. For basic stats collection turn on the config hive.stats.autogather to true.      For more advanced stats collection need to run analyze table queries.    </description>  </property>  <property>    <name>hive.stats.fetch.partition.stats</name>    <value>true</value>    <description>      Annotation of operator tree with statistics information requires partition level basic      statistics like number of rows, data size and file size. Partition statistics are fetched from      metastore. Fetching partition statistics for each needed partition can be expensive when the      number of partitions is high. This flag can be used to disable fetching of partition statistics      from metastore. When this flag is disabled, Hive will make calls to filesystem to get file sizes      and will estimate the number of rows from row schema.    </description>  </property>  <property>    <name>hive.stats.fetch.column.stats</name>    <value>true</value>    <description>      Annotation of operator tree with statistics information requires column statistics.      Column statistics are fetched from metastore. Fetching column statistics for each needed column      can be expensive when the number of columns is high. This flag can be used to disable fetching      of column statistics from metastore.    </description>  </property>  <property>    <name>hive.stats.autogather</name>    <value>true</value>    <description>A flag to gather statistics automatically during the INSERT OVERWRITE command.</description>  </property>  <property>    <name>hive.stats.dbclass</name>    <value>fs</value>    <description>      Expects one of the pattern in [jdbc(:.*), hbase, counter, custom, fs].      The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'),       each task writes statistics it has collected in a file on the filesystem, which will be aggregated       after the job has finished. Supported values are fs (filesystem), jdbc:database (where database       can be derby, mysql, etc.), hbase, counter, and custom as defined in StatsSetupConst.java.    </description>  </property>