Merge fullstack_hyracks_result_distribution back to fullstack_asterix_stabilization.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3024 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index a969372..32cfb9a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -7,6 +7,7 @@
BTREE_SEARCH,
STATS,
DATASOURCE_SCAN,
+ DISTRIBUTE_RESULT,
EMPTY_TUPLE_SOURCE,
EXTERNAL_GROUP_BY,
IN_MEMORY_HASH_JOIN,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 899b633..82187e3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -51,6 +51,10 @@
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
throws AlgebricksException;
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+ JobSpecification spec) throws AlgebricksException;
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
new file mode 100644
index 0000000..d835da4
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class DistributeResultPOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.DISTRIBUTE_RESULT;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ WriteOperator write = (WriteOperator) op;
+ IDataSink sink = write.getDataSink();
+ IPartitioningProperty pp = sink.getPartitioningProperty();
+ StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+ return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ WriteOperator resultOp = (WriteOperator) op;
+ IMetadataProvider mp = context.getMetadataProvider();
+
+ JobSpecification spec = builder.getJobSpec();
+
+ int[] columns = new int[resultOp.getExpressions().size()];
+ int i = 0;
+ for (Mutable<ILogicalExpression> exprRef : resultOp.getExpressions()) {
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException("Only writing variable expressions is supported.");
+ }
+ VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+ LogicalVariable v = varRef.getVariableReference();
+ columns[i++] = inputSchemas[0].findVariable(v);
+ }
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+ IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
+ context, columns);
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
+ resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+
+ builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = resultOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, resultOp, 0);
+ }
+}
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..a6ebf01
--- /dev/null
+++ b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.data;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public interface IResultSerializerFactoryProvider extends Serializable {
+ /**
+ * Returns a result serializer factory
+ *
+ * @param fields
+ * - A position of the fields in the order it should be written in the output.
+ * @param printerFactories
+ * - A printer factory array to print the tuple containing different fields.
+ * @param writerFactory
+ * - A writer factory to write the serialized data to the print stream.
+ * @param inputRecordDesc
+ * - The record descriptor describing the input frame to be serialized.
+ * @return A new instance of result serialized appender.
+ */
+ public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
+ IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputRecordDesc);
+}
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index d678803..0efe0ae 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -62,13 +62,10 @@
@SuppressWarnings("unchecked")
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
- IDataSource<String> dataSource,
- List<LogicalVariable> scanVariables,
- List<LogicalVariable> projectVariables, boolean projectPushed,
- IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
- JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException {
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<String> dataSource,
+ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException {
PigletFileDataSource ds = (PigletFileDataSource) dataSource;
FileSplit[] fileSplits = ds.getFileSplits();
@@ -139,6 +136,13 @@
}
@Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+ JobSpecification spec) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public IDataSourceIndex<String, String> findDataSourceIndex(String indexId, String dataSourceId)
throws AlgebricksException {
return null;
@@ -187,7 +191,7 @@
// TODO Auto-generated method stub
return null;
}
-
+
@Override
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return FN_MAP.get(fid);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 38cf96e..666f01a 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -50,7 +51,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamDiePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
@@ -242,7 +242,7 @@
break;
}
case WRITE: {
- op.setPhysicalOperator(new SinkWritePOperator());
+ op.setPhysicalOperator(new DistributeResultPOperator());
break;
}
case WRITE_RESULT: {
@@ -267,8 +267,8 @@
List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
- op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
- opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys, opInsDel
+ .getFilterExpression(), opInsDel.getDataSourceIndex()));
break;
}
case SINK: {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index 148f087..28597a7 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -70,6 +70,7 @@
for (int t = 0; t < nTuple; t++) {
try {
writer.printTuple(tAccess, t);
+ printStream.println();
} catch (AlgebricksException ae) {
throw new HyracksDataException(ae);
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
index 9c53241..762e133 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/writers/PrinterBasedWriterFactory.java
@@ -47,7 +47,6 @@
}
printers[i].print(tAccess.getBuffer().array(), fldStart, fldLen, printStream);
}
- printStream.println();
}
};
}
diff --git a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index 5782703..fe24bc9 100644
--- a/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -121,6 +121,13 @@
}
@Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+ JobSpecification spec) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<T, S> dataSource,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
index a8f2fda..a4f0b29 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -16,7 +16,7 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IInputChannel {
@@ -30,7 +30,7 @@
public void recycleBuffer(ByteBuffer buffer);
- public void open(IHyracksTaskContext ctx) throws HyracksDataException;
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException;
public void close() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e34e60d..cd2b698 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -17,6 +17,8 @@
import java.io.Serializable;
import java.util.EnumSet;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -30,6 +32,10 @@
CREATE_JOB,
GET_JOB_STATUS,
START_JOB,
+ GET_DATASET_DIRECTORY_SERIVICE_INFO,
+ GET_DATASET_RESULT_STATUS,
+ GET_DATASET_RECORD_DESCRIPTOR,
+ GET_DATASET_RESULT_LOCATIONS,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO
}
@@ -156,6 +162,74 @@
}
}
+ public static class GetDatasetDirectoryServiceInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_DATASET_DIRECTORY_SERIVICE_INFO;
+ }
+ }
+
+ public static class GetDatasetResultStatusFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ public GetDatasetResultStatusFunction(JobId jobId, ResultSetId rsId) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_DATASET_RESULT_STATUS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+ }
+
+ public static class GetDatasetResultLocationsFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final DatasetDirectoryRecord[] knownRecords;
+
+ public GetDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.knownRecords = knownRecords;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_DATASET_RESULT_LOCATIONS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public DatasetDirectoryRecord[] getKnownRecords() {
+ return knownRecords;
+ }
+ }
+
public static class WaitForCompletionFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 4c06d42..2ab42c0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -17,6 +17,7 @@
import java.util.EnumSet;
import java.util.Map;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -76,6 +77,12 @@
}
@Override
+ public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
+ HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
+ return (NetworkAddress) rpci.call(ipcHandle, gddsf);
+ }
+
+ @Override
public void waitForCompletion(JobId jobId) throws Exception {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
jobId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 227524c..e0fafb0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,7 @@
import org.apache.http.impl.client.DefaultHttpClient;
import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -118,6 +119,10 @@
return hci.startJob(appName, JavaSerializationUtils.serialize(acggf), jobFlags);
}
+ public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
+ return hci.getDatasetDirectoryServiceInfo();
+ }
+
@Override
public void waitForCompletion(JobId jobId) throws Exception {
hci.waitForCompletion(jobId);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index bdbb544..6333c22 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -18,6 +18,7 @@
import java.util.EnumSet;
import java.util.Map;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -100,6 +101,14 @@
throws Exception;
/**
+ * Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
+ *
+ * @return {@link NetworkAddress}
+ * @throws Exception
+ */
+ public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
+
+ /**
* Waits until the specified job has completed, either successfully or has
* encountered a permanent failure.
*
@@ -123,4 +132,4 @@
* @throws Exception
*/
public ClusterTopology getClusterTopology() throws Exception;
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index ef5906e..22b0a8f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -17,6 +17,7 @@
import java.util.EnumSet;
import java.util.Map;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -35,6 +36,8 @@
public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+ public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
+
public void waitForCompletion(JobId jobId) throws Exception;
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
index fd9218a..73b5488 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -27,10 +27,14 @@
private final NetworkAddress netAddress;
- public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress) {
+ private final NetworkAddress datasetNetworkAddress;
+
+ public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
+ NetworkAddress datasetNetworkAddress) {
this.nodeId = nodeId;
this.status = status;
this.netAddress = netAddress;
+ this.datasetNetworkAddress = datasetNetworkAddress;
}
public String getNodeId() {
@@ -44,4 +48,8 @@
public NetworkAddress getNetworkAddress() {
return netAddress;
}
+
+ public NetworkAddress getDatasetNetworkAddress() {
+ return datasetNetworkAddress;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index e964d66..a2ee977 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.context;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
@@ -28,5 +29,7 @@
public ICounterContext getCounterContext();
+ public IDatasetPartitionManager getDatasetPartitionManager();
+
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializer.java
new file mode 100644
index 0000000..ba2ff9a
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IResultSerializer extends Serializable {
+ /**
+ * Initializes the serializer.
+ */
+ public void init() throws HyracksDataException;
+
+ /**
+ * Method to serialize the result and append it to the provided output stream
+ *
+ * @param tAccess
+ * - A frame tuple accessor object that contains the original data to be serialized
+ * @param tIdx
+ * - Index of the tuple that should be serialized.
+ * @return true if the tuple was appended successfully, else false.
+ */
+ public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java
new file mode 100644
index 0000000..92cf569
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IResultSerializerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+public interface IResultSerializerFactory extends Serializable {
+ /**
+ * Creates a result serialized appender
+ *
+ * @param printStream
+ * - A print stream object to which the serialized results will be written.
+ * @return A new instance of result serialized appender.
+ */
+ public IResultSerializer createResultSerializer(PrintStream printStream);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/JSONSerializable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/JSONSerializable.java
new file mode 100644
index 0000000..1eca502
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/JSONSerializable.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public interface JSONSerializable {
+ /**
+ * Returns the JSON representation of the object.
+ *
+ * @return A new JSONObject instance representing this Java object.
+ */
+ public JSONObject toJSON() throws JSONException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
new file mode 100644
index 0000000..6316bba
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+
+public class DatasetDirectoryRecord implements Serializable {
+ public enum Status {
+ IDLE,
+ RUNNING,
+ SUCCESS,
+ FAILED
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private NetworkAddress address;
+
+ private boolean readEOS;
+
+ private Status status;
+
+ public DatasetDirectoryRecord() {
+ this.address = null;
+ this.readEOS = false;
+ this.status = Status.IDLE;
+ }
+
+ public void setNetworkAddress(NetworkAddress address) {
+ this.address = address;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return address;
+ }
+
+ public void readEOS() {
+ this.readEOS = true;
+ }
+
+ public boolean hasReachedReadEOS() {
+ return readEOS;
+ }
+
+ public void start() {
+ status = Status.RUNNING;
+ }
+
+ public void writeEOS() {
+ status = Status.SUCCESS;
+ }
+
+ public void fail() {
+ status = Status.FAILED;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof DatasetDirectoryRecord)) {
+ return false;
+ }
+ return address.equals(((DatasetDirectoryRecord) o).address);
+ }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
new file mode 100644
index 0000000..5266333
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IDatasetDirectoryService {
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress);
+
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
+
+ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
+
+ public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
+
+ public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
new file mode 100644
index 0000000..65ba1c7
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+
+public interface IDatasetInputChannelMonitor extends IInputChannelMonitor {
+ public boolean eosReached();
+
+ public boolean failed();
+
+ public int getNFramesAvailable();
+
+ public void notifyFrameRead();
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
new file mode 100644
index 0000000..ae38c7f
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IDatasetPartitionManager {
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions) throws HyracksException;
+
+ public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
+ throws HyracksException;
+
+ public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
+
+ public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
+
+ public IWorkspaceFileFactory getFileFactory();
+
+ public void close();
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
new file mode 100644
index 0000000..40db7a7
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IHyracksDataset {
+ public void open(JobId jobId, ResultSetId resultSetId) throws IOException;
+
+ public Status getResultStatus();
+
+ public int read(ByteBuffer buffer) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
new file mode 100644
index 0000000..d49d5cd
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IHyracksDatasetDirectoryServiceConnection {
+ /**
+ * Gets the result status for the given result set.
+ *
+ * @param jobId
+ * ID of the job
+ * @param rsId
+ * ID of the result set
+ * @return {@link Status}
+ * @throws Exception
+ */
+ public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
+
+ /**
+ * Gets the IP Addresses and ports for the partition generating the result for each location.
+ *
+ * @param jobId
+ * ID of the job
+ * @param rsId
+ * ID of the result set
+ * @param knownRecords
+ * Locations that are already known to the client
+ * @return {@link NetworkAddress[]}
+ * @throws Exception
+ */
+ public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords) throws Exception;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
new file mode 100644
index 0000000..ba21a84
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IHyracksDatasetDirectoryServiceInterface {
+ /**
+ * Gets the result status for the given result set.
+ *
+ * @param jobId
+ * ID of the job
+ * @param rsId
+ * ID of the result set
+ * @return {@link Status}
+ * @throws Exception
+ */
+ public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
+
+ /**
+ * Gets the IP Addresses and ports for the partition generating the result for each location.
+ *
+ * @param jobId
+ * ID of the job
+ * @param rsId
+ * ID of the result set
+ * @param knownRecords
+ * Locations from the dataset directory that are already known to the client
+ * @return {@link NetworkAddress[]}
+ * @throws Exception
+ */
+ public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords) throws Exception;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetId.java
new file mode 100644
index 0000000..ae38ef3
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetId.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.io.Serializable;
+
+public class ResultSetId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final long id;
+
+ public ResultSetId(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof ResultSetId)) {
+ return false;
+ }
+ return ((ResultSetId) o).id == id;
+ }
+
+ @Override
+ public String toString() {
+ return "RSID:" + id;
+ }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 7c523f1..1fdff0f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -34,12 +34,15 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
private static final long serialVersionUID = 1L;
private final List<OperatorDescriptorId> roots;
+ private final List<ResultSetId> resultSetIds;
+
private final Map<OperatorDescriptorId, IOperatorDescriptor> opMap;
private final Map<ConnectorDescriptorId, IConnectorDescriptor> connMap;
@@ -72,6 +75,7 @@
public JobSpecification() {
roots = new ArrayList<OperatorDescriptorId>();
+ resultSetIds = new ArrayList<ResultSetId>();
opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
connMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
@@ -104,6 +108,10 @@
roots.add(op.getOperatorId());
}
+ public void addResultSetId(ResultSetId rsId) {
+ resultSetIds.add(rsId);
+ }
+
public void connect(IConnectorDescriptor conn, IOperatorDescriptor producerOp, int producerPort,
IOperatorDescriptor consumerOp, int consumerPort) {
insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn);
@@ -208,6 +216,10 @@
return roots;
}
+ public List<ResultSetId> getResultSetIds() {
+ return resultSetIds;
+ }
+
public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
return connectorPolicyAssignmentPolicy;
}
diff --git a/hyracks/hyracks-client/pom.xml b/hyracks/hyracks-client/pom.xml
new file mode 100644
index 0000000..854a009
--- /dev/null
+++ b/hyracks/hyracks-client/pom.xml
@@ -0,0 +1,46 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-client</artifactId>
+ <name>hyracks-client</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-comm</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
new file mode 100644
index 0000000..8be4a8c
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public class DatasetClientContext implements IHyracksCommonContext {
+ private final int frameSize;
+
+ public DatasetClientContext(int frameSize) {
+ this.frameSize = frameSize;
+ }
+
+ @Override
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ @Override
+ public IIOManager getIOManager() {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer allocateFrame() {
+ return ByteBuffer.allocate(frameSize);
+ }
+
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
new file mode 100644
index 0000000..314b621
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.client.net.ClientNetworkManager;
+import edu.uci.ics.hyracks.comm.channels.DatasetNetworkInputChannel;
+
+// TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
+public class HyracksDataset implements IHyracksDataset {
+ private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
+
+ private final ClientNetworkManager netManager;
+
+ private final DatasetClientContext datasetClientCtx;
+
+ private JobId jobId;
+
+ private ResultSetId resultSetId;
+
+ private DatasetDirectoryRecord[] knownRecords;
+
+ private IDatasetInputChannelMonitor[] monitors;
+
+ private int lastReadPartition;
+
+ private IDatasetInputChannelMonitor lastMonitor;
+
+ private DatasetNetworkInputChannel resultChannel;
+
+ private static int NUM_READ_BUFFERS = 1;
+
+ public HyracksDataset(IHyracksClientConnection hcc, DatasetClientContext datasetClientCtx, int nReaders)
+ throws Exception {
+ NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
+ datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
+ ddsAddress.getIpAddress()), ddsAddress.getPort());
+
+ netManager = new ClientNetworkManager(nReaders);
+
+ this.datasetClientCtx = datasetClientCtx;
+
+ knownRecords = null;
+ monitors = null;
+ lastReadPartition = -1;
+ lastMonitor = null;
+ resultChannel = null;
+ }
+
+ @Override
+ public void open(JobId jobId, ResultSetId resultSetId) throws IOException {
+ this.jobId = jobId;
+ this.resultSetId = resultSetId;
+ netManager.start();
+ }
+
+ @Override
+ public Status getResultStatus() {
+ Status status = null;
+ try {
+ status = datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
+ } catch (Exception e) {
+ // TODO(madhusudancs): Decide what to do in case of error
+ }
+ return status;
+ }
+
+ @Override
+ public int read(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer readBuffer;
+ int readSize = 0;
+
+ if (lastReadPartition == -1) {
+ while (knownRecords == null || knownRecords[0] == null) {
+ try {
+ knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId, resultSetId,
+ knownRecords);
+ lastReadPartition = 0;
+ resultChannel = new DatasetNetworkInputChannel(netManager,
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+ NUM_READ_BUFFERS);
+ lastMonitor = getMonitor(lastReadPartition);
+ resultChannel.open(datasetClientCtx);
+ resultChannel.registerMonitor(lastMonitor);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ } catch (UnknownHostException e) {
+ throw new HyracksDataException(e);
+ } catch (Exception e) {
+ // Do nothing here.
+ }
+ }
+ }
+
+ while (readSize <= 0 && !((lastReadPartition == knownRecords.length - 1) && (lastMonitor.eosReached()))) {
+ while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
+ synchronized (lastMonitor) {
+ try {
+ lastMonitor.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ if (lastMonitor.getNFramesAvailable() <= 0 && lastMonitor.eosReached()) {
+ if ((lastReadPartition == knownRecords.length - 1)) {
+ break;
+ } else {
+ try {
+ lastReadPartition++;
+ while (knownRecords[lastReadPartition] == null) {
+ try {
+ knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
+ resultSetId, knownRecords);
+ } catch (Exception e) {
+ // Do nothing here.
+ }
+ }
+
+ resultChannel = new DatasetNetworkInputChannel(netManager,
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+ NUM_READ_BUFFERS);
+ lastMonitor = getMonitor(lastReadPartition);
+ resultChannel.open(datasetClientCtx);
+ resultChannel.registerMonitor(lastMonitor);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ } catch (UnknownHostException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ } else {
+ readBuffer = resultChannel.getNextBuffer();
+ lastMonitor.notifyFrameRead();
+ if (readBuffer != null) {
+ buffer.put(readBuffer);
+ buffer.flip();
+ readSize = buffer.limit();
+ resultChannel.recycleBuffer(readBuffer);
+ }
+ }
+ }
+
+ return readSize;
+ }
+
+ private boolean nullExists(DatasetDirectoryRecord[] locations) {
+ if (locations == null) {
+ return true;
+ }
+ for (int i = 0; i < locations.length; i++) {
+ if (locations[i] == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
+ NetworkAddress netAddr = addr.getNetworkAddress();
+ return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
+ }
+
+ private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
+ if (knownRecords == null || knownRecords[partition] == null) {
+ throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
+ }
+ if (monitors == null) {
+ monitors = new DatasetInputChannelMonitor[knownRecords.length];
+ }
+ if (monitors[partition] == null) {
+ monitors[partition] = new DatasetInputChannelMonitor();
+ }
+ return monitors[partition];
+ }
+
+ private class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor {
+ private final AtomicInteger nAvailableFrames;
+
+ private final AtomicBoolean eos;
+
+ private final AtomicBoolean failed;
+
+ public DatasetInputChannelMonitor() {
+ nAvailableFrames = new AtomicInteger(0);
+ eos = new AtomicBoolean(false);
+ failed = new AtomicBoolean(false);
+ }
+
+ @Override
+ public synchronized void notifyFailure(IInputChannel channel) {
+ failed.set(true);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ nAvailableFrames.addAndGet(nFrames);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IInputChannel channel) {
+ eos.set(true);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized boolean eosReached() {
+ return eos.get();
+ }
+
+ @Override
+ public synchronized boolean failed() {
+ return failed.get();
+ }
+
+ @Override
+ public synchronized int getNFramesAvailable() {
+ return nAvailableFrames.get();
+ }
+
+ @Override
+ public synchronized void notifyFrameRead() {
+ nAvailableFrames.decrementAndGet();
+ }
+
+ }
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
new file mode 100644
index 0000000..095fd7d
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import java.net.InetSocketAddress;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
+public class HyracksDatasetDirectoryServiceConnection implements IHyracksDatasetDirectoryServiceConnection {
+ private final IPCSystem ipc;
+ private final IHyracksDatasetDirectoryServiceInterface ddsi;
+
+ public HyracksDatasetDirectoryServiceConnection(String ddsHost, int ddsPort) throws Exception {
+ RPCInterface rpci = new RPCInterface();
+ ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
+ ipc.start();
+ IIPCHandle ddsIpchandle = ipc.getHandle(new InetSocketAddress(ddsHost, ddsPort));
+ this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
+ }
+
+ @Override
+ public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
+ return ddsi.getDatasetResultStatus(jobId, rsId);
+ }
+
+ @Override
+ public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords) throws Exception {
+ return ddsi.getDatasetResultLocations(jobId, rsId, knownRecords);
+ }
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
new file mode 100644
index 0000000..47cdf97
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+
+//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
+public class HyracksDatasetDirectoryServiceInterfaceRemoteProxy implements IHyracksDatasetDirectoryServiceInterface {
+ private final IIPCHandle ipcHandle;
+
+ private final RPCInterface rpci;
+
+ public HyracksDatasetDirectoryServiceInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
+ this.ipcHandle = ipcHandle;
+ this.rpci = rpci;
+ }
+
+ @Override
+ public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
+ HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(
+ jobId, rsId);
+ return (Status) rpci.call(ipcHandle, gdrlf);
+ }
+
+ @Override
+ public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords) throws Exception {
+ HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
+ jobId, rsId, knownRecords);
+ return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
+ }
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
new file mode 100644
index 0000000..7aef8b9
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.net;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class ClientNetworkManager implements IChannelConnectionFactory {
+ private static final int MAX_CONNECTION_ATTEMPTS = 5;
+
+ private final MuxDemux md;
+
+ public ClientNetworkManager(int nThreads) throws IOException {
+ /* This is a connect only socket and does not listen to any incoming connections, so pass null to
+ * localAddress and listener.
+ */
+ md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS);
+ }
+
+ public void start() throws IOException {
+ md.start();
+ }
+
+ public void stop() {
+
+ }
+
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+ return mConn.openChannel();
+ }
+
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
+ return md.getPerformanceCounters();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-comm/pom.xml b/hyracks/hyracks-comm/pom.xml
new file mode 100644
index 0000000..c3583699
--- /dev/null
+++ b/hyracks/hyracks-comm/pom.xml
@@ -0,0 +1,36 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-comm</artifactId>
+ <name>hyracks-comm</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
similarity index 72%
copy from hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
copy to hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
index 1d5af84..fac2949 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.nc.net;
+package edu.uci.ics.hyracks.comm.channels;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -23,21 +23,25 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-public class NetworkInputChannel implements IInputChannel {
- private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+public class DatasetNetworkInputChannel implements IInputChannel {
+ private static final Logger LOGGER = Logger.getLogger(DatasetNetworkInputChannel.class.getName());
- private final NetworkManager netManager;
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IChannelConnectionFactory netManager;
private final SocketAddress remoteAddress;
- private final PartitionId partitionId;
+ private final JobId jobId;
+
+ private final int partition;
private final Queue<ByteBuffer> fullQueue;
@@ -49,11 +53,12 @@
private Object attachment;
- public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
- int nBuffers) {
+ public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
+ int partition, int nBuffers) {
this.netManager = netManager;
this.remoteAddress = remoteAddress;
- this.partitionId = partitionId;
+ this.jobId = jobId;
+ this.partition = partition;
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
this.nBuffers = nBuffers;
}
@@ -85,7 +90,7 @@
}
@Override
- public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException {
try {
ccb = netManager.connect(remoteAddress);
} catch (Exception e) {
@@ -96,14 +101,13 @@
for (int i = 0; i < nBuffers; ++i) {
ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
}
- ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
- writeBuffer.putLong(partitionId.getJobId().getId());
- writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
- writeBuffer.putInt(partitionId.getSenderIndex());
- writeBuffer.putInt(partitionId.getReceiverIndex());
+ ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(jobId.getId());
+ writeBuffer.putInt(partition);
writeBuffer.flip();
if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
+ LOGGER.fine("Sending partition request for JobId: " + jobId + " partition: " + partition + " on channel: "
+ + ccb);
}
ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
ccb.getWriteInterface().getFullBufferAcceptor().close();
@@ -118,17 +122,17 @@
@Override
public void accept(ByteBuffer buffer) {
fullQueue.add(buffer);
- monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
+ monitor.notifyDataAvailability(DatasetNetworkInputChannel.this, 1);
}
@Override
public void close() {
- monitor.notifyEndOfStream(NetworkInputChannel.this);
+ monitor.notifyEndOfStream(DatasetNetworkInputChannel.this);
}
@Override
public void error(int ecode) {
- monitor.notifyFailure(NetworkInputChannel.this);
+ monitor.notifyFailure(DatasetNetworkInputChannel.this);
}
}
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java
new file mode 100644
index 0000000..33179ba
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public interface IChannelConnectionFactory {
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException;
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
similarity index 89%
rename from hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
rename to hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
index 1d5af84..aa37b16 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.nc.net;
+package edu.uci.ics.hyracks.comm.channels;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -33,7 +33,9 @@
public class NetworkInputChannel implements IInputChannel {
private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
- private final NetworkManager netManager;
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IChannelConnectionFactory netManager;
private final SocketAddress remoteAddress;
@@ -49,8 +51,8 @@
private Object attachment;
- public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
- int nBuffers) {
+ public NetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress,
+ PartitionId partitionId, int nBuffers) {
this.netManager = netManager;
this.remoteAddress = remoteAddress;
this.partitionId = partitionId;
@@ -85,7 +87,7 @@
}
@Override
- public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException {
try {
ccb = netManager.connect(remoteAddress);
} catch (Exception e) {
@@ -96,7 +98,7 @@
for (int i = 0; i < nBuffers; ++i) {
ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
}
- ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+ ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
writeBuffer.putLong(partitionId.getJobId().getId());
writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
writeBuffer.putInt(partitionId.getSenderIndex());
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
similarity index 97%
rename from hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
rename to hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
index 9024e18..df910be 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.nc.net;
+package edu.uci.ics.hyracks.comm.channels;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -87,7 +87,7 @@
ccb.getWriteInterface().getFullBufferAcceptor().close();
}
- void abort() {
+ public void abort() {
ccb.getWriteInterface().getFullBufferAcceptor().error(1);
synchronized (NetworkOutputChannel.this) {
aborted = true;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5a33891..82457fe 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -35,12 +35,17 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
@@ -48,17 +53,23 @@
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
+import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
+import edu.uci.ics.hyracks.control.cc.work.GetResultStatusWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
+import edu.uci.ics.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import edu.uci.ics.hyracks.control.cc.work.RemoveDeadNodesWork;
import edu.uci.ics.hyracks.control.cc.work.ReportProfilesWork;
+import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionFailureWork;
+import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
import edu.uci.ics.hyracks.control.cc.work.TaskFailureWork;
import edu.uci.ics.hyracks.control.cc.work.UnregisterNodeWork;
@@ -115,6 +126,8 @@
private final DeadNodeSweeper sweeper;
+ private final IDatasetDirectoryService datasetDirectoryService;
+
private long jobCounter;
public ClusterControllerService(final CCConfig ccConfig) throws Exception {
@@ -162,6 +175,7 @@
}
};
sweeper = new DeadNodeSweeper();
+ datasetDirectoryService = new DatasetDirectoryService();
jobCounter = 0;
}
@@ -264,6 +278,10 @@
return clusterIPC;
}
+ public NetworkAddress getDatasetDirectoryServiceInfo() {
+ return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
+ }
+
private class DeadNodeSweeper extends TimerTask {
@Override
public void run() {
@@ -271,6 +289,10 @@
}
}
+ public IDatasetDirectoryService getDatasetDirectoryService() {
+ return datasetDirectoryService;
+ }
+
private class HyracksClientInterfaceIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
@@ -321,6 +343,27 @@
return;
}
+ case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
+ workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
+ new IPCResponder<NetworkAddress>(handle, mid)));
+ return;
+ }
+
+ case GET_DATASET_RESULT_STATUS: {
+ HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+ workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
+ .getResultSetId(), new IPCResponder<Status>(handle, mid)));
+ return;
+ }
+
+ case GET_DATASET_RESULT_LOCATIONS: {
+ HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+ workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
+ .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+ new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
+ return;
+ }
+
case WAIT_FOR_COMPLETION: {
HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
@@ -416,6 +459,28 @@
return;
}
+ case REGISTER_RESULT_PARTITION_LOCATION: {
+ CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+ workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
+ .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
+ .getNPartitions(), rrplf.getNetworkAddress()));
+ return;
+ }
+
+ case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+ workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
+ rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
+ return;
+ }
+
+ case REPORT_RESULT_PARTITION_FAILURE: {
+ CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
+ workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf
+ .getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
+ return;
+ }
+
case APPLICATION_STATE_CHANGE_RESPONSE: {
CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index c17acd0..c96a319 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -41,6 +41,8 @@
private final NetworkAddress dataPort;
+ private final NetworkAddress datasetPort;
+
private final Set<JobId> activeJobIds;
private final String osName;
@@ -107,6 +109,14 @@
private final long[] netSignalingBytesWritten;
+ private final long[] datasetNetPayloadBytesRead;
+
+ private final long[] datasetNetPayloadBytesWritten;
+
+ private final long[] datasetNetSignalingBytesRead;
+
+ private final long[] datasetNetSignalingBytesWritten;
+
private final long[] ipcMessagesSent;
private final long[] ipcMessageBytesSent;
@@ -123,6 +133,7 @@
this.nodeController = nodeController;
ncConfig = reg.getNCConfig();
dataPort = reg.getDataPort();
+ datasetPort = reg.getDatasetPort();
activeJobIds = new HashSet<JobId>();
osName = reg.getOSName();
@@ -164,6 +175,10 @@
netPayloadBytesWritten = new long[RRD_SIZE];
netSignalingBytesRead = new long[RRD_SIZE];
netSignalingBytesWritten = new long[RRD_SIZE];
+ datasetNetPayloadBytesRead = new long[RRD_SIZE];
+ datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+ datasetNetSignalingBytesRead = new long[RRD_SIZE];
+ datasetNetSignalingBytesWritten = new long[RRD_SIZE];
ipcMessagesSent = new long[RRD_SIZE];
ipcMessageBytesSent = new long[RRD_SIZE];
ipcMessagesReceived = new long[RRD_SIZE];
@@ -196,6 +211,10 @@
netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+ datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+ datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+ datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+ datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
@@ -227,6 +246,10 @@
return dataPort;
}
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
+
public JSONObject toSummaryJSON() throws JSONException {
JSONObject o = new JSONObject();
o.put("node-id", ncConfig.nodeId);
@@ -271,6 +294,10 @@
o.put("net-payload-bytes-written", netPayloadBytesWritten);
o.put("net-signaling-bytes-read", netSignalingBytesRead);
o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+ o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+ o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
+ o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+ o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
o.put("ipc-messages-sent", ipcMessagesSent);
o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
o.put("ipc-messages-received", ipcMessagesReceived);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
new file mode 100644
index 0000000..13d0c30
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.dataset;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+/**
+ * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
+ * location information is never evicted from the memory and the memory usage grows as the number of jobs in the system
+ * grows. What we should possibly do is, add an API call for the client to say that it received everything it has to for
+ * the job (after it receives all the results) completely. Then we can just get rid of the location information for that
+ * job.
+ */
+public class DatasetDirectoryService implements IDatasetDirectoryService {
+ private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
+
+ public DatasetDirectoryService() {
+ jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
+ }
+
+ @Override
+ public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions, NetworkAddress networkAddress) {
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
+ if (rsMap == null) {
+ rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
+ jobResultLocationsMap.put(jobId, rsMap);
+ }
+
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null) {
+ resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
+ rsMap.put(rsId, resultSetMetaData);
+ }
+
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ if (records[partition] == null) {
+ records[partition] = new DatasetDirectoryRecord();
+ }
+ records[partition].setNetworkAddress(networkAddress);
+ records[partition].start();
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
+ DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
+ ddr.writeEOS();
+ }
+
+ @Override
+ public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
+ DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
+ ddr.fail();
+ }
+
+ @Override
+ public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
+ Map<ResultSetId, ResultSetMetaData> rsMap;
+ while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+ throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+ }
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+
+ ArrayList<Status> statuses = new ArrayList<Status>(records.length);
+ for (int i = 0; i < records.length; i++) {
+ statuses.add(records[i].getStatus());
+ }
+
+ // Default status is idle
+ Status status = Status.IDLE;
+ if (statuses.contains(Status.FAILED)) {
+ // Even if there is at least one failed entry we should return failed status.
+ return Status.FAILED;
+ } else if (statuses.contains(Status.RUNNING)) {
+ // If there are not failed entry and if there is at least one running entry we should return running status.
+ return Status.RUNNING;
+ } else {
+ // If each and every partition has reported success do we report success as the status.
+ int successCount = 0;
+ for (int i = 0; i < statuses.size(); i++) {
+ if (statuses.get(i) == Status.SUCCESS) {
+ successCount++;
+ }
+ }
+ if (successCount == statuses.size()) {
+ return Status.SUCCESS;
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
+ DatasetDirectoryRecord[] newRecords;
+ while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return newRecords;
+ }
+
+ public DatasetDirectoryRecord getDatasetDirectoryRecord(JobId jobId, ResultSetId rsId, int partition) {
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ return records[partition];
+ }
+
+ /**
+ * Compares the records already known by the client for the given job's result set id with the records that the
+ * dataset directory service knows and if there are any newly discovered records returns a whole array with the
+ * new records filled in.
+ * This method has a very convoluted logic. Here is the explanation of how it works.
+ * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
+ * the order of the partitions. It always traverses the array in the first to last order!
+ * If known records array or the first element in that array is null in the but the record for that partition now
+ * known to the directory service, the method fills in that record in the array and returns the array back.
+ * However, if the first known null record is not a first element in the array, by induction, all the previous
+ * known records should be known already be known to client and none of the records for the partitions ahead is
+ * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
+ * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
+ * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
+ * If the client has reached the end of stream record for the last known non-null record, we check if the next record
+ * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
+ * send null otherwise.
+ * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
+ * arrays are equal and if they are not we send the entire new updated array.
+ *
+ * @param jobId
+ * - Id of the job for which the directory records should be retrieved.
+ * @param rsId
+ * - Id of the result set for which the directory records should be retrieved.
+ * @param knownRecords
+ * - An array of directory records that the client is already aware of.
+ * @return
+ * - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
+ * @throws HyracksDataException
+ * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
+ * every check. This already looks very expensive.
+ */
+ private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
+ throws HyracksDataException {
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
+ if (rsMap == null) {
+ return null;
+ }
+
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+ throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+ }
+
+ boolean ordered = resultSetMetaData.getOrderedResult();
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
+ * we can simply check if there are any newly discovered records and send the whole array back if there are.
+ */
+ if (ordered) {
+ // Iterate over the known records and find the last record which is not null.
+ int i = 0;
+ for (i = 0; i < records.length; i++) {
+ if (knownRecords == null) {
+ if (records[0] != null) {
+ knownRecords = new DatasetDirectoryRecord[records.length];
+ knownRecords[0] = records[0];
+ return knownRecords;
+ }
+ return null;
+ }
+ if (knownRecords[i] == null) {
+ if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) {
+ knownRecords[i] = records[i];
+ return knownRecords;
+ }
+ return null;
+ }
+ }
+ } else {
+ if (!Arrays.equals(records, knownRecords)) {
+ return records;
+ }
+ }
+ return null;
+ }
+
+ private class ResultSetMetaData {
+ private final boolean ordered;
+
+ private final DatasetDirectoryRecord[] records;
+
+ public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
+ this.ordered = ordered;
+ this.records = records;
+ }
+
+ public boolean getOrderedResult() {
+ return ordered;
+ }
+
+ public DatasetDirectoryRecord[] getRecords() {
+ return records;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
new file mode 100644
index 0000000..3ac6acc
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetDatasetDirectoryServiceInfoWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final IResultCallback<NetworkAddress> callback;
+
+ public GetDatasetDirectoryServiceInfoWork(ClusterControllerService ccs, IResultCallback<NetworkAddress> callback) {
+ this.ccs = ccs;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ try {
+ NetworkAddress addr = ccs.getDatasetDirectoryServiceInfo();
+ callback.setValue(addr);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 2f23a2c..a787b9f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,7 +39,8 @@
Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
- result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort()));
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
+ .getValue().getDatasetPort()));
}
callback.setValue(result);
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
new file mode 100644
index 0000000..fd1d418
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetResultPartitionLocationsWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final DatasetDirectoryRecord[] knownRecords;
+
+ private final IResultCallback<DatasetDirectoryRecord[]> callback;
+
+ public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.knownRecords = knownRecords;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
+ knownRecords);
+ callback.setValue(partitionLocations);
+ } catch (HyracksDataException e) {
+ callback.setException(e);
+ }
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
new file mode 100644
index 0000000..d2dadf5
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetResultStatusWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final IResultCallback<Status> callback;
+
+ public GetResultStatusWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ IResultCallback<Status> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ try {
+ Status status = ccs.getDatasetDirectoryService().getResultStatus(jobId, rsId);
+ callback.setValue(status);
+ } catch (HyracksDataException e) {
+ callback.setException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index b6a33cd..b062d33 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -16,6 +16,7 @@
import java.util.EnumSet;
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
new file mode 100644
index 0000000..f86e924
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class RegisterResultPartitionLocationWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final boolean orderedResult;
+
+ private final int partition;
+
+ private final int nPartitions;
+
+ private final NetworkAddress networkAddress;
+
+ public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.orderedResult = orderedResult;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public void run() {
+ ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+ nPartitions, networkAddress);
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
+ + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
new file mode 100644
index 0000000..4aea41e
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class ReportResultPartitionFailureWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void run() {
+ ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition);
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
new file mode 100644
index 0000000..313b730
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class ReportResultPartitionWriteCompletionWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionWriteCompletionWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ int partition) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public void run() {
+ ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ }
+
+ @Override
+ public String toString() {
+ return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index ff9d8a0..3fc46ff 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -58,6 +58,10 @@
var netPayloadBytesWritten = result['net-payload-bytes-written'];
var netSignalingBytesRead = result['net-signaling-bytes-read'];
var netSignalingBytesWritten = result['net-signaling-bytes-written'];
+ var datasetNetPayloadBytesRead = result['dataset-net-payload-bytes-read'];
+ var datasetNetPayloadBytesWritten = result['dataset-net-payload-bytes-written'];
+ var datasetNetSignalingBytesRead = result['dataset-net-signaling-bytes-read'];
+ var datasetNetSignalingBytesWritten = result['dataset-net-signaling-bytes-written'];
var ipcMessagesSent = result['ipc-messages-sent'];
var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
var ipcMessagesReceived = result['ipc-messages-received'];
@@ -117,9 +121,13 @@
}
if (i < sysLoad.length - 1) {
netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
+ netPayloadReadBWArray.push([ i, computeRate(datasetNetPayloadBytesRead, rrdPtr) ]);
netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
+ netPayloadWriteBWArray.push([ i, computeRate(datasetNetPayloadBytesWritten, rrdPtr) ]);
netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
+ netSignalingReadBWArray.push([ i, computeRate(datasetNetSignalingBytesRead, rrdPtr) ]);
netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
+ netSignalingWriteBWArray.push([ i, computeRate(etSignalingBytesWritten, rrdPtr) ]);
ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 0c5bb2f..55e4479 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -16,7 +16,9 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
@@ -46,6 +48,13 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception;
+
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+
+ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 167eb4b..9b34810 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -37,6 +37,9 @@
@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
+ @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+ public String datasetIPAddress;
+
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -66,6 +69,7 @@
cList.add(nodeId);
cList.add("-data-ip-address");
cList.add(dataIPAddress);
+ cList.add(datasetIPAddress);
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-dcache-client-servers");
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index 91cfecf..a897602 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -33,6 +33,8 @@
private final NetworkAddress dataPort;
+ private final NetworkAddress datasetPort;
+
private final String osName;
private final String arch;
@@ -60,13 +62,14 @@
private final HeartbeatSchema hbSchema;
public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
- String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
- String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
- Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+ NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
+ String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
+ List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
this.dataPort = dataPort;
+ this.datasetPort = datasetPort;
this.osName = osName;
this.arch = arch;
this.osVersion = osVersion;
@@ -98,6 +101,10 @@
return dataPort;
}
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
+
public String getOSName() {
return osName;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 1dba3bc..663c68a 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,6 +37,10 @@
public long netPayloadBytesWritten;
public long netSignalingBytesRead;
public long netSignalingBytesWritten;
+ public long datasetNetPayloadBytesRead;
+ public long datasetNetPayloadBytesWritten;
+ public long datasetNetSignalingBytesRead;
+ public long datasetNetSignalingBytesWritten;
public long ipcMessagesSent;
public long ipcMessageBytesSent;
public long ipcMessagesReceived;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 557a8cb..b506b12 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -68,6 +69,9 @@
REPORT_PROFILE,
REGISTER_PARTITION_PROVIDER,
REGISTER_PARTITION_REQUEST,
+ REGISTER_RESULT_PARTITION_LOCATION,
+ REPORT_RESULT_PARTITION_WRITE_COMPLETION,
+ REPORT_RESULT_PARTITION_FAILURE,
APPLICATION_STATE_CHANGE_RESPONSE,
NODE_REGISTRATION_RESULT,
@@ -438,6 +442,127 @@
}
}
+ public static class RegisterResultPartitionLocationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final boolean orderedResult;
+
+ private final int partition;
+
+ private final int nPartitions;
+
+ private NetworkAddress networkAddress;
+
+ public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions, NetworkAddress networkAddress) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.orderedResult = orderedResult;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_RESULT_PARTITION_LOCATION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public boolean getOrderedResult() {
+ return orderedResult;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public int getNPartitions() {
+ return nPartitions;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+ }
+
+ public static class ReportResultPartitionWriteCompletionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionWriteCompletionFunction(JobId jobId, ResultSetId rsId, int partition) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+ }
+
+ public static class ReportResultPartitionFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final int partition;
+
+ public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.partition = partition;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+ }
+
public static class ApplicationStateChangeResponseFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index bbaab4e..091a5d2 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -16,7 +16,9 @@
import java.util.List;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -95,6 +97,28 @@
}
@Override
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception {
+ CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
+ jobId, rsId, orderedResult, partition, nPartitions, networkAddress);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(
+ jobId, rsId, partition);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
+ CCNCFunctions.ReportResultPartitionFailureFunction fn = new CCNCFunctions.ReportResultPartitionFailureFunction(
+ jobId, rsId, partition);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
nodeId, appName, status);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index c662a75..f103f4a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -40,6 +40,11 @@
<artifactId>hyracks-net</artifactId>
<version>0.2.3-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-comm</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 0195143..290c59c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -45,6 +45,7 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -61,7 +62,9 @@
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -94,6 +97,10 @@
private final NetworkManager netManager;
+ private final IDatasetPartitionManager datasetPartitionManager;
+
+ private final DatasetNetworkManager datasetNetworkManager;
+
private final WorkQueue queue;
private final Timer timer;
@@ -140,7 +147,11 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+ netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
+
+ datasetPartitionManager = new DatasetPartitionManager(this, executor);
+ datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
+ datasetPartitionManager, ncConfig.nNetThreads);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -205,6 +216,7 @@
LOGGER.log(Level.INFO, "Starting NodeControllerService");
ipc.start();
netManager.start();
+ datasetNetworkManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -213,10 +225,11 @@
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
- osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
- runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
- .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
- runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+ datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
+ .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
+ .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
+ .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+ runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -247,8 +260,10 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
executor.shutdownNow();
partitionManager.close();
+ datasetPartitionManager.close();
heartbeatTask.cancel();
netManager.stop();
+ datasetNetworkManager.stop();
queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -273,6 +288,10 @@
return netManager;
}
+ public DatasetNetworkManager getDatasetNetworkManager() {
+ return datasetNetworkManager;
+ }
+
public PartitionManager getPartitionManager() {
return partitionManager;
}
@@ -297,8 +316,7 @@
return queue;
}
- private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
- String ipaddrStr = ncConfig.dataIPAddress;
+ private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
ipaddrStr = ipaddrStr.trim();
Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
Matcher m = pattern.matcher(ipaddrStr);
@@ -355,6 +373,12 @@
hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+ MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
+ hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
+ hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
+ hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
+ hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
+
IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
@@ -459,6 +483,10 @@
}
}
+ public IDatasetPartitionManager getDatasetPartitionManager() {
+ return datasetPartitionManager;
+ }
+
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
ccs.sendApplicationMessageToCC(data, appName, nodeId);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index eba3ec9..5a3e9dd 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -348,6 +349,11 @@
}
@Override
+ public IDatasetPartitionManager getDatasetPartitionManager() {
+ return ncs.getDatasetPartitionManager();
+ }
+
+ @Override
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext()
.getApplicationName(), nodeId);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
new file mode 100644
index 0000000..85f17b1
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class DatasetPartitionManager implements IDatasetPartitionManager {
+ private final NodeControllerService ncs;
+
+ private final Executor executor;
+
+ private final Map<JobId, DatasetPartitionWriter[]> partitionDatasetWriterMap;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ public DatasetPartitionManager(NodeControllerService ncs, Executor executor) {
+ this.ncs = ncs;
+ this.executor = executor;
+ partitionDatasetWriterMap = new HashMap<JobId, DatasetPartitionWriter[]>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+ }
+
+ @Override
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+ int partition, int nPartitions) throws HyracksException {
+ DatasetPartitionWriter dpw = null;
+ JobId jobId = ctx.getJobletContext().getJobId();
+ try {
+ ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+ nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, executor);
+
+ DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
+ if (writers == null) {
+ writers = new DatasetPartitionWriter[nPartitions];
+ partitionDatasetWriterMap.put(jobId, writers);
+ }
+ writers[partition] = dpw;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+
+ return dpw;
+ }
+
+ @Override
+ public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
+ try {
+ ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
+ try {
+ ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
+ throws HyracksException {
+ DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
+ if (writers == null) {
+ throw new HyracksException("Unknown JobId " + jobId);
+ }
+
+ DatasetPartitionWriter dpw = writers[partition];
+ if (dpw == null) {
+ throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+ }
+
+ dpw.writeTo(writer);
+ }
+
+ @Override
+ public IWorkspaceFileFactory getFileFactory() {
+ return fileFactory;
+ }
+
+ @Override
+ public void close() {
+ deallocatableRegistry.close();
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
new file mode 100644
index 0000000..0f5895e
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+
+public class DatasetPartitionWriter implements IFrameWriter, IPartition {
+ private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
+
+ private static final String FILE_PREFIX = "result_";
+
+ private final IHyracksTaskContext ctx;
+
+ private final IDatasetPartitionManager manager;
+
+ private final JobId jobId;
+
+ private final ResultSetId resultSetId;
+
+ private final int partition;
+
+ private final Executor executor;
+
+ private final AtomicBoolean eos;
+
+ private FileReference fRef;
+
+ private IFileHandle handle;
+
+ private long size;
+
+ public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
+ ResultSetId rsId, int partition, Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.jobId = jobId;
+ this.resultSetId = rsId;
+ this.partition = partition;
+ this.executor = executor;
+ eos = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + partition + ")");
+ }
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(FILE_PREFIX + String.valueOf(partition));
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ }
+
+ @Override
+ public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ notifyAll();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ try {
+ manager.reportPartitionFailure(jobId, resultSetId, partition);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public synchronized void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + partition + ")");
+ }
+
+ try {
+ eos.set(true);
+ notifyAll();
+ manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ private synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ while (offset >= size && !eos.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ return ctx.getIOManager().syncRead(handle, offset, buffer);
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ NetworkOutputChannel channel = (NetworkOutputChannel) writer;
+ channel.setTaskContext(ctx);
+ try {
+ channel.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ while (true) {
+ buffer.clear();
+ long size = read(offset, buffer);
+ if (size < 0) {
+ break;
+ } else if (size < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += size;
+ buffer.flip();
+ channel.nextFrame(buffer);
+ }
+ } finally {
+ channel.close();
+ ctx.getIOManager().close(handle);
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean isReusable() {
+ return true;
+ }
+
+ @Override
+ public void deallocate() {
+
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
new file mode 100644
index 0000000..5b8b333
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class DatasetNetworkManager implements IChannelConnectionFactory {
+ private static final Logger LOGGER = Logger.getLogger(DatasetNetworkManager.class.getName());
+
+ private static final int MAX_CONNECTION_ATTEMPTS = 5;
+
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IDatasetPartitionManager partitionManager;
+
+ private final MuxDemux md;
+
+ private NetworkAddress networkAddress;
+
+ public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
+ throws IOException {
+ this.partitionManager = partitionManager;
+ md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+ MAX_CONNECTION_ATTEMPTS);
+ }
+
+ public void start() throws IOException {
+ md.start();
+ InetSocketAddress sockAddr = md.getLocalAddress();
+ networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public void stop() {
+
+ }
+
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+ return mConn.openChannel();
+ }
+
+ private class ChannelOpenListener implements IChannelOpenListener {
+ @Override
+ public void channelOpened(ChannelControlBlock channel) {
+ channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+ channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+ }
+ }
+
+ private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+ private final ChannelControlBlock ccb;
+
+ private NetworkOutputChannel noc;
+
+ public InitialBufferAcceptor(ChannelControlBlock ccb) {
+ this.ccb = ccb;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ int partition = buffer.getInt();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
+ + partition + " on channel: " + ccb);
+ }
+ noc = new NetworkOutputChannel(ccb, 1);
+ try {
+ partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
+ } catch (HyracksException e) {
+ noc.abort();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void error(int ecode) {
+ if (noc != null) {
+ noc.abort();
+ }
+ }
+ }
+
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
+ return md.getPerformanceCounters();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index b805595..c8e4e94 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
@@ -36,7 +38,7 @@
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-public class NetworkManager {
+public class NetworkManager implements IChannelConnectionFactory {
private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
private static final int MAX_CONNECTION_ATTEMPTS = 5;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 16e31f7..ba6e6c3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,7 +21,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -82,7 +82,7 @@
}
@Override
- public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException {
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 45c091a..e532a0b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -28,12 +28,12 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
-import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class PartitionManager {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index bb9669d..7ed9d11 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -22,10 +22,10 @@
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
public class ReportPartitionAvailabilityWork extends AbstractWork {
private final NodeControllerService ncs;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
new file mode 100644
index 0000000..95833b1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.comm.io;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class FrameOutputStream extends ByteArrayAccessibleOutputStream {
+ private static final Logger LOGGER = Logger.getLogger(FrameOutputStream.class.getName());
+
+ private final FrameTupleAppender frameTupleAppender;
+
+ public FrameOutputStream(int frameSize) {
+ this.frameTupleAppender = new FrameTupleAppender(frameSize);
+ }
+
+ public void reset(ByteBuffer buffer, boolean clear) {
+ frameTupleAppender.reset(buffer, clear);
+ }
+
+ public int getTupleCount() {
+ int tupleCount = frameTupleAppender.getTupleCount();
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("appendTuple(): tuple count: " + tupleCount);
+ }
+ return tupleCount;
+ }
+
+ public boolean appendTuple() {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("appendTuple(): tuple size: " + count);
+ }
+ boolean appended = frameTupleAppender.append(buf, 0, count);
+ count = 0;
+ return appended;
+ }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
new file mode 100644
index 0000000..e500307
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.result;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final ResultSetId rsId;
+
+ private final boolean ordered;
+
+ private final RecordDescriptor recordDescriptor;
+
+ private final IResultSerializerFactory resultSerializerFactory;
+
+ public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
+ RecordDescriptor recordDescriptor, IResultSerializerFactory resultSerializerFactory) throws IOException {
+ super(spec, 1, 0);
+ this.rsId = rsId;
+ this.ordered = ordered;
+ this.recordDescriptor = recordDescriptor;
+ this.resultSerializerFactory = resultSerializerFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+ final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
+
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+
+ final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
+ frameOutputStream.reset(outputBuffer, true);
+ PrintStream printStream = new PrintStream(frameOutputStream);
+
+ final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(printStream);
+
+ final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ IFrameWriter datasetPartitionWriter;
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition, nPartitions);
+ datasetPartitionWriter.open();
+ resultSerializer.init();
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ frameTupleAccessor.reset(buffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+ if (!frameOutputStream.appendTuple()) {
+ datasetPartitionWriter.nextFrame(outputBuffer);
+ frameOutputStream.reset(outputBuffer, true);
+
+ /* TODO(madhusudancs): This works under the assumption that no single serialized record is
+ * longer than the buffer size.
+ */
+ resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+ frameOutputStream.appendTuple();
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ datasetPartitionWriter.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (frameOutputStream.getTupleCount() > 0) {
+ datasetPartitionWriter.nextFrame(outputBuffer);
+ frameOutputStream.reset(outputBuffer, true);
+ }
+ datasetPartitionWriter.close();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index f4cf908..0df38e7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -31,6 +31,13 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-client</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 023bdd9..d7c20d7 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -16,6 +16,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -30,9 +31,15 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
@@ -80,6 +87,7 @@
ncConfig1.ccPort = 39001;
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -89,6 +97,7 @@
ncConfig2.ccPort = 39001;
ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -115,6 +124,50 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jobId.toString());
}
+
+ // My code
+ int nReaders = 5;
+ DatasetClientContext datasetClientCtx = new DatasetClientContext(spec.getFrameSize());
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, nReaders);
+
+ hyracksDataset.open(jobId, spec.getResultSetIds().get(0));
+ /* FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(),
+ recordDescriptor);
+
+ ByteBuffer readBuffer = datasetClientCtx.allocateFrame();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+
+ while (true) {
+ readBuffer.clear();
+ int size = hyracksDataset.read(readBuffer);
+ if (size <= 0) {
+ break;
+ }
+ try {
+ frameTupleAccessor.reset(readBuffer);
+ System.out.println("Tuple count: " + recordDescriptor);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+ + frameTupleAccessor.getFieldSlotsLength();
+ bbis.setByteBuffer(readBuffer, start);
+ Object[] record = new Object[recordDescriptor.getFieldCount()];
+ for (int i = 0; i < record.length; ++i) {
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (i == 0) {
+ System.out.print(String.valueOf(instance));
+ } else {
+ System.out.print(", " + String.valueOf(instance));
+ }
+ }
+ System.out.println();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ // End of my code
+ */
hcc.waitForCompletion(jobId);
dumpOutputFiles();
}
@@ -144,4 +197,27 @@
outputFiles.add(tempFile);
return tempFile;
}
+
+ protected IResultSerializerFactory getResultSerializedAppenderFactory() {
+ return new IResultSerializerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IResultSerializer createResultSerializer(final PrintStream printStream) {
+ return new IResultSerializer() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void init() throws HyracksDataException {
+
+ }
+
+ @Override
+ public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
+ return true;
+ }
+ };
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 2c3fddf..ab9605b 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -40,7 +41,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
@@ -73,9 +74,9 @@
ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, ordersDesc,
+ getResultSerializedAppenderFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -87,6 +88,8 @@
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
.of(UTF8StringPointable.FACTORY) }), sorter, 0, printer, 0);
+ spec.addResultSetId(rsId);
+
runTest(spec);
}
@@ -118,9 +121,9 @@
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ ResultSetId rsId = new ResultSetId(1);
+ IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, ordersDesc,
+ getResultSerializedAppenderFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -135,6 +138,8 @@
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
printer, 0);
+ spec.addResultSetId(rsId);
+
runTest(spec);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java
index 47de024..fc06a68 100644
--- a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java
+++ b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/edu/uci/ics/hyracks/maven/plugin/HyracksNCStartMojo.java
@@ -55,6 +55,7 @@
cmdLineBuffer.append(" -data-ip-address ").append(dataIpAddress);
cmdLineBuffer.append(" -node-id ").append(nodeId);
cmdLineBuffer.append(" -cluster-net-ip-address 127.0.0.1");
+ cmdLineBuffer.append(" -result-ip-address 127.0.0.1");
if (ccPort != 0) {
cmdLineBuffer.append(" -cc-port ").append(ccPort);
}
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index c719bc4..e4df6b9 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -48,9 +48,9 @@
* Constructor.
*
* @param localAddress
- * - TCP/IP socket address to listen on
+ * - TCP/IP socket address to listen on. Null for non-listening unidirectional sockets
* @param listener
- * - Callback interface to report channel events
+ * - Callback interface to report channel events. Null for non-listening unidirectional sockets
* @param nThreads
* - Number of threads to use for data transfer
* @param maxConnectionAttempts
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index d13a17e..a9061e1 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -45,15 +45,23 @@
}
public void start(InetSocketAddress localAddress) throws IOException {
- serverSocketChannel = ServerSocketChannel.open();
- ServerSocket serverSocket = serverSocketChannel.socket();
- serverSocket.bind(localAddress);
- this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
+ // Setup a server socket listening channel only if the TCPEndpoint is a listening endpoint.
+ if (localAddress != null) {
+ serverSocketChannel = ServerSocketChannel.open();
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(localAddress);
+ this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
+ }
+
ioThreads = new IOThread[nThreads];
for (int i = 0; i < ioThreads.length; ++i) {
ioThreads[i] = new IOThread();
}
- ioThreads[0].registerServerSocket(serverSocketChannel);
+
+ if (localAddress != null) {
+ ioThreads[0].registerServerSocket(serverSocketChannel);
+ }
+
for (int i = 0; i < ioThreads.length; ++i) {
ioThreads[i].start();
}
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index c122b25..0ca93b2 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -101,6 +102,11 @@
}
@Override
+ public IDatasetPartitionManager getDatasetPartitionManager() {
+ return null;
+ }
+
+ @Override
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
// TODO Auto-generated method stub
diff --git a/hyracks/pom.xml b/hyracks/pom.xml
index 09925fb..b699542 100644
--- a/hyracks/pom.xml
+++ b/hyracks/pom.xml
@@ -83,6 +83,8 @@
<modules>
<module>hyracks-ipc</module>
<module>hyracks-api</module>
+ <module>hyracks-comm</module>
+ <module>hyracks-client</module>
<module>hyracks-dataflow-common</module>
<module>hyracks-dataflow-std</module>
<module>hyracks-dataflow-hadoop</module>