Modified Yarn driver to accept extra jar files

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@2786 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java b/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
index a9c4520..e56285a 100644
--- a/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
+++ b/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
@@ -23,14 +23,18 @@
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -38,6 +42,7 @@
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.kohsuke.args4j.CmdLineParser;
 
@@ -59,6 +64,10 @@
 
     private final Map<AbstractProcess, AskRecord> proc2AskMap;
 
+    private final AtomicInteger lastResponseId;
+
+    private final ApplicationAttemptId appAttemptId;
+
     private YarnConfiguration config;
 
     private AMRMConnection amrmc;
@@ -73,6 +82,11 @@
         asks = new ArrayList<ResourceRequest>();
         resource2AskMap = new HashMap<Resource, Set<AskRecord>>();
         proc2AskMap = new HashMap<AbstractProcess, AskRecord>();
+        lastResponseId = new AtomicInteger();
+
+        String containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+        ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+        appAttemptId = containerId.getApplicationAttemptId();
     }
 
     private void run() throws Exception {
@@ -103,7 +117,7 @@
         rsrcRequest.setHostName(cSpec.getHostname());
 
         Priority pri = Records.newRecord(Priority.class);
-        pri.setPriority(100);
+        pri.setPriority(0);
         rsrcRequest.setPriority(pri);
 
         Resource capability = Records.newRecord(Resource.class);
@@ -124,7 +138,8 @@
         arSet.add(ar);
         proc2AskMap.put(proc, ar);
 
-        System.err.println(proc + " -> " + rsrcRequest);
+        System.err.println(proc + " -> [" + rsrcRequest.getHostName() + ", " + rsrcRequest.getNumContainers() + ", "
+                + rsrcRequest.getPriority() + ", " + rsrcRequest.getCapability().getMemory() + "]");
 
         asks.add(rsrcRequest);
     }
@@ -162,11 +177,14 @@
 
     private synchronized void populateAllocateRequest(AllocateRequest hb) {
         hb.addAllAsks(asks);
-        asks.clear();
+        hb.addAllReleases(new ArrayList<ContainerId>());
+        hb.setResponseId(lastResponseId.incrementAndGet());
+        hb.setApplicationAttemptId(appAttemptId);
     }
 
     private synchronized void processAllocation(List<Container> allocatedContainers,
             List<ContainerStatus> completedContainers) {
+        System.err.println(allocatedContainers);
         for (Container c : allocatedContainers) {
             System.err.println("Got container: " + c.getContainerStatus());
             NodeId nodeId = c.getNodeId();
diff --git a/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java b/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java
index e12891e..0eb930e 100644
--- a/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java
+++ b/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java
@@ -53,7 +53,6 @@
 
         File amZipFile = new File(System.getProperty("basedir") + "/hyracks-yarn-am/hyracks-yarn-am.zip");
         localResources.put("archive", LocalResourceHelper.createArchiveResource(conf, amZipFile));
-        localResources.put("manifest.xml", LocalResourceHelper.createFileResource(conf, options.hcManifest));
         clCtx.setLocalResources(localResources);
 
         String command = "./archive/bin/hyracks-yarn-am 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
@@ -84,10 +83,19 @@
         @Option(name = "-application-name", required = true, usage = "Application Name")
         String appName;
 
+        @Option(name = "-am-host", required = false, usage = "Application master host name (default: *). Currently has NO effect")
+        String amHostName = "*";
+
         @Option(name = "-am-memory", required = false, usage = "Application Master memory requirements")
         int amMemory = 128;
 
-        @Option(name = "-hyracks-cluster-manifest", required = true, usage = "Hyracks Cluster Manifest file")
-        File hcManifest;
+        @Option(name = "-workers", required = true, usage = "Number of worker containers")
+        int nWorkers;
+
+        @Option(name = "-worker-memory", required = true, usage = "Amount of memory to provide to each worker")
+        int workerMemory;
+
+        @Option(name = "-extra-jars", required = false, usage = "Other jars that need to be added to the classpath")
+        String extraJars = "";
     }
 }
\ No newline at end of file