minor fix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2071 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 5852b72..45d3687 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -45,7 +45,6 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -118,8 +117,6 @@
typeTraits[1] = new TypeTraits(false);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -128,12 +125,14 @@
vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 4,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate, rdFinal);
ClusterConfig.setLocationConstraint(spec, scanner);
@@ -166,11 +165,6 @@
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
- * construct unnest operator
- */
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), messageValueClass.getName());
- /**
* construct local sort operator
*/
int[] keyFields = new int[] { 0 };
@@ -311,8 +305,6 @@
*/
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -324,7 +316,7 @@
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
- preHookFactory, null, rdMessage, rdDummy, rdPartialAggregate, rdFinal);
+ preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
/**
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index f1951f2..8252555 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -45,7 +45,6 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -114,24 +113,23 @@
typeTraits[1] = new TypeTraits(false);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 3,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), messageValueClass.getName());
/**
* construct local sort operator
*/
@@ -278,8 +276,6 @@
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -291,7 +287,7 @@
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
- new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index d075929..68ea01f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -44,7 +44,6 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -116,27 +115,25 @@
* construct compute operator
*/
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 3,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
* construct global sort operator
*/
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), messageValueClass.getName());
int[] keyFields = new int[] { 0 };
INormalizedKeyComputerFactory nkmFactory = JobGenUtil
.getINormalizedKeyComputerFactory(iteration, vertexIdClass);
@@ -270,8 +267,6 @@
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -283,7 +278,7 @@
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
- new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 7c0b259..0792e1d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -44,7 +44,6 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
@@ -112,24 +111,22 @@
typeTraits[1] = new TypeTraits(false);
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 3,
- new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, scanner);
- RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), messageValueClass.getName());
/**
* construct local sort operator
*/
@@ -282,8 +279,6 @@
nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
- RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
- MessageList.class.getName());
RecordDescriptor rdPartialAggregate = DataflowUtils
.getRecordDescriptorFromWritableClasses(partialAggregateValueClass.getName());
IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
@@ -295,7 +290,7 @@
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
- new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdMessage, rdDummy,
+ new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
rdPartialAggregate);
ClusterConfig.setLocationConstraint(spec, join);