make genomix-hyracks test compilable
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index 0d31998..b8dc757 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -241,6 +241,13 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.5-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>genomix-data</artifactId>
<version>0.2.5-SNAPSHOT</version>
<type>jar</type>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
new file mode 100644
index 0000000..39cc6fc
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
@@ -0,0 +1,179 @@
+package edu.uci.ics.genomix.job;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.util.ByteComparatorFactory;
+import edu.uci.ics.genomix.util.StatCountAggregateFactory;
+import edu.uci.ics.genomix.util.StatReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.util.StatSumAggregateFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenStatistic extends JobGen {
+ private int kmers;
+ private JobConf hadoopjob;
+ private RecordDescriptor readOutputRec;
+ private String[] ncNodeNames;
+ private Scheduler scheduler;
+ private RecordDescriptor combineOutputRec;
+
+
+ public JobGenStatistic(GenomixJob job) {
+ super(job);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ protected void initJobConfiguration() {
+ // TODO Auto-generated method stub
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ hadoopjob = new JobConf(conf);
+ hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+ // TODO Auto-generated method stub
+ int[] degreeFields = { 0 };
+ int[] countFields = { 1 };
+ JobSpecification jobSpec = new JobSpecification();
+ /** specify the record fields after read */
+ readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, ByteSerializerDeserializer.INSTANCE,ByteSerializerDeserializer.INSTANCE });
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, ByteSerializerDeserializer.INSTANCE });
+ /** the reader */
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ readOperator, ncNodeNames);
+
+ /** the combiner aggregator */
+ AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(
+ jobSpec, degreeFields, readOperator);
+ AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(
+ jobSpec, countFields, readOperator);
+
+ /** the final aggregator */
+ AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(
+ jobSpec, degreeFields, degreeLocal);
+ AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(
+ jobSpec, countFields, countLocal);
+
+ /** writer */
+ AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(
+ jobSpec, degreeFields, degreeMerger);
+ AbstractFileWriteOperatorDescriptor writeCount = connectWriter(
+ jobSpec, countFields, countMerger);
+ jobSpec.addRoot(writeDegree);
+ jobSpec.addRoot(writeCount);
+ return null;
+ }
+
+ private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
+ throws HyracksDataException {
+ try {
+
+ InputSplit[] splits = hadoopjob.getInputFormat().getSplits(
+ hadoopjob, ncNodeNames.length);
+
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec,
+ hadoopjob, splits, readSchedule,
+ new StatReadsKeyValueParserFactory());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(
+ JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields,
+ GenomixJob.DEFAULT_FRAME_LIMIT, new IBinaryComparatorFactory[] {
+ new ByteComparatorFactory() }, null, aggeragater,
+ new StatSumAggregateFactory(),
+ combineOutputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] {
+ new ByteComparatorFactory() }),
+ GenomixJob.DEFAULT_TABLE_SIZE), true);
+ }
+
+ private AbstractOperatorDescriptor connectLocalAggregateByField(
+ JobSpecification jobSpec, int[] fields,
+ HDFSReadOperatorDescriptor readOperator) {
+ AbstractOperatorDescriptor localAggregator = newExternalGroupby(
+ jobSpec, fields, new StatCountAggregateFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ localAggregator, ncNodeNames);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+ jobSpec);
+ jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+ return localAggregator;
+ }
+
+ private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec,
+ int[] fields, AbstractOperatorDescriptor localAggregator) {
+ AbstractOperatorDescriptor finalAggregator = newExternalGroupby(
+ jobSpec, fields, new StatSumAggregateFactory());
+ // only need one reducer
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ finalAggregator, ncNodeNames[fields[0] % ncNodeNames.length]);
+ IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(
+ jobSpec,
+ new ITuplePartitionComputerFactory(){
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer(){
+ @Override
+ public int partition(IFrameTupleAccessor accessor,
+ int tIndex, int nParts)
+ throws HyracksDataException {
+ return 0;
+ }
+ };
+ }
+ },
+ fields,
+ new IBinaryComparatorFactory[]{new ByteComparatorFactory()});
+ jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+ return finalAggregator;
+ }
+
+ private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int [] fields, AbstractOperatorDescriptor finalAggregator){
+ LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(
+ jobSpec, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ writeOperator, ncNodeNames[fields[0] % ncNodeNames.length]);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
+ jobSpec);
+ jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+ return writeOperator;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
new file mode 100644
index 0000000..b469be0
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.genomix.util;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator(){
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
+ int l2) {
+ return b1[s1]-b2[s2];
+ }
+
+ };
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+ return new IBinaryHashFunction(){
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ return bytes[offset];
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
new file mode 100644
index 0000000..49a7453
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
@@ -0,0 +1,121 @@
+package edu.uci.ics.genomix.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatCountAggregateFactory implements
+ IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public class CountAggregator implements IAggregatorDescriptor {
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = 1;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = 1;
+
+ int statetupleOffset = stateAccessor
+ .getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(
+ stateTupleIndex, 1);
+ int countoffset = statetupleOffset
+ + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart;
+ byte[] data = accessor.getBuffer().array();
+
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields,
+ int[] keyFieldsInPartialResults) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new CountAggregator();
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..4c2205f
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
@@ -0,0 +1,85 @@
+package edu.uci.ics.genomix.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.KmerUtil;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<BytesWritable,KmerCountValue> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IKeyValueParser<BytesWritable,KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+
+ return new IKeyValueParser<BytesWritable,KmerCountValue>(){
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void parse(BytesWritable key, KmerCountValue value,
+ IFrameWriter writer) throws HyracksDataException {
+ byte adjMap = value.getAdjBitMap();
+ byte count = value.getCount();
+ InsertToFrame((byte) (KmerUtil.inDegree(adjMap)*10+KmerUtil.outDegree(adjMap)),count,writer);
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+
+ private void InsertToFrame(byte degree, byte count,
+ IFrameWriter writer) {
+ try {
+ tupleBuilder.reset();
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,degree);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,count);
+
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
new file mode 100644
index 0000000..ce00667
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
@@ -0,0 +1,122 @@
+package edu.uci.ics.genomix.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public class DistributeAggregatorDescriptor implements
+ IAggregatorDescriptor {
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart;
+ byte[] data = accessor.getBuffer().array();
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+
+ int statetupleOffset = stateAccessor
+ .getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(
+ stateTupleIndex, 1);
+ int countoffset = statetupleOffset
+ + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields,
+ int[] keyFieldsInPartialResults) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new DistributeAggregatorDescriptor();
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/HyracksUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/HyracksUtils.java
deleted file mode 100644
index f1b2008..0000000
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/HyracksUtils.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.example.jobrun;
-import java.util.EnumSet;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-
-public class HyracksUtils {
-
- public static final String NC1_ID = "nc1";
- public static final String NC2_ID = "nc2";
-
- public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
- public static final int TEST_HYRACKS_CC_PORT = 1099;
- public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
- public static final String CC_HOST = "localhost";
-
- public static final int FRAME_SIZE = 65536;
-
- private static ClusterControllerService cc;
- private static NodeControllerService nc1;
- private static NodeControllerService nc2;
- private static IHyracksClientConnection hcc;
-
- public static void init() throws Exception {
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = CC_HOST;
- ccConfig.clusterNetIpAddress = CC_HOST;
- ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
- ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
- ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 0;
- ccConfig.profileDumpPeriod = -1;
-
- // cluster controller
- cc = new ClusterControllerService(ccConfig);
- cc.start();
-
- // two node controllers
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.clusterNetIPAddress = "localhost";
- ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- nc1 = new NodeControllerService(ncConfig1);
- nc1.start();
-
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.clusterNetIPAddress = "localhost";
- ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.datasetIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
- nc2 = new NodeControllerService(ncConfig2);
- nc2.start();
-
- // hyracks connection
- hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
- }
-
- public static void deinit() throws Exception {
- nc2.stop();
- nc1.stop();
- cc.stop();
- }
-
- public static void runJob(JobSpecification spec, String appName)
- throws Exception {
- spec.setFrameSize(FRAME_SIZE);
- JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
- hcc.waitForCompletion(jobId);
- }
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index de68f39..ba0e5b9 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -30,12 +30,13 @@
import edu.uci.ics.genomix.job.GenomixJob;
import edu.uci.ics.genomix.type.Kmer;
import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.example.jobrun.TestUtils;;
public class JobRunTest {
private static final String ACTUAL_RESULT_DIR = "actual";
private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_PATH = "src/test/resources/data/mergeTest/ThreeKmer";
+ private static final String DATA_PATH = "src/test/resources/data/webmap/text.txt";
private static final String HDFS_INPUT_PATH = "/webmap";
private static final String HDFS_OUTPUT_PATH = "/webmap_result";
@@ -59,7 +60,7 @@
@Before
public void setUp() throws Exception {
cleanupStores();
- HyracksUtils.init();
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
startHDFS();
@@ -68,8 +69,8 @@
FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
conf.setInt(GenomixJob.KMER_LENGTH, 5);
- driver = new Driver(HyracksUtils.CC_HOST,
- HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
+ driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
numPartitionPerMachine);
}
@@ -94,7 +95,7 @@
Path dest = new Path(HDFS_INPUT_PATH);
Path result = new Path(HDFS_OUTPUT_PATH);
dfs.mkdirs(dest);
- // dfs.mkdirs(result);
+ //dfs.mkdirs(result);
dfs.copyFromLocalFile(src, dest);
DataOutputStream confOutput = new DataOutputStream(
@@ -116,7 +117,7 @@
}
@Test
- public void TestAll() throws Exception {
+ public void TestAll() throws Exception{
cleanUpReEntry();
TestExternalGroupby();
cleanUpReEntry();
@@ -131,7 +132,7 @@
cleanUpReEntry();
TestHybridReversedGroupby();
}
-
+
public void TestExternalGroupby() throws Exception {
conf.set(GenomixJob.GROUPBY_TYPE, "external");
System.err.println("Testing ExternalGroupBy");
@@ -152,24 +153,22 @@
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_PATH));
}
-
- public void TestExternalReversedGroupby() throws Exception {
+
+ public void TestExternalReversedGroupby() throws Exception{
conf.set(GenomixJob.GROUPBY_TYPE, "external");
conf.setBoolean(GenomixJob.REVERSED_KMER, true);
System.err.println("Testing ExternalGroupBy + Reversed");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
}
-
- public void TestPreClusterReversedGroupby() throws Exception {
+ public void TestPreClusterReversedGroupby() throws Exception{
conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
conf.setBoolean(GenomixJob.REVERSED_KMER, true);
System.err.println("Testing PreclusterGroupBy + Reversed");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
}
-
- public void TestHybridReversedGroupby() throws Exception {
+ public void TestHybridReversedGroupby() throws Exception{
conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
conf.setBoolean(GenomixJob.REVERSED_KMER, true);
System.err.println("Testing HybridGroupBy + Reversed");
@@ -187,21 +186,22 @@
DUMPED_RESULT), false, conf, null);
dumped = new File(DUMPED_RESULT);
} else {
-
- FileSystem.getLocal(new Configuration()).mkdirs(
- new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
+
+ FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR
+ + HDFS_OUTPUT_PATH));
File filePathTo = new File(CONVERT_RESULT);
BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
String partname = "/part-" + i;
- FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
- + partname), FileSystem.getLocal(new Configuration()),
- new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH
- + partname), false, conf);
-
- Path path = new Path(HDFS_OUTPUT_PATH + partname);
+// FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+// + partname), FileSystem.getLocal(new Configuration()),
+// new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+
+
+ Path path = new Path(HDFS_OUTPUT_PATH
+ + partname);
FileSystem dfs = FileSystem.get(conf);
- if (dfs.getFileStatus(path).getLen() == 0) {
+ if (dfs.getFileStatus(path).getLen() == 0){
continue;
}
SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
@@ -213,14 +213,15 @@
int k = conf.getInt(GenomixJob.KMER_LENGTH, 25);
while (reader.next(key, value)) {
- if (key == null || value == null) {
+ if (key == null || value == null){
break;
}
bw.write(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
key.getLength())
+ "\t" + value.toString());
- System.out.println(Kmer.recoverKmerFrom(k, key.getBytes(),
- 0, key.getLength()) + "\t" + value.toString());
+ System.out.println(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
+ key.getLength())
+ + "\t" + value.toString());
bw.newLine();
}
reader.close();
@@ -230,13 +231,13 @@
dumped = new File(CONVERT_RESULT);
}
- TestUtils.compareWithResult(new File(expectedPath), dumped);
+ TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
return true;
}
@After
public void tearDown() throws Exception {
- HyracksUtils.deinit();
+ edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
cleanupHDFS();
}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
index 4c36eca..22688e0 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
@@ -18,8 +18,47 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collections;
public class TestUtils {
+ /**
+ * Compare with the sorted expected file.
+ * The actual file may not be sorted;
+ * @param expectedFile
+ * @param actualFile
+ */
+ public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+ BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+ BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+ ArrayList<String> actualLines = new ArrayList<String>();
+ String lineExpected, lineActual;
+ try{
+ while ( (lineActual = readerActual.readLine())!=null){
+ actualLines.add(lineActual);
+ }
+ Collections.sort(actualLines);
+ int num = 1;
+ for(String actualLine : actualLines){
+ lineExpected = readerExpected.readLine();
+ if (lineExpected == null){
+ throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
+ }
+ if ( !equalStrings(lineExpected, actualLine)){
+ throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+ + actualLine);
+ }
+ ++num;
+ }
+ lineExpected = readerExpected.readLine();
+ if (lineExpected != null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+ }
+ } finally{
+ readerActual.close();
+ readerExpected.close();
+ }
+ }
public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index 436e09f..5de9f37 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -1,5 +1,6 @@
<?xml version="1.0"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>hyracks-hdfs-core</artifactId>
<name>hyracks-hdfs-core</name>
@@ -52,6 +53,18 @@
</filesets>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>