checking in bug fix in PreClustered Group operator. The close on aggregate was not being called that prevented reducer from closing
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@178 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 46b0f78..4b7aff3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -23,91 +23,98 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-public class DeserializedPreclusteredGroupOperator implements IOpenableDataWriterOperator {
- private final int[] groupFields;
+public class DeserializedPreclusteredGroupOperator implements
+ IOpenableDataWriterOperator {
+ private final int[] groupFields;
- private final IComparator[] comparators;
+ private final IComparator[] comparators;
- private final IGroupAggregator aggregator;
+ private final IGroupAggregator aggregator;
- private Object[] lastData;
+ private Object[] lastData;
- private IOpenableDataWriter<Object[]> writer;
+ private IOpenableDataWriter<Object[]> writer;
- private List<Object[]> buffer;
+ private List<Object[]> buffer;
- private IOpenableDataReader<Object[]> reader;
+ private IOpenableDataReader<Object[]> reader;
- public DeserializedPreclusteredGroupOperator(int[] groupFields, IComparator[] comparators, IGroupAggregator aggregator) {
- this.groupFields = groupFields;
- this.comparators = comparators;
- this.aggregator = aggregator;
- buffer = new ArrayList<Object[]>();
- reader = new IOpenableDataReader<Object[]>() {
- private int idx;
+ public DeserializedPreclusteredGroupOperator(int[] groupFields,
+ IComparator[] comparators, IGroupAggregator aggregator) {
+ this.groupFields = groupFields;
+ this.comparators = comparators;
+ this.aggregator = aggregator;
+ buffer = new ArrayList<Object[]>();
+ reader = new IOpenableDataReader<Object[]>() {
+ private int idx;
- @Override
- public void open() {
- idx = 0;
- }
+ @Override
+ public void open() {
+ idx = 0;
+ }
- @Override
- public void close() {
- }
+ @Override
+ public void close() {
+ }
- @Override
- public Object[] readData() {
- return idx >= buffer.size() ? null : buffer.get(idx++);
- }
- };
- }
+ @Override
+ public Object[] readData() {
+ return idx >= buffer.size() ? null : buffer.get(idx++);
+ }
+ };
+ }
- @Override
- public void close() throws HyracksDataException {
- if (!buffer.isEmpty()) {
- aggregate();
- }
- writer.close();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ if (!buffer.isEmpty()) {
+ aggregate();
+ }
+ writer.close();
+ try {
+ aggregator.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- private void aggregate() throws HyracksDataException {
- reader.open();
- aggregator.aggregate(reader, writer);
- reader.close();
- buffer.clear();
- }
+ private void aggregate() throws HyracksDataException {
+ reader.open();
+ aggregator.aggregate(reader, writer);
+ reader.close();
+ buffer.clear();
+ }
- @Override
- public void open() throws HyracksDataException {
- lastData = null;
- writer.open();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ lastData = null;
+ writer.open();
+ }
- @Override
- public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
+ @Override
+ public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
+ if (index != 0) {
+ throw new IllegalArgumentException();
+ }
+ this.writer = writer;
+ }
- @Override
- public void writeData(Object[] data) throws HyracksDataException {
- if (lastData != null && compare(data, lastData) != 0) {
- aggregate();
- }
- lastData = data;
- buffer.add(data);
- }
+ @Override
+ public void writeData(Object[] data) throws HyracksDataException {
+ if (lastData != null && compare(data, lastData) != 0) {
+ aggregate();
+ }
+ lastData = data;
+ buffer.add(data);
+ }
- private int compare(Object[] d1, Object[] d2) {
- for (int i = 0; i < groupFields.length; ++i) {
- int fIdx = groupFields[i];
- int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
+ private int compare(Object[] d1, Object[] d2) {
+ for (int i = 0; i < groupFields.length; ++i) {
+ int fIdx = groupFields[i];
+ int c = comparators[i].compare(d1[fIdx], d2[fIdx]);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
}
\ No newline at end of file