minor fix

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2071 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 5852b72..45d3687 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -45,7 +45,6 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -118,8 +117,6 @@
         typeTraits[1] = new TypeTraits(false);
 
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -128,12 +125,14 @@
                 vertexIdClass.getName(), vertexClass.getName());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 4,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate, rdFinal);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
@@ -166,11 +165,6 @@
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
-         * construct unnest operator
-         */
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), messageValueClass.getName());
-        /**
          * construct local sort operator
          */
         int[] keyFields = new int[] { 0 };
@@ -311,8 +305,6 @@
          */
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -324,7 +316,7 @@
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
-                preHookFactory, null, rdMessage, rdDummy, rdPartialAggregate, rdFinal);
+                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdFinal);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index f1951f2..8252555 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -45,7 +45,6 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -114,24 +113,23 @@
         typeTraits[1] = new TypeTraits(false);
 
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
+
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 3,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), messageValueClass.getName());
         /**
          * construct local sort operator
          */
@@ -278,8 +276,6 @@
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -291,7 +287,7 @@
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
-                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index d075929..68ea01f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -44,7 +44,6 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -116,27 +115,25 @@
          * construct compute operator
          */
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 3,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
          * construct global sort operator
          */
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), messageValueClass.getName());
         int[] keyFields = new int[] { 0 };
         INormalizedKeyComputerFactory nkmFactory = JobGenUtil
                 .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
@@ -270,8 +267,6 @@
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -283,7 +278,7 @@
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
-                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
 
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 7c0b259..0792e1d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -44,7 +44,6 @@
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -112,24 +111,22 @@
         typeTraits[1] = new TypeTraits(false);
 
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 vertexIdClass.getName(), vertexClass.getName());
+        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+                vertexIdClass.getName(), messageValueClass.getName());
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 3,
-                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
-        RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
-                vertexIdClass.getName(), messageValueClass.getName());
         /**
          * construct local sort operator
          */
@@ -282,8 +279,6 @@
         nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
 
         RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
-        RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
-                MessageList.class.getName());
         RecordDescriptor rdPartialAggregate = DataflowUtils
                 .getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
         IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -295,7 +290,7 @@
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
-                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+                new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);