merged hyracks_asterix_stabilization r1658:1659, r1660:1661, r1671:1672, r1673:1680

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1682 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-yarn/hyracks-yarn-am/pom.xml b/hyracks-yarn/hyracks-yarn-am/pom.xml
new file mode 100644
index 0000000..9e453a6
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/pom.xml
@@ -0,0 +1,74 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-yarn-am</artifactId>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-yarn</artifactId>
+    <version>0.2.1-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.yarn.am.HyracksYarnApplicationMaster</mainClass>
+                  <name>hyracks-yarn-am</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  <dependency>
+  	<groupId>args4j</groupId>
+  	<artifactId>args4j</artifactId>
+  	<version>2.0.16</version>
+  </dependency>
+  <dependency>
+  	<groupId>edu.uci.ics.hyracks</groupId>
+  	<artifactId>hyracks-yarn-common</artifactId>
+  	<version>0.2.1-SNAPSHOT</version>
+  </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml b/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..0500499
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
new file mode 100644
index 0000000..a9c4520
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.hyracks.yarn.am.manifest.AbstractProcess;
+import edu.uci.ics.hyracks.yarn.am.manifest.ContainerSpecification;
+import edu.uci.ics.hyracks.yarn.am.manifest.HyracksCluster;
+import edu.uci.ics.hyracks.yarn.am.manifest.ManifestParser;
+import edu.uci.ics.hyracks.yarn.am.manifest.NodeController;
+import edu.uci.ics.hyracks.yarn.common.protocols.amrm.AMRMConnection;
+
+public class HyracksYarnApplicationMaster {
+    private final Options options;
+
+    private final Timer timer;
+
+    private final List<ResourceRequest> asks;
+
+    private final Map<Resource, Set<AskRecord>> resource2AskMap;
+
+    private final Map<AbstractProcess, AskRecord> proc2AskMap;
+
+    private YarnConfiguration config;
+
+    private AMRMConnection amrmc;
+
+    private RegisterApplicationMasterResponse registration;
+
+    private HyracksCluster hcManifest;
+
+    private HyracksYarnApplicationMaster(Options options) {
+        this.options = options;
+        timer = new Timer(true);
+        asks = new ArrayList<ResourceRequest>();
+        resource2AskMap = new HashMap<Resource, Set<AskRecord>>();
+        proc2AskMap = new HashMap<AbstractProcess, AskRecord>();
+    }
+
+    private void run() throws Exception {
+        Configuration conf = new Configuration();
+        config = new YarnConfiguration(conf);
+        amrmc = new AMRMConnection(config);
+
+        performRegistration();
+        setupHeartbeats();
+        parseManifest();
+        setupAsks();
+        while (true) {
+            Thread.sleep(1000);
+        }
+    }
+
+    private synchronized void setupAsks() {
+        setupAsk(hcManifest.getClusterController());
+        for (NodeController nc : hcManifest.getNodeControllers()) {
+            setupAsk(nc);
+        }
+    }
+
+    private void setupAsk(AbstractProcess proc) {
+        ContainerSpecification cSpec = proc.getContainerSpecification();
+        ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
+
+        rsrcRequest.setHostName(cSpec.getHostname());
+
+        Priority pri = Records.newRecord(Priority.class);
+        pri.setPriority(100);
+        rsrcRequest.setPriority(pri);
+
+        Resource capability = Records.newRecord(Resource.class);
+        capability.setMemory(cSpec.getMemory());
+        rsrcRequest.setCapability(capability);
+
+        rsrcRequest.setNumContainers(1);
+
+        AskRecord ar = new AskRecord();
+        ar.req = rsrcRequest;
+        ar.proc = proc;
+
+        Set<AskRecord> arSet = resource2AskMap.get(capability);
+        if (arSet == null) {
+            arSet = new HashSet<AskRecord>();
+            resource2AskMap.put(capability, arSet);
+        }
+        arSet.add(ar);
+        proc2AskMap.put(proc, ar);
+
+        System.err.println(proc + " -> " + rsrcRequest);
+
+        asks.add(rsrcRequest);
+    }
+
+    private void parseManifest() throws Exception {
+        String str = FileUtils.readFileToString(new File("manifest.xml"));
+        hcManifest = ManifestParser.parse(str);
+    }
+
+    private void setupHeartbeats() {
+        long heartbeatInterval = config.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+                YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+        System.err.println("Heartbeat interval: " + heartbeatInterval);
+        heartbeatInterval = Math.min(heartbeatInterval, 1000);
+        System.err.println("Heartbeat interval: " + heartbeatInterval);
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                AllocateRequest hb = Records.newRecord(AllocateRequest.class);
+                populateAllocateRequest(hb);
+                hb.setApplicationAttemptId(amrmc.getApplicationAttemptId());
+                hb.setProgress(0);
+                try {
+                    AllocateResponse allocateResponse = amrmc.getAMRMProtocol().allocate(hb);
+                    List<Container> allocatedContainers = allocateResponse.getAMResponse().getAllocatedContainers();
+                    List<ContainerStatus> completedContainers = allocateResponse.getAMResponse()
+                            .getCompletedContainersStatuses();
+                    processAllocation(allocatedContainers, completedContainers);
+                } catch (YarnRemoteException e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 0, heartbeatInterval);
+    }
+
+    private synchronized void populateAllocateRequest(AllocateRequest hb) {
+        hb.addAllAsks(asks);
+        asks.clear();
+    }
+
+    private synchronized void processAllocation(List<Container> allocatedContainers,
+            List<ContainerStatus> completedContainers) {
+        for (Container c : allocatedContainers) {
+            System.err.println("Got container: " + c.getContainerStatus());
+            NodeId nodeId = c.getNodeId();
+            Resource resource = c.getResource();
+
+            Set<AskRecord> arSet = resource2AskMap.get(resource);
+            boolean found = false;
+            if (arSet != null) {
+                AskRecord wildcardMatch = null;
+                AskRecord nameMatch = null;
+                for (AskRecord ar : arSet) {
+                    ResourceRequest req = ar.req;
+                    if (ar.allocation == null) {
+                        if ("*".equals(req.getHostName()) && wildcardMatch == null) {
+                            wildcardMatch = ar;
+                        }
+                        if (req.getHostName().equals(nodeId.getHost()) && nameMatch == null) {
+                            nameMatch = ar;
+                            break;
+                        }
+                    }
+                }
+                if (nameMatch != null) {
+                    found = true;
+                    nameMatch.allocation = c;
+                } else if (wildcardMatch != null) {
+                    found = true;
+                    wildcardMatch.allocation = c;
+                }
+            }
+            if (!found) {
+                System.err.println("Unknown request satisfied: " + resource);
+            }
+        }
+    }
+
+    private void performRegistration() throws YarnRemoteException {
+        RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
+        appMasterRequest.setApplicationAttemptId(amrmc.getApplicationAttemptId());
+
+        registration = amrmc.getAMRMProtocol().registerApplicationMaster(appMasterRequest);
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        try {
+            parser.parseArgument(args);
+        } catch (Exception e) {
+            parser.printUsage(System.err);
+            return;
+        }
+        new HyracksYarnApplicationMaster(options).run();
+    }
+
+    private static class Options {
+    }
+
+    private static class AskRecord {
+        ResourceRequest req;
+        AbstractProcess proc;
+        Container allocation;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/AbstractProcess.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/AbstractProcess.java
new file mode 100644
index 0000000..be6e263
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/AbstractProcess.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public abstract class AbstractProcess {
+    protected ContainerSpecification cSpec;
+
+    protected String cmdLineArgs;
+
+    public void setContainerSpecification(ContainerSpecification cSpec) {
+        this.cSpec = cSpec;
+    }
+
+    public ContainerSpecification getContainerSpecification() {
+        return cSpec;
+    }
+
+    public void setCommandLineArguments(String cmdLineArgs) {
+        this.cmdLineArgs = cmdLineArgs;
+    }
+
+    public String getCommandLineArguments() {
+        return cmdLineArgs;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ClusterController.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ClusterController.java
new file mode 100644
index 0000000..b230ada
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ClusterController.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public class ClusterController extends AbstractProcess {    
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ContainerSpecification.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ContainerSpecification.java
new file mode 100644
index 0000000..07f12c8
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ContainerSpecification.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public class ContainerSpecification {
+    private String hostname;
+
+    private int memory;
+
+    public ContainerSpecification() {
+        hostname = "*";
+    }
+
+    public void setHostname(String hostname) {
+        this.hostname = hostname;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public void setMemory(int memory) {
+        this.memory = memory;
+    }
+
+    public int getMemory() {
+        return memory;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/HyracksCluster.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/HyracksCluster.java
new file mode 100644
index 0000000..4a66ed6
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/HyracksCluster.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HyracksCluster {
+    private String name;
+
+    private ClusterController cc;
+
+    private List<NodeController> ncs;
+
+    public HyracksCluster() {
+        ncs = new ArrayList<NodeController>();
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setClusterController(ClusterController cc) {
+        this.cc = cc;
+    }
+
+    public ClusterController getClusterController() {
+        return cc;
+    }
+
+    public void addNodeController(NodeController nc) {
+        ncs.add(nc);
+    }
+
+    public List<NodeController> getNodeControllers() {
+        return ncs;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ManifestParser.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ManifestParser.java
new file mode 100644
index 0000000..bc5ebae
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/ManifestParser.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+import java.io.StringReader;
+
+import org.apache.commons.digester.Digester;
+
+public class ManifestParser {
+    public static HyracksCluster parse(String mXML) throws Exception {
+        Digester d = createDigester();
+        return (HyracksCluster) d.parse(new StringReader(mXML));
+    }
+
+    private static Digester createDigester() {
+        Digester d = new Digester();
+        d.setValidating(false);
+
+        d.addObjectCreate("hyracks-cluster", HyracksCluster.class);
+        d.addSetProperties("hyracks-cluster");
+
+        d.addObjectCreate("hyracks-cluster/cluster-controller", ClusterController.class);
+        d.addSetProperties("hyracks-cluster/cluster-controller");
+        d.addSetNext("hyracks-cluster/cluster-controller", "setClusterController");
+
+        d.addObjectCreate("hyracks-cluster/node-controllers/node-controller", NodeController.class);
+        d.addSetProperties("hyracks-cluster/node-controllers/node-controller");
+        d.addSetNext("hyracks-cluster/node-controllers/node-controller", "addNodeController");
+
+        d.addObjectCreate("*/container-specification", ContainerSpecification.class);
+        d.addSetProperties("*/container-specification");
+        d.addSetNext("*/container-specification", "setContainerSpecification");
+        return d;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/NodeController.java b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/NodeController.java
new file mode 100644
index 0000000..f46eb7f
--- /dev/null
+++ b/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/manifest/NodeController.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.yarn.am.manifest;
+
+public class NodeController extends AbstractProcess {
+    private String id;
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+}
\ No newline at end of file