COLLECT_SET() in Hive, keep duplicates?
Try to use COLLECT_LIST(col) after Hive 0.13.0
SELECT hash_id, COLLECT_LIST(num_of_cats) AS aggr_setFROM tablenameWHERE blablablaGROUP BY hash_id;
There is nothing built in, but creating user defined functions, including aggregates, isn't that bad. The only rough part is trying to make them type generic, but here is a collect example.
package com.example;import java.util.ArrayList;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;public class CollectAll extends AbstractGenericUDAFResolver{ @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis) throws SemanticException { if (tis.length != 1) { throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected."); } if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1."); } return new CollectAllEvaluator(); } public static class CollectAllEvaluator extends GenericUDAFEvaluator { private PrimitiveObjectInspector inputOI; private StandardListObjectInspector loi; private StandardListObjectInspector internalMergeOI; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); if (m == Mode.PARTIAL1) { inputOI = (PrimitiveObjectInspector) parameters[0]; return ObjectInspectorFactory .getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils .getStandardObjectInspector(inputOI)); } else { if (!(parameters[0] instanceof StandardListObjectInspector)) { inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils .getStandardObjectInspector(parameters[0]); return (StandardListObjectInspector) ObjectInspectorFactory .getStandardListObjectInspector(inputOI); } else { internalMergeOI = (StandardListObjectInspector) parameters[0]; inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector(); loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); return loi; } } } static class ArrayAggregationBuffer implements AggregationBuffer { ArrayList<Object> container; } @Override public void reset(AggregationBuffer ab) throws HiveException { ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>(); } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { ArrayAggregationBuffer ret = new ArrayAggregationBuffer(); reset(ret); return ret; } @Override public void iterate(AggregationBuffer ab, Object[] parameters) throws HiveException { assert (parameters.length == 1); Object p = parameters[0]; if (p != null) { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI)); } } @Override public Object terminatePartial(AggregationBuffer ab) throws HiveException { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); ret.addAll(agg.container); return ret; } @Override public void merge(AggregationBuffer ab, Object o) throws HiveException { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o); for(Object i : partial) { agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI)); } } @Override public Object terminate(AggregationBuffer ab) throws HiveException { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); ret.addAll(agg.container); return ret; } }}
Then in hive, just issue add jar Whatever.jar;
and CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll';
You should them be able to use it as expected.
hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id;OKad3jkfk [4,4,2]fkjh43f [1,8,8]rjkhd93 [7,4,7]
It's worth noting that the order of the elements should be considered undefined, so if you intend to use this to feed information into n_grams you might need to expand it a bit to sort the data as needed.
Modified Jeff Mc's code to remove the restriction (presumably inherited from collect_set) that input must be primitive types. This version can collect structs, maps and arrays as well as primitives.
package com.example;import java.util.ArrayList;import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.parse.SemanticException;import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;public class CollectAll extends AbstractGenericUDAFResolver{ @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis) throws SemanticException { if (tis.length != 1) { throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected."); } return new CollectAllEvaluator(); } public static class CollectAllEvaluator extends GenericUDAFEvaluator { private ObjectInspector inputOI; private StandardListObjectInspector loi; private StandardListObjectInspector internalMergeOI; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); if (m == Mode.PARTIAL1) { inputOI = parameters[0]; return ObjectInspectorFactory .getStandardListObjectInspector(ObjectInspectorUtils .getStandardObjectInspector(inputOI)); } else { if (!(parameters[0] instanceof StandardListObjectInspector)) { inputOI = ObjectInspectorUtils .getStandardObjectInspector(parameters[0]); return (StandardListObjectInspector) ObjectInspectorFactory .getStandardListObjectInspector(inputOI); } else { internalMergeOI = (StandardListObjectInspector) parameters[0]; inputOI = internalMergeOI.getListElementObjectInspector(); loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI); return loi; } } } static class ArrayAggregationBuffer implements AggregationBuffer { ArrayList<Object> container; } @Override public void reset(AggregationBuffer ab) throws HiveException { ((ArrayAggregationBuffer) ab).container = new ArrayList<Object>(); } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { ArrayAggregationBuffer ret = new ArrayAggregationBuffer(); reset(ret); return ret; } @Override public void iterate(AggregationBuffer ab, Object[] parameters) throws HiveException { assert (parameters.length == 1); Object p = parameters[0]; if (p != null) { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI)); } } @Override public Object terminatePartial(AggregationBuffer ab) throws HiveException { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); ret.addAll(agg.container); return ret; } @Override public void merge(AggregationBuffer ab, Object o) throws HiveException { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o); for(Object i : partial) { agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI)); } } @Override public Object terminate(AggregationBuffer ab) throws HiveException { ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab; ArrayList<Object> ret = new ArrayList<Object>(agg.container.size()); ret.addAll(agg.container); return ret; } }}