Merged fullstack_asterix_stabilization -r 2933:3157
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 096f2e1..ce1298e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -18,8 +18,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 3099bbb..627dd55 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -16,7 +16,9 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -47,5 +49,12 @@
public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception;
+
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+
+ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+
public void getNodeControllerInfos() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 2e7b498..6a9747b 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -39,6 +39,9 @@
@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
+ @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+ public String datasetIPAddress;
+
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -55,6 +58,9 @@
@Option(name = "--", handler = StopOptionHandler.class)
public List<String> appArgs;
+ @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
+ public int resultManagerMemory = -1;
+
public void toCommandLine(List<String> cList) {
cList.add("-cc-host");
cList.add(ccHost);
@@ -66,6 +72,7 @@
cList.add(nodeId);
cList.add("-data-ip-address");
cList.add(dataIPAddress);
+ cList.add(datasetIPAddress);
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-net-thread-count");
@@ -82,5 +89,7 @@
cList.add(appArg);
}
}
+ cList.add("-result-manager-memory");
+ cList.add(String.valueOf(resultManagerMemory));
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index 91cfecf..a897602 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -33,6 +33,8 @@
private final NetworkAddress dataPort;
+ private final NetworkAddress datasetPort;
+
private final String osName;
private final String arch;
@@ -60,13 +62,14 @@
private final HeartbeatSchema hbSchema;
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
- String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
- String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
- Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+ NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
+ String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
+ List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
+ this.datasetPort = datasetPort;
this.osName = osName;
this.arch = arch;
this.osVersion = osVersion;
@@ -98,6 +101,10 @@
return dataPort;
}
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
+
public String getOSName() {
return osName;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 1dba3bc..663c68a 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,6 +37,10 @@
public long netPayloadBytesWritten;
public long netSignalingBytesRead;
public long netSignalingBytesWritten;
+ public long datasetNetPayloadBytesRead;
+ public long datasetNetPayloadBytesWritten;
+ public long datasetNetSignalingBytesRead;
+ public long datasetNetSignalingBytesWritten;
public long ipcMessagesSent;
public long ipcMessageBytesSent;
public long ipcMessagesReceived;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 8cc4a54..f6ab9ba 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -67,6 +68,9 @@
REPORT_PROFILE,
REGISTER_PARTITION_PROVIDER,
REGISTER_PARTITION_REQUEST,
+ REGISTER_RESULT_PARTITION_LOCATION,
+ REPORT_RESULT_PARTITION_WRITE_COMPLETION,
+ REPORT_RESULT_PARTITION_FAILURE,
NODE_REGISTRATION_RESULT,
START_TASKS,
@@ -427,6 +431,127 @@
}
}
+ public static class RegisterResultPartitionLocationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final boolean orderedResult;
+
+ private final int partition;
+
+ private final int nPartitions;
+
+ private NetworkAddress networkAddress;
+
+ public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions, NetworkAddress networkAddress) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.orderedResult = orderedResult;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_RESULT_PARTITION_LOCATION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public boolean getOrderedResult() {
+ return orderedResult;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public int getNPartitions() {
+ return nPartitions;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+ }
+
+ public static class ReportResultPartitionWriteCompletionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionWriteCompletionFunction(JobId jobId, ResultSetId rsId, int partition) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+ }
+
+ public static class ReportResultPartitionFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+ }
+
public static class NodeRegistrationResult extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 5fd2302..057a0f4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -16,7 +16,9 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
@@ -99,6 +101,27 @@
ipcHandle.send(-1, fn, null);
}
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception {
+ CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
+ jobId, rsId, orderedResult, partition, nPartitions, networkAddress);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(
+ jobId, rsId, partition);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
+ CCNCFunctions.ReportResultPartitionFailureFunction fn = new CCNCFunctions.ReportResultPartitionFailureFunction(
+ jobId, rsId, partition);
+ ipcHandle.send(-1, fn, null);
+ }
+
@Override
public void getNodeControllerInfos() throws Exception {
ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);