clean up genomix branch
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2803 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs-scheduler/pom.xml b/hyracks/hyracks-hdfs-scheduler/pom.xml
deleted file mode 100644
index 09de5f7..0000000
--- a/hyracks/hyracks-hdfs-scheduler/pom.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-<?xml version="1.0"?>
-<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>hyracks</artifactId>
- <groupId>edu.uci.ics.hyracks</groupId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
-
- <groupId>edu.uc.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs-scheduler</artifactId>
- <version>1.0-SNAPSHOT</version>
- <name>hyracks-hdfs-scheduler</name>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.2</version>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.7.2</version>
- <configuration>
- <forkMode>pertest</forkMode>
- <includes>
- <include>**/*TestSuite.java</include>
- <include>**/*Test.java</include>
- </includes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-api</artifactId>
- <version>0.2.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uc/ics/hyracks/hdfs/scheduler/Scheduler.java b/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uc/ics/hyracks/hdfs/scheduler/Scheduler.java
deleted file mode 100644
index 83e4269..0000000
--- a/hyracks/hyracks-hdfs-scheduler/src/main/java/edu/uc/ics/hyracks/hdfs/scheduler/Scheduler.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uc.ics.hyracks.hdfs.scheduler;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-
-/**
- * The scheduler conduct data-local scheduling for data on HDFS
- */
-@SuppressWarnings({ "rawtypes", "deprecation" })
-public class Scheduler {
-
- /** a list of NCs */
- private String[] NCs;
-
- /** a map from ip to NCs */
- private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-
- /** a map from the NC name to the index */
- private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
-
- /**
- * The constructor of the scheduler
- *
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
- public Scheduler(String ipAddress, int port) throws HyracksException {
- try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
- Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
- loadIPAddressToNCMap(ncNameToNcInfos);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
- loadIPAddressToNCMap(ncNameToNcInfos);
- }
-
- /**
- * Set the location constraints for a file scan operator with an input format class
- *
- * @conf the Hadoop job configuration
- * @param inputFormatClass
- * @throws HyracksException
- */
- public String[] getLocationConstraints(JobConf conf, Class<InputFormat> inputFormatClass) throws HyracksException {
- try {
- InputFormat inputFormat = inputFormatClass.newInstance();
- InputSplit[] splits = inputFormat.getSplits(conf, NCs.length);
- return getLocationConstraints(splits);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- /**
- * Set location constraints for a file scan operator with a list of file splits
- *
- * @throws HyracksDataException
- */
- public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
- int[] capacity = new int[NCs.length];
- Arrays.fill(capacity, 0);
- String[] locations = new String[splits.length];
- int slots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
- / capacity.length + 1);
-
- try {
- Random random = new Random(System.currentTimeMillis());
- boolean scheduled[] = new boolean[splits.length];
- Arrays.fill(scheduled, false);
-
- for (int i = 0; i < splits.length; i++) {
- /**
- * get the location of all the splits
- */
- String[] loc = splits[i].getLocations();
- if (loc.length > 0) {
- for (int j = 0; j < loc.length; j++) {
- /**
- * get all the IP addresses from the name
- */
- InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
- /**
- * iterate overa all ips
- */
- for (InetAddress ip : allIps) {
- /**
- * if the node controller exists
- */
- if (ipToNcMapping.get(ip.getHostAddress()) != null) {
- /**
- * set the ncs
- */
- List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
- int arrayPos = random.nextInt(dataLocations.size());
- String nc = dataLocations.get(arrayPos);
- int pos = ncNameToIndex.get(nc);
- /**
- * check if the node is already full
- */
- if (capacity[pos] < slots) {
- locations[i] = nc;
- capacity[pos]++;
- scheduled[i] = true;
- }
- }
- }
-
- /**
- * break the loop for data-locations if the schedule has already been found
- */
- if (scheduled[i] == true) {
- break;
- }
- }
- }
- }
-
- /**
- * find the lowest index the current available NCs
- */
- int currentAvailableNC = 0;
- for (int i = 0; i < capacity.length; i++) {
- if (capacity[i] < slots) {
- currentAvailableNC = i;
- break;
- }
- }
-
- /**
- * schedule no-local file reads
- */
- for (int i = 0; i < splits.length; i++) {
- // if there is no data-local NC choice, choose a random one
- if (!scheduled[i]) {
- locations[i] = NCs[currentAvailableNC];
- capacity[currentAvailableNC]++;
- scheduled[i] = true;
-
- /**
- * move the available NC cursor to the next one
- */
- for (int j = currentAvailableNC; j < capacity.length; j++) {
- if (capacity[j] < slots) {
- currentAvailableNC = j;
- break;
- }
- }
- }
- }
- return locations;
- } catch (IOException e) {
- throw new HyracksException(e);
- }
- }
-
- /**
- * Load the IP-address-to-NC map from the NCNameToNCInfoMap
- *
- * @param ncNameToNcInfos
- * @throws HyracksException
- */
- private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
- try {
- NCs = new String[ncNameToNcInfos.size()];
- int i = 0;
-
- /**
- * build the IP address to NC map
- */
- for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
- String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
- .getHostAddress();
- List<String> matchedNCs = ipToNcMapping.get(ipAddr);
- if (matchedNCs == null) {
- matchedNCs = new ArrayList<String>();
- ipToNcMapping.put(ipAddr, matchedNCs);
- }
- matchedNCs.add(entry.getKey());
- NCs[i] = entry.getKey();
- i++;
- }
-
- /**
- * set up the NC name to index mapping
- */
- for (i = 0; i < NCs.length; i++) {
- ncNameToIndex.put(NCs[i], i);
- }
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-}
diff --git a/hyracks/hyracks-hdfs-scheduler/src/test/java/edu/uc/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs-scheduler/src/test/java/edu/uc/ics/hyracks/hdfs/scheduler/SchedulerTest.java
deleted file mode 100644
index ffc6e05..0000000
--- a/hyracks/hyracks-hdfs-scheduler/src/test/java/edu/uc/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uc.ics.hyracks.hdfs.scheduler;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-
-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.client.NodeStatus;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-
-@SuppressWarnings("deprecation")
-public class SchedulerTest extends TestCase {
-
- /**
- * Test the scheduler for the case when the Hyracks cluster is the HDFS cluster
- *
- * @throws Exception
- */
- public void testSchedulerSimple() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099)));
-
- InputSplit[] fileSplits = new InputSplit[6];
- fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
- fileSplits[1] = new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[2] = new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
- fileSplits[3] = new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
- fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
-
- Scheduler scheduler = new Scheduler(ncNameToNcInfos);
- String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
-
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc5", "nc6" };
-
- for (int i = 0; i < locationConstraints.length; i++) {
- Assert.assertEquals(locationConstraints[i], expectedResults[i]);
- }
- }
-
- /**
- * Test the case where the HDFS cluster is a larger than the Hyracks cluster
- *
- * @throws Exception
- */
- public void testSchedulerLargerHDFS() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099)));
-
- InputSplit[] fileSplits = new InputSplit[12];
- fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
- fileSplits[1] = new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[2] = new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
- fileSplits[3] = new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
- fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
- fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
- fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
- fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
- fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
- fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
-
- Scheduler scheduler = new Scheduler(ncNameToNcInfos);
- String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
-
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
- "nc6", "nc5" };
-
- for (int i = 0; i < locationConstraints.length; i++) {
- Assert.assertEquals(locationConstraints[i], expectedResults[i]);
- }
- }
-
- /**
- * Test the case where the HDFS cluster is a larger than the Hyracks cluster
- *
- * @throws Exception
- */
- public void testSchedulerSmallerHDFS() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099)));
-
- InputSplit[] fileSplits = new InputSplit[12];
- fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
- fileSplits[1] = new FileSplit(new Path("part-2"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[2] = new FileSplit(new Path("part-3"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.3" });
- fileSplits[3] = new FileSplit(new Path("part-4"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.3" });
- fileSplits[4] = new FileSplit(new Path("part-5"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
- fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
- fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[8] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.1" });
- fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.2" });
- fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[11] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
-
- Scheduler scheduler = new Scheduler(ncNameToNcInfos);
- String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
-
- String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
- "nc5", "nc6" };
-
- for (int i = 0; i < locationConstraints.length; i++) {
- Assert.assertEquals(locationConstraints[i], expectedResults[i]);
- }
- }
-
-}