checking in bug fix in PreClustered Group operator. The close on aggregate was not being called that prevented reducer from closing

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@178 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 46b0f78..4b7aff3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -23,91 +23,98 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 
-public class DeserializedPreclusteredGroupOperator implements IOpenableDataWriterOperator {
-    private final int[] groupFields;
+public class DeserializedPreclusteredGroupOperator implements
+		IOpenableDataWriterOperator {
+	private final int[] groupFields;
 
-    private final IComparator[] comparators;
+	private final IComparator[] comparators;
 
-    private final IGroupAggregator aggregator;
+	private final IGroupAggregator aggregator;
 
-    private Object[] lastData;
+	private Object[] lastData;
 
-    private IOpenableDataWriter<Object[]> writer;
+	private IOpenableDataWriter<Object[]> writer;
 
-    private List<Object[]> buffer;
+	private List<Object[]> buffer;
 
-    private IOpenableDataReader<Object[]> reader;
+	private IOpenableDataReader<Object[]> reader;
 
-    public DeserializedPreclusteredGroupOperator(int[] groupFields, IComparator[] comparators, IGroupAggregator aggregator) {
-        this.groupFields = groupFields;
-        this.comparators = comparators;
-        this.aggregator = aggregator;
-        buffer = new ArrayList<Object[]>();
-        reader = new IOpenableDataReader<Object[]>() {
-            private int idx;
+	public DeserializedPreclusteredGroupOperator(int[] groupFields,
+			IComparator[] comparators, IGroupAggregator aggregator) {
+		this.groupFields = groupFields;
+		this.comparators = comparators;
+		this.aggregator = aggregator;
+		buffer = new ArrayList<Object[]>();
+		reader = new IOpenableDataReader<Object[]>() {
+			private int idx;
 
-            @Override
-            public void open() {
-                idx = 0;
-            }
+			@Override
+			public void open() {
+				idx = 0;
+			}
 
-            @Override
-            public void close() {
-            }
+			@Override
+			public void close() {
+			}
 
-            @Override
-            public Object[] readData() {
-                return idx >= buffer.size() ? null : buffer.get(idx++);
-            }
-        };
-    }
+			@Override
+			public Object[] readData() {
+				return idx >= buffer.size() ? null : buffer.get(idx++);
+			}
+		};
+	}
 
-    @Override
-    public void close() throws HyracksDataException {
-        if (!buffer.isEmpty()) {
-            aggregate();
-        }
-        writer.close();
-    }
+	@Override
+	public void close() throws HyracksDataException {
+		if (!buffer.isEmpty()) {
+			aggregate();
+		}
+		writer.close();
+		try {
+			aggregator.close();
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		}
+	}
 
-    private void aggregate() throws HyracksDataException {
-        reader.open();
-        aggregator.aggregate(reader, writer);
-        reader.close();
-        buffer.clear();
-    }
+	private void aggregate() throws HyracksDataException {
+		reader.open();
+		aggregator.aggregate(reader, writer);
+		reader.close();
+		buffer.clear();
+	}
 
-    @Override
-    public void open() throws HyracksDataException {
-        lastData = null;
-        writer.open();
-    }
+	@Override
+	public void open() throws HyracksDataException {
+		lastData = null;
+		writer.open();
+	}
 
-    @Override
-    public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
-        if (index != 0) {
-            throw new IllegalArgumentException();
-        }
-        this.writer = writer;
-    }
+	@Override
+	public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+		if (index != 0) {
+			throw new IllegalArgumentException();
+		}
+		this.writer = writer;
+	}
 
-    @Override
-    public void writeData(Object[] data) throws HyracksDataException {
-        if (lastData != null && compare(data, lastData) != 0) {
-            aggregate();
-        }
-        lastData = data;
-        buffer.add(data);
-    }
+	@Override
+	public void writeData(Object[] data) throws HyracksDataException {
+		if (lastData != null && compare(data, lastData) != 0) {
+			aggregate();
+		}
+		lastData = data;
+		buffer.add(data);
+	}
 
-    private int compare(Object[] d1, Object[] d2) {
-        for (int i = 0; i < groupFields.length; ++i) {
-            int fIdx = groupFields[i];
-            int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
-            if (c != 0) {
-                return c;
-            }
-        }
-        return 0;
-    }
+	private int compare(Object[] d1, Object[] d2) {
+		for (int i = 0; i < groupFields.length; ++i) {
+			int fIdx = groupFields[i];
+			int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
+			if (c != 0) {
+				return c;
+			}
+		}
+		return 0;
+	}
 }
\ No newline at end of file