how to prevent hadoop job to fail on corrupted input file how to prevent hadoop job to fail on corrupted input file hadoop hadoop

how to prevent hadoop job to fail on corrupted input file


It depends on where your job is failing - if a line is corrupt, and somewhere in your map method an Exception is thrown then you should just be able to wrap the body of your map method with a try / catch and just log the error:

protected void map(LongWritable key, Text value, Context context) {  try {    // parse value to a long    int val = Integer.parseInt(value.toString());    // do something with key and val..  } catch (NumberFormatException nfe) {    // log error and continue  }}

But if the error is thrown by your InputFormat's RecordReader then you'll need to amend the mappers run(..) method - who's default implementation is as follows:

public void run(Context context) {  setup(context);  while (context.nextKeyValue()) {    map(context.getCurrentKey(), context.getCurrentValue(), context);  }  cleanup(context);}

So you could amend this to try and catch the exception on the context.nextKeyValue() call but you have to be careful on just ignoring any errors thrown by the reader - an IOExeption for example may not be 'skippable' by just ignoring the error.

If you have written your own InputFormat / RecordReader, and you have a specific exception which denotes record failure but will allow you to skip over and continue parsing, then something like this will probably work:

public void run(Context context) {  setup(context);  while (true) {    try {      if (!context.nextKeyValue()) {         break;      } else {        map(context.getCurrentKey(), context.getCurrentValue(), context);      }    } catch (SkippableRecordException sre) {      // log error    }  }  cleanup(context);}

But just to re-itterate - your RecordReader must be able to recover on error otherwise the above code could send you into an infinite loop.

For your specific case - if you just want to ignore a file upon the first failure then you can update the run method to something much simpler:

public void run(Context context) {  setup(context);  try {    while (context.nextKeyValue()) {      map(context.getCurrentKey(), context.getCurrentValue(), context);    }    cleanup(context);  } catch (Exception e) {    // log error  }}

Some final words of warning:

  • You need to make sure that it isn't your mapper code which is causing the exception to be thrown, otherwise you'll be ignoring files for the wrong reason
  • GZip compressed files which are not GZip compressed will actually fail in the initialization of the record reader - so the above will not catch this type or error (you'll need to write your own record reader implementation). This is true for any file error that is thrown during record reader creation


This is what Failure Traps are used for in cascading:

Whenever an operation fails and throws an exception, if there is an associated trap, the offending Tuple is saved to the resource specified by the trap Tap. This allows the job to continue processing without any data loss.

This will essentially let your job continue and let you check your corrupt files later

If you are somewhat familiar with cascading in your flow definition statement:

    new FlowDef().addTrap( String branchName, Tap trap );

Failure Traps


There is also another possible way. You could use mapred.max.map.failures.percent configuration option. Of course this way of solving this problem could also hide some other problems occurring during map phase.