Try pregelix readLong from SerDeUtils


git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2929 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/pom.xml b/genomix/genomix-core/pom.xml
index 3dfba8f..f713b35 100644
--- a/genomix/genomix-core/pom.xml
+++ b/genomix/genomix-core/pom.xml
@@ -245,5 +245,10 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>pregelix-api</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index a0a6b9a..f3efd45 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -4,6 +4,11 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

 

+/**

+ * Aggregation sort: speed up 

+ * from hyracks

+ *

+ */

 public class Integer64NormalizedKeyComputerFactory implements

 		INormalizedKeyComputerFactory {

 	private static final long serialVersionUID = 8735044913496854551L;

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
index ac8a6bc..f8134dc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
@@ -27,6 +27,9 @@
 				return r;

 			}

 

+			/**

+			 * one kmer

+			 */

 			@Override

 			public int normalize(byte[] bytes, int start, int length) {

 				long value = getLong(bytes, start);

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index 4078718..aeaa2bf 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -5,6 +5,8 @@
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

+import edu.uci.ics.pregelix.api.util.SerDeUtils;

+

 

 public class KmerHashPartitioncomputerFactory implements

 		ITuplePartitionComputerFactory {

@@ -28,8 +30,9 @@
 				int slotLength = accessor.getFieldSlotsLength();

 

 				ByteBuffer buf = accessor.getBuffer();

-				buf.position(startOffset + fieldOffset + slotLength);

-				long l = accessor.getBuffer().getLong();

+//				buf.position(startOffset + fieldOffset + slotLength);

+//				long l = accessor.getBuffer().getLong();

+				long l = SerDeUtils.readVLong(buf.array(), startOffset + fieldOffset, slotLength);

 				return (int) (l % nParts);

 			}

 		};

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index 20c6a28..ed114ff 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -7,6 +7,10 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;

 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

 

+/**

+ * used by precluster groupby

+ *

+ */

 public class ConnectorPolicyAssignmentPolicy implements

 		IConnectorPolicyAssignmentPolicy {

 	private static final long serialVersionUID = 1L;

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 30a2277..d6498e6 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -7,7 +7,6 @@
 import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;

 import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;

 import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;

 import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

@@ -31,7 +30,6 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;

 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

 import edu.uci.ics.hyracks.control.nc.NodeControllerService;

 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index c715dbd..a16dcea 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -14,6 +14,10 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 

+/**

+ * sum

+ *

+ */

 public class DistributedMergeLmerAggregateFactory implements

 		IAggregatorDescriptorFactory {

 	private static final long serialVersionUID = 1L;

@@ -46,6 +50,9 @@
 				});

 			}

 

+			/**

+			 * met a new kmer

+			 */

 			@Override

 			public void init(ArrayTupleBuilder tupleBuilder,

 					IFrameTupleAccessor accessor, int tIndex,

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index b096887..b2f94dc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -14,6 +14,10 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 

+/**

+ * count

+ *

+ */

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

 	private static final long serialVersionUID = 1L;

 	private static final int max = 255;

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index b0f88a2..3c4f331 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -79,6 +79,7 @@
 		try {
 			Map<String, NodeControllerInfo> ncMap = hcc
 					.getNodeControllerInfos();
+			LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
 			switch (planChoice) {
 			case BUILD_DEBRUJIN_GRAPH:
 			default:
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 3b8c9aa..3a8bd90 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -2,19 +2,21 @@
 
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
-import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;
 import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
 import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.dataflow.KMerWriterFactory;
-import edu.uci.ics.genomix.dataflow.PrinterOperatorDescriptor;
 import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
 import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
@@ -51,6 +53,7 @@
 		EXTERNAL, PRECLUSTER, HYBRIDHASH,
 	}
 
+	private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
 	private final Map<String, NodeControllerInfo> ncMap;
 	private Scheduler scheduler;
 	private String[] ncNodeNames;
@@ -80,6 +83,7 @@
 			System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
 					nodes.length);
 		}
+		LOG.info("nc nodes:" + ncNodeNames.length + ncNodeNames.toString());
 	}
 
 	private ExternalGroupOperatorDescriptor newExternalGroupby(
@@ -90,8 +94,8 @@
 				keyFields,
 				frameLimits,
 				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-						.of(LongPointable.FACTORY) },
-				new Integer64NormalizedKeyComputerFactory(),
+						.of(VLongPointable.FACTORY) },
+				new VLongNormalizedKeyComputerFactory(),
 				aggeragater,
 				new DistributedMergeLmerAggregateFactory(),
 				outputRec,
@@ -99,7 +103,7 @@
 						new FieldHashPartitionComputerFactory(
 								keyFields,
 								new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-										.of(LongPointable.FACTORY) }),
+										.of(VLongPointable.FACTORY) }),
 						tableSize), true);
 	}
 
@@ -118,11 +122,11 @@
 				tableSize,
 				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
 						.of(LongPointable.FACTORY) },
-				new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },
+				new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
 				// new IBinaryHashFunctionFamily[]
 				// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
 				hashfuncStartLevel,
-				new Integer64NormalizedKeyComputerFactory(),
+				new VLongNormalizedKeyComputerFactory(),
 				new MergeKmerAggregateFactory(),
 				new DistributedMergeLmerAggregateFactory(), outputRec, true);
 	}
@@ -148,7 +152,7 @@
 					new KmerHashPartitioncomputerFactory(),
 					keyFields,
 					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-							.of(LongPointable.FACTORY) });
+							.of(VLongPointable.FACTORY) });
 			crossGrouper = new PreclusteredGroupOperatorDescriptor(
 					jobSpec,
 					keyFields,