One whole exception would be splitted into 2 maps while using hadoop to catch exceptions from raw logs One whole exception would be splitted into 2 maps while using hadoop to catch exceptions from raw logs hadoop hadoop

One whole exception would be splitted into 2 maps while using hadoop to catch exceptions from raw logs


I would go for a preprocessing job where you tag the exceptions with XML tags. Next you can use XMLInputformat to process the files. (this is only the start to a solution, based on your feedback we might make things more concrete)

This link provides a tutorial to write your own XMLinputformat, which you can customize to look for 'exception' characteristics. The main point of this tutorial is this sentence:

In the event that a record spans a InputSplit boundary, the record reader will take care of this so we will not have to worry about this.

I will copy paste the information of the website, since it might go offline in the future, which could be very frustrating for people reviewing this in the future:

The inputformat:

package org.undercloud.mapreduce.example3;import java.io.IOException;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class XmlInputFormat extends FileInputFormat { public RecordReader getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {reporter.setStatus(input.toString()); return new XmlRecordReader(job, (FileSplit)input); }

The record reader:NOTE: The logic for reading past the end of the split is in readUntilMatch function which reads past the end of the split if the there is an open tag. This is really what you are looking for I think!

package org.undercloud.mapreduce.example3;import java.io.IOException;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class XmlRecordReader implements RecordReader {    private String startTagS = "";    private String endTagS = "";    private byte[] startTag;    private byte[] endTag;    private long start;    private long end;    private FSDataInputStream fsin;    private DataOutputBuffer buffer = new DataOutputBuffer();    private LineRecordReader lineReader;    private LongWritable lineKey;    private Text lineValue;    public XmlRecordReader(JobConf job, FileSplit split) throws IOException {      lineReader = new LineRecordReader(job, split);      lineKey = lineReader.createKey();      lineValue = lineReader.createValue();      startTag = startTagS.getBytes();      endTag = endTagS.getBytes();      // Open the file and seek to the start of the split      start = split.getStart();      end = start + split.getLength();      Path file = split.getPath();      FileSystem fs = file.getFileSystem(job);      fsin = fs.open(split.getPath());      fsin.seek(start);   }    public boolean next(Text key, XmlContent value) throws IOException {    // Get the next line        if (fsin.getPos() < end) {             if (readUntilMatch(startTag, false)) {                 try {                     buffer.write(startTag);                     if (readUntilMatch(endTag, true)) {                         key.set(Long.toString(fsin.getPos()));                         value.bufferData = buffer.getData();                         value.offsetData = 0;                         value.lenghtData = buffer.getLength();                         return true;                     }                 }                 finally {                     buffer.reset();                 }             }         }         return false;     }     private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {         int i = 0;         while (true) {             int b = fsin.read(); // End of file -> T            if (b == -1) return false;            // F-> Save to buffer:            if (withinBlock) buffer.write(b);            if (b == match[i]) {                i++;                if (i >= match.length) return true;            } else i = 0;            // see if we’ve passed the stop point:            if(!withinBlock && i == 0 && fsin.getPos() >= end) return false;        }    }  public Text createKey() {     return new Text("");  }  public XmlContent createValue() {    return new XmlContent();  }  public long getPos() throws IOException {    return lineReader.getPos();  }  public void close() throws IOException {    lineReader.close();  }  public float getProgress() throws IOException {    return lineReader.getProgress();  }}

And finally the writable:

package org.undercloud.mapreduce.example3;import java.io.*;import org.apache.hadoop.io.*;public class XmlContent implements Writable{    public byte[] bufferData;    public int offsetData;    public int lenghtData;      public XmlContent(byte[] bufferData, int offsetData, int lenghtData) {          this.bufferData = bufferData;          this.offsetData = offsetData;          this.lenghtData = lenghtData;          }      public XmlContent(){          this(null,0,0);      }      public void write(DataOutput out) throws IOException {          out.write(bufferData);          out.writeInt(offsetData);          out.writeInt(lenghtData);      }      public void readFields(DataInput in) throws IOException {          in.readFully(bufferData);          offsetData = in.readInt();          lenghtData = in.readInt();          }      public String toString() {            return Integer.toString(offsetData) + ", "                + Integer.toString(lenghtData) +", "                + bufferData.toString();          }   }

This looks like a really useful tutorial, addressing the issue of records spanning multiple splits. Let me know if you are able to adapt this example to your problem.


The classes TextInputFormat and NLineInputFormat might be helpful. The TextInputFormat will split the file by line, so if the exception ends with a newline (and contains none within it) this should work. If the exceptions contain a fixed number of lines the NLineInputFormat class should be what you want as you can set the number of lines to take.

Unfortunately if the exception(s) can contain a variable number of newlines within them this won't work.

In that case I recommend looking for Mahout's XmlInputFormat. It crosses split boundaries, so will work for most stuff. just run a pre-processor to put the Exceptions inside of an <exception></exception> tag, and specify that as start/end tags.

Example pre-processor, using regex to identify exceptions

String input; //code this to the input stringString regex; //make this equal to the exception regexBufferedWriter bw; //make this go to file where output will be storedString toProcess = input;boolean continueLoop = true;while(continueLoop){    Pattern p = Pattern.compile(regex);    Matcher m = p.matcher(toProcess);    if(m.find()){        bw.write("<exception>"+toProcess.substring(m.start(),m.end())+"</exception>");        toProcess = toProcess.substring(m.end());    }else{        continueLoop = false;    }}


Thanks for all your solution. I think it is useful for me

Especially notice the above comment

"In the event that a record spans a InputSplit boundary, the record reader will take care of this so we will not have to worry about this."

Then I look into the source code about how LineRecordReader to read the data form split. then I find actually the LineRecordReader has alreadly had some logic to read record spaning a InputSplit boundary cause line records in the bottom of the split always be splitted into 2 different split due to size limitation of block.so I think what i need to do is to add the data size that LineRecordReader read spaning split boundary.

Now my solution is: override the Method "nextKeyValue()" in LineRecordReader.

public boolean nextKeyValue() throws IOException {if (key == null) {  key = new LongWritable();}key.set(pos);if (value == null) {  value = new Text();}int newSize = 0;while (pos < end) {  newSize = in.readLine(value, maxLineLength,                        Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),                                 maxLineLength));

change the line“ while (pos < end) ” to “ while (pos < end + {param}) ”

the {param} means the size of redundant data which readRecorder read across split boundary.