1. let hyracks-hdfs support hadoop new API\n 2. let Pregelix use hyracks-hdfs functionality;\n 3. increase degree-of-parallelism in tests
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2816 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 69936a4..bc44db8 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -108,7 +108,6 @@
synchronized (executed) {
if (executed[i] == false) {
executed[i] = true;
- System.out.println("thread " + Thread.currentThread().getId() + " setting " + i);
} else {
continue;
}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
new file mode 100644
index 0000000..d843d27
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ConfFactory implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private byte[] confBytes;
+
+ public ConfFactory(Job conf) throws HyracksDataException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ conf.getConfiguration().write(dos);
+ confBytes = bos.toByteArray();
+ dos.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public Job getConf() throws HyracksDataException {
+ try {
+ Job conf = new Job();
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
+ conf.getConfiguration().readFields(dis);
+ dis.close();
+ return conf;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
new file mode 100644
index 0000000..54d60c0
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+@SuppressWarnings("rawtypes")
+public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final ConfFactory confFactory;
+ private final InputSplit[] inputSplits;
+ private final String[] scheduledLocations;
+ private final IKeyValueParserFactory tupleParserFactory;
+ private final boolean[] executed;
+
+ /**
+ * The constructor of HDFSReadOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param rd
+ * the output record descriptor
+ * @param conf
+ * the Hadoop JobConf object, which contains the input format and the input paths
+ * @param splits
+ * the array of FileSplits (HDFS chunks).
+ * @param scheduledLocations
+ * the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
+ * is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+ * @param tupleParserFactory
+ * the ITupleParserFactory implementation instance.
+ * @throws HyracksException
+ */
+ public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, InputSplit[] splits,
+ String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
+ super(spec, 0, 1);
+ try {
+ this.inputSplits = splits;
+ this.confFactory = new ConfFactory(conf);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
+ this.tupleParserFactory = tupleParserFactory;
+ this.recordDescriptors[0] = rd;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final Job conf = confFactory.getConf();
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+ writer.open();
+ InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
+ conf.getConfiguration());
+ for (int i = 0; i < inputSplits.length; i++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (scheduledLocations[i].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[i] == false) {
+ executed[i] = true;
+ System.out.println("thread " + Thread.currentThread().getId() + " setting " + i);
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ TaskAttemptContext context = new TaskAttemptContext(conf.getConfiguration(),
+ new TaskAttemptID());
+ RecordReader reader = inputFormat.createRecordReader(inputSplits[i], context);
+ reader.initialize(inputSplits[i], context);
+ while (reader.nextKeyValue() == true) {
+ parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer);
+ }
+ }
+ }
+ parser.flush(writer);
+ writer.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..441815b
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private ITupleWriterFactory tupleWriterFactory;
+ private String outputPath;
+
+ /**
+ * The constructor of HDFSWriteOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param conf
+ * the Hadoop JobConf which contains the output path
+ * @param tupleWriterFactory
+ * the ITupleWriterFactory implementation object
+ * @throws HyracksException
+ */
+ public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, String outputPath,
+ ITupleWriterFactory tupleWriterFactory) throws HyracksException {
+ super(spec, 1, 0);
+ this.confFactory = new ConfFactory(conf);
+ this.tupleWriterFactory = tupleWriterFactory;
+ this.outputPath = outputPath;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final Job conf = confFactory.getConf();
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ private String fileName = outputPath + File.separator + "part-" + partition;
+ private FSDataOutputStream dos;
+ private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+ private FrameTupleReference tuple = new FrameTupleReference();
+ private ITupleWriter tupleWriter;
+
+ @Override
+ public void open() throws HyracksDataException {
+ tupleWriter = tupleWriterFactory.getTupleWriter();
+ try {
+ FileSystem dfs = FileSystem.get(conf.getConfiguration());
+ dos = dfs.create(new Path(fileName), true);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ tupleWriter.write(dos, tuple);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ dos.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
new file mode 100644
index 0000000..5b70a4e
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+/**
+ * The scheduler conduct data-local scheduling for data on HDFS
+ */
+public class Scheduler {
+
+ /** a list of NCs */
+ private String[] NCs;
+
+ /** a map from ip to NCs */
+ private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+
+ /** a map from the NC name to the index */
+ private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+
+ /**
+ * The constructor of the scheduler
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ public Scheduler(String ipAddress, int port) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ }
+
+ /**
+ * Set location constraints for a file scan operator with a list of file splits
+ *
+ * @throws HyracksDataException
+ */
+ public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
+ int[] capacity = new int[NCs.length];
+ Arrays.fill(capacity, 0);
+ String[] locations = new String[splits.size()];
+ int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
+ / capacity.length + 1);
+
+ try {
+ Random random = new Random(System.currentTimeMillis());
+ boolean scheduled[] = new boolean[splits.size()];
+ Arrays.fill(scheduled, false);
+
+ for (int i = 0; i < splits.size(); i++) {
+ /**
+ * get the location of all the splits
+ */
+ String[] loc = splits.get(i).getLocations();
+ if (loc.length > 0) {
+ for (int j = 0; j < loc.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+ /**
+ * iterate overa all ips
+ */
+ for (InetAddress ip : allIps) {
+ /**
+ * if the node controller exists
+ */
+ if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+ /**
+ * set the ncs
+ */
+ List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+ int arrayPos = random.nextInt(dataLocations.size());
+ String nc = dataLocations.get(arrayPos);
+ int pos = ncNameToIndex.get(nc);
+ /**
+ * check if the node is already full
+ */
+ if (capacity[pos] < slots) {
+ locations[i] = nc;
+ capacity[pos]++;
+ scheduled[i] = true;
+ }
+ }
+ }
+
+ /**
+ * break the loop for data-locations if the schedule has already been found
+ */
+ if (scheduled[i] == true) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * find the lowest index the current available NCs
+ */
+ int currentAvailableNC = 0;
+ for (int i = 0; i < capacity.length; i++) {
+ if (capacity[i] < slots) {
+ currentAvailableNC = i;
+ break;
+ }
+ }
+
+ /**
+ * schedule no-local file reads
+ */
+ for (int i = 0; i < splits.size(); i++) {
+ // if there is no data-local NC choice, choose a random one
+ if (!scheduled[i]) {
+ locations[i] = NCs[currentAvailableNC];
+ capacity[currentAvailableNC]++;
+ scheduled[i] = true;
+
+ /**
+ * move the available NC cursor to the next one
+ */
+ for (int j = currentAvailableNC; j < capacity.length; j++) {
+ if (capacity[j] < slots) {
+ currentAvailableNC = j;
+ break;
+ }
+ }
+ }
+ }
+ return locations;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /**
+ * Load the IP-address-to-NC map from the NCNameToNCInfoMap
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ try {
+ NCs = new String[ncNameToNcInfos.size()];
+ int i = 0;
+
+ /**
+ * build the IP address to NC map
+ */
+ for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+ String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+ .getHostAddress();
+ List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+ if (matchedNCs == null) {
+ matchedNCs = new ArrayList<String>();
+ ipToNcMapping.put(ipAddr, matchedNCs);
+ }
+ matchedNCs.add(entry.getKey());
+ NCs[i] = entry.getKey();
+ i++;
+ }
+
+ /**
+ * set up the NC name to index mapping
+ */
+ for (i = 0; i < NCs.length; i++) {
+ ncNameToIndex.put(NCs[i], i);
+ }
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml b/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
index a8680ec..39b6505 100644
--- a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,7 +18,7 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
+ <name>mapred.max.split.size</name>
<value>2048</value>
</property>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index efdbd41..0b1be61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -74,9 +74,9 @@
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.core.util.DatatypeHelper;
-import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -86,462 +86,390 @@
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
public abstract class JobGen implements IJobGen {
- private static final Logger LOGGER = Logger.getLogger(JobGen.class
- .getName());
- protected static final int MB = 1048576;
- protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- protected static final int tableSize = 10485767;
- protected static final String PRIMARY_INDEX = "primary";
- protected final Configuration conf;
- protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
- protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = new UUID(System.currentTimeMillis(),
- System.nanoTime()).toString();
- protected int frameSize = ClusterConfig.getFrameSize();
- protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+ protected static final int MB = 1048576;
+ protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+ protected static final int tableSize = 10485767;
+ protected static final String PRIMARY_INDEX = "primary";
+ protected final Configuration conf;
+ protected final PregelixJob giraphJob;
+ protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+ protected int frameSize = ClusterConfig.getFrameSize();
+ protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- protected static final String SECONDARY_INDEX_ODD = "secondary1";
- protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+ protected static final String SECONDARY_INDEX_ODD = "secondary1";
+ protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- public JobGen(PregelixJob job) {
- this.conf = job.getConfiguration();
- this.giraphJob = job;
- this.initJobConfiguration();
- job.setJobId(jobId);
-
- //set the frame size to be the one user specified if the user did specify.
- int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
- if (specifiedFrameSize > 0) {
- frameSize = specifiedFrameSize;
- maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- }
- if (maxFrameNumber <= 0) {
- maxFrameNumber = 1;
- }
- }
+ public JobGen(PregelixJob job) {
+ this.conf = job.getConfiguration();
+ this.giraphJob = job;
+ this.initJobConfiguration();
+ job.setJobId(jobId);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void initJobConfiguration() {
- Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
- Vertex.class);
- List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
- Vertex.class, vertexClass);
- Type vertexIndexType = parameterTypes.get(0);
- Type vertexValueType = parameterTypes.get(1);
- Type edgeValueType = parameterTypes.get(2);
- Type messageValueType = parameterTypes.get(3);
- conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
- (Class<?>) vertexIndexType, WritableComparable.class);
- conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
- (Class<?>) vertexValueType, Writable.class);
- conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
- Writable.class);
- conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
- (Class<?>) messageValueType, Writable.class);
+ // set the frame size to be the one user specified if the user did
+ // specify.
+ int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+ if (specifiedFrameSize > 0) {
+ frameSize = specifiedFrameSize;
+ maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ }
+ if (maxFrameNumber <= 0) {
+ maxFrameNumber = 1;
+ }
+ }
- Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
- if (!aggregatorClass.equals(GlobalAggregator.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(
- GlobalAggregator.class, aggregatorClass);
- Type partialAggregateValueType = argTypes.get(4);
- conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
- (Class<?>) partialAggregateValueType, Writable.class);
- Type finalAggregateValueType = argTypes.get(5);
- conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
- (Class<?>) finalAggregateValueType, Writable.class);
- }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void initJobConfiguration() {
+ Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+ List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+ Type vertexIndexType = parameterTypes.get(0);
+ Type vertexValueType = parameterTypes.get(1);
+ Type edgeValueType = parameterTypes.get(2);
+ Type messageValueType = parameterTypes.get(3);
+ conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+ conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+ conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+ conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
- Class combinerClass = BspUtils.getMessageCombinerClass(conf);
- if (!combinerClass.equals(MessageCombiner.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(
- MessageCombiner.class, combinerClass);
- Type partialCombineValueType = argTypes.get(2);
- conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
- (Class<?>) partialCombineValueType, Writable.class);
- }
- }
+ Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+ if (!aggregatorClass.equals(GlobalAggregator.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
+ Type partialAggregateValueType = argTypes.get(4);
+ conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
+ Writable.class);
+ Type finalAggregateValueType = argTypes.get(5);
+ conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
+ }
- public String getJobId() {
- return jobId;
- }
+ Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+ if (!combinerClass.equals(MessageCombiner.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
+ Type partialCombineValueType = argTypes.get(2);
+ conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
+ }
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateCreatingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
+ public String getJobId() {
+ return jobId;
+ }
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
- TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeCreate);
- return spec;
- }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateCreatingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateLoadingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ return spec;
+ }
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob,
- fileSplitProvider.getFileSplits().length);
- LOGGER.info("number of splits: " + splits.size());
- for (InputSplit split : splits)
- LOGGER.info(split.toString());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
- spec, recordDescriptor, splits, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner, splits);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, maxFrameNumber, sortFields, nkmFactory,
- comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct tree bulk load operator
- */
- int[] fieldPermutation = new int[2];
- fieldPermutation[0] = 0;
- fieldPermutation[1] = 1;
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new MToNPartitioningConnectorDescriptor(spec,
- hashPartitionComputerFactory), scanner, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
- btreeBulkLoad, 0);
- return spec;
- }
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
- @Override
- public JobSpecification generateJob(int iteration) throws HyracksException {
- if (iteration <= 0)
- throw new IllegalStateException(
- "iteration number cannot be less than 1");
- if (iteration == 1)
- return generateFirstIteration(iteration);
- else
- return generateNonFirstIteration(iteration);
- }
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanSortPrintGraph(String nodeName, String path)
- throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ @Override
+ public JobSpecification generateJob(int iteration) throws HyracksException {
+ if (iteration <= 0)
+ throw new IllegalStateException("iteration number cannot be less than 1");
+ if (iteration == 1)
+ return generateFirstIteration(iteration);
+ else
+ return generateNonFirstIteration(iteration);
+ }
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob,
- fileSplitProvider.getFileSplits().length);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
- spec, recordDescriptor, splits, confFactory);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
- splits.size());
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, maxFrameLimit, sortFields, nkmFactory,
- comparatorFactories, recordDescriptor);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
- splits.size());
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(
- new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
- results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
- spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
- null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
- new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
- 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
- hashPartitionComputerFactory, sortFields, comparatorFactories),
- sorter, 0, writer, 0);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexPrintGraph(String nodeName, String path)
- throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), sorter, 0, writer, 0);
+ return spec;
+ }
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
- spec, recordDescriptor, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, null, true, true,
- new BTreeDataflowHelperFactory(), false,
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(
- new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
- results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
- spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
- null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
- new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * connect operator descriptors
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
- 0, scanner, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
- hashPartitionComputerFactory, sortFields, comparatorFactories),
- scanner, 0, writer, 0);
- spec.setFrameSize(frameSize);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexWriteGraph() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
- spec, recordDescriptor, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, null, true, true,
- new BTreeDataflowHelperFactory(), false,
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct write file operator
- */
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
- spec, confFactory, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * connect operator descriptors
- */
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
- 0, scanner, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
- 0);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, writer);
- /***
- * drop the sindex
- *
- * @return JobSpecification
- * @throws HyracksException
- */
- protected JobSpecification dropIndex(String indexName)
- throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ return spec;
+ }
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider);
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName) throws HyracksException {
+ JobSpecification spec = new JobSpecification();
- ClusterConfig.setLocationConstraint(spec, drop);
- spec.addRoot(drop);
- return spec;
- }
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider);
- /** generate non-first iteration job */
- protected abstract JobSpecification generateNonFirstIteration(int iteration)
- throws HyracksException;
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ return spec;
+ }
- /** generate first iteration job */
- protected abstract JobSpecification generateFirstIteration(int iteration)
- throws HyracksException;
+ /** generate non-first iteration job */
+ protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
- /** generate clean-up job */
- public abstract JobSpecification[] generateCleanup()
- throws HyracksException;
+ /** generate first iteration job */
+ protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+ /** generate clean-up job */
+ public abstract JobSpecification[] generateCleanup() throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 8eadab9..d26e637 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
public class ClusterConfig {
@@ -49,6 +50,7 @@
private static Properties clusterProperties = new Properties();
private static Map<String, List<String>> ipToNcMapping;
private static String[] stores;
+ private static Scheduler hdfsScheduler;
/**
* let tests set config path to be whatever
@@ -211,6 +213,8 @@
NCs[i] = entry.getKey();
i++;
}
+
+ hdfsScheduler = new Scheduler(ipAddress, port);
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -218,4 +222,8 @@
loadClusterProperties();
loadStores();
}
+
+ public static Scheduler getHdfsScheduler() {
+ return hdfsScheduler;
+ }
}
diff --git a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..f89dd79 100644
--- a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
+ <name>mapred.max.split.size</name>
+ <value>4096</value>
</property>
</configuration>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 36b0eb8..6c039bf 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -144,5 +144,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..0736e1d 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,6 +17,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
@@ -55,15 +56,20 @@
private final List<InputSplit> splits;
private final IConfigurationFactory confFactory;
private final int fieldSize = 2;
+ private final String[] scheduledLocations;
+ private final boolean[] executed;
/**
* @param spec
*/
public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
- IConfigurationFactory confFactory) throws HyracksException {
+ String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
super(spec, 0, 1);
this.splits = splits;
this.confFactory = confFactory;
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
this.recordDescriptors[0] = rd;
}
@@ -78,7 +84,21 @@
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
writer.open();
- loadVertices(ctx, partition);
+ for (int i = 0; i < scheduledLocations.length; i++) {
+ if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+ /**
+ * pick one from the FileSplit queue
+ */
+ synchronized (executed) {
+ if (!executed[i]) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+ loadVertices(ctx, i);
+ }
+ }
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
@@ -104,13 +124,13 @@
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
- InputSplit split = splits.get(partition);
+ InputSplit split = splits.get(partitionId);
if (split instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) split;
LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
+ " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
- + partition);
+ + partitionId);
}
VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
vertexReader.initialize(split, context);
@@ -122,7 +142,7 @@
* set context
*/
Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(partition));
+ splits.get(partitionId));
Vertex.setContext(mapperContext);
/**
@@ -166,5 +186,4 @@
}
};
}
-
}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..71450f1 100644
--- a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
</property>
</configuration>