COLLECT_SET() in Hive, keep duplicates? COLLECT_SET() in Hive, keep duplicates? hadoop hadoop

COLLECT_SET() in Hive, keep duplicates?


Try to use COLLECT_LIST(col) after Hive 0.13.0

SELECT    hash_id, COLLECT_LIST(num_of_cats) AS aggr_setFROM    tablenameWHERE    blablablaGROUP BY    hash_id;


There is nothing built in, but creating user defined functions, including aggregates, isn't that bad. The only rough part is trying to make them type generic, but here is a collect example.

package com.example;import java.util.ArrayList;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;public class CollectAll extends AbstractGenericUDAFResolver{    @Override    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)            throws SemanticException    {        if (tis.length != 1)        {            throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");        }        if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE)        {            throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1.");        }        return new CollectAllEvaluator();    }    public static class CollectAllEvaluator extends GenericUDAFEvaluator    {        private PrimitiveObjectInspector inputOI;        private StandardListObjectInspector loi;        private StandardListObjectInspector internalMergeOI;        @Override        public ObjectInspector init(Mode m, ObjectInspector[] parameters)                throws HiveException        {            super.init(m, parameters);            if (m == Mode.PARTIAL1)            {                inputOI = (PrimitiveObjectInspector) parameters[0];                return ObjectInspectorFactory                        .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils                        .getStandardObjectInspector(inputOI));            }            else            {                if (!(parameters[0] instanceof StandardListObjectInspector))                {                    inputOI = (PrimitiveObjectInspector)  ObjectInspectorUtils                            .getStandardObjectInspector(parameters[0]);                    return (StandardListObjectInspector) ObjectInspectorFactory                            .getStandardListObjectInspector(inputOI);                }                else                {                    internalMergeOI = (StandardListObjectInspector) parameters[0];                    inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();                    loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);                    return loi;                }            }        }        static class ArrayAggregationBuffer implements AggregationBuffer        {            ArrayList<Object> container;        }        @Override        public void reset(AggregationBuffer ab)                throws HiveException        {            ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();        }        @Override        public AggregationBuffer getNewAggregationBuffer()                throws HiveException        {            ArrayAggregationBuffer ret = new ArrayAggregationBuffer();            reset(ret);            return ret;        }        @Override        public void iterate(AggregationBuffer ab, Object[] parameters)                throws HiveException        {            assert (parameters.length == 1);            Object p = parameters[0];            if (p != null)            {                ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;                agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));            }        }        @Override        public Object terminatePartial(AggregationBuffer ab)                throws HiveException        {            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());            ret.addAll(agg.container);            return ret;        }        @Override        public void merge(AggregationBuffer ab, Object o)                throws HiveException        {            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;            ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);            for(Object i : partial)            {                agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));            }        }        @Override        public Object terminate(AggregationBuffer ab)                throws HiveException        {            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());            ret.addAll(agg.container);            return ret;        }    }}

Then in hive, just issue add jar Whatever.jar; and CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll';You should them be able to use it as expected.

hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id;OKad3jkfk [4,4,2]fkjh43f [1,8,8]rjkhd93 [7,4,7]

It's worth noting that the order of the elements should be considered undefined, so if you intend to use this to feed information into n_grams you might need to expand it a bit to sort the data as needed.


Modified Jeff Mc's code to remove the restriction (presumably inherited from collect_set) that input must be primitive types. This version can collect structs, maps and arrays as well as primitives.

package com.example;import java.util.ArrayList;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;public class CollectAll extends AbstractGenericUDAFResolver{    @Override    public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)            throws SemanticException    {        if (tis.length != 1)        {            throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");        }        return new CollectAllEvaluator();    }    public static class CollectAllEvaluator extends GenericUDAFEvaluator    {        private ObjectInspector inputOI;        private StandardListObjectInspector loi;        private StandardListObjectInspector internalMergeOI;        @Override        public ObjectInspector init(Mode m, ObjectInspector[] parameters)                throws HiveException        {            super.init(m, parameters);            if (m == Mode.PARTIAL1)            {                inputOI = parameters[0];                return ObjectInspectorFactory                        .getStandardListObjectInspector(ObjectInspectorUtils                        .getStandardObjectInspector(inputOI));            }            else            {                if (!(parameters[0] instanceof StandardListObjectInspector))                {                    inputOI = ObjectInspectorUtils                            .getStandardObjectInspector(parameters[0]);                    return (StandardListObjectInspector) ObjectInspectorFactory                            .getStandardListObjectInspector(inputOI);                }                else                {                    internalMergeOI = (StandardListObjectInspector) parameters[0];                    inputOI = internalMergeOI.getListElementObjectInspector();                    loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);                    return loi;                }            }        }        static class ArrayAggregationBuffer implements AggregationBuffer        {            ArrayList<Object> container;        }        @Override        public void reset(AggregationBuffer ab)                throws HiveException        {            ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();        }        @Override        public AggregationBuffer getNewAggregationBuffer()                throws HiveException        {            ArrayAggregationBuffer ret = new ArrayAggregationBuffer();            reset(ret);            return ret;        }        @Override        public void iterate(AggregationBuffer ab, Object[] parameters)                throws HiveException        {            assert (parameters.length == 1);            Object p = parameters[0];            if (p != null)            {                ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;                agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));            }        }        @Override        public Object terminatePartial(AggregationBuffer ab)                throws HiveException        {            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());            ret.addAll(agg.container);            return ret;        }        @Override        public void merge(AggregationBuffer ab, Object o)                throws HiveException        {            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;            ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);            for(Object i : partial)            {                agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));            }        }        @Override        public Object terminate(AggregationBuffer ab)                throws HiveException        {            ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;            ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());            ret.addAll(agg.container);            return ret;        }    }}