Reverting the merge of fullstack_hyracks_result_distribution branch until all the tests pass.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3033 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/pom.xml b/hyracks/hyracks-api/pom.xml
index 09307e4..72f0d8b 100644
--- a/hyracks/hyracks-api/pom.xml
+++ b/hyracks/hyracks-api/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
index a4f0b29..a8f2fda 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -16,7 +16,7 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IInputChannel {
@@ -30,7 +30,7 @@
public void recycleBuffer(ByteBuffer buffer);
- public void open(IHyracksCommonContext ctx) throws HyracksDataException;
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException;
public void close() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index cd2b698..e34e60d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -17,8 +17,6 @@
import java.io.Serializable;
import java.util.EnumSet;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -32,10 +30,6 @@
CREATE_JOB,
GET_JOB_STATUS,
START_JOB,
- GET_DATASET_DIRECTORY_SERIVICE_INFO,
- GET_DATASET_RESULT_STATUS,
- GET_DATASET_RECORD_DESCRIPTOR,
- GET_DATASET_RESULT_LOCATIONS,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO
}
@@ -162,74 +156,6 @@
}
}
- public static class GetDatasetDirectoryServiceInfoFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_DIRECTORY_SERIVICE_INFO;
- }
- }
-
- public static class GetDatasetResultStatusFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- public GetDatasetResultStatusFunction(JobId jobId, ResultSetId rsId) {
- this.jobId = jobId;
- this.rsId = rsId;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_RESULT_STATUS;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
- }
-
- public static class GetDatasetResultLocationsFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final DatasetDirectoryRecord[] knownRecords;
-
- public GetDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
- this.jobId = jobId;
- this.rsId = rsId;
- this.knownRecords = knownRecords;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_RESULT_LOCATIONS;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
-
- public DatasetDirectoryRecord[] getKnownRecords() {
- return knownRecords;
- }
- }
-
public static class WaitForCompletionFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 2ab42c0..4c06d42 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -17,7 +17,6 @@
import java.util.EnumSet;
import java.util.Map;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -77,12 +76,6 @@
}
@Override
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
- return (NetworkAddress) rpci.call(ipcHandle, gddsf);
- }
-
- @Override
public void waitForCompletion(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
jobId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index e0fafb0..227524c 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -26,7 +26,6 @@
import org.apache.http.impl.client.DefaultHttpClient;
import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -119,10 +118,6 @@
return hci.startJob(appName, JavaSerializationUtils.serialize(acggf), jobFlags);
}
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
- return hci.getDatasetDirectoryServiceInfo();
- }
-
@Override
public void waitForCompletion(JobId jobId) throws Exception {
hci.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 6333c22..bdbb544 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -18,7 +18,6 @@
import java.util.EnumSet;
import java.util.Map;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -101,14 +100,6 @@
throws Exception;
/**
- * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
- *
- * @return {@link NetworkAddress}
- * @throws Exception
- */
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
-
- /**
* Waits until the specified job has completed, either successfully or has
* encountered a permanent failure.
*
@@ -132,4 +123,4 @@
* @throws Exception
*/
public ClusterTopology getClusterTopology() throws Exception;
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 22b0a8f..ef5906e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -17,7 +17,6 @@
import java.util.EnumSet;
import java.util.Map;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -36,8 +35,6 @@
public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
-
public void waitForCompletion(JobId jobId) throws Exception;
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
index 73b5488..fd9218a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -27,14 +27,10 @@
private final NetworkAddress netAddress;
- private final NetworkAddress datasetNetworkAddress;
-
- public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
- NetworkAddress datasetNetworkAddress) {
+ public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress) {
this.nodeId = nodeId;
this.status = status;
this.netAddress = netAddress;
- this.datasetNetworkAddress = datasetNetworkAddress;
}
public String getNodeId() {
@@ -48,8 +44,4 @@
public NetworkAddress getNetworkAddress() {
return netAddress;
}
-
- public NetworkAddress getDatasetNetworkAddress() {
- return datasetNetworkAddress;
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index a2ee977..e964d66 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.api.context;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
@@ -29,7 +28,5 @@
public ICounterContext getCounterContext();
- public IDatasetPartitionManager getDatasetPartitionManager();
-
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializer.java
deleted file mode 100644
index ba2ff9a..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataflow.value;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IResultSerializer extends Serializable {
- /**
- * Initializes the serializer.
- */
- public void init() throws HyracksDataException;
-
- /**
- * Method to serialize the result and append it to the provided output stream
- *
- * @param tAccess
- * - A frame tuple accessor object that contains the original data to be serialized
- * @param tIdx
- * - Index of the tuple that should be serialized.
- * @return true if the tuple was appended successfully, else false.
- */
- public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException;
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java
deleted file mode 100644
index 92cf569..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataflow.value;
-
-import java.io.PrintStream;
-import java.io.Serializable;
-
-public interface IResultSerializerFactory extends Serializable {
- /**
- * Creates a result serialized appender
- *
- * @param printStream
- * - A print stream object to which the serialized results will be written.
- * @return A new instance of result serialized appender.
- */
- public IResultSerializer createResultSerializer(PrintStream printStream);
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/JSONSerializable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/JSONSerializable.java
deleted file mode 100644
index 1eca502..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/JSONSerializable.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataflow.value;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public interface JSONSerializable {
- /**
- * Returns the JSON representation of the object.
- *
- * @return A new JSONObject instance representing this Java object.
- */
- public JSONObject toJSON() throws JSONException;
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
deleted file mode 100644
index 6316bba..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-
-public class DatasetDirectoryRecord implements Serializable {
- public enum Status {
- IDLE,
- RUNNING,
- SUCCESS,
- FAILED
- }
-
- private static final long serialVersionUID = 1L;
-
- private NetworkAddress address;
-
- private boolean readEOS;
-
- private Status status;
-
- public DatasetDirectoryRecord() {
- this.address = null;
- this.readEOS = false;
- this.status = Status.IDLE;
- }
-
- public void setNetworkAddress(NetworkAddress address) {
- this.address = address;
- }
-
- public NetworkAddress getNetworkAddress() {
- return address;
- }
-
- public void readEOS() {
- this.readEOS = true;
- }
-
- public boolean hasReachedReadEOS() {
- return readEOS;
- }
-
- public void start() {
- status = Status.RUNNING;
- }
-
- public void writeEOS() {
- status = Status.SUCCESS;
- }
-
- public void fail() {
- status = Status.FAILED;
- }
-
- public Status getStatus() {
- return status;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (!(o instanceof DatasetDirectoryRecord)) {
- return false;
- }
- return address.equals(((DatasetDirectoryRecord) o).address);
- }
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
deleted file mode 100644
index 5266333..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public interface IDatasetDirectoryService {
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
- int nPartitions, NetworkAddress networkAddress);
-
- public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
-
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
-
- public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
-
- public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
deleted file mode 100644
index 65ba1c7..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-
-public interface IDatasetInputChannelMonitor extends IInputChannelMonitor {
- public boolean eosReached();
-
- public boolean failed();
-
- public int getNFramesAvailable();
-
- public void notifyFrameRead();
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
deleted file mode 100644
index ae38c7f..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public interface IDatasetPartitionManager {
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions) throws HyracksException;
-
- public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
- throws HyracksException;
-
- public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
-
- public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
-
- public IWorkspaceFileFactory getFileFactory();
-
- public void close();
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
deleted file mode 100644
index 40db7a7..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public interface IHyracksDataset {
- public void open(JobId jobId, ResultSetId resultSetId) throws IOException;
-
- public Status getResultStatus();
-
- public int read(ByteBuffer buffer) throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
deleted file mode 100644
index d49d5cd..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public interface IHyracksDatasetDirectoryServiceConnection {
- /**
- * Gets the result status for the given result set.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @return {@link Status}
- * @throws Exception
- */
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
-
- /**
- * Gets the IP Addresses and ports for the partition generating the result for each location.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @param knownRecords
- * Locations that are already known to the client
- * @return {@link NetworkAddress[]}
- * @throws Exception
- */
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
deleted file mode 100644
index ba21a84..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public interface IHyracksDatasetDirectoryServiceInterface {
- /**
- * Gets the result status for the given result set.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @return {@link Status}
- * @throws Exception
- */
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
-
- /**
- * Gets the IP Addresses and ports for the partition generating the result for each location.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @param knownRecords
- * Locations from the dataset directory that are already known to the client
- * @return {@link NetworkAddress[]}
- * @throws Exception
- */
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetId.java
deleted file mode 100644
index ae38ef3..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetId.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import java.io.Serializable;
-
-public class ResultSetId implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final long id;
-
- public ResultSetId(long id) {
- this.id = id;
- }
-
- public long getId() {
- return id;
- }
-
- @Override
- public int hashCode() {
- return (int) id;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (!(o instanceof ResultSetId)) {
- return false;
- }
- return ((ResultSetId) o).id == id;
- }
-
- @Override
- public String toString() {
- return "RSID:" + id;
- }
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 1fdff0f..7c523f1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -34,15 +34,12 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
private static final long serialVersionUID = 1L;
private final List<OperatorDescriptorId> roots;
- private final List<ResultSetId> resultSetIds;
-
private final Map<OperatorDescriptorId, IOperatorDescriptor> opMap;
private final Map<ConnectorDescriptorId, IConnectorDescriptor> connMap;
@@ -75,7 +72,6 @@
public JobSpecification() {
roots = new ArrayList<OperatorDescriptorId>();
- resultSetIds = new ArrayList<ResultSetId>();
opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
connMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
@@ -108,10 +104,6 @@
roots.add(op.getOperatorId());
}
- public void addResultSetId(ResultSetId rsId) {
- resultSetIds.add(rsId);
- }
-
public void connect(IConnectorDescriptor conn, IOperatorDescriptor producerOp, int producerPort,
IOperatorDescriptor consumerOp, int consumerPort) {
insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn);
@@ -216,10 +208,6 @@
return roots;
}
- public List<ResultSetId> getResultSetIds() {
- return resultSetIds;
- }
-
public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
return connectorPolicyAssignmentPolicy;
}
diff --git a/hyracks/hyracks-cli/pom.xml b/hyracks/hyracks-cli/pom.xml
index 0456625..7bb6123 100644
--- a/hyracks/hyracks-cli/pom.xml
+++ b/hyracks/hyracks-cli/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-client/pom.xml b/hyracks/hyracks-client/pom.xml
deleted file mode 100644
index 854a009..0000000
--- a/hyracks/hyracks-client/pom.xml
+++ /dev/null
@@ -1,46 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-client</artifactId>
- <name>hyracks-client</name>
- <parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.2</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-api</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-net</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-comm</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
deleted file mode 100644
index 8be4a8c..0000000
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.client.dataset;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-
-public class DatasetClientContext implements IHyracksCommonContext {
- private final int frameSize;
-
- public DatasetClientContext(int frameSize) {
- this.frameSize = frameSize;
- }
-
- @Override
- public int getFrameSize() {
- return frameSize;
- }
-
- @Override
- public IIOManager getIOManager() {
- return null;
- }
-
- @Override
- public ByteBuffer allocateFrame() {
- return ByteBuffer.allocate(frameSize);
- }
-
-}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
deleted file mode 100644
index 314b621..0000000
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.client.dataset;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.client.net.ClientNetworkManager;
-import edu.uci.ics.hyracks.comm.channels.DatasetNetworkInputChannel;
-
-// TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
-public class HyracksDataset implements IHyracksDataset {
- private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
-
- private final ClientNetworkManager netManager;
-
- private final DatasetClientContext datasetClientCtx;
-
- private JobId jobId;
-
- private ResultSetId resultSetId;
-
- private DatasetDirectoryRecord[] knownRecords;
-
- private IDatasetInputChannelMonitor[] monitors;
-
- private int lastReadPartition;
-
- private IDatasetInputChannelMonitor lastMonitor;
-
- private DatasetNetworkInputChannel resultChannel;
-
- private static int NUM_READ_BUFFERS = 1;
-
- public HyracksDataset(IHyracksClientConnection hcc, DatasetClientContext datasetClientCtx, int nReaders)
- throws Exception {
- NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
- datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
- ddsAddress.getIpAddress()), ddsAddress.getPort());
-
- netManager = new ClientNetworkManager(nReaders);
-
- this.datasetClientCtx = datasetClientCtx;
-
- knownRecords = null;
- monitors = null;
- lastReadPartition = -1;
- lastMonitor = null;
- resultChannel = null;
- }
-
- @Override
- public void open(JobId jobId, ResultSetId resultSetId) throws IOException {
- this.jobId = jobId;
- this.resultSetId = resultSetId;
- netManager.start();
- }
-
- @Override
- public Status getResultStatus() {
- Status status = null;
- try {
- status = datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
- } catch (Exception e) {
- // TODO(madhusudancs): Decide what to do in case of error
- }
- return status;
- }
-
- @Override
- public int read(ByteBuffer buffer) throws HyracksDataException {
- ByteBuffer readBuffer;
- int readSize = 0;
-
- if (lastReadPartition == -1) {
- while (knownRecords == null || knownRecords[0] == null) {
- try {
- knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId, resultSetId,
- knownRecords);
- lastReadPartition = 0;
- resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
- NUM_READ_BUFFERS);
- lastMonitor = getMonitor(lastReadPartition);
- resultChannel.open(datasetClientCtx);
- resultChannel.registerMonitor(lastMonitor);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- } catch (UnknownHostException e) {
- throw new HyracksDataException(e);
- } catch (Exception e) {
- // Do nothing here.
- }
- }
- }
-
- while (readSize <= 0 && !((lastReadPartition == knownRecords.length - 1) && (lastMonitor.eosReached()))) {
- while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
- synchronized (lastMonitor) {
- try {
- lastMonitor.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- if (lastMonitor.getNFramesAvailable() <= 0 && lastMonitor.eosReached()) {
- if ((lastReadPartition == knownRecords.length - 1)) {
- break;
- } else {
- try {
- lastReadPartition++;
- while (knownRecords[lastReadPartition] == null) {
- try {
- knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
- resultSetId, knownRecords);
- } catch (Exception e) {
- // Do nothing here.
- }
- }
-
- resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
- NUM_READ_BUFFERS);
- lastMonitor = getMonitor(lastReadPartition);
- resultChannel.open(datasetClientCtx);
- resultChannel.registerMonitor(lastMonitor);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- } catch (UnknownHostException e) {
- throw new HyracksDataException(e);
- }
- }
- } else {
- readBuffer = resultChannel.getNextBuffer();
- lastMonitor.notifyFrameRead();
- if (readBuffer != null) {
- buffer.put(readBuffer);
- buffer.flip();
- readSize = buffer.limit();
- resultChannel.recycleBuffer(readBuffer);
- }
- }
- }
-
- return readSize;
- }
-
- private boolean nullExists(DatasetDirectoryRecord[] locations) {
- if (locations == null) {
- return true;
- }
- for (int i = 0; i < locations.length; i++) {
- if (locations[i] == null) {
- return true;
- }
- }
- return false;
- }
-
- private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
- NetworkAddress netAddr = addr.getNetworkAddress();
- return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
- }
-
- private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
- if (knownRecords == null || knownRecords[partition] == null) {
- throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
- }
- if (monitors == null) {
- monitors = new DatasetInputChannelMonitor[knownRecords.length];
- }
- if (monitors[partition] == null) {
- monitors[partition] = new DatasetInputChannelMonitor();
- }
- return monitors[partition];
- }
-
- private class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor {
- private final AtomicInteger nAvailableFrames;
-
- private final AtomicBoolean eos;
-
- private final AtomicBoolean failed;
-
- public DatasetInputChannelMonitor() {
- nAvailableFrames = new AtomicInteger(0);
- eos = new AtomicBoolean(false);
- failed = new AtomicBoolean(false);
- }
-
- @Override
- public synchronized void notifyFailure(IInputChannel channel) {
- failed.set(true);
- notifyAll();
- }
-
- @Override
- public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
- nAvailableFrames.addAndGet(nFrames);
- notifyAll();
- }
-
- @Override
- public synchronized void notifyEndOfStream(IInputChannel channel) {
- eos.set(true);
- notifyAll();
- }
-
- @Override
- public synchronized boolean eosReached() {
- return eos.get();
- }
-
- @Override
- public synchronized boolean failed() {
- return failed.get();
- }
-
- @Override
- public synchronized int getNFramesAvailable() {
- return nAvailableFrames.get();
- }
-
- @Override
- public synchronized void notifyFrameRead() {
- nAvailableFrames.decrementAndGet();
- }
-
- }
-}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
deleted file mode 100644
index 095fd7d..0000000
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.client.dataset;
-
-import java.net.InetSocketAddress;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.RPCInterface;
-import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
-import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-
-//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
-public class HyracksDatasetDirectoryServiceConnection implements IHyracksDatasetDirectoryServiceConnection {
- private final IPCSystem ipc;
- private final IHyracksDatasetDirectoryServiceInterface ddsi;
-
- public HyracksDatasetDirectoryServiceConnection(String ddsHost, int ddsPort) throws Exception {
- RPCInterface rpci = new RPCInterface();
- ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
- ipc.start();
- IIPCHandle ddsIpchandle = ipc.getHandle(new InetSocketAddress(ddsHost, ddsPort));
- this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
- }
-
- @Override
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
- return ddsi.getDatasetResultStatus(jobId, rsId);
- }
-
- @Override
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception {
- return ddsi.getDatasetResultLocations(jobId, rsId, knownRecords);
- }
-}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
deleted file mode 100644
index 47cdf97..0000000
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.client.dataset;
-
-import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.RPCInterface;
-
-//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
-public class HyracksDatasetDirectoryServiceInterfaceRemoteProxy implements IHyracksDatasetDirectoryServiceInterface {
- private final IIPCHandle ipcHandle;
-
- private final RPCInterface rpci;
-
- public HyracksDatasetDirectoryServiceInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
- this.ipcHandle = ipcHandle;
- this.rpci = rpci;
- }
-
- @Override
- public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(
- jobId, rsId);
- return (Status) rpci.call(ipcHandle, gdrlf);
- }
-
- @Override
- public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
- jobId, rsId, knownRecords);
- return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
- }
-}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
deleted file mode 100644
index 7aef8b9..0000000
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.client.net;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
-import edu.uci.ics.hyracks.net.exceptions.NetException;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-
-public class ClientNetworkManager implements IChannelConnectionFactory {
- private static final int MAX_CONNECTION_ATTEMPTS = 5;
-
- private final MuxDemux md;
-
- public ClientNetworkManager(int nThreads) throws IOException {
- /* This is a connect only socket and does not listen to any incoming connections, so pass null to
- * localAddress and listener.
- */
- md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS);
- }
-
- public void start() throws IOException {
- md.start();
- }
-
- public void stop() {
-
- }
-
- public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
- MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
- return mConn.openChannel();
- }
-
- public MuxDemuxPerformanceCounters getPerformanceCounters() {
- return md.getPerformanceCounters();
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-comm/pom.xml b/hyracks/hyracks-comm/pom.xml
deleted file mode 100644
index c3583699..0000000
--- a/hyracks/hyracks-comm/pom.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-comm</artifactId>
- <name>hyracks-comm</name>
- <parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.2</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-api</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-net</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
deleted file mode 100644
index fac2949..0000000
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.comm.channels;
-
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.channels.IInputChannel;
-import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
-import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-
-public class DatasetNetworkInputChannel implements IInputChannel {
- private static final Logger LOGGER = Logger.getLogger(DatasetNetworkInputChannel.class.getName());
-
- static final int INITIAL_MESSAGE_SIZE = 20;
-
- private final IChannelConnectionFactory netManager;
-
- private final SocketAddress remoteAddress;
-
- private final JobId jobId;
-
- private final int partition;
-
- private final Queue<ByteBuffer> fullQueue;
-
- private final int nBuffers;
-
- private ChannelControlBlock ccb;
-
- private IInputChannelMonitor monitor;
-
- private Object attachment;
-
- public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
- int partition, int nBuffers) {
- this.netManager = netManager;
- this.remoteAddress = remoteAddress;
- this.jobId = jobId;
- this.partition = partition;
- fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- this.nBuffers = nBuffers;
- }
-
- @Override
- public void registerMonitor(IInputChannelMonitor monitor) {
- this.monitor = monitor;
- }
-
- @Override
- public void setAttachment(Object attachment) {
- this.attachment = attachment;
- }
-
- @Override
- public Object getAttachment() {
- return attachment;
- }
-
- @Override
- public synchronized ByteBuffer getNextBuffer() {
- return fullQueue.poll();
- }
-
- @Override
- public void recycleBuffer(ByteBuffer buffer) {
- buffer.clear();
- ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
- }
-
- @Override
- public void open(IHyracksCommonContext ctx) throws HyracksDataException {
- try {
- ccb = netManager.connect(remoteAddress);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
- ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
- for (int i = 0; i < nBuffers; ++i) {
- ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
- }
- ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
- writeBuffer.putLong(jobId.getId());
- writeBuffer.putInt(partition);
- writeBuffer.flip();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Sending partition request for JobId: " + jobId + " partition: " + partition + " on channel: "
- + ccb);
- }
- ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
- ccb.getWriteInterface().getFullBufferAcceptor().close();
- }
-
- @Override
- public void close() throws HyracksDataException {
-
- }
-
- private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
- @Override
- public void accept(ByteBuffer buffer) {
- fullQueue.add(buffer);
- monitor.notifyDataAvailability(DatasetNetworkInputChannel.this, 1);
- }
-
- @Override
- public void close() {
- monitor.notifyEndOfStream(DatasetNetworkInputChannel.this);
- }
-
- @Override
- public void error(int ecode) {
- monitor.notifyFailure(DatasetNetworkInputChannel.this);
- }
- }
-
- private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
- @Override
- public void accept(ByteBuffer buffer) {
- // do nothing
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java
deleted file mode 100644
index 33179ba..0000000
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.comm.channels;
-
-import java.net.SocketAddress;
-
-import edu.uci.ics.hyracks.net.exceptions.NetException;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-
-public interface IChannelConnectionFactory {
- public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException;
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 95598da..c7eedb3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 82457fe..5a33891 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -35,17 +35,12 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.context.ICCContext;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
-import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
@@ -53,23 +48,17 @@
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
-import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
-import edu.uci.ics.hyracks.control.cc.work.GetResultStatusWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
-import edu.uci.ics.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import edu.uci.ics.hyracks.control.cc.work.RemoveDeadNodesWork;
import edu.uci.ics.hyracks.control.cc.work.ReportProfilesWork;
-import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionFailureWork;
-import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
import edu.uci.ics.hyracks.control.cc.work.TaskFailureWork;
import edu.uci.ics.hyracks.control.cc.work.UnregisterNodeWork;
@@ -126,8 +115,6 @@
private final DeadNodeSweeper sweeper;
- private final IDatasetDirectoryService datasetDirectoryService;
-
private long jobCounter;
public ClusterControllerService(final CCConfig ccConfig) throws Exception {
@@ -175,7 +162,6 @@
}
};
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService();
jobCounter = 0;
}
@@ -278,10 +264,6 @@
return clusterIPC;
}
- public NetworkAddress getDatasetDirectoryServiceInfo() {
- return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
- }
-
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
@@ -289,10 +271,6 @@
}
}
- public IDatasetDirectoryService getDatasetDirectoryService() {
- return datasetDirectoryService;
- }
-
private class HyracksClientInterfaceIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
@@ -343,27 +321,6 @@
return;
}
- case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
- workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
- new IPCResponder<NetworkAddress>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_STATUS: {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
- workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
- .getResultSetId(), new IPCResponder<Status>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_LOCATIONS: {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
- workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
- .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
- new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
- return;
- }
-
case WAIT_FOR_COMPLETION: {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
@@ -459,28 +416,6 @@
return;
}
- case REGISTER_RESULT_PARTITION_LOCATION: {
- CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
- workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
- .getNPartitions(), rrplf.getNetworkAddress()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
- workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_FAILURE: {
- CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
- workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
- return;
- }
-
case APPLICATION_STATE_CHANGE_RESPONSE: {
CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index c96a319..c17acd0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -41,8 +41,6 @@
private final NetworkAddress dataPort;
- private final NetworkAddress datasetPort;
-
private final Set<JobId> activeJobIds;
private final String osName;
@@ -109,14 +107,6 @@
private final long[] netSignalingBytesWritten;
- private final long[] datasetNetPayloadBytesRead;
-
- private final long[] datasetNetPayloadBytesWritten;
-
- private final long[] datasetNetSignalingBytesRead;
-
- private final long[] datasetNetSignalingBytesWritten;
-
private final long[] ipcMessagesSent;
private final long[] ipcMessageBytesSent;
@@ -133,7 +123,6 @@
this.nodeController = nodeController;
ncConfig = reg.getNCConfig();
dataPort = reg.getDataPort();
- datasetPort = reg.getDatasetPort();
activeJobIds = new HashSet<JobId>();
osName = reg.getOSName();
@@ -175,10 +164,6 @@
netPayloadBytesWritten = new long[RRD_SIZE];
netSignalingBytesRead = new long[RRD_SIZE];
netSignalingBytesWritten = new long[RRD_SIZE];
- datasetNetPayloadBytesRead = new long[RRD_SIZE];
- datasetNetPayloadBytesWritten = new long[RRD_SIZE];
- datasetNetSignalingBytesRead = new long[RRD_SIZE];
- datasetNetSignalingBytesWritten = new long[RRD_SIZE];
ipcMessagesSent = new long[RRD_SIZE];
ipcMessageBytesSent = new long[RRD_SIZE];
ipcMessagesReceived = new long[RRD_SIZE];
@@ -211,10 +196,6 @@
netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
- datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
- datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
- datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
- datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
@@ -246,10 +227,6 @@
return dataPort;
}
- public NetworkAddress getDatasetPort() {
- return datasetPort;
- }
-
public JSONObject toSummaryJSON() throws JSONException {
JSONObject o = new JSONObject();
o.put("node-id", ncConfig.nodeId);
@@ -294,10 +271,6 @@
o.put("net-payload-bytes-written", netPayloadBytesWritten);
o.put("net-signaling-bytes-read", netSignalingBytesRead);
o.put("net-signaling-bytes-written", netSignalingBytesWritten);
- o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
- o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
- o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
- o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
o.put("ipc-messages-sent", ipcMessagesSent);
o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
o.put("ipc-messages-received", ipcMessagesReceived);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
deleted file mode 100644
index 13d0c30..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.dataset;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-/**
- * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
- * location information is never evicted from the memory and the memory usage grows as the number of jobs in the system
- * grows. What we should possibly do is, add an API call for the client to say that it received everything it has to for
- * the job (after it receives all the results) completely. Then we can just get rid of the location information for that
- * job.
- */
-public class DatasetDirectoryService implements IDatasetDirectoryService {
- private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
-
- public DatasetDirectoryService() {
- jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
- }
-
- @Override
- public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions, NetworkAddress networkAddress) {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- if (rsMap == null) {
- rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
- jobResultLocationsMap.put(jobId, rsMap);
- }
-
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null) {
- resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
- rsMap.put(rsId, resultSetMetaData);
- }
-
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- if (records[partition] == null) {
- records[partition] = new DatasetDirectoryRecord();
- }
- records[partition].setNetworkAddress(networkAddress);
- records[partition].start();
- notifyAll();
- }
-
- @Override
- public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
- DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
- ddr.writeEOS();
- }
-
- @Override
- public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
- ddr.fail();
- }
-
- @Override
- public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
- Map<ResultSetId, ResultSetMetaData> rsMap;
- while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
-
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
- throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
- }
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-
- ArrayList<Status> statuses = new ArrayList<Status>(records.length);
- for (int i = 0; i < records.length; i++) {
- statuses.add(records[i].getStatus());
- }
-
- // Default status is idle
- Status status = Status.IDLE;
- if (statuses.contains(Status.FAILED)) {
- // Even if there is at least one failed entry we should return failed status.
- return Status.FAILED;
- } else if (statuses.contains(Status.RUNNING)) {
- // If there are not failed entry and if there is at least one running entry we should return running status.
- return Status.RUNNING;
- } else {
- // If each and every partition has reported success do we report success as the status.
- int successCount = 0;
- for (int i = 0; i < statuses.size(); i++) {
- if (statuses.get(i) == Status.SUCCESS) {
- successCount++;
- }
- }
- if (successCount == statuses.size()) {
- return Status.SUCCESS;
- }
- }
- return status;
- }
-
- @Override
- public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
- DatasetDirectoryRecord[] newRecords;
- while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- return newRecords;
- }
-
- public DatasetDirectoryRecord getDatasetDirectoryRecord(JobId jobId, ResultSetId rsId, int partition) {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- return records[partition];
- }
-
- /**
- * Compares the records already known by the client for the given job's result set id with the records that the
- * dataset directory service knows and if there are any newly discovered records returns a whole array with the
- * new records filled in.
- * This method has a very convoluted logic. Here is the explanation of how it works.
- * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
- * the order of the partitions. It always traverses the array in the first to last order!
- * If known records array or the first element in that array is null in the but the record for that partition now
- * known to the directory service, the method fills in that record in the array and returns the array back.
- * However, if the first known null record is not a first element in the array, by induction, all the previous
- * known records should be known already be known to client and none of the records for the partitions ahead is
- * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
- * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
- * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
- * If the client has reached the end of stream record for the last known non-null record, we check if the next record
- * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
- * send null otherwise.
- * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
- * arrays are equal and if they are not we send the entire new updated array.
- *
- * @param jobId
- * - Id of the job for which the directory records should be retrieved.
- * @param rsId
- * - Id of the result set for which the directory records should be retrieved.
- * @param knownRecords
- * - An array of directory records that the client is already aware of.
- * @return
- * - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
- * @throws HyracksDataException
- * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
- * every check. This already looks very expensive.
- */
- private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
- throws HyracksDataException {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- if (rsMap == null) {
- return null;
- }
-
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
- throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
- }
-
- boolean ordered = resultSetMetaData.getOrderedResult();
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
- * we can simply check if there are any newly discovered records and send the whole array back if there are.
- */
- if (ordered) {
- // Iterate over the known records and find the last record which is not null.
- int i = 0;
- for (i = 0; i < records.length; i++) {
- if (knownRecords == null) {
- if (records[0] != null) {
- knownRecords = new DatasetDirectoryRecord[records.length];
- knownRecords[0] = records[0];
- return knownRecords;
- }
- return null;
- }
- if (knownRecords[i] == null) {
- if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) {
- knownRecords[i] = records[i];
- return knownRecords;
- }
- return null;
- }
- }
- } else {
- if (!Arrays.equals(records, knownRecords)) {
- return records;
- }
- }
- return null;
- }
-
- private class ResultSetMetaData {
- private final boolean ordered;
-
- private final DatasetDirectoryRecord[] records;
-
- public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
- this.ordered = ordered;
- this.records = records;
- }
-
- public boolean getOrderedResult() {
- return ordered;
- }
-
- public DatasetDirectoryRecord[] getRecords() {
- return records;
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
deleted file mode 100644
index 3ac6acc..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetDatasetDirectoryServiceInfoWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
-
- private final IResultCallback<NetworkAddress> callback;
-
- public GetDatasetDirectoryServiceInfoWork(ClusterControllerService ccs, IResultCallback<NetworkAddress> callback) {
- this.ccs = ccs;
- this.callback = callback;
- }
-
- @Override
- public void doRun() {
- try {
- NetworkAddress addr = ccs.getDatasetDirectoryServiceInfo();
- callback.setValue(addr);
- } catch (Exception e) {
- callback.setException(e);
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index a787b9f..2f23a2c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,8 +39,7 @@
Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
- result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
- .getValue().getDatasetPort()));
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort()));
}
callback.setValue(result);
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
deleted file mode 100644
index fd1d418..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetResultPartitionLocationsWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final DatasetDirectoryRecord[] knownRecords;
-
- private final IResultCallback<DatasetDirectoryRecord[]> callback;
-
- public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.knownRecords = knownRecords;
- this.callback = callback;
- }
-
- @Override
- public void doRun() {
- final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
- knownRecords);
- callback.setValue(partitionLocations);
- } catch (HyracksDataException e) {
- callback.setException(e);
- }
- }
- });
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
deleted file mode 100644
index d2dadf5..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetResultStatusWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final IResultCallback<Status> callback;
-
- public GetResultStatusWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- IResultCallback<Status> callback) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.callback = callback;
- }
-
- @Override
- public void doRun() {
- try {
- Status status = ccs.getDatasetDirectoryService().getResultStatus(jobId, rsId);
- callback.setValue(status);
- } catch (HyracksDataException e) {
- callback.setException(e);
- }
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId;
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index b062d33..b6a33cd 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -16,7 +16,6 @@
import java.util.EnumSet;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
deleted file mode 100644
index f86e924..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class RegisterResultPartitionLocationWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final boolean orderedResult;
-
- private final int partition;
-
- private final int nPartitions;
-
- private final NetworkAddress networkAddress;
-
- public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.orderedResult = orderedResult;
- this.partition = partition;
- this.nPartitions = nPartitions;
- this.networkAddress = networkAddress;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
- nPartitions, networkAddress);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
- + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
deleted file mode 100644
index 4aea41e..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionFailureWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
deleted file mode 100644
index 313b730..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionWriteCompletionWork extends AbstractWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionWriteCompletionWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- int partition) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public void run() {
- ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
- }
-
- @Override
- public String toString() {
- return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index 3fc46ff..ff9d8a0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -58,10 +58,6 @@
var netPayloadBytesWritten = result['net-payload-bytes-written'];
var netSignalingBytesRead = result['net-signaling-bytes-read'];
var netSignalingBytesWritten = result['net-signaling-bytes-written'];
- var datasetNetPayloadBytesRead = result['dataset-net-payload-bytes-read'];
- var datasetNetPayloadBytesWritten = result['dataset-net-payload-bytes-written'];
- var datasetNetSignalingBytesRead = result['dataset-net-signaling-bytes-read'];
- var datasetNetSignalingBytesWritten = result['dataset-net-signaling-bytes-written'];
var ipcMessagesSent = result['ipc-messages-sent'];
var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
var ipcMessagesReceived = result['ipc-messages-received'];
@@ -121,13 +117,9 @@
}
if (i < sysLoad.length - 1) {
netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
- netPayloadReadBWArray.push([ i, computeRate(datasetNetPayloadBytesRead, rrdPtr) ]);
netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
- netPayloadWriteBWArray.push([ i, computeRate(datasetNetPayloadBytesWritten, rrdPtr) ]);
netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
- netSignalingReadBWArray.push([ i, computeRate(datasetNetSignalingBytesRead, rrdPtr) ]);
netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
- netSignalingWriteBWArray.push([ i, computeRate(etSignalingBytesWritten, rrdPtr) ]);
ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 2efdd42..096f2e1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 55e4479..0c5bb2f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -16,9 +16,7 @@
import java.util.List;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
@@ -48,13 +46,6 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
- int nPartitions, NetworkAddress networkAddress) throws Exception;
-
- public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
-
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception;
-
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 9b34810..167eb4b 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -37,9 +37,6 @@
@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
- @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
- public String datasetIPAddress;
-
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -69,7 +66,6 @@
cList.add(nodeId);
cList.add("-data-ip-address");
cList.add(dataIPAddress);
- cList.add(datasetIPAddress);
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-dcache-client-servers");
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index a897602..91cfecf 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -33,8 +33,6 @@
private final NetworkAddress dataPort;
- private final NetworkAddress datasetPort;
-
private final String osName;
private final String arch;
@@ -62,14 +60,13 @@
private final HeartbeatSchema hbSchema;
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
- NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
- String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
- List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+ String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
+ String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
+ Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
- this.datasetPort = datasetPort;
this.osName = osName;
this.arch = arch;
this.osVersion = osVersion;
@@ -101,10 +98,6 @@
return dataPort;
}
- public NetworkAddress getDatasetPort() {
- return datasetPort;
- }
-
public String getOSName() {
return osName;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 663c68a..1dba3bc 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,10 +37,6 @@
public long netPayloadBytesWritten;
public long netSignalingBytesRead;
public long netSignalingBytesWritten;
- public long datasetNetPayloadBytesRead;
- public long datasetNetPayloadBytesWritten;
- public long datasetNetSignalingBytesRead;
- public long datasetNetSignalingBytesWritten;
public long ipcMessagesSent;
public long ipcMessageBytesSent;
public long ipcMessagesReceived;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index b506b12..557a8cb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -36,7 +36,6 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -69,9 +68,6 @@
REPORT_PROFILE,
REGISTER_PARTITION_PROVIDER,
REGISTER_PARTITION_REQUEST,
- REGISTER_RESULT_PARTITION_LOCATION,
- REPORT_RESULT_PARTITION_WRITE_COMPLETION,
- REPORT_RESULT_PARTITION_FAILURE,
APPLICATION_STATE_CHANGE_RESPONSE,
NODE_REGISTRATION_RESULT,
@@ -442,127 +438,6 @@
}
}
- public static class RegisterResultPartitionLocationFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final boolean orderedResult;
-
- private final int partition;
-
- private final int nPartitions;
-
- private NetworkAddress networkAddress;
-
- public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions, NetworkAddress networkAddress) {
- this.jobId = jobId;
- this.rsId = rsId;
- this.orderedResult = orderedResult;
- this.partition = partition;
- this.nPartitions = nPartitions;
- this.networkAddress = networkAddress;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REGISTER_RESULT_PARTITION_LOCATION;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
-
- public boolean getOrderedResult() {
- return orderedResult;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public int getNPartitions() {
- return nPartitions;
- }
-
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
- }
- }
-
- public static class ReportResultPartitionWriteCompletionFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionWriteCompletionFunction(JobId jobId, ResultSetId rsId, int partition) {
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
-
- public int getPartition() {
- return partition;
- }
- }
-
- public static class ReportResultPartitionFailureFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final int partition;
-
- public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) {
- this.jobId = jobId;
- this.rsId = rsId;
- this.partition = partition;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
-
- public int getPartition() {
- return partition;
- }
- }
-
public static class ApplicationStateChangeResponseFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 091a5d2..bbaab4e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -16,9 +16,7 @@
import java.util.List;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -97,28 +95,6 @@
}
@Override
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
- int nPartitions, NetworkAddress networkAddress) throws Exception {
- CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
- jobId, rsId, orderedResult, partition, nPartitions, networkAddress);
- ipcHandle.send(-1, fn, null);
- }
-
- @Override
- public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(
- jobId, rsId, partition);
- ipcHandle.send(-1, fn, null);
- }
-
- @Override
- public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
- CCNCFunctions.ReportResultPartitionFailureFunction fn = new CCNCFunctions.ReportResultPartitionFailureFunction(
- jobId, rsId, partition);
- ipcHandle.send(-1, fn, null);
- }
-
- @Override
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
nodeId, appName, status);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index f103f4a..9ecc083 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
@@ -40,11 +40,6 @@
<artifactId>hyracks-net</artifactId>
<version>0.2.3-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-comm</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 290c59c..0195143 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -45,7 +45,6 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -62,9 +61,7 @@
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
-import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -97,10 +94,6 @@
private final NetworkManager netManager;
- private final IDatasetPartitionManager datasetPartitionManager;
-
- private final DatasetNetworkManager datasetNetworkManager;
-
private final WorkQueue queue;
private final Timer timer;
@@ -147,11 +140,7 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
-
- datasetPartitionManager = new DatasetPartitionManager(this, executor);
- datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
- datasetPartitionManager, ncConfig.nNetThreads);
+ netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -216,7 +205,6 @@
LOGGER.log(Level.INFO, "Starting NodeControllerService");
ipc.start();
netManager.start();
- datasetNetworkManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -225,11 +213,10 @@
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
- datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
- .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
- .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
- .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema));
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+ .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -260,10 +247,8 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
executor.shutdownNow();
partitionManager.close();
- datasetPartitionManager.close();
heartbeatTask.cancel();
netManager.stop();
- datasetNetworkManager.stop();
queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -288,10 +273,6 @@
return netManager;
}
- public DatasetNetworkManager getDatasetNetworkManager() {
- return datasetNetworkManager;
- }
-
public PartitionManager getPartitionManager() {
return partitionManager;
}
@@ -316,7 +297,8 @@
return queue;
}
- private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
+ private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+ String ipaddrStr = ncConfig.dataIPAddress;
ipaddrStr = ipaddrStr.trim();
Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
Matcher m = pattern.matcher(ipaddrStr);
@@ -373,12 +355,6 @@
hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
- MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
- hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
- hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
- hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
- hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
-
IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
@@ -483,10 +459,6 @@
}
}
- public IDatasetPartitionManager getDatasetPartitionManager() {
- return datasetPartitionManager;
- }
-
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
ccs.sendApplicationMessageToCC(data, appName, nodeId);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 5a3e9dd..eba3ec9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -34,7 +34,6 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -349,11 +348,6 @@
}
@Override
- public IDatasetPartitionManager getDatasetPartitionManager() {
- return ncs.getDatasetPartitionManager();
- }
-
- @Override
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext()
.getApplicationName(), nodeId);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
deleted file mode 100644
index 85f17b1..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.dataset;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
-import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
-
-public class DatasetPartitionManager implements IDatasetPartitionManager {
- private final NodeControllerService ncs;
-
- private final Executor executor;
-
- private final Map<JobId, DatasetPartitionWriter[]> partitionDatasetWriterMap;
-
- private final DefaultDeallocatableRegistry deallocatableRegistry;
-
- private final IWorkspaceFileFactory fileFactory;
-
- public DatasetPartitionManager(NodeControllerService ncs, Executor executor) {
- this.ncs = ncs;
- this.executor = executor;
- partitionDatasetWriterMap = new HashMap<JobId, DatasetPartitionWriter[]>();
- deallocatableRegistry = new DefaultDeallocatableRegistry();
- fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
- }
-
- @Override
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions) throws HyracksException {
- DatasetPartitionWriter dpw = null;
- JobId jobId = ctx.getJobletContext().getJobId();
- try {
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
- nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, executor);
-
- DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
- if (writers == null) {
- writers = new DatasetPartitionWriter[nPartitions];
- partitionDatasetWriterMap.put(jobId, writers);
- }
- writers[partition] = dpw;
- } catch (Exception e) {
- throw new HyracksException(e);
- }
-
- return dpw;
- }
-
- @Override
- public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
- try {
- ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
- public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
- try {
- ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
- public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
- throws HyracksException {
- DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
- if (writers == null) {
- throw new HyracksException("Unknown JobId " + jobId);
- }
-
- DatasetPartitionWriter dpw = writers[partition];
- if (dpw == null) {
- throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
- }
-
- dpw.writeTo(writer);
- }
-
- @Override
- public IWorkspaceFileFactory getFileFactory() {
- return fileFactory;
- }
-
- @Override
- public void close() {
- deallocatableRegistry.close();
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
deleted file mode 100644
index 0f5895e..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.dataset;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.IPartition;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-
-public class DatasetPartitionWriter implements IFrameWriter, IPartition {
- private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
-
- private static final String FILE_PREFIX = "result_";
-
- private final IHyracksTaskContext ctx;
-
- private final IDatasetPartitionManager manager;
-
- private final JobId jobId;
-
- private final ResultSetId resultSetId;
-
- private final int partition;
-
- private final Executor executor;
-
- private final AtomicBoolean eos;
-
- private FileReference fRef;
-
- private IFileHandle handle;
-
- private long size;
-
- public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, int partition, Executor executor) {
- this.ctx = ctx;
- this.manager = manager;
- this.jobId = jobId;
- this.resultSetId = rsId;
- this.partition = partition;
- this.executor = executor;
- eos = new AtomicBoolean(false);
- }
-
- @Override
- public void open() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("open(" + partition + ")");
- }
- fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(FILE_PREFIX + String.valueOf(partition));
- handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- size = 0;
- }
-
- @Override
- public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- size += ctx.getIOManager().syncWrite(handle, size, buffer);
- notifyAll();
- }
-
- @Override
- public void fail() throws HyracksDataException {
- try {
- manager.reportPartitionFailure(jobId, resultSetId, partition);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public synchronized void close() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("close(" + partition + ")");
- }
-
- try {
- eos.set(true);
- notifyAll();
- manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public IHyracksTaskContext getTaskContext() {
- return ctx;
- }
-
- private synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- while (offset >= size && !eos.get()) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- return ctx.getIOManager().syncRead(handle, offset, buffer);
- }
-
- @Override
- public void writeTo(final IFrameWriter writer) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- NetworkOutputChannel channel = (NetworkOutputChannel) writer;
- channel.setTaskContext(ctx);
- try {
- channel.open();
- try {
- long offset = 0;
- ByteBuffer buffer = ctx.allocateFrame();
- while (true) {
- buffer.clear();
- long size = read(offset, buffer);
- if (size < 0) {
- break;
- } else if (size < buffer.capacity()) {
- throw new HyracksDataException("Premature end of file");
- }
- offset += size;
- buffer.flip();
- channel.nextFrame(buffer);
- }
- } finally {
- channel.close();
- ctx.getIOManager().close(handle);
- }
- } catch (HyracksDataException e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- @Override
- public boolean isReusable() {
- return true;
- }
-
- @Override
- public void deallocate() {
-
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
deleted file mode 100644
index 5b8b333..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
-import edu.uci.ics.hyracks.net.exceptions.NetException;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-
-public class DatasetNetworkManager implements IChannelConnectionFactory {
- private static final Logger LOGGER = Logger.getLogger(DatasetNetworkManager.class.getName());
-
- private static final int MAX_CONNECTION_ATTEMPTS = 5;
-
- static final int INITIAL_MESSAGE_SIZE = 20;
-
- private final IDatasetPartitionManager partitionManager;
-
- private final MuxDemux md;
-
- private NetworkAddress networkAddress;
-
- public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
- throws IOException {
- this.partitionManager = partitionManager;
- md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS);
- }
-
- public void start() throws IOException {
- md.start();
- InetSocketAddress sockAddr = md.getLocalAddress();
- networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
- }
-
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
- }
-
- public void stop() {
-
- }
-
- public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
- MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
- return mConn.openChannel();
- }
-
- private class ChannelOpenListener implements IChannelOpenListener {
- @Override
- public void channelOpened(ChannelControlBlock channel) {
- channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
- channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
- }
- }
-
- private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
- private final ChannelControlBlock ccb;
-
- private NetworkOutputChannel noc;
-
- public InitialBufferAcceptor(ChannelControlBlock ccb) {
- this.ccb = ccb;
- }
-
- @Override
- public void accept(ByteBuffer buffer) {
- JobId jobId = new JobId(buffer.getLong());
- int partition = buffer.getInt();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
- + partition + " on channel: " + ccb);
- }
- noc = new NetworkOutputChannel(ccb, 1);
- try {
- partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
- } catch (HyracksException e) {
- noc.abort();
- }
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void error(int ecode) {
- if (noc != null) {
- noc.abort();
- }
- }
- }
-
- public MuxDemuxPerformanceCounters getPerformanceCounters() {
- return md.getPerformanceCounters();
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
similarity index 89%
rename from hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
rename to hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index aa37b16..1d5af84 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.comm.channels;
+package edu.uci.ics.hyracks.control.nc.net;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -33,9 +33,7 @@
public class NetworkInputChannel implements IInputChannel {
private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
- static final int INITIAL_MESSAGE_SIZE = 20;
-
- private final IChannelConnectionFactory netManager;
+ private final NetworkManager netManager;
private final SocketAddress remoteAddress;
@@ -51,8 +49,8 @@
private Object attachment;
- public NetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress,
- PartitionId partitionId, int nBuffers) {
+ public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
+ int nBuffers) {
this.netManager = netManager;
this.remoteAddress = remoteAddress;
this.partitionId = partitionId;
@@ -87,7 +85,7 @@
}
@Override
- public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
try {
ccb = netManager.connect(remoteAddress);
} catch (Exception e) {
@@ -98,7 +96,7 @@
for (int i = 0; i < nBuffers; ++i) {
ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
}
- ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
+ ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
writeBuffer.putLong(partitionId.getJobId().getId());
writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
writeBuffer.putInt(partitionId.getSenderIndex());
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index c8e4e94..b805595 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -27,8 +27,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
@@ -38,7 +36,7 @@
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-public class NetworkManager implements IChannelConnectionFactory {
+public class NetworkManager {
private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
private static final int MAX_CONNECTION_ATTEMPTS = 5;
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
similarity index 97%
rename from hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
rename to hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index df910be..9024e18 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.comm.channels;
+package edu.uci.ics.hyracks.control.nc.net;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -87,7 +87,7 @@
ccb.getWriteInterface().getFullBufferAcceptor().close();
}
- public void abort() {
+ void abort() {
ccb.getWriteInterface().getFullBufferAcceptor().error(1);
synchronized (NetworkOutputChannel.this) {
aborted = true;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index ba6e6c3..16e31f7 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,7 +21,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -82,7 +82,7 @@
}
@Override
- public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index e532a0b..45c091a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -28,12 +28,12 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class PartitionManager {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index 7ed9d11..bb9669d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -22,10 +22,10 @@
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
public class ReportPartitionAvailabilityWork extends AbstractWork {
private final NodeControllerService ncs;
diff --git a/hyracks/hyracks-data/hyracks-data-std/pom.xml b/hyracks/hyracks-data/hyracks-data-std/pom.xml
index 85c8fd5..3f051f9 100644
--- a/hyracks/hyracks-data/hyracks-data-std/pom.xml
+++ b/hyracks/hyracks-data/hyracks-data-std/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-dataflow-common/pom.xml b/hyracks/hyracks-dataflow-common/pom.xml
index 393e97f..a0ffb66 100644
--- a/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks/hyracks-dataflow-common/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
deleted file mode 100644
index 95833b1..0000000
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.common.comm.io;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-
-public class FrameOutputStream extends ByteArrayAccessibleOutputStream {
- private static final Logger LOGGER = Logger.getLogger(FrameOutputStream.class.getName());
-
- private final FrameTupleAppender frameTupleAppender;
-
- public FrameOutputStream(int frameSize) {
- this.frameTupleAppender = new FrameTupleAppender(frameSize);
- }
-
- public void reset(ByteBuffer buffer, boolean clear) {
- frameTupleAppender.reset(buffer, clear);
- }
-
- public int getTupleCount() {
- int tupleCount = frameTupleAppender.getTupleCount();
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("appendTuple(): tuple count: " + tupleCount);
- }
- return tupleCount;
- }
-
- public boolean appendTuple() {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("appendTuple(): tuple size: " + count);
- }
- boolean appended = frameTupleAppender.append(buf, 0, count);
- count = 0;
- return appended;
- }
-}
diff --git a/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks/hyracks-dataflow-hadoop/pom.xml
index c01a6a7..f9d5153 100644
--- a/hyracks/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks/hyracks-dataflow-hadoop/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index 0cb5516..bf27d20 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
deleted file mode 100644
index e500307..0000000
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.result;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameOutputStream;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-
-public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
-
- private final ResultSetId rsId;
-
- private final boolean ordered;
-
- private final RecordDescriptor recordDescriptor;
-
- private final IResultSerializerFactory resultSerializerFactory;
-
- public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
- RecordDescriptor recordDescriptor, IResultSerializerFactory resultSerializerFactory) throws IOException {
- super(spec, 1, 0);
- this.rsId = rsId;
- this.ordered = ordered;
- this.recordDescriptor = recordDescriptor;
- this.resultSerializerFactory = resultSerializerFactory;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
-
- final ByteBuffer outputBuffer = ctx.allocateFrame();
-
- final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
- frameOutputStream.reset(outputBuffer, true);
- PrintStream printStream = new PrintStream(frameOutputStream);
-
- final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(printStream);
-
- final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
-
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- IFrameWriter datasetPartitionWriter;
-
- @Override
- public void open() throws HyracksDataException {
- try {
- datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition, nPartitions);
- datasetPartitionWriter.open();
- resultSerializer.init();
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- frameTupleAccessor.reset(buffer);
- for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
- resultSerializer.appendTuple(frameTupleAccessor, tIndex);
- if (!frameOutputStream.appendTuple()) {
- datasetPartitionWriter.nextFrame(outputBuffer);
- frameOutputStream.reset(outputBuffer, true);
-
- /* TODO(madhusudancs): This works under the assumption that no single serialized record is
- * longer than the buffer size.
- */
- resultSerializer.appendTuple(frameTupleAccessor, tIndex);
- frameOutputStream.appendTuple();
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- datasetPartitionWriter.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (frameOutputStream.getTupleCount() > 0) {
- datasetPartitionWriter.nextFrame(outputBuffer);
- frameOutputStream.reset(outputBuffer, true);
- }
- datasetPartitionWriter.close();
- }
- };
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-dist/pom.xml b/hyracks/hyracks-dist/pom.xml
index 58a4b1c..c86c4e6 100755
--- a/hyracks/hyracks-dist/pom.xml
+++ b/hyracks/hyracks-dist/pom.xml
@@ -23,8 +23,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
index dce275f..ba296ac 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -37,8 +37,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml b/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
index d94feb7..8e90606 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
@@ -41,8 +41,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
index 95cfaff..70c663d 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
@@ -147,8 +147,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
index d260601..dd96db8 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
@@ -33,8 +33,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
index f61b9e8..1e029be 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
@@ -32,8 +32,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index 0df38e7..e31af75 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -16,8 +16,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
@@ -31,13 +31,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-client</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 2fcd626..023bdd9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -16,7 +16,6 @@
import java.io.File;
import java.io.IOException;
-import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -31,16 +30,9 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
-import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
-import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
@@ -88,7 +80,6 @@
ncConfig1.ccPort = 39001;
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -98,7 +89,6 @@
ncConfig2.ccPort = 39001;
ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -125,50 +115,6 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jobId.toString());
}
-
- // My code
- int nReaders = 5;
- DatasetClientContext datasetClientCtx = new DatasetClientContext(spec.getFrameSize());
- IHyracksDataset hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, nReaders);
-
- hyracksDataset.open(jobId, spec.getResultSetIds().get(0));
- /* FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
- recordDescriptor);
-
- ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream di = new DataInputStream(bbis);
-
- while (true) {
- readBuffer.clear();
- int size = hyracksDataset.read(readBuffer);
- if (size <= 0) {
- break;
- }
- try {
- frameTupleAccessor.reset(readBuffer);
- System.out.println("Tuple count: " + recordDescriptor);
- for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
- int start = frameTupleAccessor.getTupleStartOffset(tIndex)
- + frameTupleAccessor.getFieldSlotsLength();
- bbis.setByteBuffer(readBuffer, start);
- Object[] record = new Object[recordDescriptor.getFieldCount()];
- for (int i = 0; i < record.length; ++i) {
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (i == 0) {
- System.out.print(String.valueOf(instance));
- } else {
- System.out.print(", " + String.valueOf(instance));
- }
- }
- System.out.println();
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- // End of my code
- */
hcc.waitForCompletion(jobId);
dumpOutputFiles();
}
@@ -198,27 +144,4 @@
outputFiles.add(tempFile);
return tempFile;
}
-
- protected IResultSerializerFactory getResultSerializedAppenderFactory() {
- return new IResultSerializerFactory() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IResultSerializer createResultSerializer(final PrintStream printStream) {
- return new IResultSerializer() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void init() throws HyracksDataException {
-
- }
-
- @Override
- public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
- return true;
- }
- };
- }
- };
- }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index ab9605b..2c3fddf 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -24,7 +24,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -41,7 +40,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
@@ -74,9 +73,9 @@
ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, ordersDesc,
- getResultSerializedAppenderFactory());
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -88,8 +87,6 @@
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
.of(UTF8StringPointable.FACTORY) }), sorter, 0, printer, 0);
- spec.addResultSetId(rsId);
-
runTest(spec);
}
@@ -121,9 +118,9 @@
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- ResultSetId rsId = new ResultSetId(1);
- IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, ordersDesc,
- getResultSerializedAppenderFactory());
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -138,8 +135,6 @@
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
printer, 0);
- spec.addResultSetId(rsId);
-
runTest(spec);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/text-example/textapp/pom.xml b/hyracks/hyracks-examples/text-example/textapp/pom.xml
index 3834d08..8ac69d8 100644
--- a/hyracks/hyracks-examples/text-example/textapp/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textapp/pom.xml
@@ -142,8 +142,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index af72b71..901f1fb2 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -33,8 +33,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-examples/text-example/texthelper/pom.xml b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
index 5095db8..8e32c8c 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/pom.xml
+++ b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
@@ -36,8 +36,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index 0f8d8fc..6b3f603 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -29,8 +29,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml
index 9a907b8..3426293 100644
--- a/hyracks/hyracks-hadoop-compat/pom.xml
+++ b/hyracks/hyracks-hadoop-compat/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index b33e8e2..e74d610 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index 07b244f..27a1e33 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -17,8 +17,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index 6557b08..fccfec4 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
index 0087307..4b8a278 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -41,23 +41,17 @@
public void testSchedulerSimple() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
InputSplit[] fileSplits = new InputSplit[6];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -85,23 +79,17 @@
public void testSchedulerLargerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
InputSplit[] fileSplits = new InputSplit[12];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -136,23 +124,17 @@
public void testSchedulerSmallerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
InputSplit[] fileSplits = new InputSplit[12];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -187,23 +169,17 @@
public void testSchedulerSmallerHDFSOdd() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
InputSplit[] fileSplits = new InputSplit[13];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
index 79f0874..ea2af13 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -46,23 +46,17 @@
public void testSchedulerSimple() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -90,23 +84,17 @@
public void testSchedulerLargerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -141,23 +129,17 @@
public void testSchedulerSmallerHDFS() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -192,23 +174,17 @@
public void testSchedulerSmallerHDFSOdd() throws Exception {
Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.1").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.1")
- .getAddress(), 5098)));
+ .getByName("10.0.0.1").getAddress(), 5099)));
ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.2").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.2")
- .getAddress(), 5098)));
+ .getByName("10.0.0.2").getAddress(), 5099)));
ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.3").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.3")
- .getAddress(), 5098)));
+ .getByName("10.0.0.3").getAddress(), 5099)));
ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.4").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.4")
- .getAddress(), 5098)));
+ .getByName("10.0.0.4").getAddress(), 5099)));
ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.5").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.5")
- .getAddress(), 5098)));
+ .getByName("10.0.0.5").getAddress(), 5099)));
ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress(InetAddress
- .getByName("10.0.0.6").getAddress(), 5099), new NetworkAddress(InetAddress.getByName("10.0.0.6")
- .getAddress(), 5098)));
+ .getByName("10.0.0.6").getAddress(), 5099)));
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
diff --git a/hyracks/hyracks-ipc/pom.xml b/hyracks/hyracks-ipc/pom.xml
index a5e3662..cb0de08 100644
--- a/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks/hyracks-ipc/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
index e95f7f0..50c05da 100644
--- a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
+++ b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
@@ -17,8 +17,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java
index fc06a68..47de024 100644
--- a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java
+++ b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java
@@ -55,7 +55,6 @@
cmdLineBuffer.append(" -data-ip-address ").append(dataIpAddress);
cmdLineBuffer.append(" -node-id ").append(nodeId);
cmdLineBuffer.append(" -cluster-net-ip-address 127.0.0.1");
- cmdLineBuffer.append(" -result-ip-address 127.0.0.1");
if (ccPort != 0) {
cmdLineBuffer.append(" -cc-port ").append(ccPort);
}
diff --git a/hyracks/hyracks-net/pom.xml b/hyracks/hyracks-net/pom.xml
index 5eb88b5..a079306 100644
--- a/hyracks/hyracks-net/pom.xml
+++ b/hyracks/hyracks-net/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index e4df6b9..c719bc4 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -48,9 +48,9 @@
* Constructor.
*
* @param localAddress
- * - TCP/IP socket address to listen on. Null for non-listening unidirectional sockets
+ * - TCP/IP socket address to listen on
* @param listener
- * - Callback interface to report channel events. Null for non-listening unidirectional sockets
+ * - Callback interface to report channel events
* @param nThreads
* - Number of threads to use for data transfer
* @param maxConnectionAttempts
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index a9061e1..d13a17e 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -45,23 +45,15 @@
}
public void start(InetSocketAddress localAddress) throws IOException {
- // Setup a server socket listening channel only if the TCPEndpoint is a listening endpoint.
- if (localAddress != null) {
- serverSocketChannel = ServerSocketChannel.open();
- ServerSocket serverSocket = serverSocketChannel.socket();
- serverSocket.bind(localAddress);
- this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
- }
-
+ serverSocketChannel = ServerSocketChannel.open();
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(localAddress);
+ this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
ioThreads = new IOThread[nThreads];
for (int i = 0; i < ioThreads.length; ++i) {
ioThreads[i] = new IOThread();
}
-
- if (localAddress != null) {
- ioThreads[0].registerServerSocket(serverSocketChannel);
- }
-
+ ioThreads[0].registerServerSocket(serverSocketChannel);
for (int i = 0; i < ioThreads.length; ++i) {
ioThreads[i].start();
}
diff --git a/hyracks/hyracks-server/pom.xml b/hyracks/hyracks-server/pom.xml
index f514820..6c6640e 100644
--- a/hyracks/hyracks-server/pom.xml
+++ b/hyracks/hyracks-server/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index ccb3b41..118d46d 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-storage-am-common/pom.xml b/hyracks/hyracks-storage-am-common/pom.xml
index a90e91e..0b32733 100644
--- a/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks/hyracks-storage-am-common/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-storage-am-invertedindex/pom.xml b/hyracks/hyracks-storage-am-invertedindex/pom.xml
index 2ba4980..a647e9d 100644
--- a/hyracks/hyracks-storage-am-invertedindex/pom.xml
+++ b/hyracks/hyracks-storage-am-invertedindex/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-storage-am-rtree/pom.xml b/hyracks/hyracks-storage-am-rtree/pom.xml
index ade69ef..61620ec 100644
--- a/hyracks/hyracks-storage-am-rtree/pom.xml
+++ b/hyracks/hyracks-storage-am-rtree/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-storage-common/pom.xml b/hyracks/hyracks-storage-common/pom.xml
index ee507cc..289171a 100644
--- a/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks/hyracks-storage-common/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-test-support/pom.xml b/hyracks/hyracks-test-support/pom.xml
index 2f10556..25a5378 100644
--- a/hyracks/hyracks-test-support/pom.xml
+++ b/hyracks/hyracks-test-support/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 0ca93b2..c122b25 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -20,7 +20,6 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -102,11 +101,6 @@
}
@Override
- public IDatasetPartitionManager getDatasetPartitionManager() {
- return null;
- }
-
- @Override
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
// TODO Auto-generated method stub
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
index ebd8bcc..7b03a71 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
index bba6a0e..2cf6ce2 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
index 72d9a78..ea86042 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
index 6063de5..bd10e13 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
@@ -18,8 +18,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml b/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml
index d33ddc5..9e453a6 100644
--- a/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml
+++ b/hyracks/hyracks-yarn/hyracks-yarn-am/pom.xml
@@ -14,8 +14,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml b/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml
index 649aa6c..08935a7 100644
--- a/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml
+++ b/hyracks/hyracks-yarn/hyracks-yarn-client/pom.xml
@@ -14,8 +14,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml b/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml
index fe210fd..3aaf4a2 100644
--- a/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml
+++ b/hyracks/hyracks-yarn/hyracks-yarn-common/pom.xml
@@ -14,8 +14,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks/pom.xml b/hyracks/pom.xml
index b699542..09925fb 100644
--- a/hyracks/pom.xml
+++ b/hyracks/pom.xml
@@ -83,8 +83,6 @@
<modules>
<module>hyracks-ipc</module>
<module>hyracks-api</module>
- <module>hyracks-comm</module>
- <module>hyracks-client</module>
<module>hyracks-dataflow-common</module>
<module>hyracks-dataflow-std</module>
<module>hyracks-dataflow-hadoop</module>