merged hyracks_asterix_stabilization r1658:1659, r1660:1661, r1671:1672, r1673:1680
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1682 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-yarn/hyracks-yarn-am/pom.xml b/hyracks-yarn/hyracks-yarn-am/pom.xml
new file mode 100644
index 0000000..9e453a6
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/pom.xml
@@ -0,0 +1,74 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-yarn-am</artifactId>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-yarn</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.yarn.am.HyracksYarnApplicationMaster</mainClass>
+ <name>hyracks-yarn-am</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.0.16</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-yarn-common</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml b/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
new file mode 100644
index 0000000..a9c4520
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.hyracks.yarn.am.manifest.AbstractProcess;
+import edu.uci.ics.hyracks.yarn.am.manifest.ContainerSpecification;
+import edu.uci.ics.hyracks.yarn.am.manifest.HyracksCluster;
+import edu.uci.ics.hyracks.yarn.am.manifest.ManifestParser;
+import edu.uci.ics.hyracks.yarn.am.manifest.NodeController;
+import edu.uci.ics.hyracks.yarn.common.protocols.amrm.AMRMConnection;
+
+public class HyracksYarnApplicationMaster {
+ private final Options options;
+
+ private final Timer timer;
+
+ private final List<ResourceRequest> asks;
+
+ private final Map<Resource, Set<AskRecord>> resource2AskMap;
+
+ private final Map<AbstractProcess, AskRecord> proc2AskMap;
+
+ private YarnConfiguration config;
+
+ private AMRMConnection amrmc;
+
+ private RegisterApplicationMasterResponse registration;
+
+ private HyracksCluster hcManifest;
+
+ private HyracksYarnApplicationMaster(Options options) {
+ this.options = options;
+ timer = new Timer(true);
+ asks = new ArrayList<ResourceRequest>();
+ resource2AskMap = new HashMap<Resource, Set<AskRecord>>();
+ proc2AskMap = new HashMap<AbstractProcess, AskRecord>();
+ }
+
+ private void run() throws Exception {
+ Configuration conf = new Configuration();
+ config = new YarnConfiguration(conf);
+ amrmc = new AMRMConnection(config);
+
+ performRegistration();
+ setupHeartbeats();
+ parseManifest();
+ setupAsks();
+ while (true) {
+ Thread.sleep(1000);
+ }
+ }
+
+ private synchronized void setupAsks() {
+ setupAsk(hcManifest.getClusterController());
+ for (NodeController nc : hcManifest.getNodeControllers()) {
+ setupAsk(nc);
+ }
+ }
+
+ private void setupAsk(AbstractProcess proc) {
+ ContainerSpecification cSpec = proc.getContainerSpecification();
+ ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
+
+ rsrcRequest.setHostName(cSpec.getHostname());
+
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(100);
+ rsrcRequest.setPriority(pri);
+
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(cSpec.getMemory());
+ rsrcRequest.setCapability(capability);
+
+ rsrcRequest.setNumContainers(1);
+
+ AskRecord ar = new AskRecord();
+ ar.req = rsrcRequest;
+ ar.proc = proc;
+
+ Set<AskRecord> arSet = resource2AskMap.get(capability);
+ if (arSet == null) {
+ arSet = new HashSet<AskRecord>();
+ resource2AskMap.put(capability, arSet);
+ }
+ arSet.add(ar);
+ proc2AskMap.put(proc, ar);
+
+ System.err.println(proc + " -> " + rsrcRequest);
+
+ asks.add(rsrcRequest);
+ }
+
+ private void parseManifest() throws Exception {
+ String str = FileUtils.readFileToString(new File("manifest.xml"));
+ hcManifest = ManifestParser.parse(str);
+ }
+
+ private void setupHeartbeats() {
+ long heartbeatInterval = config.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+ System.err.println("Heartbeat interval: " + heartbeatInterval);
+ heartbeatInterval = Math.min(heartbeatInterval, 1000);
+ System.err.println("Heartbeat interval: " + heartbeatInterval);
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ AllocateRequest hb = Records.newRecord(AllocateRequest.class);
+ populateAllocateRequest(hb);
+ hb.setApplicationAttemptId(amrmc.getApplicationAttemptId());
+ hb.setProgress(0);
+ try {
+ AllocateResponse allocateResponse = amrmc.getAMRMProtocol().allocate(hb);
+ List<Container> allocatedContainers = allocateResponse.getAMResponse().getAllocatedContainers();
+ List<ContainerStatus> completedContainers = allocateResponse.getAMResponse()
+ .getCompletedContainersStatuses();
+ processAllocation(allocatedContainers, completedContainers);
+ } catch (YarnRemoteException e) {
+ e.printStackTrace();
+ }
+ }
+ }, 0, heartbeatInterval);
+ }
+
+ private synchronized void populateAllocateRequest(AllocateRequest hb) {
+ hb.addAllAsks(asks);
+ asks.clear();
+ }
+
+ private synchronized void processAllocation(List<Container> allocatedContainers,
+ List<ContainerStatus> completedContainers) {
+ for (Container c : allocatedContainers) {
+ System.err.println("Got container: " + c.getContainerStatus());
+ NodeId nodeId = c.getNodeId();
+ Resource resource = c.getResource();
+
+ Set<AskRecord> arSet = resource2AskMap.get(resource);
+ boolean found = false;
+ if (arSet != null) {
+ AskRecord wildcardMatch = null;
+ AskRecord nameMatch = null;
+ for (AskRecord ar : arSet) {
+ ResourceRequest req = ar.req;
+ if (ar.allocation == null) {
+ if ("*".equals(req.getHostName()) && wildcardMatch == null) {
+ wildcardMatch = ar;
+ }
+ if (req.getHostName().equals(nodeId.getHost()) && nameMatch == null) {
+ nameMatch = ar;
+ break;
+ }
+ }
+ }
+ if (nameMatch != null) {
+ found = true;
+ nameMatch.allocation = c;
+ } else if (wildcardMatch != null) {
+ found = true;
+ wildcardMatch.allocation = c;
+ }
+ }
+ if (!found) {
+ System.err.println("Unknown request satisfied: " + resource);
+ }
+ }
+ }
+
+ private void performRegistration() throws YarnRemoteException {
+ RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
+ appMasterRequest.setApplicationAttemptId(amrmc.getApplicationAttemptId());
+
+ registration = amrmc.getAMRMProtocol().registerApplicationMaster(appMasterRequest);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ try {
+ parser.parseArgument(args);
+ } catch (Exception e) {
+ parser.printUsage(System.err);
+ return;
+ }
+ new HyracksYarnApplicationMaster(options).run();
+ }
+
+ private static class Options {
+ }
+
+ private static class AskRecord {
+ ResourceRequest req;
+ AbstractProcess proc;
+ Container allocation;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/AbstractProcess.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/AbstractProcess.java
new file mode 100644
index 0000000..be6e263
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/AbstractProcess.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public abstract class AbstractProcess {
+ protected ContainerSpecification cSpec;
+
+ protected String cmdLineArgs;
+
+ public void setContainerSpecification(ContainerSpecification cSpec) {
+ this.cSpec = cSpec;
+ }
+
+ public ContainerSpecification getContainerSpecification() {
+ return cSpec;
+ }
+
+ public void setCommandLineArguments(String cmdLineArgs) {
+ this.cmdLineArgs = cmdLineArgs;
+ }
+
+ public String getCommandLineArguments() {
+ return cmdLineArgs;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ClusterController.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ClusterController.java
new file mode 100644
index 0000000..b230ada
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ClusterController.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public class ClusterController extends AbstractProcess {
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ContainerSpecification.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ContainerSpecification.java
new file mode 100644
index 0000000..07f12c8
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ContainerSpecification.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public class ContainerSpecification {
+ private String hostname;
+
+ private int memory;
+
+ public ContainerSpecification() {
+ hostname = "*";
+ }
+
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public void setMemory(int memory) {
+ this.memory = memory;
+ }
+
+ public int getMemory() {
+ return memory;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/HyracksCluster.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/HyracksCluster.java
new file mode 100644
index 0000000..4a66ed6
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/HyracksCluster.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HyracksCluster {
+ private String name;
+
+ private ClusterController cc;
+
+ private List<NodeController> ncs;
+
+ public HyracksCluster() {
+ ncs = new ArrayList<NodeController>();
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setClusterController(ClusterController cc) {
+ this.cc = cc;
+ }
+
+ public ClusterController getClusterController() {
+ return cc;
+ }
+
+ public void addNodeController(NodeController nc) {
+ ncs.add(nc);
+ }
+
+ public List<NodeController> getNodeControllers() {
+ return ncs;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ManifestParser.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ManifestParser.java
new file mode 100644
index 0000000..bc5ebae
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ManifestParser.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+import java.io.StringReader;
+
+import org.apache.commons.digester.Digester;
+
+public class ManifestParser {
+ public static HyracksCluster parse(String mXML) throws Exception {
+ Digester d = createDigester();
+ return (HyracksCluster) d.parse(new StringReader(mXML));
+ }
+
+ private static Digester createDigester() {
+ Digester d = new Digester();
+ d.setValidating(false);
+
+ d.addObjectCreate("hyracks-cluster", HyracksCluster.class);
+ d.addSetProperties("hyracks-cluster");
+
+ d.addObjectCreate("hyracks-cluster/cluster-controller", ClusterController.class);
+ d.addSetProperties("hyracks-cluster/cluster-controller");
+ d.addSetNext("hyracks-cluster/cluster-controller", "setClusterController");
+
+ d.addObjectCreate("hyracks-cluster/node-controllers/node-controller", NodeController.class);
+ d.addSetProperties("hyracks-cluster/node-controllers/node-controller");
+ d.addSetNext("hyracks-cluster/node-controllers/node-controller", "addNodeController");
+
+ d.addObjectCreate("*/container-specification", ContainerSpecification.class);
+ d.addSetProperties("*/container-specification");
+ d.addSetNext("*/container-specification", "setContainerSpecification");
+ return d;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/NodeController.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/NodeController.java
new file mode 100644
index 0000000..f46eb7f
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/NodeController.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public class NodeController extends AbstractProcess {
+ private String id;
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+}
\ No newline at end of file