Update issue #52:

Added Min/Max String aggregator using the new interface; added two test cases for both in-mem and external hash group operators.

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@863 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
new file mode 100644
index 0000000..5f8a7c1
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregations/aggregators/MinMaxStringAggregatorFactory.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IAggregateState;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.IFieldAggregateDescriptorFactory;
+
+/**
+ *
+ */
+public class MinMaxStringAggregatorFactory implements
+        IFieldAggregateDescriptorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int aggField;
+
+    private final boolean isMax;
+
+    public MinMaxStringAggregatorFactory(int aggField, boolean isMax) {
+        this.aggField = aggField;
+        this.isMax = isMax;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory
+     * #createAggregator(edu.uci.ics.hyracks.api.context.IHyracksTaskContext,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor,
+     * edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor, int[])
+     */
+    @Override
+    public IFieldAggregateDescriptor createAggregator(IHyracksTaskContext ctx,
+            RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+        return new IFieldAggregateDescriptor() {
+
+            @Override
+            public void reset(IAggregateState state) {
+                state.reset();
+            }
+
+            @Override
+            public void outputPartialResult(DataOutput fieldOutput,
+                    byte[] data, int offset, IAggregateState state)
+                    throws HyracksDataException {
+                try {
+                    if (data != null) {
+                        int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
+                        Object[] storedState = (Object[]) state.getState();
+                        fieldOutput.writeUTF((String)storedState[stateIdx]);
+                    } else {
+                        fieldOutput.writeUTF((String) state.getState());
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing a string to the output writer in MinMaxStringAggregatorFactory.");
+                }
+            }
+
+            @Override
+            public void outputFinalResult(DataOutput fieldOutput, byte[] data,
+                    int offset, IAggregateState state)
+                    throws HyracksDataException {
+                try {
+                    if (data != null) {
+                        int stateIdx = IntegerSerializerDeserializer.getInt(data, offset);
+                        Object[] storedState = (Object[]) state.getState();
+                        fieldOutput.writeUTF((String)storedState[stateIdx]);
+                    } else {
+                        fieldOutput.writeUTF((String) state.getState());
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(
+                            "I/O exception when writing a string to the output writer in MinMaxStringAggregatorFactory.");
+                }
+            }
+
+            @Override
+            public void init(IFrameTupleAccessor accessor, int tIndex,
+                    DataOutput fieldOutput, IAggregateState state)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                int fieldLength = accessor.getFieldLength(tIndex, aggField);
+                String strField = UTF8StringSerializerDeserializer.INSTANCE
+                        .deserialize(new DataInputStream(
+                                new ByteArrayInputStream(accessor.getBuffer()
+                                        .array(), tupleOffset
+                                        + accessor.getFieldSlotsLength()
+                                        + fieldStart, fieldLength)));
+                if (fieldOutput != null) {
+                    // Object-binary-state
+                    Object[] storedState = (Object[]) state.getState();
+                    if (storedState == null) {
+                        storedState = new Object[8];
+                        storedState[0] = new Integer(0);
+                        state.setState(storedState);
+                    }
+                    int stateCount = (Integer) (storedState[0]);
+                    if (stateCount + 1 >= storedState.length) {
+                        storedState = Arrays.copyOf(storedState,
+                                storedState.length * 2);
+                        state.setState(storedState);
+                    }
+
+                    stateCount++;
+                    storedState[0] = stateCount;
+                    storedState[stateCount] = strField;
+                    try {
+                        fieldOutput.writeInt(stateCount);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e.fillInStackTrace());
+                    }
+                } else {
+                    // Only object-state
+                    state.setState(strField);
+                }
+            }
+
+            @Override
+            public IAggregateState createState() {
+                return new IAggregateState() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    Object state = null;
+
+                    @Override
+                    public void setState(Object obj) {
+                        state = null;
+                        state = obj;
+                    }
+
+                    @Override
+                    public void reset() {
+                        state = null;
+                    }
+
+                    @Override
+                    public Object getState() {
+                        return state;
+                    }
+
+                    @Override
+                    public int getLength() {
+                        return -1;
+                    }
+                };
+            }
+
+            @Override
+            public void close() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+                    byte[] data, int offset, IAggregateState state)
+                    throws HyracksDataException {
+                int tupleOffset = accessor.getTupleStartOffset(tIndex);
+                int fieldStart = accessor.getFieldStartOffset(tIndex, aggField);
+                int fieldLength = accessor.getFieldLength(tIndex, aggField);
+                String strField = UTF8StringSerializerDeserializer.INSTANCE
+                        .deserialize(new DataInputStream(
+                                new ByteArrayInputStream(accessor.getBuffer()
+                                        .array(), tupleOffset
+                                        + accessor.getFieldSlotsLength()
+                                        + fieldStart, fieldLength)));
+                if (data != null) {
+                    int stateIdx = IntegerSerializerDeserializer.getInt(data,
+                            offset);
+                    Object[] storedState = (Object[]) state.getState();
+
+                    if (isMax) {
+                        if (strField.length() > ((String) (storedState[stateIdx]))
+                                .length()) {
+                            storedState[stateIdx] = strField;
+                        }
+                    } else {
+                        if (strField.length() < ((String) (storedState[stateIdx]))
+                                .length()) {
+                            storedState[stateIdx] = strField;
+                        }
+                    }
+                } else {
+                    if (isMax) {
+                        if (strField.length() > ((String) (state.getState()))
+                                .length()) {
+                            state.setState(strField);
+                        }
+                    } else {
+                        if (strField.length() < ((String) (state.getState()))
+                                .length()) {
+                            state.setState(strField);
+                        }
+                    }
+                }
+            }
+        };
+    }
+
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index c1bc9e9..1e6766a 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -45,6 +45,7 @@
 import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.AvgMergeAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.IntSumAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregations.aggregators.MinMaxStringAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -343,5 +344,119 @@
         runTest(spec);
     }
 
+    @Test
+    public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
+
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new MinMaxStringAggregatorFactory(15, true) }, outputRec, tableSize);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void singleKeyMinMaxStringExtGroupTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int frameLimits = 3;
+        int tableSize = 8;
+
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new MinMaxStringAggregatorFactory(15, true) },
+                new IFieldAggregateDescriptorFactory[] {
+                        new IntSumAggregatorFactory(1),
+                        new MinMaxStringAggregatorFactory(2, true) },
+                outputRec,
+                new HashSpillableTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "singleKeyAvgExtGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
     
 }