Try pregelix readLong from SerDeUtils
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2929 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/pom.xml b/genomix/genomix-core/pom.xml
index 3dfba8f..f713b35 100644
--- a/genomix/genomix-core/pom.xml
+++ b/genomix/genomix-core/pom.xml
@@ -245,5 +245,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index a0a6b9a..f3efd45 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -4,6 +4,11 @@
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+/**
+ * Aggregation sort: speed up
+ * from hyracks
+ *
+ */
public class Integer64NormalizedKeyComputerFactory implements
INormalizedKeyComputerFactory {
private static final long serialVersionUID = 8735044913496854551L;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
index ac8a6bc..f8134dc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
@@ -27,6 +27,9 @@
return r;
}
+ /**
+ * one kmer
+ */
@Override
public int normalize(byte[] bytes, int start, int length) {
long value = getLong(bytes, start);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index 4078718..aeaa2bf 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -5,6 +5,8 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+
public class KmerHashPartitioncomputerFactory implements
ITuplePartitionComputerFactory {
@@ -28,8 +30,9 @@
int slotLength = accessor.getFieldSlotsLength();
ByteBuffer buf = accessor.getBuffer();
- buf.position(startOffset + fieldOffset + slotLength);
- long l = accessor.getBuffer().getLong();
+// buf.position(startOffset + fieldOffset + slotLength);
+// long l = accessor.getBuffer().getLong();
+ long l = SerDeUtils.readVLong(buf.array(), startOffset + fieldOffset, slotLength);
return (int) (l % nParts);
}
};
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index 20c6a28..ed114ff 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -7,6 +7,10 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+/**
+ * used by precluster groupby
+ *
+ */
public class ConnectorPolicyAssignmentPolicy implements
IConnectorPolicyAssignmentPolicy {
private static final long serialVersionUID = 1L;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 30a2277..d6498e6 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -7,7 +7,6 @@
import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;
import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
@@ -31,7 +30,6 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index c715dbd..a16dcea 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -14,6 +14,10 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+/**
+ * sum
+ *
+ */
public class DistributedMergeLmerAggregateFactory implements
IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
@@ -46,6 +50,9 @@
});
}
+ /**
+ * met a new kmer
+ */
@Override
public void init(ArrayTupleBuilder tupleBuilder,
IFrameTupleAccessor accessor, int tIndex,
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index b096887..b2f94dc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -14,6 +14,10 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+/**
+ * count
+ *
+ */
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private static final int max = 255;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index b0f88a2..3c4f331 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -79,6 +79,7 @@
try {
Map<String, NodeControllerInfo> ncMap = hcc
.getNodeControllerInfos();
+ LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
switch (planChoice) {
case BUILD_DEBRUJIN_GRAPH:
default:
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 3b8c9aa..3a8bd90 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -2,19 +2,21 @@
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;
import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.dataflow.KMerWriterFactory;
-import edu.uci.ics.genomix.dataflow.PrinterOperatorDescriptor;
import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
@@ -51,6 +53,7 @@
EXTERNAL, PRECLUSTER, HYBRIDHASH,
}
+ private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
private final Map<String, NodeControllerInfo> ncMap;
private Scheduler scheduler;
private String[] ncNodeNames;
@@ -80,6 +83,7 @@
System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
nodes.length);
}
+ LOG.info("nc nodes:" + ncNodeNames.length + ncNodeNames.toString());
}
private ExternalGroupOperatorDescriptor newExternalGroupby(
@@ -90,8 +94,8 @@
keyFields,
frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(LongPointable.FACTORY) },
- new Integer64NormalizedKeyComputerFactory(),
+ .of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(),
aggeragater,
new DistributedMergeLmerAggregateFactory(),
outputRec,
@@ -99,7 +103,7 @@
new FieldHashPartitionComputerFactory(
keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(LongPointable.FACTORY) }),
+ .of(VLongPointable.FACTORY) }),
tableSize), true);
}
@@ -118,11 +122,11 @@
tableSize,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
.of(LongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },
+ new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
// new IBinaryHashFunctionFamily[]
// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
hashfuncStartLevel,
- new Integer64NormalizedKeyComputerFactory(),
+ new VLongNormalizedKeyComputerFactory(),
new MergeKmerAggregateFactory(),
new DistributedMergeLmerAggregateFactory(), outputRec, true);
}
@@ -148,7 +152,7 @@
new KmerHashPartitioncomputerFactory(),
keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(LongPointable.FACTORY) });
+ .of(VLongPointable.FACTORY) });
crossGrouper = new PreclusteredGroupOperatorDescriptor(
jobSpec,
keyFields,