support job specific frame size in Pregelix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2682 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index bba2229..8b6d1b6 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -64,6 +64,8 @@
public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
/** job id */
public static final String JOB_ID = "pregelix.jobid";
+ /** frame size */
+ public static final String FRAME_SIZE = "pregelix.framesize";
/**
* Constructor that will instantiate the configuration
@@ -154,4 +156,14 @@
final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
}
+
+ /**
+ * Set the frame size for a job
+ *
+ * @param frameSize
+ * the desired frame size
+ */
+ final public void setFrameSize(int frameSize) {
+ getConfiguration().setInt(FRAME_SIZE, frameSize);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 6066dfb..ff9724d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -421,4 +421,15 @@
public static boolean getDynamicVertexValueSize(Configuration conf) {
return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
}
+
+ /**
+ * Get the specified frame size
+ *
+ * @param conf
+ * the job configuration
+ * @return the specified frame size; -1 if it is not set by users
+ */
+ public static int getFrameSize(Configuration conf) {
+ return conf.getInt(PregelixJob.FRAME_SIZE, -1);
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index de29dbc..991d969 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -86,377 +86,462 @@
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
public abstract class JobGen implements IJobGen {
- private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
- protected static final int MB = 1048576;
- protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- protected static final int frameSize = ClusterConfig.getFrameSize();
- protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
- protected static final int tableSize = 10485767;
- protected static final String PRIMARY_INDEX = "primary";
- protected final Configuration conf;
- protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
- protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class
+ .getName());
+ protected static final int MB = 1048576;
+ protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+ protected static final int tableSize = 10485767;
+ protected static final String PRIMARY_INDEX = "primary";
+ protected final Configuration conf;
+ protected final PregelixJob giraphJob;
+ protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+ protected String jobId = new UUID(System.currentTimeMillis(),
+ System.nanoTime()).toString();
+ protected int frameSize = ClusterConfig.getFrameSize();
+ protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- protected static final String SECONDARY_INDEX_ODD = "secondary1";
- protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+ protected static final String SECONDARY_INDEX_ODD = "secondary1";
+ protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- public JobGen(PregelixJob job) {
- this.conf = job.getConfiguration();
- this.giraphJob = job;
- this.initJobConfiguration();
- job.setJobId(jobId);
- }
+ public JobGen(PregelixJob job) {
+ this.conf = job.getConfiguration();
+ this.giraphJob = job;
+ this.initJobConfiguration();
+ job.setJobId(jobId);
+
+ //set the frame size to be the one user specified if the user did specify.
+ int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+ if (specifiedFrameSize > 0) {
+ frameSize = specifiedFrameSize;
+ maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ }
+ if (maxFrameNumber <= 0) {
+ maxFrameNumber = 1;
+ }
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void initJobConfiguration() {
- Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
- List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
- Type vertexIndexType = parameterTypes.get(0);
- Type vertexValueType = parameterTypes.get(1);
- Type edgeValueType = parameterTypes.get(2);
- Type messageValueType = parameterTypes.get(3);
- conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
- conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
- conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
- conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void initJobConfiguration() {
+ Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
+ Vertex.class);
+ List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
+ Vertex.class, vertexClass);
+ Type vertexIndexType = parameterTypes.get(0);
+ Type vertexValueType = parameterTypes.get(1);
+ Type edgeValueType = parameterTypes.get(2);
+ Type messageValueType = parameterTypes.get(3);
+ conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
+ (Class<?>) vertexIndexType, WritableComparable.class);
+ conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
+ (Class<?>) vertexValueType, Writable.class);
+ conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
+ Writable.class);
+ conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
+ (Class<?>) messageValueType, Writable.class);
- Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
- if (!aggregatorClass.equals(GlobalAggregator.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
- Type partialAggregateValueType = argTypes.get(4);
- conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
- Writable.class);
- Type finalAggregateValueType = argTypes.get(5);
- conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
- }
+ Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+ if (!aggregatorClass.equals(GlobalAggregator.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(
+ GlobalAggregator.class, aggregatorClass);
+ Type partialAggregateValueType = argTypes.get(4);
+ conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
+ (Class<?>) partialAggregateValueType, Writable.class);
+ Type finalAggregateValueType = argTypes.get(5);
+ conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
+ (Class<?>) finalAggregateValueType, Writable.class);
+ }
- Class combinerClass = BspUtils.getMessageCombinerClass(conf);
- if (!combinerClass.equals(MessageCombiner.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
- Type partialCombineValueType = argTypes.get(2);
- conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
- }
- }
+ Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+ if (!combinerClass.equals(MessageCombiner.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(
+ MessageCombiner.class, combinerClass);
+ Type partialCombineValueType = argTypes.get(2);
+ conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
+ (Class<?>) partialCombineValueType, Writable.class);
+ }
+ }
- public String getJobId() {
- return jobId;
- }
+ public String getJobId() {
+ return jobId;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateCreatingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateCreatingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeCreate);
- return spec;
- }
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider,
+ fileSplitProvider, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateLoadingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
- LOGGER.info("number of splits: " + splits.size());
- for (InputSplit split : splits)
- LOGGER.info(split.toString());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner, splits);
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob,
+ fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
+ spec, recordDescriptor, splits, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner, splits);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
- nkmFactory, comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec, maxFrameNumber, sortFields, nkmFactory,
+ comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * construct tree bulk load operator
- */
- int[] fieldPermutation = new int[2];
- fieldPermutation[0] = 0;
- fieldPermutation[1] = 1;
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider,
+ fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+ new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec,
+ hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
+ btreeBulkLoad, 0);
+ return spec;
+ }
- @Override
- public JobSpecification generateJob(int iteration) throws HyracksException {
- if (iteration <= 0)
- throw new IllegalStateException("iteration number cannot be less than 1");
- if (iteration == 1)
- return generateFirstIteration(iteration);
- else
- return generateNonFirstIteration(iteration);
- }
+ @Override
+ public JobSpecification generateJob(int iteration) throws HyracksException {
+ if (iteration <= 0)
+ throw new IllegalStateException(
+ "iteration number cannot be less than 1");
+ if (iteration == 1)
+ return generateFirstIteration(iteration);
+ else
+ return generateNonFirstIteration(iteration);
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanSortPrintGraph(String nodeName, String path)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- confFactory);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob,
+ fileSplitProvider.getFileSplits().length);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
+ spec, recordDescriptor, splits, confFactory);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
+ splits.size());
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
- nkmFactory, comparatorFactories, recordDescriptor);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
+ spec, maxFrameLimit, sortFields, nkmFactory,
+ comparatorFactories, recordDescriptor);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
+ splits.size());
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
- resultFileSplitProvider, preHookFactory, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(
+ new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
+ results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(
+ spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
+ new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), sorter, 0, writer, 0);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
+ 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
+ hashPartitionComputerFactory, sortFields, comparatorFactories),
+ sorter, 0, writer, 0);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexPrintGraph(String nodeName, String path)
+ throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
+ spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+ tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
+ spec, recordDescriptor, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, null, true, true,
+ new BTreeDataflowHelperFactory(), false,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
- resultFileSplitProvider, preHookFactory, null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(
+ new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
+ results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(
+ spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
+ new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- /**
- * connect operator descriptors
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
- comparatorFactories), scanner, 0, writer, 0);
- spec.setFrameSize(frameSize);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
+ 0, scanner, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
+ hashPartitionComputerFactory, sortFields, comparatorFactories),
+ scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexWriteGraph() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
+ .getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
+ spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
+ tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
- .getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils
+ .getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
+ WritableComparator.get(vertexIdClass).getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, PRIMARY_INDEX);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
+ spec, recordDescriptor, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, null, true, true,
+ new BTreeDataflowHelperFactory(), false,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct write file operator
- */
- IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ /**
+ * construct write file operator
+ */
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils
+ .getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
+ spec, confFactory, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, writer);
- /**
- * connect operator descriptors
- */
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
- return spec;
- }
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
+ 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
+ 0);
+ return spec;
+ }
- /***
- * drop the sindex
- *
- * @return JobSpecification
- * @throws HyracksException
- */
- protected JobSpecification dropIndex(String indexName) throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName)
+ throws HyracksException {
+ JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider);
+ IFileSplitProvider fileSplitProvider = ClusterConfig
+ .getFileSplitProvider(jobId, indexName);
+ TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider,
+ fileSplitProvider);
- ClusterConfig.setLocationConstraint(spec, drop);
- spec.addRoot(drop);
- return spec;
- }
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ return spec;
+ }
- /** generate non-first iteration job */
- protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
+ /** generate non-first iteration job */
+ protected abstract JobSpecification generateNonFirstIteration(int iteration)
+ throws HyracksException;
- /** generate first iteration job */
- protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+ /** generate first iteration job */
+ protected abstract JobSpecification generateFirstIteration(int iteration)
+ throws HyracksException;
- /** generate clean-up job */
- public abstract JobSpecification[] generateCleanup() throws HyracksException;
+ /** generate clean-up job */
+ public abstract JobSpecification[] generateCleanup()
+ throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 00cdf07..04a6e42 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -173,7 +173,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -342,7 +342,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3847aa7..35f5e61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -139,7 +139,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -299,7 +299,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ec783a7..328d7d1 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -140,7 +140,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -290,7 +290,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index bb939e3..812e697 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -136,7 +136,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -152,7 +152,7 @@
/**
* construct global sort operator
*/
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -303,7 +303,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -319,7 +319,7 @@
/**
* construct global sort operator
*/
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);