1. let hyracks-hdfs support hadoop new API\n 2. let Pregelix use hyracks-hdfs functionality;\n 3. increase degree-of-parallelism in tests
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2816 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 69936a4..bc44db8 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -108,7 +108,6 @@
synchronized (executed) {
if (executed[i] == false) {
executed[i] = true;
- System.out.println("thread " + Thread.currentThread().getId() + " setting " + i);
} else {
continue;
}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
new file mode 100644
index 0000000..d843d27
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ConfFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.Serializable;
+
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ConfFactory implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private byte[] confBytes;
+
+ public ConfFactory(Job conf) throws HyracksDataException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ conf.getConfiguration().write(dos);
+ confBytes = bos.toByteArray();
+ dos.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public Job getConf() throws HyracksDataException {
+ try {
+ Job conf = new Job();
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(confBytes));
+ conf.getConfiguration().readFields(dis);
+ dis.close();
+ return conf;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
new file mode 100644
index 0000000..54d60c0
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+@SuppressWarnings("rawtypes")
+public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final ConfFactory confFactory;
+ private final InputSplit[] inputSplits;
+ private final String[] scheduledLocations;
+ private final IKeyValueParserFactory tupleParserFactory;
+ private final boolean[] executed;
+
+ /**
+ * The constructor of HDFSReadOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param rd
+ * the output record descriptor
+ * @param conf
+ * the Hadoop JobConf object, which contains the input format and the input paths
+ * @param splits
+ * the array of FileSplits (HDFS chunks).
+ * @param scheduledLocations
+ * the node controller names to scan the FileSplits, which is an one-to-one mapping. The String array
+ * is obtained from the edu.cui.ics.hyracks.hdfs.scheduler.Scheduler.getLocationConstraints(InputSplits[]).
+ * @param tupleParserFactory
+ * the ITupleParserFactory implementation instance.
+ * @throws HyracksException
+ */
+ public HDFSReadOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, Job conf, InputSplit[] splits,
+ String[] scheduledLocations, IKeyValueParserFactory tupleParserFactory) throws HyracksException {
+ super(spec, 0, 1);
+ try {
+ this.inputSplits = splits;
+ this.confFactory = new ConfFactory(conf);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
+ this.tupleParserFactory = tupleParserFactory;
+ this.recordDescriptors[0] = rd;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final Job conf = confFactory.getConf();
+
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+ writer.open();
+ InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
+ conf.getConfiguration());
+ for (int i = 0; i < inputSplits.length; i++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (scheduledLocations[i].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[i] == false) {
+ executed[i] = true;
+ System.out.println("thread " + Thread.currentThread().getId() + " setting " + i);
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ TaskAttemptContext context = new TaskAttemptContext(conf.getConfiguration(),
+ new TaskAttemptID());
+ RecordReader reader = inputFormat.createRecordReader(inputSplits[i], context);
+ reader.initialize(inputSplits[i], context);
+ while (reader.nextKeyValue() == true) {
+ parser.parse(reader.getCurrentKey(), reader.getCurrentValue(), writer);
+ }
+ }
+ }
+ parser.flush(writer);
+ writer.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
new file mode 100644
index 0000000..441815b
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private ITupleWriterFactory tupleWriterFactory;
+ private String outputPath;
+
+ /**
+ * The constructor of HDFSWriteOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param conf
+ * the Hadoop JobConf which contains the output path
+ * @param tupleWriterFactory
+ * the ITupleWriterFactory implementation object
+ * @throws HyracksException
+ */
+ public HDFSWriteOperatorDescriptor(JobSpecification spec, Job conf, String outputPath,
+ ITupleWriterFactory tupleWriterFactory) throws HyracksException {
+ super(spec, 1, 0);
+ this.confFactory = new ConfFactory(conf);
+ this.tupleWriterFactory = tupleWriterFactory;
+ this.outputPath = outputPath;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final Job conf = confFactory.getConf();
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ private String fileName = outputPath + File.separator + "part-" + partition;
+ private FSDataOutputStream dos;
+ private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
+ private FrameTupleReference tuple = new FrameTupleReference();
+ private ITupleWriter tupleWriter;
+
+ @Override
+ public void open() throws HyracksDataException {
+ tupleWriter = tupleWriterFactory.getTupleWriter();
+ try {
+ FileSystem dfs = FileSystem.get(conf.getConfiguration());
+ dos = dfs.create(new Path(fileName), true);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ tupleWriter.write(dos, tuple);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ dos.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
new file mode 100644
index 0000000..5b70a4e
--- /dev/null
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/Scheduler.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+/**
+ * The scheduler conduct data-local scheduling for data on HDFS
+ */
+public class Scheduler {
+
+ /** a list of NCs */
+ private String[] NCs;
+
+ /** a map from ip to NCs */
+ private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+
+ /** a map from the NC name to the index */
+ private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+
+ /**
+ * The constructor of the scheduler
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ public Scheduler(String ipAddress, int port) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ loadIPAddressToNCMap(ncNameToNcInfos);
+ }
+
+ /**
+ * Set location constraints for a file scan operator with a list of file splits
+ *
+ * @throws HyracksDataException
+ */
+ public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
+ int[] capacity = new int[NCs.length];
+ Arrays.fill(capacity, 0);
+ String[] locations = new String[splits.size()];
+ int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
+ / capacity.length + 1);
+
+ try {
+ Random random = new Random(System.currentTimeMillis());
+ boolean scheduled[] = new boolean[splits.size()];
+ Arrays.fill(scheduled, false);
+
+ for (int i = 0; i < splits.size(); i++) {
+ /**
+ * get the location of all the splits
+ */
+ String[] loc = splits.get(i).getLocations();
+ if (loc.length > 0) {
+ for (int j = 0; j < loc.length; j++) {
+ /**
+ * get all the IP addresses from the name
+ */
+ InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+ /**
+ * iterate overa all ips
+ */
+ for (InetAddress ip : allIps) {
+ /**
+ * if the node controller exists
+ */
+ if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+ /**
+ * set the ncs
+ */
+ List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+ int arrayPos = random.nextInt(dataLocations.size());
+ String nc = dataLocations.get(arrayPos);
+ int pos = ncNameToIndex.get(nc);
+ /**
+ * check if the node is already full
+ */
+ if (capacity[pos] < slots) {
+ locations[i] = nc;
+ capacity[pos]++;
+ scheduled[i] = true;
+ }
+ }
+ }
+
+ /**
+ * break the loop for data-locations if the schedule has already been found
+ */
+ if (scheduled[i] == true) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * find the lowest index the current available NCs
+ */
+ int currentAvailableNC = 0;
+ for (int i = 0; i < capacity.length; i++) {
+ if (capacity[i] < slots) {
+ currentAvailableNC = i;
+ break;
+ }
+ }
+
+ /**
+ * schedule no-local file reads
+ */
+ for (int i = 0; i < splits.size(); i++) {
+ // if there is no data-local NC choice, choose a random one
+ if (!scheduled[i]) {
+ locations[i] = NCs[currentAvailableNC];
+ capacity[currentAvailableNC]++;
+ scheduled[i] = true;
+
+ /**
+ * move the available NC cursor to the next one
+ */
+ for (int j = currentAvailableNC; j < capacity.length; j++) {
+ if (capacity[j] < slots) {
+ currentAvailableNC = j;
+ break;
+ }
+ }
+ }
+ }
+ return locations;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ /**
+ * Load the IP-address-to-NC map from the NCNameToNCInfoMap
+ *
+ * @param ncNameToNcInfos
+ * @throws HyracksException
+ */
+ private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
+ try {
+ NCs = new String[ncNameToNcInfos.size()];
+ int i = 0;
+
+ /**
+ * build the IP address to NC map
+ */
+ for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+ String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+ .getHostAddress();
+ List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+ if (matchedNCs == null) {
+ matchedNCs = new ArrayList<String>();
+ ipToNcMapping.put(ipAddr, matchedNCs);
+ }
+ matchedNCs.add(entry.getKey());
+ NCs[i] = entry.getKey();
+ i++;
+ }
+
+ /**
+ * set up the NC name to index mapping
+ */
+ for (i = 0; i < NCs.length; i++) {
+ ncNameToIndex.put(NCs[i], i);
+ }
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml b/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
index a8680ec..39b6505 100644
--- a/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/hyracks/hyracks-hdfs/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,7 +18,7 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
+ <name>mapred.max.split.size</name>
<value>2048</value>
</property>