adapt pregelix to the ioc changes
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3379 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 8f1c401..36ceb18 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -91,13 +91,6 @@
}
@Override
- public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception {
- HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
- binaryURLs, null);
- return (DeploymentId) rpci.call(ipcHandle, dbf);
- }
-
- @Override
public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception {
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
binaryURLs, deploymentId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 93bb88f..2f02780 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -21,6 +21,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@@ -30,6 +31,7 @@
import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -120,12 +122,17 @@
}
@Override
- public void deployBinary(List<String> jars) throws Exception {
+ public DeploymentId deployBinary(List<String> jars) throws Exception {
+ DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
List<URL> binaryURLs = new ArrayList<URL>();
if (jars != null && jars.size() > 0) {
HttpClient hc = new DefaultHttpClient();
for (String jar : jars) {
- String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + jar;
+ int slashIndex = jar.lastIndexOf('/');
+ String fileName = jar.substring(slashIndex + 1);
+ String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/"
+ + deploymentId.toString() + "&" + fileName;
+ System.out.print("put URL: " + url);
HttpPut put = new HttpPut(url);
put.setEntity(new FileEntity(new File(jar), "application/octet-stream"));
HttpResponse response = hc.execute(put);
@@ -135,6 +142,31 @@
binaryURLs.add(new URL(url));
}
}
- hci.deployBinary(binaryURLs);
+ hci.deployBinary(binaryURLs, deploymentId);
+ return deploymentId;
+ }
+
+ @Override
+ public void unDeployBinary(DeploymentId deploymentId) throws Exception {
+ hci.unDeployBinary(deploymentId);
+ }
+
+ @Override
+ public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception {
+ return startJob(deploymentId, jobSpec, EnumSet.noneOf(JobFlag.class));
+ }
+
+ @Override
+ public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+ throws Exception {
+ JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
+ jobSpec);
+ return startJob(deploymentId, jsacggf, jobFlags);
+ }
+
+ @Override
+ public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
+ EnumSet<JobFlag> jobFlags) throws Exception {
+ return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 6e4dc73..41b07d7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -19,6 +19,7 @@
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -118,5 +119,53 @@
* @param jars
* a list of user-defined jars
*/
- public void deployBinary(List<String> jars) throws Exception;
+ public DeploymentId deployBinary(List<String> jars) throws Exception;
+
+ /**
+ * undeploy a certain deployment
+ *
+ * @param jars
+ * a list of user-defined jars
+ */
+ public void unDeployBinary(DeploymentId deploymentId) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param deploymentId
+ * the id of the specific deployment
+ * @param jobSpec
+ * Job Specification
+ * @throws Exception
+ */
+ public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param deploymentId
+ * the id of the specific deployment
+ * @param jobSpec
+ * Job Specification
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+ throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param deploymentId
+ * the id of the specific deployment
+ * @param acggf
+ * Activity Cluster Graph Generator Factory
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
+ EnumSet<JobFlag> jobFlags) throws Exception;
+
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 9c0ba3b..aabc351 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -41,8 +41,6 @@
public ClusterTopology getClusterTopology() throws Exception;
- public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception;
-
public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
public void unDeployBinary(DeploymentId deploymentId) throws Exception;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index fad4300..f1cfee8 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -28,4 +28,6 @@
public ICounterContext getCounterContext();
public Object getGlobalJobData();
+
+ public Class<?> loadClass(String className);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
index d3188d6..1e3f0ad 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -27,6 +27,8 @@
public byte[] serialize(Serializable job) throws HyracksException;
+ public Class<?> loadClass(String className) throws HyracksException;
+
public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
index b7e7920..59571c2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
@@ -47,4 +47,13 @@
throw new UnsupportedOperationException("Not supported by " + this.getClass().getName());
}
+ @Override
+ public Class<?> loadClass(String className) throws HyracksException {
+ try {
+ return this.getClass().getClassLoader().loadClass(className);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
index ed26304..35a1e8b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
@@ -31,9 +31,6 @@
return defaultJobSerDe;
}
IJobSerializerDeserializer jobSerDe = jobSerializerDeserializerMap.get(deploymentId);
- if (jobSerDe == null) {
- return defaultJobSerDe;
- }
return jobSerDe;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
index cb90f4c..a3c8aed 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -68,6 +68,10 @@
}
}
+ public static Class<?> loadClass(String className) throws IOException, ClassNotFoundException {
+ return Class.forName(className);
+ }
+
private static class ClassLoaderObjectInputStream extends ObjectInputStream {
private ClassLoader classLoader;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
new file mode 100755
index 0000000..40a21c9
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/ApplicationInstallationHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.web;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.eclipse.jetty.http.HttpMethods;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class ApplicationInstallationHandler extends AbstractHandler {
+ private ClusterControllerService ccs;
+
+ public ApplicationInstallationHandler(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ try {
+ while (target.startsWith("/")) {
+ target = target.substring(1);
+ }
+ while (target.endsWith("/")) {
+ target = target.substring(0, target.length() - 1);
+ }
+ String[] parts = target.split("/");
+ if (parts.length != 1) {
+ return;
+ }
+ final String[] params = parts[0].split("&");
+ String deployIdString = params[0];
+ String rootDir = ccs.getServerContext().getBaseDir().toString();
+ final String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + "applications/" + deployIdString
+ : rootDir + File.separator + "/applications/" + File.separator + deployIdString;
+ if (HttpMethods.PUT.equals(request.getMethod())) {
+ class OutputStreamGetter extends SynchronizableWork {
+ private OutputStream os;
+
+ @Override
+ protected void doRun() throws Exception {
+ FileUtils.forceMkdir(new File(deploymentDir));
+ String fileName = params[1];
+ File jarFile = new File(deploymentDir, fileName);
+ os = new FileOutputStream(jarFile);
+ System.out.println("client put: " + jarFile.getAbsolutePath());
+ }
+ }
+ OutputStreamGetter r = new OutputStreamGetter();
+ try {
+ ccs.getWorkQueue().scheduleAndSync(r);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ try {
+ IOUtils.copyLarge(request.getInputStream(), r.os);
+ } finally {
+ r.os.close();
+ }
+ } else if (HttpMethods.GET.equals(request.getMethod())) {
+ class InputStreamGetter extends SynchronizableWork {
+ private InputStream is;
+
+ @Override
+ protected void doRun() throws Exception {
+ String fileName = params[1];
+ File jarFile = new File(deploymentDir, fileName);
+ is = new FileInputStream(jarFile);
+ System.out.println("client request: " + jarFile.getAbsolutePath());
+ }
+ }
+ InputStreamGetter r = new InputStreamGetter();
+ try {
+ ccs.getWorkQueue().scheduleAndSync(r);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ if (r.is == null) {
+ response.setStatus(HttpServletResponse.SC_NOT_FOUND);
+ } else {
+ response.setContentType("application/octet-stream");
+ response.setStatus(HttpServletResponse.SC_OK);
+ try {
+ IOUtils.copyLarge(r.is, response.getOutputStream());
+ } finally {
+ r.is.close();
+ }
+ }
+ }
+ baseRequest.setHandled(true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index e39e766..c32cb97 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -70,6 +70,10 @@
addHandler(createAdminConsoleHandler());
addHandler(createStaticResourcesHandler());
+
+ handler = new ContextHandler("/applications");
+ handler.setHandler(new ApplicationInstallationHandler(ccs));
+ addHandler(handler);
}
private Handler createAdminConsoleHandler() {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
index a4f34af..fa44633 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -27,10 +27,10 @@
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.common.work.IPCResponder;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-public class CliDeployBinaryWork extends AbstractWork {
+public class CliDeployBinaryWork extends SynchronizableWork {
private ClusterControllerService ccs;
private List<URL> binaryURLs;
@@ -42,10 +42,11 @@
this.ccs = ncs;
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
+ this.callback = callback;
}
@Override
- public void run() {
+ public void doRun() {
try {
if (deploymentId == null) {
deploymentId = new DeploymentId(UUID.randomUUID().toString());
@@ -54,7 +55,7 @@
* Deploy for the cluster controller
*/
DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getApplicationContext()
- .getJobSerializerDeserializerContainer(), ccs.getServerContext());
+ .getJobSerializerDeserializerContainer(), ccs.getServerContext(), false);
/**
* Deploy for the node controllers
@@ -65,7 +66,7 @@
for (String nc : nodeControllerStateMap.keySet()) {
nodeIds.add(nc);
}
- DeploymentRun dRun = new DeploymentRun(nodeIds);
+ final DeploymentRun dRun = new DeploymentRun(nodeIds);
ccs.addDeploymentRun(deploymentId, dRun);
/***
@@ -75,12 +76,21 @@
ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
}
- /**
- * wait for completion
- */
- dRun.waitForCompletion();
- ccs.removeDeploymentRun(deploymentId);
- callback.setValue(deploymentId);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ /**
+ * wait for completion
+ */
+ dRun.waitForCompletion();
+ ccs.removeDeploymentRun(deploymentId);
+ callback.setValue(deploymentId);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+ });
} catch (Exception e) {
callback.setException(e);
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
index 584ab7c..d7e5df8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -46,7 +46,8 @@
/**
* Deploy for the cluster controller
*/
- DeploymentUtils.undeploy(deploymentId, ccs.getApplicationContext().getJobSerializerDeserializerContainer());
+ DeploymentUtils.undeploy(deploymentId, ccs.getApplicationContext().getJobSerializerDeserializerContainer(),
+ ccs.getServerContext());
/**
* Deploy for the node controllers
@@ -66,7 +67,7 @@
for (NodeControllerState ncs : nodeControllerStateMap.values()) {
ncs.getNodeController().undeployBinary(deploymentId);
}
-
+
/**
* wait for completion
*/
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index bc8c314..b3550bf 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -32,7 +32,7 @@
@Override
protected void performEvent(TaskAttempt ta) {
JobRun run = ccs.getActiveRunMap().get(jobId);
- ccs.getDatasetDirectoryService().reportJobFailure(jobId);
+ //ccs.getDatasetDirectoryService().reportJobFailure(jobId);
ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
run.getScheduler().notifyTaskFailure(ta, ac, details);
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 0e4d366..7a0340a 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -10,7 +10,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeserializer {
@@ -18,12 +17,12 @@
private URLClassLoader classLoader;
@Override
- public JobSpecification deserialize(byte[] jsBytes) throws HyracksException {
+ public Object deserialize(byte[] jsBytes) throws HyracksException {
try {
if (classLoader == null) {
- return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
+ return JavaSerializationUtils.deserialize(jsBytes);
}
- return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes, classLoader);
+ return JavaSerializationUtils.deserialize(jsBytes, classLoader);
} catch (Exception e) {
throw new HyracksException(e);
}
@@ -53,6 +52,9 @@
if (classLoader == null) {
/** crate a new classloader */
URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
+ for (URL url : urls) {
+ System.out.println("Class loader url " + url);
+ }
classLoader = new URLClassLoader(urls, this.getClass().getClassLoader());
} else {
/** add URLs to the existing classloader */
@@ -65,4 +67,13 @@
throw new HyracksException(e);
}
}
+
+ @Override
+ public Class<?> loadClass(String className) throws HyracksException {
+ try {
+ return classLoader.loadClass(className);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index ab84065..e95b85f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -1,11 +1,19 @@
package edu.uci.ics.hyracks.control.common.deployment;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -17,24 +25,35 @@
public class DeploymentUtils {
- private static final String DEPLOYMENT = "deployment";
+ private static final String DEPLOYMENT = "applications";
- public static void undeploy(DeploymentId deploymentId, IJobSerializerDeserializerContainer container)
- throws HyracksException {
+ public static void undeploy(DeploymentId deploymentId, IJobSerializerDeserializerContainer container,
+ ServerContext ctx) throws HyracksException {
container.removeJobSerializerDeserializer(deploymentId);
+ String rootDir = ctx.getBaseDir().toString();
+ String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId
+ : rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId;
+ try {
+ File dFile = new File(deploymentDir);
+ if (dFile.exists()) {
+ FileUtils.forceDelete(dFile);
+ }
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
}
public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
- ServerContext ctx) throws HyracksException {
+ ServerContext ctx, boolean isNC) throws HyracksException {
IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
if (jobSerDe == null) {
jobSerDe = new ClassLoaderJobSerializerDeserializer();
container.addJobSerializerDeserializer(deploymentId, jobSerDe);
}
String rootDir = ctx.getBaseDir().toString();
- String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT : rootDir + File.separator
- + DEPLOYMENT;
- jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir));
+ String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId
+ : rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId;
+ jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC));
}
public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IApplicationContext appCtx)
@@ -50,7 +69,21 @@
}
}
- private static List<URL> downloadURLs(List<URL> urls, String deploymentDir) throws HyracksException {
+ public static Class<?> loadClass(String className, DeploymentId deploymentId, IApplicationContext appCtx)
+ throws HyracksException {
+ try {
+ IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+ IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
+ .getJobSerializerDeerializer(deploymentId);
+ Class<?> cl = jobSerDe == null ? JavaSerializationUtils.loadClass(className) : jobSerDe
+ .loadClass(className);
+ return cl;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
try {
List<URL> downloadedFileURLs = new ArrayList<URL>();
File dir = new File(deploymentDir);
@@ -60,10 +93,23 @@
for (URL url : urls) {
String urlString = url.toString();
int slashIndex = urlString.lastIndexOf('/');
- String fileName = urlString.substring(slashIndex + 1);
+ String fileName = urlString.substring(slashIndex + 1).split("&")[1];
String filePath = deploymentDir + File.separator + fileName;
File targetFile = new File(filePath);
- FileUtils.copyURLToFile(url, targetFile);
+ if (isNC) {
+ HttpClient hc = new DefaultHttpClient();
+ System.out.println(url.toString());
+ HttpGet get = new HttpGet(url.toString());
+ HttpResponse response = hc.execute(get);
+ InputStream is = response.getEntity().getContent();
+ OutputStream os = new FileOutputStream(targetFile);
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
downloadedFileURLs.add(targetFile.toURI().toURL());
}
return downloadedFileURLs;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 3855b4d..8ac17d9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -43,6 +44,7 @@
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
@@ -58,6 +60,8 @@
private final INCApplicationContext appCtx;
+ private final DeploymentId deploymentId;
+
private final JobId jobId;
private final ActivityClusterGraph acg;
@@ -86,10 +90,11 @@
private boolean cleanupPending;
- public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx,
- ActivityClusterGraph acg) {
+ public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId,
+ INCApplicationContext appCtx, ActivityClusterGraph acg) {
this.nodeController = nodeController;
this.appCtx = appCtx;
+ this.deploymentId = deploymentId;
this.jobId = jobId;
this.frameSize = acg.getFrameSize();
this.acg = acg;
@@ -283,4 +288,13 @@
e.printStackTrace();
}
}
+
+ @Override
+ public Class<?> loadClass(String className) {
+ try {
+ return DeploymentUtils.loadClass(className, deploymentId, appCtx);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
index eecf67f..4f613a1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
@@ -26,7 +26,7 @@
public void run() {
try {
DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getApplicationContext()
- .getJobSerializerDeserializerContainer(), ncs.getServerContext());
+ .getJobSerializerDeserializerContainer(), ncs.getServerContext(), true);
IClusterController ccs = ncs.getClusterController();
ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
} catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 939276a..2c6d38e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -89,7 +89,7 @@
public void run() {
try {
NCApplicationContext appCtx = ncs.getApplicationContext();
- final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
+ final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes == null ? null
: (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx));
final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
@@ -168,15 +168,15 @@
}
}
- private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, ActivityClusterGraph acg)
- throws Exception {
+ private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx,
+ ActivityClusterGraph acg) throws Exception {
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
if (acg == null) {
throw new NullPointerException("JobActivityGraph was null");
}
- ji = new Joblet(ncs, jobId, appCtx, acg);
+ ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
}
return ji;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
index 38a764b..bd77fb5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
@@ -20,7 +20,8 @@
@Override
public void run() {
try {
- DeploymentUtils.undeploy(deploymentId, ncs.getApplicationContext().getJobSerializerDeserializerContainer());
+ DeploymentUtils.undeploy(deploymentId, ncs.getApplicationContext().getJobSerializerDeserializerContainer(),
+ ncs.getServerContext());
IClusterController ccs = ncs.getClusterController();
ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
} catch (Exception e) {
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index 7612db9..e4d94ff 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -92,4 +92,13 @@
public Object getGlobalJobData() {
return null;
}
+
+ @Override
+ public Class<?> loadClass(String className) {
+ try {
+ return Class.forName(className);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 72256f9..d1a665c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -22,9 +22,6 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +30,7 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -45,7 +43,6 @@
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.util.Utilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@SuppressWarnings("rawtypes")
@@ -54,9 +51,7 @@
private JobGen jobGen;
private boolean profiling;
- private String applicationName;
private IHyracksClientConnection hcc;
-
private Class exampleClass;
public Driver(Class exampleClass) {
@@ -71,7 +66,6 @@
@Override
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
- applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
try {
/** add hadoop configurations */
URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
@@ -121,13 +115,13 @@
for (URL url : urls)
if (url.toString().endsWith(".jar"))
jars.add(new File(url.getPath()));
- installApplication(jars);
+ DeploymentId deploymentId = installApplication(jars);
start = System.currentTimeMillis();
FileSystem dfs = FileSystem.get(job.getConfiguration());
dfs.delete(FileOutputFormat.getOutputPath(job), true);
- runCreate(jobGen);
- runDataLoad(jobGen);
+ runCreate(deploymentId, jobGen);
+ runDataLoad(deploymentId, jobGen);
end = System.currentTimeMillis();
time = end - start;
LOG.info("data loading finished " + time + "ms");
@@ -135,7 +129,7 @@
boolean terminate = false;
do {
start = System.currentTimeMillis();
- runLoopBodyIteration(jobGen, i);
+ runLoopBodyIteration(deploymentId, jobGen, i);
end = System.currentTimeMillis();
time = end - start;
LOG.info("iteration " + i + " finished " + time + "ms");
@@ -145,8 +139,8 @@
} while (!terminate);
start = System.currentTimeMillis();
- runHDFSWRite(jobGen);
- runCleanup(jobGen);
+ runHDFSWRite(deploymentId, jobGen);
+ runCleanup(deploymentId, jobGen);
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
@@ -156,79 +150,74 @@
}
}
- private void runCreate(JobGen jobGen) throws Exception {
+ private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
- execute(treeCreateSpec);
+ execute(deploymentId, treeCreateSpec);
} catch (Exception e) {
throw e;
}
}
- private void runDataLoad(JobGen jobGen) throws Exception {
+ private void runDataLoad(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
- execute(bulkLoadJobSpec);
+ execute(deploymentId, bulkLoadJobSpec);
} catch (Exception e) {
throw e;
}
}
- private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+ private void runLoopBodyIteration(DeploymentId deploymentId, JobGen jobGen, int iteration) throws Exception {
try {
JobSpecification loopBody = jobGen.generateJob(iteration);
- execute(loopBody);
+ execute(deploymentId, loopBody);
} catch (Exception e) {
throw e;
}
}
- private void runHDFSWRite(JobGen jobGen) throws Exception {
+ private void runHDFSWRite(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
- execute(scanSortPrintJobSpec);
+ execute(deploymentId, scanSortPrintJobSpec);
} catch (Exception e) {
throw e;
}
}
- private void runCleanup(JobGen jobGen) throws Exception {
+ private void runCleanup(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification[] cleanups = jobGen.generateCleanup();
- runJobArray(cleanups);
+ runJobArray(deploymentId, cleanups);
} catch (Exception e) {
throw e;
}
}
- private void runJobArray(JobSpecification[] jobs) throws Exception {
+ private void runJobArray(DeploymentId deploymentId, JobSpecification[] jobs) throws Exception {
for (JobSpecification job : jobs) {
- execute(job);
+ execute(deploymentId, job);
}
}
- private void execute(JobSpecification job) throws Exception {
+ private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc
- .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc.startJob(deploymentId, job,
+ profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
- public void installApplication(List<File> jars) throws Exception {
- Set<String> allJars = new TreeSet<String>();
+ public DeploymentId installApplication(List<File> jars) throws Exception {
+ List<String> allJars = new ArrayList<String>();
for (File jar : jars) {
allJars.add(jar.getAbsolutePath());
}
long start = System.currentTimeMillis();
- File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+ DeploymentId deploymentId = hcc.deployBinary(allJars);
long end = System.currentTimeMillis();
- LOG.info("jar packing finished " + (end - start) + "ms");
-
- start = System.currentTimeMillis();
- // TODO: Fix this step to use Yarn
- //hcc.createApplication(applicationName, appZip);
- end = System.currentTimeMillis();
LOG.info("jar deployment finished " + (end - start) + "ms");
+ return deploymentId;
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
index d1d927d..145169e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.pregelix.core.runtime.touchpoint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -29,9 +30,9 @@
}
@Override
- public RecordDescriptor createRecordDescriptor() throws HyracksDataException {
+ public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException {
try {
- return DataflowUtils.getRecordDescriptorFromWritableClasses(fieldClasses);
+ return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, fieldClasses);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index bcf3ffc..25ad28a 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -17,6 +17,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -40,9 +41,10 @@
throws HyracksException {
RecordDescriptor recordDescriptor = null;
try {
+ ClassLoader loader = DataflowUtils.class.getClassLoader();
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(className1),
- (Class<? extends Writable>) Class.forName(className2));
+ (Class<? extends Writable>) loader.loadClass(className1),
+ (Class<? extends Writable>) loader.loadClass(className2));
} catch (ClassNotFoundException cnfe) {
throw new HyracksException(cnfe);
}
@@ -53,11 +55,12 @@
public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
RecordDescriptor recordDescriptor = null;
ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+ ClassLoader loader = DataflowUtils.class.getClassLoader();
try {
int i = 0;
for (String className : classNames)
- serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) Class
- .forName(className));
+ serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) loader
+ .loadClass(className));
} catch (ClassNotFoundException cnfe) {
throw new HyracksException(cnfe);
}
@@ -79,4 +82,35 @@
new IAggregateFunctionFactory[] { aggFuncFactory });
return aggregatorFactory;
}
+
+ @SuppressWarnings("unchecked")
+ public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, String className1,
+ String className2) throws HyracksException {
+ RecordDescriptor recordDescriptor = null;
+ try {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) ctx
+ .getJobletContext().loadClass(className1), (Class<? extends Writable>) ctx.getJobletContext()
+ .loadClass(className2));
+ } catch (Exception cnfe) {
+ throw new HyracksException(cnfe);
+ }
+ return recordDescriptor;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, String... classNames)
+ throws HyracksException {
+ RecordDescriptor recordDescriptor = null;
+ ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+ try {
+ int i = 0;
+ for (String className : classNames)
+ serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) ctx
+ .getJobletContext().loadClass(className));
+ } catch (Exception cnfe) {
+ throw new HyracksException(cnfe);
+ }
+ recordDescriptor = new RecordDescriptor(serdes);
+ return recordDescriptor;
+ }
}
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
new file mode 100755
index 0000000..461d76a
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
@@ -0,0 +1,117 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Copyright 2001-2006 The Apache Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ----------------------------------------------------------------------------
+#
+# Copyright (c) 2001-2006 The Apache Software Foundation. All rights
+# reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ Darwin*) darwin=true
+ if [ -z "$JAVA_VERSION" ] ; then
+ JAVA_VERSION="CurrentJDK"
+ else
+ echo "Using Java version: $JAVA_VERSION"
+ fi
+ if [ -z "$JAVA_HOME" ] ; then
+ JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD=`which java`
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." 1>&2
+ echo " We cannot execute $JAVACMD" 1>&2
+ exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+ REPO="$BASEDIR"/lib
+fi
+
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
+
+for f in ${REPO}/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
+ [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+ [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS \
+ -classpath "$CLASSPATH" \
+ -Dapp.name="pregelixcc" \
+ -Dapp.pid="$$" \
+ -Dapp.repo="$REPO" \
+ -Dapp.home="$BASEDIR" \
+ -Dbasedir="$BASEDIR" \
+ edu.uci.ics.hyracks.control.cc.CCDriver \
+ "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
new file mode 100755
index 0000000..4a6ca1f
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
@@ -0,0 +1,118 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Copyright 2001-2006 The Apache Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ----------------------------------------------------------------------------
+#
+# Copyright (c) 2001-2006 The Apache Software Foundation. All rights
+# reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ Darwin*) darwin=true
+ if [ -z "$JAVA_VERSION" ] ; then
+ JAVA_VERSION="CurrentJDK"
+ else
+ echo "Using Java version: $JAVA_VERSION"
+ fi
+ if [ -z "$JAVA_HOME" ] ; then
+ JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD=`which java`
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." 1>&2
+ echo " We cannot execute $JAVACMD" 1>&2
+ exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+ REPO="$BASEDIR"/lib
+fi
+
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
+
+for f in ${REPO}/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`2
+ [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+ [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS \
+ -classpath "$CLASSPATH" \
+ -Dapp.name="pregelixnc" \
+ -Dapp.pid="$$" \
+ -Dapp.repo="$REPO" \
+ -Dapp.home="$BASEDIR" \
+ -Dbasedir="$BASEDIR" \
+ edu.uci.ics.hyracks.control.nc.NCDriver \
+ -app-nc-main-class edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index 133b604..74f0987 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -20,12 +20,15 @@
export JAVA_HOME=$JAVA_HOME
export JAVA_OPTS=$CCJAVA_OPTS
+PREGELIX_HOME=`pwd`
-chmod -R 755 $HYRACKS_HOME
+#Enter the temp dir
+cd $CCTMP_DIR
+
if [ -f "conf/topology.xml" ]; then
#Launch hyracks cc script with topology
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
else
#Launch hyracks cc script without toplogy
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
fi
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index b059aad..3ab1b25 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -39,11 +39,10 @@
#Set JAVA_OPTS
export JAVA_OPTS=$NCJAVA_OPTS
-cd $HYRACKS_HOME
-HYRACKS_HOME=`pwd`
+PREGELIX_HOME=`pwd`
#Enter the temp dir
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+${PREGELIX_HOME}/bin/pregelixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
index c2f525a..84f369f 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
@@ -2,7 +2,18 @@
. conf/cluster.properties
#Kill process
-PID=`ps -ef|grep ${USER}|grep java|grep hyracks|awk '{print $2}'`
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixcc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
+ USERID=`id | sed 's/^uid=//;s/(.*$//'`
+ PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixcc'|awk '{print $2}'`
+fi
+
echo $PID
kill -9 $PID
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 35c4794..31de984 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -2,7 +2,7 @@
. conf/cluster.properties
#Kill process
-PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
@@ -10,7 +10,7 @@
if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
- PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+ PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
fi
echo $PID
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
index e7de650..7454f2d 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
@@ -16,11 +16,12 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IRecordDescriptorFactory extends Serializable {
- public RecordDescriptor createRecordDescriptor() throws HyracksDataException;
+ public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException;
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index 4cbd6c4..24d28b0 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -66,7 +66,7 @@
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
+ : inputRdFactory.createRecordDescriptor(ctx);
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 99bca1a..f75dab2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -56,7 +56,7 @@
* @throws HyracksDataException
*/
public void functionOpen() throws HyracksDataException {
- inputRd = inputRdFactory.createRecordDescriptor();
+ inputRd = inputRdFactory.createRecordDescriptor(ctx);
tupleDe = new TupleDeserializer(inputRd);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index eda7754..62aeeea 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -60,7 +60,7 @@
@SuppressWarnings("rawtypes")
private GlobalAggregator aggregator = BspUtils.createGlobalAggregator(conf);
private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
- inputRdFactory.createRecordDescriptor());
+ inputRdFactory.createRecordDescriptor(ctx));
private ByteBufferInputStream inputStream = new ByteBufferInputStream();
private DataInput input = new DataInputStream(inputStream);
private Writable partialAggregateValue = BspUtils.createFinalAggregateValue(conf);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 2402748..8bc91c2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -75,7 +75,7 @@
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
+ : inputRdFactory.createRecordDescriptor(ctx);
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
index d7cbb3a..5da0239 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
@@ -64,7 +64,7 @@
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
+ : inputRdFactory.createRecordDescriptor(ctx);
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
try {
outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]