Reverting the merge of fullstack_hyracks_result_distribution branch until all the tests pass.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3033 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 95598da..c7eedb3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 82457fe..5a33891 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -35,17 +35,12 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.context.ICCContext;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
-import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
@@ -53,23 +48,17 @@
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
-import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
-import edu.uci.ics.hyracks.control.cc.work.GetResultStatusWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
-import edu.uci.ics.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import edu.uci.ics.hyracks.control.cc.work.RemoveDeadNodesWork;
import edu.uci.ics.hyracks.control.cc.work.ReportProfilesWork;
-import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionFailureWork;
-import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
import edu.uci.ics.hyracks.control.cc.work.TaskFailureWork;
import edu.uci.ics.hyracks.control.cc.work.UnregisterNodeWork;
@@ -126,8 +115,6 @@
private final DeadNodeSweeper sweeper;
- private final IDatasetDirectoryService datasetDirectoryService;
-
private long jobCounter;
public ClusterControllerService(final CCConfig ccConfig) throws Exception {
@@ -175,7 +162,6 @@
}
};
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService();
jobCounter = 0;
}
@@ -278,10 +264,6 @@
return clusterIPC;
}
- public NetworkAddress getDatasetDirectoryServiceInfo() {
- return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
- }
-
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
@@ -289,10 +271,6 @@
}
}
- public IDatasetDirectoryService getDatasetDirectoryService() {
- return datasetDirectoryService;
- }
-
private class HyracksClientInterfaceIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
@@ -343,27 +321,6 @@
return;
}
- case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
- workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
- new IPCResponder<NetworkAddress>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_STATUS: {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
- workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
- .getResultSetId(), new IPCResponder<Status>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_LOCATIONS: {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
- workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
- .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
- new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
- return;
- }
-
case WAIT_FOR_COMPLETION: {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
@@ -459,28 +416,6 @@
return;
}
- case REGISTER_RESULT_PARTITION_LOCATION: {
- CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
- workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
- .getNPartitions(), rrplf.getNetworkAddress()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
- workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_FAILURE: {
- CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
- workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
- return;
- }
-
case APPLICATION_STATE_CHANGE_RESPONSE: {
CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index c96a319..c17acd0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -41,8 +41,6 @@
private final NetworkAddress dataPort;
- private final NetworkAddress datasetPort;
-
private final Set<JobId> activeJobIds;
private final String osName;
@@ -109,14 +107,6 @@
private final long[] netSignalingBytesWritten;
- private final long[] datasetNetPayloadBytesRead;
-
- private final long[] datasetNetPayloadBytesWritten;
-
- private final long[] datasetNetSignalingBytesRead;
-
- private final long[] datasetNetSignalingBytesWritten;
-
private final long[] ipcMessagesSent;
private final long[] ipcMessageBytesSent;
@@ -133,7 +123,6 @@
this.nodeController = nodeController;
ncConfig = reg.getNCConfig();
dataPort = reg.getDataPort();
- datasetPort = reg.getDatasetPort();
activeJobIds = new HashSet<JobId>();
osName = reg.getOSName();
@@ -175,10 +164,6 @@
netPayloadBytesWritten = new long[RRD_SIZE];
netSignalingBytesRead = new long[RRD_SIZE];
netSignalingBytesWritten = new long[RRD_SIZE];
- datasetNetPayloadBytesRead = new long[RRD_SIZE];
- datasetNetPayloadBytesWritten = new long[RRD_SIZE];
- datasetNetSignalingBytesRead = new long[RRD_SIZE];
- datasetNetSignalingBytesWritten = new long[RRD_SIZE];
ipcMessagesSent = new long[RRD_SIZE];
ipcMessageBytesSent = new long[RRD_SIZE];
ipcMessagesReceived = new long[RRD_SIZE];
@@ -211,10 +196,6 @@
netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
- datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
- datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
- datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
- datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
@@ -246,10 +227,6 @@
return dataPort;
}
- public NetworkAddress getDatasetPort() {
- return datasetPort;
- }
-
public JSONObject toSummaryJSON() throws JSONException {
JSONObject o = new JSONObject();
o.put("node-id", ncConfig.nodeId);
@@ -294,10 +271,6 @@
o.put("net-payload-bytes-written", netPayloadBytesWritten);
o.put("net-signaling-bytes-read", netSignalingBytesRead);
o.put("net-signaling-bytes-written", netSignalingBytesWritten);
- o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
- o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
- o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
- o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
o.put("ipc-messages-sent", ipcMessagesSent);
o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
o.put("ipc-messages-received", ipcMessagesReceived);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
deleted file mode 100644
index 13d0c30..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.dataset;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-/**
- * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
- * location information is never evicted from the memory and the memory usage grows as the number of jobs in the system
- * grows. What we should possibly do is, add an API call for the client to say that it received everything it has to for
- * the job (after it receives all the results) completely. Then we can just get rid of the location information for that
- * job.
- */
-public class DatasetDirectoryService implements IDatasetDirectoryService {
- private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
-
- public DatasetDirectoryService() {
- jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
- }
-
- @Override
- public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions, NetworkAddress networkAddress) {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- if (rsMap == null) {
- rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
- jobResultLocationsMap.put(jobId, rsMap);
- }
-
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null) {
- resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
- rsMap.put(rsId, resultSetMetaData);
- }
-
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- if (records[partition] == null) {
- records[partition] = new DatasetDirectoryRecord();
- }
- records[partition].setNetworkAddress(networkAddress);
- records[partition].start();
- notifyAll();
- }
-
- @Override
- public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
- DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
- ddr.writeEOS();
- }
-
- @Override
- public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
- ddr.fail();
- }
-
- @Override
- public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
- Map<ResultSetId, ResultSetMetaData> rsMap;
- while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
-
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
- throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
- }
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-
- ArrayList<Status> statuses = new ArrayList<Status>(records.length);
- for (int i = 0; i < records.length; i++) {
- statuses.add(records[i].getStatus());
- }
-
- // Default status is idle
- Status status = Status.IDLE;
- if (statuses.contains(Status.FAILED)) {
- // Even if there is at least one failed entry we should return failed status.
- return Status.FAILED;
- } else if (statuses.contains(Status.RUNNING)) {
- // If there are not failed entry and if there is at least one running entry we should return running status.
- return Status.RUNNING;
- } else {
- // If each and every partition has reported success do we report success as the status.
- int successCount = 0;
- for (int i = 0; i < statuses.size(); i++) {
- if (statuses.get(i) == Status.SUCCESS) {
- successCount++;
- }
- }
- if (successCount == statuses.size()) {
- return Status.SUCCESS;
- }
- }
- return status;
- }
-
- @Override
- public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
- DatasetDirectoryRecord[] newRecords;
- while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- return newRecords;
- }
-
- public DatasetDirectoryRecord getDatasetDirectoryRecord(JobId jobId, ResultSetId rsId, int partition) {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- return records[partition];
- }
-
- /**
- * Compares the records already known by the client for the given job's result set id with the records that the
- * dataset directory service knows and if there are any newly discovered records returns a whole array with the
- * new records filled in.
- * This method has a very convoluted logic. Here is the explanation of how it works.
- * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
- * the order of the partitions. It always traverses the array in the first to last order!
- * If known records array or the first element in that array is null in the but the record for that partition now
- * known to the directory service, the method fills in that record in the array and returns the array back.
- * However, if the first known null record is not a first element in the array, by induction, all the previous
- * known records should be known already be known to client and none of the records for the partitions ahead is
- * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
- * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
- * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
- * If the client has reached the end of stream record for the last known non-null record, we check if the next record
- * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
- * send null otherwise.
- * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
- * arrays are equal and if they are not we send the entire new updated array.
- *
- * @param jobId
- * - Id of the job for which the directory records should be retrieved.
- * @param rsId
- * - Id of the result set for which the directory records should be retrieved.
- * @param knownRecords
- * - An array of directory records that the client is already aware of.
- * @return
- * - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
- * @throws HyracksDataException
- * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
- * every check. This already looks very expensive.
- */
- private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
- throws HyracksDataException {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- if (rsMap == null) {
- return null;
- }
-
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
- throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
- }
-
- boolean ordered = resultSetMetaData.getOrderedResult();
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
- * we can simply check if there are any newly discovered records and send the whole array back if there are.
- */
- if (ordered) {
- // Iterate over the known records and find the last record which is not null.
- int i = 0;
- for (i = 0; i < records.length; i++) {
- if (knownRecords == null) {
- if (records[0] != null) {
- knownRecords = new DatasetDirectoryRecord[records.length];
- knownRecords[0] = records[0];
- return knownRecords;
- }
- return null;
- }
- if (knownRecords[i] == null) {
- if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) {
- knownRecords[i] = records[i];
- return knownRecords;
- }
- return null;
- }
- }
- } else {
- if (!Arrays.equals(records, knownRecords)) {
- return records;
- }
- }
- return null;
- }
-
- private class ResultSetMetaData {
- private final boolean ordered;
-
- private final DatasetDirectoryRecord[] records;
-
- public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
- this.ordered = ordered;
- this.records = records;
- }
-
- public boolean getOrderedResult() {
- return ordered;
- }
-
- public DatasetDirectoryRecord[] getRecords() {
- return records;
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
deleted file mode 100644
index 3ac6acc..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetDatasetDirectoryServiceInfoWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
-
- private final IResultCallback<NetworkAddress> callback;
-
- public GetDatasetDirectoryServiceInfoWork(ClusterControllerService ccs, IResultCallback<NetworkAddress> callback) {
- this.ccs = ccs;
- this.callback = callback;
- }
-
- @Override
- public void doRun() {
- try {
- NetworkAddress addr = ccs.getDatasetDirectoryServiceInfo();
- callback.setValue(addr);
- } catch (Exception e) {
- callback.setException(e);
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index a787b9f..2f23a2c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,8 +39,7 @@
Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
- result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
- .getValue().getDatasetPort()));
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort()));
}
callback.setValue(result);
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
deleted file mode 100644
index fd1d418..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetResultPartitionLocationsWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final DatasetDirectoryRecord[] knownRecords;
-
- private final IResultCallback<DatasetDirectoryRecord[]> callback;
-
- public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.knownRecords = knownRecords;
- this.callback = callback;
- }
-
- @Override
- public void doRun() {
- final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
- knownRecords);
- callback.setValue(partitionLocations);
- } catch (HyracksDataException e) {
- callback.setException(e);
- }
- }
- });
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
deleted file mode 100644
index d2dadf5..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetResultStatusWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final IResultCallback<Status> callback;
-
- public GetResultStatusWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- IResultCallback<Status> callback) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.callback = callback;
- }
-
- @Override
- public void doRun() {
- try {
- Status status = ccs.getDatasetDirectoryService().getResultStatus(jobId, rsId);
- callback.setValue(status);
- } catch (HyracksDataException e) {
- callback.setException(e);
- }
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId;
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index b062d33..b6a33cd 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -16,7 +16,6 @@
import java.util.EnumSet;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
deleted file mode 100644
index f86e924..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class RegisterResultPartitionLocationWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final boolean orderedResult;
-
- private final int partition;
-
- private final int nPartitions;
-
- private final NetworkAddress networkAddress;
-
- public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.orderedResult = orderedResult;
- this.partition = partition;
- this.nPartitions = nPartitions;
- this.networkAddress = networkAddress;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
- nPartitions, networkAddress);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
- + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
deleted file mode 100644
index 4aea41e..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionFailureWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
deleted file mode 100644
index 313b730..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionWriteCompletionWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionWriteCompletionWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- int partition) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index 3fc46ff..ff9d8a0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -58,10 +58,6 @@
var netPayloadBytesWritten = result['net-payload-bytes-written'];
var netSignalingBytesRead = result['net-signaling-bytes-read'];
var netSignalingBytesWritten = result['net-signaling-bytes-written'];
- var datasetNetPayloadBytesRead = result['dataset-net-payload-bytes-read'];
- var datasetNetPayloadBytesWritten = result['dataset-net-payload-bytes-written'];
- var datasetNetSignalingBytesRead = result['dataset-net-signaling-bytes-read'];
- var datasetNetSignalingBytesWritten = result['dataset-net-signaling-bytes-written'];
var ipcMessagesSent = result['ipc-messages-sent'];
var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
var ipcMessagesReceived = result['ipc-messages-received'];
@@ -121,13 +117,9 @@
}
if (i < sysLoad.length - 1) {
netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
- netPayloadReadBWArray.push([ i, computeRate(datasetNetPayloadBytesRead, rrdPtr) ]);
netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
- netPayloadWriteBWArray.push([ i, computeRate(datasetNetPayloadBytesWritten, rrdPtr) ]);
netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
- netSignalingReadBWArray.push([ i, computeRate(datasetNetSignalingBytesRead, rrdPtr) ]);
netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
- netSignalingWriteBWArray.push([ i, computeRate(etSignalingBytesWritten, rrdPtr) ]);
ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);