Decompress all Gzip files in a Hadoop hdfs directory Decompress all Gzip files in a Hadoop hdfs directory hadoop hadoop

Decompress all Gzip files in a Hadoop hdfs directory


You need a CompressionCodec to decompress the file. The implementation for gzip is GzipCodec. You get a CompressedInputStream via the codec and out the result with simple IO. Something like this: say you have a file file.gz

//path of fileString uri = "/uri/to/file.gz";Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);Path inputPath = new Path(uri);CompressionCodecFactory factory = new CompressionCodecFactory(conf);// the correct codec will be discovered by the extension of the fileCompressionCodec codec = factory.getCodec(inputPath);if (codec == null) {    System.err.println("No codec found for " + uri);    System.exit(1);}// remove the .gz extensionString outputUri =    CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());InputStream is = codec.createInputStream(fs.open(inputPath));OutputStream out = fs.create(new Path(outputUri));IOUtils.copyBytes(is, out, conf);// close streams

UPDATE

If you need to get all the files in a directory, the you should get the FileStatuses like

FileSystem fs = FileSystem.get(new Configuration());FileStatus[] statuses = fs.listStatus(new Path("hdfs/path/to/dir"));

Then just loop

for (FileStatus status: statuses) {    CompressionCodec codec = factory.getCodec(status.getPath());    ...    InputStream is = codec.createInputStream(fs.open(status.getPath());    ...}


I use an identity map Hadoop job I wrote in Scalding to change compression / change split size / etc.

class IdentityMap(args: Args) extends ConfiguredJob(args) {  CombineFileMultipleTextLine(args.list("in"): _*).read.mapTo[String, String]('line -> 'line)(identity)  .write(if (args.boolean("compress")) TsvCompressed(args("out")) else TextLine(args("out")))}

General configuration abstract class:

abstract class ConfiguredJob(args: Args) extends Job(args) {  override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {    val Megabyte = 1024 * 1024    val conf = super.config(mode)    val splitSizeMax = args.getOrElse("splitSizeMax", "1024").toInt * Megabyte    val splitSizeMin = args.getOrElse("splitSizeMin", "512").toInt * Megabyte    val jobPriority = args.getOrElse("jobPriority","NORMAL")    val maxHeap = args.getOrElse("maxHeap","512m")    conf ++ Map("mapred.child.java.opts" -> ("-Xmx" + maxHeap),      "mapred.output.compress" -> (if (args.boolean("compress")) "true" else "false"),      "mapred.min.split.size" -> splitSizeMin.toString,      "mapred.max.split.size" -> splitSizeMax.toString,//      "mapred.output.compression.codec" -> args.getOrElse("codec", "org.apache.hadoop.io.compress.BZip2Codec"), //Does not work, has to be -D flag      "mapred.job.priority" -> jobPriority)  }}