Modified Yarn driver to accept extra jar files
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@2786 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java b/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
index a9c4520..e56285a 100644
--- a/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
+++ b/hyracks/hyracks-yarn/hyracks-yarn-am/src/main/java/edu/uci/ics/hyracks/yarn/am/HyracksYarnApplicationMaster.java
@@ -23,14 +23,18 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -38,6 +42,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.kohsuke.args4j.CmdLineParser;
@@ -59,6 +64,10 @@
private final Map<AbstractProcess, AskRecord> proc2AskMap;
+ private final AtomicInteger lastResponseId;
+
+ private final ApplicationAttemptId appAttemptId;
+
private YarnConfiguration config;
private AMRMConnection amrmc;
@@ -73,6 +82,11 @@
asks = new ArrayList<ResourceRequest>();
resource2AskMap = new HashMap<Resource, Set<AskRecord>>();
proc2AskMap = new HashMap<AbstractProcess, AskRecord>();
+ lastResponseId = new AtomicInteger();
+
+ String containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ appAttemptId = containerId.getApplicationAttemptId();
}
private void run() throws Exception {
@@ -103,7 +117,7 @@
rsrcRequest.setHostName(cSpec.getHostname());
Priority pri = Records.newRecord(Priority.class);
- pri.setPriority(100);
+ pri.setPriority(0);
rsrcRequest.setPriority(pri);
Resource capability = Records.newRecord(Resource.class);
@@ -124,7 +138,8 @@
arSet.add(ar);
proc2AskMap.put(proc, ar);
- System.err.println(proc + " -> " + rsrcRequest);
+ System.err.println(proc + " -> [" + rsrcRequest.getHostName() + ", " + rsrcRequest.getNumContainers() + ", "
+ + rsrcRequest.getPriority() + ", " + rsrcRequest.getCapability().getMemory() + "]");
asks.add(rsrcRequest);
}
@@ -162,11 +177,14 @@
private synchronized void populateAllocateRequest(AllocateRequest hb) {
hb.addAllAsks(asks);
- asks.clear();
+ hb.addAllReleases(new ArrayList<ContainerId>());
+ hb.setResponseId(lastResponseId.incrementAndGet());
+ hb.setApplicationAttemptId(appAttemptId);
}
private synchronized void processAllocation(List<Container> allocatedContainers,
List<ContainerStatus> completedContainers) {
+ System.err.println(allocatedContainers);
for (Container c : allocatedContainers) {
System.err.println("Got container: " + c.getContainerStatus());
NodeId nodeId = c.getNodeId();
diff --git a/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java b/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java
index e12891e..0eb930e 100644
--- a/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java
+++ b/hyracks/hyracks-yarn/hyracks-yarn-client/src/main/java/edu/uci/ics/hyracks/yarn/client/LaunchHyracksApplication.java
@@ -53,7 +53,6 @@
File amZipFile = new File(System.getProperty("basedir") + "/hyracks-yarn-am/hyracks-yarn-am.zip");
localResources.put("archive", LocalResourceHelper.createArchiveResource(conf, amZipFile));
- localResources.put("manifest.xml", LocalResourceHelper.createFileResource(conf, options.hcManifest));
clCtx.setLocalResources(localResources);
String command = "./archive/bin/hyracks-yarn-am 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
@@ -84,10 +83,19 @@
@Option(name = "-application-name", required = true, usage = "Application Name")
String appName;
+ @Option(name = "-am-host", required = false, usage = "Application master host name (default: *). Currently has NO effect")
+ String amHostName = "*";
+
@Option(name = "-am-memory", required = false, usage = "Application Master memory requirements")
int amMemory = 128;
- @Option(name = "-hyracks-cluster-manifest", required = true, usage = "Hyracks Cluster Manifest file")
- File hcManifest;
+ @Option(name = "-workers", required = true, usage = "Number of worker containers")
+ int nWorkers;
+
+ @Option(name = "-worker-memory", required = true, usage = "Amount of memory to provide to each worker")
+ int workerMemory;
+
+ @Option(name = "-extra-jars", required = false, usage = "Other jars that need to be added to the classpath")
+ String extraJars = "";
}
}
\ No newline at end of file