How to efficiently update Impala tables whose files are modified very frequently How to efficiently update Impala tables whose files are modified very frequently hadoop hadoop

How to efficiently update Impala tables whose files are modified very frequently


Since nobody seems to have the answer for my problem, I would like to share the approach we took to make this processing more efficient, comments are very welcome.

We discovered (doc. is not very clear on this) that some of the information stored in the Spark "checkpoints" in HDFS is a number of metadata files describing when each Parquet file was written and how big was it:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadataw-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compactw-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250...$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250v1{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}...

So, what we did was:

  • Build a Spark Streaming Job polling that _spark_metadata folder.
    • We use a fileStream since it allow us to define the file filter to use.
    • Each entry in that stream is one of those JSON lines, which is parsed to extract the file path and size.
  • Group the files by the parent folder (which maps to each Impala partition) they belong to.
  • For each folder:
    • Read a dataframe loading only the targeted Parquet files (to avoid race conditions with the other job writing the files)
    • Calculate how many blocks to write (using the size field in the JSON and a target block size)
    • Coalesce the dataframe to the desired number of partitions and write it back to HDFS
    • Execute the DDL REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • Finally, delete the source files

What we achieved is:

  • Limit the DDLs, by doing one refresh per partition and batch.

  • By having batch time and block size configurable, we are able to adapt our product to different deployment scenarios with bigger or smaller datasets.

  • The solution is quite flexible, since we can assign more or less resources to the Spark Streaming job (executors, cores, memory, etc.) and also we can start/stop it (using its own checkpointing system).

  • We are also studying the possibily of applying some data repartitioning, while doing this process, to have partitions as close as possible to the most optimum size.