Added Binary Preclustered Group Operator. Preclustered and Hash based group operators now use the same aggregator interface. Added a Frame Writing Operator.
git-svn-id: https://hyracks.googlecode.com/svn/trunk@132 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-cli/pom.xml b/hyracks/hyracks-cli/pom.xml
index 1fe64b8..825d5d2 100644
--- a/hyracks/hyracks-cli/pom.xml
+++ b/hyracks/hyracks-cli/pom.xml
@@ -59,6 +59,23 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<dependencies>
diff --git a/hyracks/hyracks-cli/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-cli/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks/hyracks-cli/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 1f9cba8..2a70577 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -46,7 +46,7 @@
import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.dataflow.std.group.IGroupAggregator;
-import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperator;
+import edu.uci.ics.hyracks.dataflow.std.group.DeserializedPreclusteredGroupOperator;
import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
public class HadoopReducerOperatorDescriptor<K2, V2, K3, V3> extends AbstractHadoopOperatorDescriptor {
@@ -226,7 +226,7 @@
this.comparatorFactory = new WritableComparingComparatorFactory(rawComparator.getClass());
}
}
- IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
+ IOpenableDataWriterOperator op = new DeserializedPreclusteredGroupOperator(new int[] { 0 },
new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
getOperatorId(), 0));
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
new file mode 100644
index 0000000..d0cb6bd
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class CountAggregatorFactory implements IFieldValueResultingAggregatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IFieldValueResultingAggregator createFieldValueResultingAggregator() {
+ return new IFieldValueResultingAggregator() {
+ private int count;
+
+ @Override
+ public void output(DataOutput resultAcceptor) throws HyracksDataException {
+ try {
+ resultAcceptor.writeInt(count);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ count = 0;
+ }
+
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ ++count;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java
new file mode 100644
index 0000000..783467b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregator.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFieldValueResultingAggregator {
+ /**
+ * Called once per aggregator before calling accumulate for the first time.
+ *
+ * @param accessor
+ * - Accessor to the data tuple.
+ * @param tIndex
+ * - Index of the tuple in the accessor.
+ * @throws HyracksDataException
+ */
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+ /**
+ * Called once per tuple that belongs to this group.
+ *
+ * @param accessor
+ * - Accessor to data tuple.
+ * @param tIndex
+ * - Index of tuple in the accessor.
+ * @throws HyracksDataException
+ */
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException;
+
+ /**
+ * Called finally to emit output.
+ *
+ * @param resultAcceptor
+ * - Interface to write the result to.
+ * @throws HyracksDataException
+ */
+ public void output(DataOutput resultAcceptor) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ITupleAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
similarity index 77%
rename from hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ITupleAggregatorFactory.java
rename to hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
index afada81..d2d2e89 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ITupleAggregatorFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/IFieldValueResultingAggregatorFactory.java
@@ -16,8 +16,6 @@
import java.io.Serializable;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface ITupleAggregatorFactory extends Serializable {
- public ITupleAggregator createTupleAggregator() throws HyracksDataException;
+public interface IFieldValueResultingAggregatorFactory extends Serializable {
+ public IFieldValueResultingAggregator createFieldValueResultingAggregator();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ITupleAggregator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ITupleAggregator.java
deleted file mode 100644
index 9919935..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ITupleAggregator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface ITupleAggregator {
- void add(Object[] data);
-
- void init(Object[] data);
-
- void write(IDataWriter<Object[]> writer) throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
new file mode 100644
index 0000000..2d176a1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MultiAggregatorFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.aggregators;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregator;
+import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
+
+public class MultiAggregatorFactory implements IAccumulatingAggregatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private IFieldValueResultingAggregatorFactory[] aFactories;
+
+ public MultiAggregatorFactory(IFieldValueResultingAggregatorFactory[] aFactories) {
+ this.aFactories = aFactories;
+ }
+
+ @Override
+ public IAccumulatingAggregator createAggregator(RecordDescriptor inRecordDesc,
+ final RecordDescriptor outRecordDescriptor) {
+ final IFieldValueResultingAggregator aggregators[] = new IFieldValueResultingAggregator[aFactories.length];
+ for (int i = 0; i < aFactories.length; ++i) {
+ aggregators[i] = aFactories[i].createFieldValueResultingAggregator();
+ }
+ final ArrayTupleBuilder tb = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+ return new IAccumulatingAggregator() {
+ private boolean pending;
+
+ @Override
+ public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex,
+ int[] keyFieldIndexes) throws HyracksDataException {
+ if (!pending) {
+ for (int i = 0; i < keyFieldIndexes.length; ++i) {
+ tb.addField(accessor, tIndex, keyFieldIndexes[i]);
+ }
+ DataOutput dos = tb.getDataOutput();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].output(dos);
+ tb.addFieldEndOffset();
+ }
+ }
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ pending = true;
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ tb.reset();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].init(accessor, tIndex);
+ }
+ pending = false;
+ }
+
+ @Override
+ public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].accumulate(accessor, tIndex);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ReflectionBasedTupleAggregatorFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ReflectionBasedTupleAggregatorFactory.java
deleted file mode 100644
index 2deef50..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/ReflectionBasedTupleAggregatorFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class ReflectionBasedTupleAggregatorFactory implements
- ITupleAggregatorFactory {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- private final Class<? extends ITupleAggregator> aggregatorClass;
-
- public ReflectionBasedTupleAggregatorFactory(
- Class<? extends ITupleAggregator> aggregatorClass) {
- this.aggregatorClass = aggregatorClass;
- }
-
- @Override
- public ITupleAggregator createTupleAggregator() throws HyracksDataException {
- try {
- return aggregatorClass.newInstance();
- } catch (InstantiationException e) {
- throw new HyracksDataException(e);
- } catch (IllegalAccessException e) {
- throw new HyracksDataException(e);
- }
- }
-
-}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumGroupAggregator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumGroupAggregator.java
deleted file mode 100644
index 88b38b3..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumGroupAggregator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.dataflow.IDataReader;
-import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.group.IGroupAggregator;
-
-public class SumGroupAggregator implements IGroupAggregator {
- private static final long serialVersionUID = 1L;
- private int keyIdx;
-
- public SumGroupAggregator(int keyIdx) {
- this.keyIdx = keyIdx;
- }
-
- @Override
- public void aggregate(IDataReader<Object[]> reader, IDataWriter<Object[]> writer) throws HyracksDataException {
- String key = "";
- Object[] data;
- int count = 0;
- while ((data = reader.readData()) != null) {
- key = (String) data[keyIdx];
- ++count;
- }
- writer.writeData(new Object[] { key, count });
- }
-
- @Override
- public void close() throws Exception {
- }
-
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumStringGroupAggregator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumStringGroupAggregator.java
deleted file mode 100644
index dd4e4eb..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumStringGroupAggregator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.dataflow.IDataReader;
-import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.group.IGroupAggregator;
-
-public class SumStringGroupAggregator implements IGroupAggregator {
- private static final long serialVersionUID = 1L;
- private int keyIdx;
-
- public SumStringGroupAggregator(int keyIdx) {
- this.keyIdx = keyIdx;
- }
-
- @Override
- public void aggregate(IDataReader<Object[]> reader, IDataWriter<Object[]> writer) throws HyracksDataException {
- String key = "";
- Object[] data;
- int count = 0;
- while ((data = reader.readData()) != null) {
- key = (String) data[keyIdx];
- ++count;
- }
- writer.writeData(new Object[] { key, String.valueOf(count) });
- }
-
- @Override
- public void close() throws Exception {
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumTupleAggregator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumTupleAggregator.java
deleted file mode 100644
index 6dee771..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumTupleAggregator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.aggregators;
-
-import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class SumTupleAggregator implements ITupleAggregator {
-
- private Object key;
- private int count;
-
- @Override
- public void add(Object[] data) {
- count++;
- }
-
- @Override
- public void init(Object[] data) {
- key = data[0];
- count = 0;
- }
-
- @Override
- public void write(IDataWriter<Object[]> writer) throws HyracksDataException {
- writer.writeData(new Object[] { key, count });
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
new file mode 100644
index 0000000..f36731a
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class FrameFileWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private IFileSplitProvider fileSplitProvider;
+
+ public FrameFileWriterOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider) {
+ super(spec, 1, 0);
+ this.fileSplitProvider = fileSplitProvider;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ final FileSplit[] splits = fileSplitProvider.getFileSplits();
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private OutputStream out;
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ out = new FileOutputStream(splits[partition].getLocalFile());
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ out.write(buffer.array());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ try {
+ out.flush();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ out.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
similarity index 93%
rename from hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperator.java
rename to hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index 637d8f0..46b0f78 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
-public class PreclusteredGroupOperator implements IOpenableDataWriterOperator {
+public class DeserializedPreclusteredGroupOperator implements IOpenableDataWriterOperator {
private final int[] groupFields;
private final IComparator[] comparators;
@@ -38,7 +38,7 @@
private IOpenableDataReader<Object[]> reader;
- public PreclusteredGroupOperator(int[] groupFields, IComparator[] comparators, IGroupAggregator aggregator) {
+ public DeserializedPreclusteredGroupOperator(int[] groupFields, IComparator[] comparators, IGroupAggregator aggregator) {
this.groupFields = groupFields;
this.comparators = comparators;
this.aggregator = aggregator;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
index 7d36b50..adfb183 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/GroupingHashTable.java
@@ -80,8 +80,8 @@
private final FrameTupleAccessor storedKeysAccessor;
GroupingHashTable(IHyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
+ ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
this.ctx = ctx;
appender = new FrameTupleAppender(ctx);
buffers = new ArrayList<ByteBuffer>();
@@ -160,7 +160,7 @@
}
int saIndex = accumulatorSize++;
aggregator = accumulators[saIndex] = aggregatorFactory.createAggregator(inRecordDescriptor,
- outRecordDescriptor);
+ outRecordDescriptor);
aggregator.init(accessor, tIndex);
link.add(sbIndex, stIndex, saIndex);
}
@@ -180,7 +180,7 @@
ByteBuffer keyBuffer = buffers.get(bIndex);
storedKeysAccessor.reset(keyBuffer);
IAccumulatingAggregator aggregator = accumulators[aIndex];
- while (!aggregator.output(appender, storedKeysAccessor, tIndex)) {
+ while (!aggregator.output(appender, storedKeysAccessor, tIndex, storedKeys)) {
flushFrame(appender, writer);
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java
index 1b160de..b0da898 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/IAccumulatingAggregator.java
@@ -52,9 +52,11 @@
* - Accessor to access the key.
* @param tIndex
* - Tuple index of the key in the accessor.
+ * @param keyFieldIndexes
+ * - Field indexes of the key field.
* @return true if all output is written, false if the appender is full.
* @throws HyracksDataException
*/
- public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex)
+ public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex, int[] keyFieldIndexes)
throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index d4cd57c..7b01a16 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -14,41 +14,138 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group;
+import java.nio.ByteBuffer;
+
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class PreclusteredGroupOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private final int[] groupFields;
- private final IGroupAggregator aggregator;
- private final IComparatorFactory[] comparatorFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IAccumulatingAggregatorFactory aggregatorFactory;
private static final long serialVersionUID = 1L;
public PreclusteredGroupOperatorDescriptor(JobSpecification spec, int[] groupFields,
- IComparatorFactory[] comparatorFactories, IGroupAggregator aggregator, RecordDescriptor recordDescriptor) {
+ IBinaryComparatorFactory[] comparatorFactories, IAccumulatingAggregatorFactory aggregatorFactory,
+ RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
- this.aggregator = aggregator;
+ this.aggregatorFactory = aggregatorFactory;
recordDescriptors[0] = recordDescriptor;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- IComparator[] comparators = new IComparator[comparatorFactories.length];
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createComparator();
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- return new DeserializedOperatorNodePushable(ctx, new PreclusteredGroupOperator(groupFields, comparators,
- aggregator), recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final IAccumulatingAggregator aggregator = aggregatorFactory.createAggregator(inRecordDesc,
+ recordDescriptors[0]);
+ final ByteBuffer copyFrame = ctx.getResourceManager().allocateFrame();
+ final FrameTupleAccessor inFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
+ final FrameTupleAccessor copyFrameAccessor = new FrameTupleAccessor(ctx, inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
+ ByteBuffer outFrame = ctx.getResourceManager().allocateFrame();
+ final FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ appender.reset(outFrame, true);
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private boolean first;
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ first = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ inFrameAccessor.reset(buffer);
+ int nTuples = inFrameAccessor.getTupleCount();
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
+ aggregator.init(inFrameAccessor, i);
+ first = false;
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1,
+ inFrameAccessor, i);
+ } else {
+ switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ }
+ }
+ aggregator.accumulate(inFrameAccessor, i);
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ writeOutput(prevTupleAccessor, prevTupleIndex);
+ aggregator.init(currTupleAccessor, currTupleIndex);
+ }
+ }
+
+ private void writeOutput(final FrameTupleAccessor lastTupleAccessor, int lastTupleIndex)
+ throws HyracksDataException {
+ if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ if (!aggregator.output(appender, lastTupleAccessor, lastTupleIndex, groupFields)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+ private boolean sameGroup(FrameTupleAccessor a1, int t1Idx, FrameTupleAccessor a2, int t2Idx) {
+ for (int i = 0; i < comparators.length; ++i) {
+ int fIdx = groupFields[i];
+ int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength()
+ + a1.getFieldStartOffset(t1Idx, fIdx);
+ int l1 = a1.getFieldLength(t1Idx, fIdx);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength()
+ + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
+ if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ appender.reset(appender.getBuffer(), true);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!first) {
+ writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(appender.getBuffer(), writer);
+ }
+ }
+ writer.close();
+ }
+ };
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index c06bd40..67bcd63 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -25,18 +25,20 @@
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.SumStringGroupAggregator;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -60,9 +62,11 @@
RecordDescriptor desc = new RecordDescriptor(
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
- ','), desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
@@ -74,21 +78,28 @@
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ desc2);
PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
group.setPartitionConstraint(groupPartitionConstraint);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
+ RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
group2.setPartitionConstraint(groupPartitionConstraint2);
@@ -130,9 +141,11 @@
RecordDescriptor desc = new RecordDescriptor(
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
- ','), desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
@@ -145,22 +158,29 @@
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ desc2);
PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group.setPartitionConstraint(groupPartitionConstraint);
InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
+ RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group2.setPartitionConstraint(groupPartitionConstraint2);
@@ -202,9 +222,11 @@
RecordDescriptor desc = new RecordDescriptor(
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
- ','), desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
@@ -217,22 +239,29 @@
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ desc2);
PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group.setPartitionConstraint(groupPartitionConstraint);
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
- new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+ InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
+ RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+ new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }), desc3);
PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group2.setPartitionConstraint(groupPartitionConstraint2);
diff --git a/hyracks/hyracks-examples/pom.xml b/hyracks/hyracks-examples/pom.xml
index c3371dc..9596b37 100644
--- a/hyracks/hyracks-examples/pom.xml
+++ b/hyracks/hyracks-examples/pom.xml
@@ -13,6 +13,7 @@
<modules>
<module>tpch-example</module>
+ <module>text-example</module>
<module>hyracks-integration-tests</module>
</modules>
</project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks/hyracks-examples/text-example/.project
similarity index 64%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.project
copy to hyracks/hyracks-examples/text-example/.project
index 41b2574..4e057cd 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks/hyracks-examples/text-example/.project
@@ -1,16 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>tpchhelper</name>
+ <name>text-example</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
<name>org.maven.ide.eclipse.maven2Builder</name>
<arguments>
</arguments>
@@ -18,6 +13,5 @@
</buildSpec>
<natures>
<nature>org.maven.ide.eclipse.maven2Nature</nature>
- <nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-examples/text-example/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
rename from hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
rename to hyracks/hyracks-examples/text-example/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks/hyracks-examples/text-example/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
activeProfiles=
eclipse.preferences.version=1
fullBuildGoals=process-test-resources
diff --git a/hyracks/hyracks-examples/text-example/pom.xml b/hyracks/hyracks-examples/text-example/pom.xml
new file mode 100644
index 0000000..f6b8d8f
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/pom.xml
@@ -0,0 +1,19 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>text-example</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-examples</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <modules>
+ <module>texthelper</module>
+ <module>textclient</module>
+ <module>textapp</module>
+ </modules>
+</project>
diff --git a/hyracks/hyracks-examples/text-example/textapp/.classpath b/hyracks/hyracks-examples/text-example/textapp/.classpath
new file mode 100644
index 0000000..3f62785
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textapp/.classpath
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.4"/>
+ <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks/hyracks-examples/text-example/textapp/.project
similarity index 95%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.project
copy to hyracks/hyracks-examples/text-example/textapp/.project
index 41b2574..4f3af14 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks/hyracks-examples/text-example/textapp/.project
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>tpchhelper</name>
+ <name>textapp</name>
<comment></comment>
<projects>
</projects>
@@ -17,7 +17,7 @@
</buildCommand>
</buildSpec>
<natures>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
</natures>
</projectDescription>
diff --git a/hyracks/hyracks-examples/text-example/textapp/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-examples/text-example/textapp/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..37272d9
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textapp/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Tue Sep 28 14:37:42 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.4
+org.eclipse.jdt.core.compiler.compliance=1.4
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.4
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-examples/text-example/textapp/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
copy to hyracks/hyracks-examples/text-example/textapp/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks/hyracks-examples/text-example/textapp/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
activeProfiles=
eclipse.preferences.version=1
fullBuildGoals=process-test-resources
diff --git a/hyracks/hyracks-examples/text-example/textapp/pom.xml b/hyracks/hyracks-examples/text-example/textapp/pom.xml
new file mode 100644
index 0000000..706ae72
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textapp/pom.xml
@@ -0,0 +1,58 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>textapp</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>text-example</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/application/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/app-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>texthelper</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml b/hyracks/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml
new file mode 100644
index 0000000..43ace6c
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textapp/src/main/assembly/app-assembly.xml
@@ -0,0 +1,13 @@
+<assembly>
+ <id>app-assembly</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/application/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.classpath b/hyracks/hyracks-examples/text-example/textclient/.classpath
similarity index 100%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.classpath
copy to hyracks/hyracks-examples/text-example/textclient/.classpath
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks/hyracks-examples/text-example/textclient/.project
similarity index 95%
rename from hyracks/hyracks-examples/tpch-example/tpchhelper/.project
rename to hyracks/hyracks-examples/text-example/textclient/.project
index 41b2574..04307d3 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks/hyracks-examples/text-example/textclient/.project
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>tpchhelper</name>
+ <name>textclient</name>
<comment></comment>
<projects>
</projects>
@@ -17,7 +17,7 @@
</buildCommand>
</buildSpec>
<natures>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
</natures>
</projectDescription>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
similarity index 88%
rename from hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
rename to hyracks/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
index 4898439..8599738 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks/hyracks-examples/text-example/textclient/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
copy to hyracks/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks/hyracks-examples/text-example/textclient/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
activeProfiles=
eclipse.preferences.version=1
fullBuildGoals=process-test-resources
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
new file mode 100644
index 0000000..c56a6b8
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -0,0 +1,80 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>textclient</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks.examples</groupId>
+ <artifactId>text-example</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>texthelper</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.examples.text.client.WordCountMain</mainClass>
+ <name>textclient</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
new file mode 100644
index 0000000..96045d2
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.text.client;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.examples.text.WordTupleParserFactory;
+
+public class WordCountMain {
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
+
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+ public int port = 1099;
+
+ @Option(name = "-app", usage = "Hyracks Application name", required = true)
+ public String app;
+
+ @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+ public String inFileSplits;
+
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
+
+ @Option(name = "-use-hash", usage = "Use Hash based grouping", required = true)
+ public boolean useHash;
+
+ @Option(name = "-hashtable-size", usage = "Hash table size (default: 8191)", required = false)
+ public int htSize = 8191;
+
+ @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
+ public int sbSize = 32768;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+
+ JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
+ options.useHash, options.htSize, options.sbSize);
+
+ long start = System.currentTimeMillis();
+ UUID jobId = hcc.createJob(options.app, job);
+ hcc.start(jobId);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+
+ private static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new File(s.substring(idx + 1)));
+ }
+ return fSplits;
+ }
+
+ private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, boolean useHash, int htSize,
+ int sbSize) {
+ JobSpecification spec = new JobSpecification();
+
+ IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+ RecordDescriptor wordDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+ new WordTupleParserFactory(), wordDesc);
+ wordScanner.setPartitionConstraint(createPartitionConstraint(inSplits));
+
+ RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ IOperatorDescriptor gBy;
+ if (useHash) {
+ gBy = new HashGroupOperatorDescriptor(spec, new int[] { 0 },
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ groupResultDesc, htSize);
+ gBy.setPartitionConstraint(createPartitionConstraint(outSplits));
+ IConnectorDescriptor scanGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
+ } else {
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, sbSize, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, wordDesc);
+
+ IConnectorDescriptor scanSortConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
+
+ gBy = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+ new MultiAggregatorFactory(
+ new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ groupResultDesc);
+ OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(sortGroupConn, sorter, 0, gBy, 0);
+ }
+
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+ FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ writer.setPartitionConstraint(createPartitionConstraint(outSplits));
+
+ IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(gbyPrinterConn, gBy, 0, writer, 0);
+
+ spec.addRoot(writer);
+ return spec;
+ }
+
+ private static PartitionConstraint createPartitionConstraint(FileSplit[] splits) {
+ LocationConstraint[] lConstraints = new LocationConstraint[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ lConstraints[i] = new AbsoluteLocationConstraint(splits[i].getNodeName());
+ }
+ return new ExplicitPartitionConstraint(lConstraints);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.classpath b/hyracks/hyracks-examples/text-example/texthelper/.classpath
similarity index 100%
rename from hyracks/hyracks-examples/tpch-example/tpchhelper/.classpath
rename to hyracks/hyracks-examples/text-example/texthelper/.classpath
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project b/hyracks/hyracks-examples/text-example/texthelper/.project
similarity index 95%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.project
copy to hyracks/hyracks-examples/text-example/texthelper/.project
index 41b2574..19ce234 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.project
+++ b/hyracks/hyracks-examples/text-example/texthelper/.project
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>tpchhelper</name>
+ <name>texthelper</name>
<comment></comment>
<projects>
</projects>
@@ -17,7 +17,7 @@
</buildCommand>
</buildSpec>
<natures>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
</natures>
</projectDescription>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-examples/text-example/texthelper/.settings/org.eclipse.jdt.core.prefs
similarity index 88%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
copy to hyracks/hyracks-examples/text-example/texthelper/.settings/org.eclipse.jdt.core.prefs
index 4898439..8599738 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks/hyracks-examples/text-example/texthelper/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-examples/text-example/texthelper/.settings/org.maven.ide.eclipse.prefs
similarity index 88%
copy from hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
copy to hyracks/hyracks-examples/text-example/texthelper/.settings/org.maven.ide.eclipse.prefs
index 032b327..4562b1a 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/.settings/org.maven.ide.eclipse.prefs
+++ b/hyracks/hyracks-examples/text-example/texthelper/.settings/org.maven.ide.eclipse.prefs
@@ -1,4 +1,4 @@
-#Wed Aug 11 19:04:13 PDT 2010
+#Tue Sep 28 14:37:42 PDT 2010
activeProfiles=
eclipse.preferences.version=1
fullBuildGoals=process-test-resources
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/pom.xml b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
similarity index 89%
rename from hyracks/hyracks-examples/tpch-example/tpchhelper/pom.xml
rename to hyracks/hyracks-examples/text-example/texthelper/pom.xml
index ed35716..b0dae86 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/pom.xml
+++ b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
@@ -1,12 +1,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
- <artifactId>tpchhelper</artifactId>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>texthelper</artifactId>
<version>0.1.3-SNAPSHOT</version>
<parent>
<groupId>edu.uci.ics.hyracks.examples</groupId>
- <artifactId>tpch-example</artifactId>
+ <artifactId>text-example</artifactId>
<version>0.1.3-SNAPSHOT</version>
</parent>
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
new file mode 100644
index 0000000..9825fdf
--- /dev/null
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
@@ -0,0 +1,128 @@
+package edu.uci.ics.hyracks.examples.text;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class WordTupleParserFactory implements ITupleParserFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITupleParser createTupleParser(final IHyracksContext ctx) {
+ return new ITupleParser() {
+ @Override
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+ try {
+ ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ DataOutput dos = tb.getDataOutput();
+
+ IValueParser utf8StringParser = UTF8StringParserFactory.INSTANCE.createValueParser();
+ WordCursor cursor = new WordCursor(new InputStreamReader(in));
+ while (cursor.nextWord()) {
+ tb.reset();
+ utf8StringParser.parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
+ tb.addFieldEndOffset();
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+
+ private static class WordCursor {
+ private static final int INITIAL_BUFFER_SIZE = 4096;
+ private static final int INCREMENT = 4096;
+
+ private char[] buffer;
+
+ private int start;
+ private int end;
+ private boolean eof;
+
+ private int fStart;
+ private int fEnd;
+ private Reader in;
+
+ public WordCursor(Reader in) {
+ this.in = in;
+ buffer = new char[INITIAL_BUFFER_SIZE];
+ start = 0;
+ end = 0;
+ eof = false;
+ }
+
+ public boolean nextWord() throws IOException {
+ if (eof) {
+ return false;
+ }
+
+ int p = start;
+ while (true) {
+ if (p >= end) {
+ int s = start;
+ eof = !readMore();
+ if (eof) {
+ return true;
+ }
+ p -= (s - start);
+ }
+ char ch = buffer[p];
+ if (Character.isWhitespace(ch)) {
+ fStart = start;
+ fEnd = p;
+ start = p + 1;
+ return true;
+ }
+ ++p;
+ }
+ }
+
+ private boolean readMore() throws IOException {
+ if (start > 0) {
+ System.arraycopy(buffer, start, buffer, 0, end - start);
+ }
+ end -= start;
+ start = 0;
+
+ if (end == buffer.length) {
+ buffer = Arrays.copyOf(buffer, buffer.length + INCREMENT);
+ }
+
+ int n = in.read(buffer, end, buffer.length - end);
+ if (n < 0) {
+ return false;
+ }
+ end += n;
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/tpch-example/pom.xml b/hyracks/hyracks-examples/tpch-example/pom.xml
index d81d4ce..c30df2b 100644
--- a/hyracks/hyracks-examples/tpch-example/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/pom.xml
@@ -12,7 +12,6 @@
</parent>
<modules>
- <module>tpchhelper</module>
<module>tpchclient</module>
<module>tpchapp</module>
</modules>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
index d95511d..cc0b0b9 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
@@ -48,11 +48,11 @@
</plugins>
</build>
<dependencies>
- <dependency>
- <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
- <artifactId>tpchhelper</artifactId>
- <version>0.1.3-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.1.3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index f69b7c8..7a49171 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -17,13 +17,6 @@
<version>0.1.3-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
- <artifactId>tpchhelper</artifactId>
- <version>0.1.3-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
</dependencies>
<build>
<plugins>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index d90f384..5bc5479 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -24,6 +24,9 @@
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -34,7 +37,6 @@
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
-import edu.uci.ics.hyracks.examples.tpch.helper.CountAccumulatingAggregatorFactory;
public class Main {
public static void main(String[] args) throws Exception {
@@ -119,11 +121,14 @@
RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 6 },
+ HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+ spec,
+ new int[] { 6 },
new FieldHashPartitionComputerFactory(new int[] { 6 },
new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
- new CountAccumulatingAggregatorFactory(), groupResultDesc, 16);
+ new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+ groupResultDesc, 16);
gby.setPartitionConstraint(new PartitionCountConstraint(4));
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
diff --git a/hyracks/hyracks-examples/tpch-example/tpchhelper/src/main/java/edu/uci/ics/hyracks/examples/tpch/helper/CountAccumulatingAggregatorFactory.java b/hyracks/hyracks-examples/tpch-example/tpchhelper/src/main/java/edu/uci/ics/hyracks/examples/tpch/helper/CountAccumulatingAggregatorFactory.java
deleted file mode 100644
index cbc8a57..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchhelper/src/main/java/edu/uci/ics/hyracks/examples/tpch/helper/CountAccumulatingAggregatorFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package edu.uci.ics.hyracks.examples.tpch.helper;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregator;
-import edu.uci.ics.hyracks.dataflow.std.group.IAccumulatingAggregatorFactory;
-
-public class CountAccumulatingAggregatorFactory implements IAccumulatingAggregatorFactory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IAccumulatingAggregator createAggregator(RecordDescriptor inRecordDesc, RecordDescriptor outRecordDescriptor) {
- final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- return new IAccumulatingAggregator() {
- private int count;
-
- @Override
- public boolean output(FrameTupleAppender appender, IFrameTupleAccessor accessor, int tIndex)
- throws HyracksDataException {
- tb.reset();
- tb.addField(accessor, tIndex, 0);
- tb.addField(IntegerSerializerDeserializer.INSTANCE, count);
- return appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- }
-
- @Override
- public void init(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- count = 0;
- }
-
- @Override
- public void accumulate(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- ++count;
- }
- };
- }
-}
\ No newline at end of file