1. let hyracks-hdfs support hadoop new API\n 2. let Pregelix use hyracks-hdfs functionality;\n 3. increase degree-of-parallelism in tests
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2816 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index efdbd41..0b1be61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -74,9 +74,9 @@
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.core.util.DatatypeHelper;
-import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -86,462 +86,390 @@
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
public abstract class JobGen implements IJobGen {
- private static final Logger LOGGER = Logger.getLogger(JobGen.class
- .getName());
- protected static final int MB = 1048576;
- protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- protected static final int tableSize = 10485767;
- protected static final String PRIMARY_INDEX = "primary";
- protected final Configuration conf;
- protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
- protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = new UUID(System.currentTimeMillis(),
- System.nanoTime()).toString();
- protected int frameSize = ClusterConfig.getFrameSize();
- protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+ protected static final int MB = 1048576;
+ protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+ protected static final int tableSize = 10485767;
+ protected static final String PRIMARY_INDEX = "primary";
+ protected final Configuration conf;
+ protected final PregelixJob giraphJob;
+ protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+ protected int frameSize = ClusterConfig.getFrameSize();
+ protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- protected static final String SECONDARY_INDEX_ODD = "secondary1";
- protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+ protected static final String SECONDARY_INDEX_ODD = "secondary1";
+ protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- public JobGen(PregelixJob job) {
- this.conf = job.getConfiguration();
- this.giraphJob = job;
- this.initJobConfiguration();
- job.setJobId(jobId);
-
- //set the frame size to be the one user specified if the user did specify.
- int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
- if (specifiedFrameSize > 0) {
- frameSize = specifiedFrameSize;
- maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- }
- if (maxFrameNumber <= 0) {
- maxFrameNumber = 1;
- }
- }
+ public JobGen(PregelixJob job) {
+ this.conf = job.getConfiguration();
+ this.giraphJob = job;
+ this.initJobConfiguration();
+ job.setJobId(jobId);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void initJobConfiguration() {
- Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
- Vertex.class);
- List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
- Vertex.class, vertexClass);
- Type vertexIndexType = parameterTypes.get(0);
- Type vertexValueType = parameterTypes.get(1);
- Type edgeValueType = parameterTypes.get(2);
- Type messageValueType = parameterTypes.get(3);
- conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
- (Class<?>) vertexIndexType, WritableComparable.class);
- conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
- (Class<?>) vertexValueType, Writable.class);
- conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
- Writable.class);
- conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
- (Class<?>) messageValueType, Writable.class);
+ // set the frame size to be the one user specified if the user did
+ // specify.
+ int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+ if (specifiedFrameSize > 0) {
+ frameSize = specifiedFrameSize;
+ maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ }
+ if (maxFrameNumber <= 0) {
+ maxFrameNumber = 1;
+ }
+ }
- Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
- if (!aggregatorClass.equals(GlobalAggregator.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(
- GlobalAggregator.class, aggregatorClass);
- Type partialAggregateValueType = argTypes.get(4);
- conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
- (Class<?>) partialAggregateValueType, Writable.class);
- Type finalAggregateValueType = argTypes.get(5);
- conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
- (Class<?>) finalAggregateValueType, Writable.class);
- }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void initJobConfiguration() {
+ Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+ List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+ Type vertexIndexType = parameterTypes.get(0);
+ Type vertexValueType = parameterTypes.get(1);
+ Type edgeValueType = parameterTypes.get(2);
+ Type messageValueType = parameterTypes.get(3);
+ conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+ conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+ conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+ conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
- Class combinerClass = BspUtils.getMessageCombinerClass(conf);
- if (!combinerClass.equals(MessageCombiner.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(
- MessageCombiner.class, combinerClass);
- Type partialCombineValueType = argTypes.get(2);
- conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
- (Class<?>) partialCombineValueType, Writable.class);
- }
- }
+ Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+ if (!aggregatorClass.equals(GlobalAggregator.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
+ Type partialAggregateValueType = argTypes.get(4);
+ conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
+ Writable.class);
+ Type finalAggregateValueType = argTypes.get(5);
+ conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
+ }
- public String getJobId() {
- return jobId;
- }
+ Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+ if (!combinerClass.equals(MessageCombiner.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
+ Type partialCombineValueType = argTypes.get(2);
+ conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
+ }
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateCreatingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
+ public String getJobId() {
+ return jobId;
+ }
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
- TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeCreate);
- return spec;
- }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateCreatingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateLoadingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ return spec;
+ }
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob,
- fileSplitProvider.getFileSplits().length);
- LOGGER.info("number of splits: " + splits.size());
- for (InputSplit split : splits)
- LOGGER.info(split.toString());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
- spec, recordDescriptor, splits, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner, splits);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, maxFrameNumber, sortFields, nkmFactory,
- comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct tree bulk load operator
- */
- int[] fieldPermutation = new int[2];
- fieldPermutation[0] = 0;
- fieldPermutation[1] = 1;
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new MToNPartitioningConnectorDescriptor(spec,
- hashPartitionComputerFactory), scanner, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
- btreeBulkLoad, 0);
- return spec;
- }
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
- @Override
- public JobSpecification generateJob(int iteration) throws HyracksException {
- if (iteration <= 0)
- throw new IllegalStateException(
- "iteration number cannot be less than 1");
- if (iteration == 1)
- return generateFirstIteration(iteration);
- else
- return generateNonFirstIteration(iteration);
- }
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanSortPrintGraph(String nodeName, String path)
- throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ @Override
+ public JobSpecification generateJob(int iteration) throws HyracksException {
+ if (iteration <= 0)
+ throw new IllegalStateException("iteration number cannot be less than 1");
+ if (iteration == 1)
+ return generateFirstIteration(iteration);
+ else
+ return generateNonFirstIteration(iteration);
+ }
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob,
- fileSplitProvider.getFileSplits().length);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
- spec, recordDescriptor, splits, confFactory);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
- splits.size());
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, maxFrameLimit, sortFields, nkmFactory,
- comparatorFactories, recordDescriptor);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
- splits.size());
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(
- new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
- results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
- spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
- null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
- new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
- 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
- hashPartitionComputerFactory, sortFields, comparatorFactories),
- sorter, 0, writer, 0);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexPrintGraph(String nodeName, String path)
- throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), sorter, 0, writer, 0);
+ return spec;
+ }
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
- spec, recordDescriptor, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, null, true, true,
- new BTreeDataflowHelperFactory(), false,
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(
- new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
- results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
- spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
- null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
- new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * connect operator descriptors
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
- 0, scanner, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
- hashPartitionComputerFactory, sortFields, comparatorFactories),
- scanner, 0, writer, 0);
- spec.setFrameSize(frameSize);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexWriteGraph() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
- spec, recordDescriptor, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, null, true, true,
- new BTreeDataflowHelperFactory(), false,
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct write file operator
- */
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
- spec, confFactory, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * connect operator descriptors
- */
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
- 0, scanner, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
- 0);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, writer);
- /***
- * drop the sindex
- *
- * @return JobSpecification
- * @throws HyracksException
- */
- protected JobSpecification dropIndex(String indexName)
- throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ return spec;
+ }
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider);
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName) throws HyracksException {
+ JobSpecification spec = new JobSpecification();
- ClusterConfig.setLocationConstraint(spec, drop);
- spec.addRoot(drop);
- return spec;
- }
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider);
- /** generate non-first iteration job */
- protected abstract JobSpecification generateNonFirstIteration(int iteration)
- throws HyracksException;
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ return spec;
+ }
- /** generate first iteration job */
- protected abstract JobSpecification generateFirstIteration(int iteration)
- throws HyracksException;
+ /** generate non-first iteration job */
+ protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
- /** generate clean-up job */
- public abstract JobSpecification[] generateCleanup()
- throws HyracksException;
+ /** generate first iteration job */
+ protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+ /** generate clean-up job */
+ public abstract JobSpecification[] generateCleanup() throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 8eadab9..d26e637 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
public class ClusterConfig {
@@ -49,6 +50,7 @@
private static Properties clusterProperties = new Properties();
private static Map<String, List<String>> ipToNcMapping;
private static String[] stores;
+ private static Scheduler hdfsScheduler;
/**
* let tests set config path to be whatever
@@ -211,6 +213,8 @@
NCs[i] = entry.getKey();
i++;
}
+
+ hdfsScheduler = new Scheduler(ipAddress, port);
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -218,4 +222,8 @@
loadClusterProperties();
loadStores();
}
+
+ public static Scheduler getHdfsScheduler() {
+ return hdfsScheduler;
+ }
}
diff --git a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..f89dd79 100644
--- a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
+ <name>mapred.max.split.size</name>
+ <value>4096</value>
</property>
</configuration>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 36b0eb8..6c039bf 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -144,5 +144,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..0736e1d 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,6 +17,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
@@ -55,15 +56,20 @@
private final List<InputSplit> splits;
private final IConfigurationFactory confFactory;
private final int fieldSize = 2;
+ private final String[] scheduledLocations;
+ private final boolean[] executed;
/**
* @param spec
*/
public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
- IConfigurationFactory confFactory) throws HyracksException {
+ String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
super(spec, 0, 1);
this.splits = splits;
this.confFactory = confFactory;
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
this.recordDescriptors[0] = rd;
}
@@ -78,7 +84,21 @@
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
writer.open();
- loadVertices(ctx, partition);
+ for (int i = 0; i < scheduledLocations.length; i++) {
+ if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+ /**
+ * pick one from the FileSplit queue
+ */
+ synchronized (executed) {
+ if (!executed[i]) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+ loadVertices(ctx, i);
+ }
+ }
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
@@ -104,13 +124,13 @@
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
- InputSplit split = splits.get(partition);
+ InputSplit split = splits.get(partitionId);
if (split instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) split;
LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
+ " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
- + partition);
+ + partitionId);
}
VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
vertexReader.initialize(split, context);
@@ -122,7 +142,7 @@
* set context
*/
Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(partition));
+ splits.get(partitionId));
Vertex.setContext(mapperContext);
/**
@@ -166,5 +186,4 @@
}
};
}
-
}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..71450f1 100644
--- a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
</property>
</configuration>