Merged fullstack_asterix_stabilization -r 2933:3157
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index c7eedb3..d644673 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -15,8 +15,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 137cf05..a8055c9 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -35,26 +35,37 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
+import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
+import edu.uci.ics.hyracks.control.cc.work.GetResultStatusWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
+import edu.uci.ics.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import edu.uci.ics.hyracks.control.cc.work.RemoveDeadNodesWork;
import edu.uci.ics.hyracks.control.cc.work.ReportProfilesWork;
+import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionFailureWork;
+import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
import edu.uci.ics.hyracks.control.cc.work.TaskFailureWork;
import edu.uci.ics.hyracks.control.cc.work.UnregisterNodeWork;
@@ -111,6 +122,8 @@
private final DeadNodeSweeper sweeper;
+ private final IDatasetDirectoryService datasetDirectoryService;
+
private long jobCounter;
public ClusterControllerService(final CCConfig ccConfig) throws Exception {
@@ -157,6 +170,7 @@
}
};
sweeper = new DeadNodeSweeper();
+ datasetDirectoryService = new DatasetDirectoryService();
jobCounter = 0;
}
@@ -272,6 +286,10 @@
return clusterIPC;
}
+ public NetworkAddress getDatasetDirectoryServiceInfo() {
+ return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
+ }
+
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
@@ -279,6 +297,10 @@
}
}
+ public IDatasetDirectoryService getDatasetDirectoryService() {
+ return datasetDirectoryService;
+ }
+
private class HyracksClientInterfaceIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
@@ -308,6 +330,27 @@
return;
}
+ case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
+ workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
+ new IPCResponder<NetworkAddress>(handle, mid)));
+ return;
+ }
+
+ case GET_DATASET_RESULT_STATUS: {
+ HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+ workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
+ .getResultSetId(), new IPCResponder<Status>(handle, mid)));
+ return;
+ }
+
+ case GET_DATASET_RESULT_LOCATIONS: {
+ HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+ workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
+ .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+ new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
+ return;
+ }
+
case WAIT_FOR_COMPLETION: {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
@@ -403,6 +446,28 @@
return;
}
+ case REGISTER_RESULT_PARTITION_LOCATION: {
+ CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+ workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
+ .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
+ .getNPartitions(), rrplf.getNetworkAddress()));
+ return;
+ }
+
+ case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+ workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
+ rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
+ return;
+ }
+
+ case REPORT_RESULT_PARTITION_FAILURE: {
+ CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
+ workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf
+ .getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
+ return;
+ }
+
case SEND_APPLICATION_MESSAGE: {
CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index c17acd0..c96a319 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -41,6 +41,8 @@
private final NetworkAddress dataPort;
+ private final NetworkAddress datasetPort;
+
private final Set<JobId> activeJobIds;
private final String osName;
@@ -107,6 +109,14 @@
private final long[] netSignalingBytesWritten;
+ private final long[] datasetNetPayloadBytesRead;
+
+ private final long[] datasetNetPayloadBytesWritten;
+
+ private final long[] datasetNetSignalingBytesRead;
+
+ private final long[] datasetNetSignalingBytesWritten;
+
private final long[] ipcMessagesSent;
private final long[] ipcMessageBytesSent;
@@ -123,6 +133,7 @@
this.nodeController = nodeController;
ncConfig = reg.getNCConfig();
dataPort = reg.getDataPort();
+ datasetPort = reg.getDatasetPort();
activeJobIds = new HashSet<JobId>();
osName = reg.getOSName();
@@ -164,6 +175,10 @@
netPayloadBytesWritten = new long[RRD_SIZE];
netSignalingBytesRead = new long[RRD_SIZE];
netSignalingBytesWritten = new long[RRD_SIZE];
+ datasetNetPayloadBytesRead = new long[RRD_SIZE];
+ datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+ datasetNetSignalingBytesRead = new long[RRD_SIZE];
+ datasetNetSignalingBytesWritten = new long[RRD_SIZE];
ipcMessagesSent = new long[RRD_SIZE];
ipcMessageBytesSent = new long[RRD_SIZE];
ipcMessagesReceived = new long[RRD_SIZE];
@@ -196,6 +211,10 @@
netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+ datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+ datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+ datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+ datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
@@ -227,6 +246,10 @@
return dataPort;
}
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
+
public JSONObject toSummaryJSON() throws JSONException {
JSONObject o = new JSONObject();
o.put("node-id", ncConfig.nodeId);
@@ -271,6 +294,10 @@
o.put("net-payload-bytes-written", netPayloadBytesWritten);
o.put("net-signaling-bytes-read", netSignalingBytesRead);
o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+ o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+ o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
+ o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+ o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
o.put("ipc-messages-sent", ipcMessagesSent);
o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
o.put("ipc-messages-received", ipcMessagesReceived);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
new file mode 100644
index 0000000..13d0c30
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.dataset;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+/**
+ * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
+ * location information is never evicted from the memory and the memory usage grows as the number of jobs in the system
+ * grows. What we should possibly do is, add an API call for the client to say that it received everything it has to for
+ * the job (after it receives all the results) completely. Then we can just get rid of the location information for that
+ * job.
+ */
+public class DatasetDirectoryService implements IDatasetDirectoryService {
+ private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
+
+ public DatasetDirectoryService() {
+ jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
+ }
+
+ @Override
+ public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions, NetworkAddress networkAddress) {
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
+ if (rsMap == null) {
+ rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
+ jobResultLocationsMap.put(jobId, rsMap);
+ }
+
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null) {
+ resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
+ rsMap.put(rsId, resultSetMetaData);
+ }
+
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ if (records[partition] == null) {
+ records[partition] = new DatasetDirectoryRecord();
+ }
+ records[partition].setNetworkAddress(networkAddress);
+ records[partition].start();
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
+ DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
+ ddr.writeEOS();
+ }
+
+ @Override
+ public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
+ DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
+ ddr.fail();
+ }
+
+ @Override
+ public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
+ Map<ResultSetId, ResultSetMetaData> rsMap;
+ while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+ throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+ }
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+
+ ArrayList<Status> statuses = new ArrayList<Status>(records.length);
+ for (int i = 0; i < records.length; i++) {
+ statuses.add(records[i].getStatus());
+ }
+
+ // Default status is idle
+ Status status = Status.IDLE;
+ if (statuses.contains(Status.FAILED)) {
+ // Even if there is at least one failed entry we should return failed status.
+ return Status.FAILED;
+ } else if (statuses.contains(Status.RUNNING)) {
+ // If there are not failed entry and if there is at least one running entry we should return running status.
+ return Status.RUNNING;
+ } else {
+ // If each and every partition has reported success do we report success as the status.
+ int successCount = 0;
+ for (int i = 0; i < statuses.size(); i++) {
+ if (statuses.get(i) == Status.SUCCESS) {
+ successCount++;
+ }
+ }
+ if (successCount == statuses.size()) {
+ return Status.SUCCESS;
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
+ DatasetDirectoryRecord[] newRecords;
+ while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return newRecords;
+ }
+
+ public DatasetDirectoryRecord getDatasetDirectoryRecord(JobId jobId, ResultSetId rsId, int partition) {
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ return records[partition];
+ }
+
+ /**
+ * Compares the records already known by the client for the given job's result set id with the records that the
+ * dataset directory service knows and if there are any newly discovered records returns a whole array with the
+ * new records filled in.
+ * This method has a very convoluted logic. Here is the explanation of how it works.
+ * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
+ * the order of the partitions. It always traverses the array in the first to last order!
+ * If known records array or the first element in that array is null in the but the record for that partition now
+ * known to the directory service, the method fills in that record in the array and returns the array back.
+ * However, if the first known null record is not a first element in the array, by induction, all the previous
+ * known records should be known already be known to client and none of the records for the partitions ahead is
+ * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
+ * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
+ * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
+ * If the client has reached the end of stream record for the last known non-null record, we check if the next record
+ * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
+ * send null otherwise.
+ * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
+ * arrays are equal and if they are not we send the entire new updated array.
+ *
+ * @param jobId
+ * - Id of the job for which the directory records should be retrieved.
+ * @param rsId
+ * - Id of the result set for which the directory records should be retrieved.
+ * @param knownRecords
+ * - An array of directory records that the client is already aware of.
+ * @return
+ * - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
+ * @throws HyracksDataException
+ * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
+ * every check. This already looks very expensive.
+ */
+ private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
+ throws HyracksDataException {
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
+ if (rsMap == null) {
+ return null;
+ }
+
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+ throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+ }
+
+ boolean ordered = resultSetMetaData.getOrderedResult();
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
+ * we can simply check if there are any newly discovered records and send the whole array back if there are.
+ */
+ if (ordered) {
+ // Iterate over the known records and find the last record which is not null.
+ int i = 0;
+ for (i = 0; i < records.length; i++) {
+ if (knownRecords == null) {
+ if (records[0] != null) {
+ knownRecords = new DatasetDirectoryRecord[records.length];
+ knownRecords[0] = records[0];
+ return knownRecords;
+ }
+ return null;
+ }
+ if (knownRecords[i] == null) {
+ if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) {
+ knownRecords[i] = records[i];
+ return knownRecords;
+ }
+ return null;
+ }
+ }
+ } else {
+ if (!Arrays.equals(records, knownRecords)) {
+ return records;
+ }
+ }
+ return null;
+ }
+
+ private class ResultSetMetaData {
+ private final boolean ordered;
+
+ private final DatasetDirectoryRecord[] records;
+
+ public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
+ this.ordered = ordered;
+ this.records = records;
+ }
+
+ public boolean getOrderedResult() {
+ return ordered;
+ }
+
+ public DatasetDirectoryRecord[] getRecords() {
+ return records;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
new file mode 100644
index 0000000..3ac6acc
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetDatasetDirectoryServiceInfoWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final IResultCallback<NetworkAddress> callback;
+
+ public GetDatasetDirectoryServiceInfoWork(ClusterControllerService ccs, IResultCallback<NetworkAddress> callback) {
+ this.ccs = ccs;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ try {
+ NetworkAddress addr = ccs.getDatasetDirectoryServiceInfo();
+ callback.setValue(addr);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 2f23a2c..a787b9f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,7 +39,8 @@
Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
- result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort()));
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
+ .getValue().getDatasetPort()));
}
callback.setValue(result);
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
new file mode 100644
index 0000000..fd1d418
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetResultPartitionLocationsWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final DatasetDirectoryRecord[] knownRecords;
+
+ private final IResultCallback<DatasetDirectoryRecord[]> callback;
+
+ public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.knownRecords = knownRecords;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
+ knownRecords);
+ callback.setValue(partitionLocations);
+ } catch (HyracksDataException e) {
+ callback.setException(e);
+ }
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
new file mode 100644
index 0000000..d2dadf5
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetResultStatusWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final IResultCallback<Status> callback;
+
+ public GetResultStatusWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ IResultCallback<Status> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ try {
+ Status status = ccs.getDatasetDirectoryService().getResultStatus(jobId, rsId);
+ callback.setValue(status);
+ } catch (HyracksDataException e) {
+ callback.setException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
new file mode 100644
index 0000000..f86e924
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class RegisterResultPartitionLocationWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final boolean orderedResult;
+
+ private final int partition;
+
+ private final int nPartitions;
+
+ private final NetworkAddress networkAddress;
+
+ public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.orderedResult = orderedResult;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public void run() {
+ ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+ nPartitions, networkAddress);
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
+ + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
new file mode 100644
index 0000000..4aea41e
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class ReportResultPartitionFailureWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void run() {
+ ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition);
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
new file mode 100644
index 0000000..313b730
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class ReportResultPartitionWriteCompletionWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionWriteCompletionWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ int partition) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void run() {
+ ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index ff9d8a0..3fc46ff 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -58,6 +58,10 @@
var netPayloadBytesWritten = result['net-payload-bytes-written'];
var netSignalingBytesRead = result['net-signaling-bytes-read'];
var netSignalingBytesWritten = result['net-signaling-bytes-written'];
+ var datasetNetPayloadBytesRead = result['dataset-net-payload-bytes-read'];
+ var datasetNetPayloadBytesWritten = result['dataset-net-payload-bytes-written'];
+ var datasetNetSignalingBytesRead = result['dataset-net-signaling-bytes-read'];
+ var datasetNetSignalingBytesWritten = result['dataset-net-signaling-bytes-written'];
var ipcMessagesSent = result['ipc-messages-sent'];
var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
var ipcMessagesReceived = result['ipc-messages-received'];
@@ -117,9 +121,13 @@
}
if (i < sysLoad.length - 1) {
netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
+ netPayloadReadBWArray.push([ i, computeRate(datasetNetPayloadBytesRead, rrdPtr) ]);
netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
+ netPayloadWriteBWArray.push([ i, computeRate(datasetNetPayloadBytesWritten, rrdPtr) ]);
netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
+ netSignalingReadBWArray.push([ i, computeRate(datasetNetSignalingBytesRead, rrdPtr) ]);
netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
+ netSignalingWriteBWArray.push([ i, computeRate(etSignalingBytesWritten, rrdPtr) ]);
ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 096f2e1..ce1298e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -18,8 +18,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 3099bbb..627dd55 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -16,7 +16,9 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -47,5 +49,12 @@
public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception;
+
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+
+ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+
public void getNodeControllerInfos() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 2e7b498..6a9747b 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -39,6 +39,9 @@
@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
+ @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+ public String datasetIPAddress;
+
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -55,6 +58,9 @@
@Option(name = "--", handler = StopOptionHandler.class)
public List<String> appArgs;
+ @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
+ public int resultManagerMemory = -1;
+
public void toCommandLine(List<String> cList) {
cList.add("-cc-host");
cList.add(ccHost);
@@ -66,6 +72,7 @@
cList.add(nodeId);
cList.add("-data-ip-address");
cList.add(dataIPAddress);
+ cList.add(datasetIPAddress);
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-net-thread-count");
@@ -82,5 +89,7 @@
cList.add(appArg);
}
}
+ cList.add("-result-manager-memory");
+ cList.add(String.valueOf(resultManagerMemory));
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index 91cfecf..a897602 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -33,6 +33,8 @@
private final NetworkAddress dataPort;
+ private final NetworkAddress datasetPort;
+
private final String osName;
private final String arch;
@@ -60,13 +62,14 @@
private final HeartbeatSchema hbSchema;
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
- String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
- String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
- Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+ NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
+ String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
+ List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
+ this.datasetPort = datasetPort;
this.osName = osName;
this.arch = arch;
this.osVersion = osVersion;
@@ -98,6 +101,10 @@
return dataPort;
}
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
+
public String getOSName() {
return osName;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 1dba3bc..663c68a 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,6 +37,10 @@
public long netPayloadBytesWritten;
public long netSignalingBytesRead;
public long netSignalingBytesWritten;
+ public long datasetNetPayloadBytesRead;
+ public long datasetNetPayloadBytesWritten;
+ public long datasetNetSignalingBytesRead;
+ public long datasetNetSignalingBytesWritten;
public long ipcMessagesSent;
public long ipcMessageBytesSent;
public long ipcMessagesReceived;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 8cc4a54..f6ab9ba 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -67,6 +68,9 @@
REPORT_PROFILE,
REGISTER_PARTITION_PROVIDER,
REGISTER_PARTITION_REQUEST,
+ REGISTER_RESULT_PARTITION_LOCATION,
+ REPORT_RESULT_PARTITION_WRITE_COMPLETION,
+ REPORT_RESULT_PARTITION_FAILURE,
NODE_REGISTRATION_RESULT,
START_TASKS,
@@ -427,6 +431,127 @@
}
}
+ public static class RegisterResultPartitionLocationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final boolean orderedResult;
+
+ private final int partition;
+
+ private final int nPartitions;
+
+ private NetworkAddress networkAddress;
+
+ public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions, NetworkAddress networkAddress) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.orderedResult = orderedResult;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_RESULT_PARTITION_LOCATION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public boolean getOrderedResult() {
+ return orderedResult;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public int getNPartitions() {
+ return nPartitions;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+ }
+
+ public static class ReportResultPartitionWriteCompletionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionWriteCompletionFunction(JobId jobId, ResultSetId rsId, int partition) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+ }
+
+ public static class ReportResultPartitionFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+ }
+
public static class NodeRegistrationResult extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 5fd2302..057a0f4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -16,7 +16,9 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
@@ -99,6 +101,27 @@
ipcHandle.send(-1, fn, null);
}
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception {
+ CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
+ jobId, rsId, orderedResult, partition, nPartitions, networkAddress);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(
+ jobId, rsId, partition);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
+ CCNCFunctions.ReportResultPartitionFailureFunction fn = new CCNCFunctions.ReportResultPartitionFailureFunction(
+ jobId, rsId, partition);
+ ipcHandle.send(-1, fn, null);
+ }
+
@Override
public void getNodeControllerInfos() throws Exception {
ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index 874cbab..e163d2b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -15,8 +15,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
@@ -34,6 +35,11 @@
<artifactId>hyracks-net</artifactId>
<version>0.2.3-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-comm</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 53a4d73..10e0dba 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -62,7 +63,9 @@
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -93,6 +96,10 @@
private final NetworkManager netManager;
+ private final IDatasetPartitionManager datasetPartitionManager;
+
+ private final DatasetNetworkManager datasetNetworkManager;
+
private final WorkQueue queue;
private final Timer timer;
@@ -141,7 +148,11 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+ netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
+
+ datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory);
+ datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
+ datasetPartitionManager, ncConfig.nNetThreads);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -212,6 +223,7 @@
startApplication();
+ datasetNetworkManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -220,10 +232,11 @@
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
- osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
- runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
- .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
- runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+ datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
+ .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
+ .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
+ .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+ runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -270,8 +283,10 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
executor.shutdownNow();
partitionManager.close();
+ datasetPartitionManager.close();
heartbeatTask.cancel();
netManager.stop();
+ datasetNetworkManager.stop();
queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -292,6 +307,10 @@
return netManager;
}
+ public DatasetNetworkManager getDatasetNetworkManager() {
+ return datasetNetworkManager;
+ }
+
public PartitionManager getPartitionManager() {
return partitionManager;
}
@@ -316,8 +335,7 @@
return queue;
}
- private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
- String ipaddrStr = ncConfig.dataIPAddress;
+ private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
ipaddrStr = ipaddrStr.trim();
Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
Matcher m = pattern.matcher(ipaddrStr);
@@ -374,6 +392,12 @@
hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+ MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
+ hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
+ hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
+ hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
+ hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
+
IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
@@ -468,4 +492,8 @@
public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception {
ccs.sendApplicationMessageToCC(data, nodeId);
}
+
+ public IDatasetPartitionManager getDatasetPartitionManager() {
+ return datasetPartitionManager;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 310878f..d6ea111 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -348,6 +349,11 @@
}
@Override
+ public IDatasetPartitionManager getDatasetPartitionManager() {
+ return ncs.getDatasetPartitionManager();
+ }
+
+ @Override
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
this.ncs.sendApplicationMessageToCC(message, nodeId);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
new file mode 100644
index 0000000..cecd677
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class DatasetMemoryManager {
+ private final Set<Page> availPages;
+
+ private final LeastRecentlyUsedList leastRecentlyUsedList;
+
+ private final Map<ResultSetPartitionId, PartitionNode> resultPartitionNodesMap;
+
+ private final static int FRAME_SIZE = 32768;
+
+ public DatasetMemoryManager(int availableMemory) {
+ availPages = new HashSet<Page>();
+
+ // Atleast have one page for temporarily storing the results.
+ if (availableMemory <= 0)
+ availableMemory = FRAME_SIZE;
+
+ while (availableMemory >= FRAME_SIZE) {
+ /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
+ * instead of direct ByteBuffer.allocate()?
+ */
+ availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+ availableMemory -= FRAME_SIZE;
+ }
+
+ leastRecentlyUsedList = new LeastRecentlyUsedList();
+ resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
+ }
+
+ public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
+ throws OutOfMemoryError, HyracksDataException {
+ Page page;
+ if (availPages.isEmpty()) {
+ page = evictPage();
+ } else {
+ page = getAvailablePage();
+ }
+
+ page.clear();
+
+ /*
+ * It is extremely important to update the reference after obtaining the page because, in the cases where memory
+ * manager is allocated only one page of memory, the front of the LRU list should not be created by the
+ * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
+ * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
+ */
+ PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+ pn.add(page);
+ return page;
+ }
+
+ public void pageReferenced(ResultSetPartitionId resultSetPartitionId) {
+ // When a page is referenced the dataset partition writer should already be known, so we pass null.
+ updateReference(resultSetPartitionId, null);
+ }
+
+ public int getPageSize() {
+ return FRAME_SIZE;
+ }
+
+ protected void insertPartitionNode(ResultSetPartitionId resultSetPartitionId, PartitionNode pn) {
+ leastRecentlyUsedList.add(pn);
+ resultPartitionNodesMap.put(resultSetPartitionId, pn);
+ }
+
+ protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
+ IDatasetPartitionWriter dpw) {
+ PartitionNode pn = null;
+
+ if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
+ if (dpw != null) {
+ pn = new PartitionNode(resultSetPartitionId, dpw);
+ insertPartitionNode(resultSetPartitionId, pn);
+ }
+ return pn;
+ }
+ pn = resultPartitionNodesMap.get(resultSetPartitionId);
+ leastRecentlyUsedList.remove(pn);
+ insertPartitionNode(resultSetPartitionId, pn);
+
+ return pn;
+ }
+
+ protected synchronized Page evictPage() throws HyracksDataException {
+ PartitionNode pn = leastRecentlyUsedList.getFirst();
+ IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
+ Page page = dpw.returnPage();
+
+ /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
+ * away all the pages allocated to it and add to the available pages set.
+ */
+ if (page == null) {
+ availPages.addAll(pn);
+ pn.clear();
+ resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+ leastRecentlyUsedList.remove(pn);
+
+ /* Based on the assumption that if the dataset partition writer returned a null page, it should be lying about
+ * the number of pages it holds in which case we just evict all the pages it holds and should thus be able to
+ * add all those pages to available set and we have at least one page to allocate back.
+ */
+ page = getAvailablePage();
+ } else {
+ pn.remove(page);
+
+ // If the partition no more holds any pages, remove it from the linked list and the hash map.
+ if (pn.isEmpty()) {
+ resultPartitionNodesMap.remove(pn.getResultSetPartitionId());
+ leastRecentlyUsedList.remove(pn);
+ }
+ }
+
+ return page;
+ }
+
+ protected synchronized Page getAvailablePage() {
+ Iterator<Page> iter = availPages.iterator();
+ Page page = iter.next();
+ iter.remove();
+ return page;
+ }
+
+ private class LeastRecentlyUsedList {
+ private PartitionNode head;
+
+ private PartitionNode tail;
+
+ public LeastRecentlyUsedList() {
+ head = null;
+ tail = null;
+ }
+
+ public void add(PartitionNode node) {
+ if (head == null) {
+ head = tail = node;
+ return;
+ }
+ tail.setNext(node);
+ node.setPrev(tail);
+ tail = node;
+ }
+
+ public void remove(PartitionNode node) {
+ if ((node == head) && (node == tail)) {
+ head = tail = null;
+ return;
+ } else if (node == head) {
+ head = head.getNext();
+ head.setPrev(null);
+ return;
+ } else if (node == tail) {
+ tail = tail.getPrev();
+ tail.setNext(null);
+ return;
+ } else {
+ PartitionNode prev = node.getPrev();
+ PartitionNode next = node.getNext();
+ prev.setNext(next);
+ next.setPrev(prev);
+ }
+ }
+
+ public PartitionNode getFirst() {
+ return head;
+ }
+ }
+
+ private class PartitionNode extends HashSet<Page> {
+ private static final long serialVersionUID = 1L;
+
+ private final ResultSetPartitionId resultSetPartitionId;
+
+ private final IDatasetPartitionWriter datasetPartitionWriter;
+
+ private PartitionNode prev;
+
+ private PartitionNode next;
+
+ public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+ this.resultSetPartitionId = resultSetPartitionId;
+ this.datasetPartitionWriter = datasetPartitionWriter;
+ prev = null;
+ next = null;
+ }
+
+ public ResultSetPartitionId getResultSetPartitionId() {
+ return resultSetPartitionId;
+ }
+
+ public IDatasetPartitionWriter getDatasetPartitionWriter() {
+ return datasetPartitionWriter;
+ }
+
+ public void setPrev(PartitionNode node) {
+ prev = node;
+ }
+
+ public PartitionNode getPrev() {
+ return prev;
+ }
+
+ public void setNext(PartitionNode node) {
+ next = node;
+ }
+
+ public PartitionNode getNext() {
+ return next;
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
new file mode 100644
index 0000000..1cad54b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class DatasetPartitionManager implements IDatasetPartitionManager {
+ private final NodeControllerService ncs;
+
+ private final Executor executor;
+
+ private final Map<JobId, ResultState[]> partitionResultStateMap;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ private final DatasetMemoryManager datasetMemoryManager;
+
+ public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory) {
+ this.ncs = ncs;
+ this.executor = executor;
+ partitionResultStateMap = new HashMap<JobId, ResultState[]>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+ datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+ }
+
+ @Override
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions) throws HyracksException {
+ DatasetPartitionWriter dpw = null;
+ JobId jobId = ctx.getJobletContext().getJobId();
+ try {
+ ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+ nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+
+ ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ if (resultStates == null) {
+ resultStates = new ResultState[nPartitions];
+ partitionResultStateMap.put(jobId, resultStates);
+ }
+ resultStates[partition] = dpw.getResultState();
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+
+ return dpw;
+ }
+
+ @Override
+ public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
+ try {
+ ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
+ try {
+ ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
+ throws HyracksException {
+ ResultState[] resultStates = partitionResultStateMap.get(jobId);
+
+ if (resultStates == null) {
+ throw new HyracksException("Unknown JobId " + jobId);
+ }
+
+ ResultState resultState = resultStates[partition];
+ if (resultState == null) {
+ throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+ }
+
+ IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+ dpr.writeTo(writer);
+ }
+
+ @Override
+ public IWorkspaceFileFactory getFileFactory() {
+ return fileFactory;
+ }
+
+ @Override
+ public void close() {
+ deallocatableRegistry.close();
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
new file mode 100644
index 0000000..296c502
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+
+public class DatasetPartitionReader implements IDatasetPartitionReader {
+ private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
+
+ private final DatasetMemoryManager datasetMemoryManager;
+
+ private final Executor executor;
+
+ private final ResultState resultState;
+
+ private IFileHandle fileHandle;
+
+ public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+ this.datasetMemoryManager = datasetMemoryManager;
+ this.executor = executor;
+ this.resultState = resultState;
+ }
+
+ private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ long readSize = 0;
+ synchronized (resultState) {
+ while (offset >= resultState.getSize() && !resultState.getEOS()) {
+ try {
+ resultState.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ if (offset >= resultState.getSize() && resultState.getEOS()) {
+ return readSize;
+ }
+
+ if (offset < resultState.getPersistentSize()) {
+ readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
+ }
+
+ if (readSize < buffer.capacity()) {
+ long localPageOffset = offset - resultState.getPersistentSize();
+ int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+ Page page = resultState.getPage(localPageIndex);
+ readSize += buffer.remaining();
+ buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+ }
+
+ datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
+ return readSize;
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ NetworkOutputChannel channel = (NetworkOutputChannel) writer;
+ channel.setFrameSize(resultState.getFrameSize());
+ try {
+ fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
+ IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ channel.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(resultState.getFrameSize());
+ while (true) {
+ buffer.clear();
+ long size = read(offset, buffer);
+ if (size <= 0) {
+ break;
+ } else if (size < buffer.limit()) {
+ throw new HyracksDataException("Premature end of file - readSize: " + size
+ + " buffer limit: " + buffer.limit());
+ }
+ offset += size;
+ buffer.flip();
+ channel.nextFrame(buffer);
+ }
+ } finally {
+ channel.close();
+ resultState.getIOManager().close(fileHandle);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
+ }
+ }
+ });
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
new file mode 100644
index 0000000..f6ae540
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+ private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
+
+ private static final String FILE_PREFIX = "result_";
+
+ private final IDatasetPartitionManager manager;
+
+ private final JobId jobId;
+
+ private final ResultSetId resultSetId;
+
+ private final int partition;
+
+ private final DatasetMemoryManager datasetMemoryManager;
+
+ private final ResultSetPartitionId resultSetPartitionId;
+
+ private final ResultState resultState;
+
+ private IFileHandle fileHandle;
+
+ public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
+ ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+ this.manager = manager;
+ this.jobId = jobId;
+ this.resultSetId = rsId;
+ this.partition = partition;
+ this.datasetMemoryManager = datasetMemoryManager;
+
+ resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
+ resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+ }
+
+ public ResultState getResultState() {
+ return resultState;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + partition + ")");
+ }
+ String fName = FILE_PREFIX + String.valueOf(partition);
+ FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
+ fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ resultState.init(fRef);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int srcOffset = 0;
+ Page destPage = resultState.getLastPage();
+
+ while (srcOffset < buffer.limit()) {
+ if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+ destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ resultState.addPage(destPage);
+ }
+ int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+ destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+ srcOffset += srcLength;
+ resultState.incrementSize(srcLength);
+ }
+
+ synchronized (resultState) {
+ resultState.notifyAll();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ try {
+ manager.reportPartitionFailure(jobId, resultSetId, partition);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + partition + ")");
+ }
+
+ try {
+ synchronized (resultState) {
+ resultState.setEOS(true);
+ resultState.notifyAll();
+ }
+ manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public Page returnPage() throws HyracksDataException {
+ Page page = resultState.removePage(0);
+
+ IIOManager ioManager = resultState.getIOManager();
+
+ // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+ if (page == null) {
+ ioManager.close(fileHandle);
+ return null;
+ }
+
+ page.getBuffer().flip();
+
+ long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
+ resultState.incrementPersistentSize(delta);
+ return page;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
new file mode 100644
index 0000000..3db3fd9
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class ResultState implements IStateObject {
+ private final ResultSetPartitionId resultSetPartitionId;
+
+ private final int frameSize;
+
+ private final IIOManager ioManager;
+
+ private final AtomicBoolean eos;
+
+ private final AtomicBoolean readEOS;
+
+ private final List<Page> localPageList;
+
+ private FileReference fileRef;
+
+ private long size;
+
+ private long persistentSize;
+
+ ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+ this.resultSetPartitionId = resultSetPartitionId;
+ this.ioManager = ioManager;
+ this.frameSize = frameSize;
+ eos = new AtomicBoolean(false);
+ readEOS = new AtomicBoolean(false);
+ localPageList = new ArrayList<Page>();
+ }
+
+ public synchronized void init(FileReference fileRef) {
+ this.fileRef = fileRef;
+
+ size = 0;
+ persistentSize = 0;
+ notifyAll();
+ }
+
+ public ResultSetPartitionId getResultSetPartitionId() {
+ return resultSetPartitionId;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ public IIOManager getIOManager() {
+ return ioManager;
+ }
+
+ public synchronized void incrementSize(long delta) {
+ size += delta;
+ }
+
+ public synchronized long getSize() {
+ return size;
+ }
+
+ public synchronized void incrementPersistentSize(long delta) {
+ persistentSize += delta;
+ }
+
+ public synchronized long getPersistentSize() {
+ return persistentSize;
+ }
+
+ public void setEOS(boolean eos) {
+ this.eos.set(eos);
+ }
+
+ public boolean getEOS() {
+ return eos.get();
+ }
+
+ public boolean getReadEOS() {
+ return readEOS.get();
+ }
+
+ public synchronized void addPage(Page page) {
+ localPageList.add(page);
+ }
+
+ public synchronized Page removePage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.remove(index);
+ }
+ return page;
+ }
+
+ public synchronized Page getPage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(index);
+ }
+ return page;
+ }
+
+ public synchronized Page getLastPage() {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(localPageList.size() - 1);
+ }
+ return page;
+ }
+
+ public synchronized Page getFirstPage() {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(0);
+ }
+ return page;
+ }
+
+ public synchronized FileReference getValidFileReference() throws InterruptedException {
+ while (fileRef == null)
+ wait();
+ return fileRef;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return resultSetPartitionId.getJobId();
+ }
+
+ @Override
+ public Object getId() {
+ return resultSetPartitionId;
+ }
+
+ @Override
+ public long getMemoryOccupancy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
new file mode 100644
index 0000000..5b8b333
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class DatasetNetworkManager implements IChannelConnectionFactory {
+ private static final Logger LOGGER = Logger.getLogger(DatasetNetworkManager.class.getName());
+
+ private static final int MAX_CONNECTION_ATTEMPTS = 5;
+
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IDatasetPartitionManager partitionManager;
+
+ private final MuxDemux md;
+
+ private NetworkAddress networkAddress;
+
+ public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
+ throws IOException {
+ this.partitionManager = partitionManager;
+ md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+ MAX_CONNECTION_ATTEMPTS);
+ }
+
+ public void start() throws IOException {
+ md.start();
+ InetSocketAddress sockAddr = md.getLocalAddress();
+ networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public void stop() {
+
+ }
+
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+ return mConn.openChannel();
+ }
+
+ private class ChannelOpenListener implements IChannelOpenListener {
+ @Override
+ public void channelOpened(ChannelControlBlock channel) {
+ channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+ channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+ }
+ }
+
+ private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+ private final ChannelControlBlock ccb;
+
+ private NetworkOutputChannel noc;
+
+ public InitialBufferAcceptor(ChannelControlBlock ccb) {
+ this.ccb = ccb;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ int partition = buffer.getInt();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
+ + partition + " on channel: " + ccb);
+ }
+ noc = new NetworkOutputChannel(ccb, 1);
+ try {
+ partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
+ } catch (HyracksException e) {
+ noc.abort();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void error(int ecode) {
+ if (noc != null) {
+ noc.abort();
+ }
+ }
+ }
+
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
+ return md.getPerformanceCounters();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
deleted file mode 100644
index 1d5af84..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
-import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-
-public class NetworkInputChannel implements IInputChannel {
- private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
-
- private final NetworkManager netManager;
-
- private final SocketAddress remoteAddress;
-
- private final PartitionId partitionId;
-
- private final Queue<ByteBuffer> fullQueue;
-
- private final int nBuffers;
-
- private ChannelControlBlock ccb;
-
- private IInputChannelMonitor monitor;
-
- private Object attachment;
-
- public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
- int nBuffers) {
- this.netManager = netManager;
- this.remoteAddress = remoteAddress;
- this.partitionId = partitionId;
- fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- this.nBuffers = nBuffers;
- }
-
- @Override
- public void registerMonitor(IInputChannelMonitor monitor) {
- this.monitor = monitor;
- }
-
- @Override
- public void setAttachment(Object attachment) {
- this.attachment = attachment;
- }
-
- @Override
- public Object getAttachment() {
- return attachment;
- }
-
- @Override
- public synchronized ByteBuffer getNextBuffer() {
- return fullQueue.poll();
- }
-
- @Override
- public void recycleBuffer(ByteBuffer buffer) {
- buffer.clear();
- ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
- }
-
- @Override
- public void open(IHyracksTaskContext ctx) throws HyracksDataException {
- try {
- ccb = netManager.connect(remoteAddress);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
- ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
- for (int i = 0; i < nBuffers; ++i) {
- ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
- }
- ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
- writeBuffer.putLong(partitionId.getJobId().getId());
- writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
- writeBuffer.putInt(partitionId.getSenderIndex());
- writeBuffer.putInt(partitionId.getReceiverIndex());
- writeBuffer.flip();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
- }
- ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
- ccb.getWriteInterface().getFullBufferAcceptor().close();
- }
-
- @Override
- public void close() throws HyracksDataException {
-
- }
-
- private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
- @Override
- public void accept(ByteBuffer buffer) {
- fullQueue.add(buffer);
- monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
- }
-
- @Override
- public void close() {
- monitor.notifyEndOfStream(NetworkInputChannel.this);
- }
-
- @Override
- public void error(int ecode) {
- monitor.notifyFailure(NetworkInputChannel.this);
- }
- }
-
- private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
- @Override
- public void accept(ByteBuffer buffer) {
- // do nothing
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index b805595..c8e4e94 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
@@ -36,7 +38,7 @@
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-public class NetworkManager {
+public class NetworkManager implements IChannelConnectionFactory {
private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
private static final int MAX_CONNECTION_ATTEMPTS = 5;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
deleted file mode 100644
index 9024e18..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Deque;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-
-public class NetworkOutputChannel implements IFrameWriter {
- private final ChannelControlBlock ccb;
-
- private final int nBuffers;
-
- private final Deque<ByteBuffer> emptyStack;
-
- private boolean aborted;
-
- public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
- this.ccb = ccb;
- this.nBuffers = nBuffers;
- emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
- ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
- }
-
- public void setTaskContext(IHyracksTaskContext ctx) {
- for (int i = 0; i < nBuffers; ++i) {
- emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
- }
- }
-
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer destBuffer = null;
- synchronized (this) {
- while (true) {
- if (aborted) {
- throw new HyracksDataException("Connection has been aborted");
- }
- destBuffer = emptyStack.poll();
- if (destBuffer != null) {
- break;
- }
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
- buffer.position(0);
- buffer.limit(destBuffer.capacity());
- destBuffer.clear();
- destBuffer.put(buffer);
- destBuffer.flip();
- ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- ccb.getWriteInterface().getFullBufferAcceptor().error(1);
- }
-
- @Override
- public void close() throws HyracksDataException {
- ccb.getWriteInterface().getFullBufferAcceptor().close();
- }
-
- void abort() {
- ccb.getWriteInterface().getFullBufferAcceptor().error(1);
- synchronized (NetworkOutputChannel.this) {
- aborted = true;
- NetworkOutputChannel.this.notifyAll();
- }
- }
-
- private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
- @Override
- public void accept(ByteBuffer buffer) {
- synchronized (NetworkOutputChannel.this) {
- emptyStack.push(buffer);
- NetworkOutputChannel.this.notifyAll();
- }
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 16e31f7..ba6e6c3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,7 +21,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -82,7 +82,7 @@
}
@Override
- public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException {
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 45c091a..ea88a75 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -28,12 +28,12 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
-import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class PartitionManager {
@@ -98,7 +98,7 @@
List<IPartition> pList = partitionMap.get(partitionId);
if (pList != null && !pList.isEmpty()) {
IPartition partition = pList.get(0);
- writer.setTaskContext(partition.getTaskContext());
+ writer.setFrameSize(partition.getTaskContext().getFrameSize());
partition.writeTo(writer);
if (!partition.isReusable()) {
partitionMap.remove(partitionId);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index bb9669d..7ed9d11 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -22,10 +22,10 @@
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
public class ReportPartitionAvailabilityWork extends AbstractWork {
private final NodeControllerService ncs;