Support IFrameWriter contract check.
- add a instance-level flag for injecting operators to
check IFrameWriter contract violations;
- check contract violations in runtime tests.
Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1618
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 9d01d63..ba98f73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.net.Inet4Address;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -42,9 +41,6 @@
import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -131,6 +127,7 @@
ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
ccConfig.setResultTTL(120000L);
ccConfig.setResultSweepThreshold(1000L);
+ ccConfig.setEnforceFrameWriterProtocol(true);
configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb"));
return ccConfig;
}
@@ -215,13 +212,6 @@
}
}
- public void runJob(JobSpecification spec) throws Exception {
- GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
- JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
- GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
- hcc.waitForCompletion(jobId);
- }
-
protected String getDefaultStoragePath() {
return joinPath("target", "io", "dir");
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 80c05db..7ce9df6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -191,8 +192,10 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.UnmanagedFileSplit;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
/*
@@ -214,6 +217,7 @@
protected final IRewriterFactory rewriterFactory;
protected final IStorageComponentProvider componentProvider;
protected final ExecutorService executorService;
+ protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
@@ -228,6 +232,9 @@
rewriterFactory = compliationProvider.getRewriterFactory();
activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
this.executorService = executorService;
+ if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) {
+ this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
+ }
}
public SessionOutput getSessionOutput() {
@@ -621,7 +628,7 @@
progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
// #. runJob
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -653,7 +660,7 @@
JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -900,7 +907,7 @@
"Failed to create job spec for replicating Files Index For external dataset");
}
filesIndexReplicated = true;
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
}
}
@@ -937,7 +944,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
// #. create the index artifact in NC.
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
@@ -948,7 +955,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -986,7 +993,7 @@
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1005,7 +1012,7 @@
JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1189,7 +1196,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1226,7 +1233,7 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
} catch (Exception e2) {
// do no throw exception since still the metadata needs to be compensated.
@@ -1405,7 +1412,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
// #. begin a new transaction
@@ -1466,7 +1473,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
// #. begin a new transaction
@@ -1496,7 +1503,7 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
} catch (Exception e2) {
// do no throw exception since still the metadata needs to be compensated.
@@ -1659,7 +1666,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (spec != null) {
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
}
} catch (Exception e) {
if (bActiveTxn) {
@@ -1725,7 +1732,7 @@
if (jobSpec == null) {
return jobSpec;
}
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} finally {
locker.unlock();
}
@@ -1753,7 +1760,7 @@
bActiveTxn = false;
if (jobSpec != null && !compileOnly) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
return jobSpec;
} catch (Exception e) {
@@ -1935,7 +1942,7 @@
} else {
JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()));
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
}
@@ -2022,6 +2029,9 @@
activeEventHandler.registerListener(listener);
IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED);
feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
+
+ // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
+ // We will need to design general exception handling mechanism for feeds.
JobUtils.runJob(hcc, feedJob,
Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
eventSubscriber.sync();
@@ -2211,7 +2221,7 @@
// #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
} catch (Exception e) {
if (bActiveTxn) {
@@ -2300,14 +2310,14 @@
}
break;
case IMMEDIATE:
- createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+ createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
metadataProvider.findOutputRecordType());
}, clientContextId, ctx);
break;
case DEFERRED:
- createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+ createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
if (outMetadata != null) {
outMetadata.getResultSets()
@@ -2325,7 +2335,7 @@
ResultSetId resultSetId, MutableBoolean printed) {
Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
try {
- createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
+ createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
final ResultHandle handle = new ResultHandle(id, resultSetId);
ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING);
ResultUtil.printResultHandle(sessionOutput, handle);
@@ -2353,16 +2363,20 @@
}
}
- private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler,
- IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId,
- IStatementExecutorContext ctx) throws Exception {
+ private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception {
+ JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+ }
+
+ private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+ IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
+ String clientContextId, IStatementExecutorContext ctx) throws Exception {
locker.lock();
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
return;
}
- final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+ final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
if (ctx != null && clientContextId != null) {
ctx.put(clientContextId, jobId); // Adds the running job into the context.
}
@@ -2507,14 +2521,14 @@
transactionState = TransactionState.BEGIN;
// run the files update job
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
for (Index index : indexes) {
if (!ExternalIndexingOperations.isFileIndex(index)) {
spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles,
appendedFiles, metadataProvider);
// run the files update job
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
}
}
@@ -2533,7 +2547,7 @@
bActiveTxn = false;
transactionState = TransactionState.READY_TO_COMMIT;
// We don't release the latch since this job is expected to be quick
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
// Start a new metadata transaction to record the final state of the transaction
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2602,7 +2616,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
try {
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
} catch (Exception e2) {
// This should never happen -- fix throw illegal
e.addSuppressed(e2);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f99793a..7d4b41d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -205,7 +205,7 @@
NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
BTreeSearchOperatorNodePushable searchOp =
searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1);
- emptyTupleOp.setFrameWriter(0, searchOp,
+ emptyTupleOp.setOutputFrameWriter(0, searchOp,
primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
return emptyTupleOp;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 5e3da5f..b80f8f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -40,6 +40,9 @@
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.control.common.controllers.CCConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -59,6 +62,11 @@
ExternalProperties mockAsterixExternalProperties = mock(ExternalProperties.class);
when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties);
when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002);
+ ICCServiceContext mockServiceContext = mock(ICCServiceContext.class);
+ when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext);
+ IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class);
+ when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig);
+ when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true);
// Mocks AsterixClusterProperties.
Cluster mockCluster = mock(Cluster.class);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index 41f9e67..cacbfbc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -19,7 +19,10 @@
package org.apache.asterix.common.utils;
+import java.util.EnumSet;
+
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -32,8 +35,13 @@
public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
throws Exception {
+ return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion);
+ }
+
+ public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+ boolean waitForCompletion) throws Exception {
spec.setMaxReattempts(0);
- final JobId jobId = hcc.startJob(spec);
+ final JobId jobId = hcc.startJob(spec, jobFlags);
if (waitForCompletion) {
hcc.waitForCompletion(jobId);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 71acecf..d0b7b47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -21,28 +21,20 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
public class SinkPOperator extends AbstractPhysicalOperator {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
new file mode 100644
index 0000000..26002e6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class EnforcePushRuntime extends EnforceFrameWriter implements IPushRuntime {
+
+ private final IPushRuntime pushRuntime;
+
+ private EnforcePushRuntime(IPushRuntime pushRuntime) {
+ super(pushRuntime);
+ this.pushRuntime = pushRuntime;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ pushRuntime.setOutputFrameWriter(index, writer, recordDesc);
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ pushRuntime.setInputRecordDescriptor(index, recordDescriptor);
+ }
+
+ public static IPushRuntime enforce(IPushRuntime pushRuntime) {
+ return pushRuntime instanceof EnforcePushRuntime || pushRuntime instanceof NestedTupleSourceRuntime
+ ? pushRuntime : new EnforcePushRuntime(pushRuntime);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
index 27d6900..de00697 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -22,7 +22,26 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
public interface IPushRuntime extends IFrameWriter {
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
- public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+ /**
+ * Sets the output frame writer for this writer.
+ *
+ * @param index,
+ * the index of the output channel.
+ * @param writer,
+ * the writer for writing output.
+ * @param recordDesc,
+ * the output record descriptor.
+ */
+ void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+ /**
+ * Sets the input record descriptor for this writer.
+ *
+ * @param index,
+ * the index of the input channel.
+ * @param recordDescriptor,
+ * the corresponding input record descriptor.
+ */
+ void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 42e5157..e99b61b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -124,6 +124,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
if (isOpen) {
writer.fail();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index b397f23..893aa61 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -29,6 +30,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
@@ -52,7 +54,6 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
-
final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
@@ -91,8 +92,8 @@
}
@Override
- public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].close();
@@ -144,9 +145,13 @@
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
- newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+ start = enforce ? EnforcePushRuntime.enforce(start) : start;
+ newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
} else {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 52245e1..8b8e320 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -28,8 +29,10 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -58,11 +61,14 @@
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
final IFrameWriter writer) throws HyracksDataException {
- final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
- decorFieldIdx.length, writer);
+ final RunningAggregatorOutput outputWriter =
+ new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer);
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+ IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
- pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+ pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx);
}
final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
@@ -136,13 +142,17 @@
private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
throws HyracksDataException {
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
// plug the operators
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
- newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+ start = enforce ? EnforceFrameWriter.enforce(start) : start;
+ newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
} else {
@@ -206,8 +216,9 @@
int start = 0;
int offset = 0;
for (int i = 0; i < fieldEnds.length; i++) {
- if (i > 0)
+ if (i > 0) {
start = fieldEnds[i - 1];
+ }
offset = fieldEnds[i] - start;
tb.addField(data, start, offset);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 33e7c73..5cced8d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -29,7 +29,7 @@
protected boolean failed;
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
this.writer = writer;
this.outputRecordDesc = recordDesc;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
index e430461..d47199d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -26,7 +26,7 @@
protected RecordDescriptor inputRecordDesc;
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalStateException();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index b7707d4..35563e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -27,20 +27,29 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // nextFrame will never be called on this runtime
throw new UnsupportedOperationException();
}
@Override
public void close() throws HyracksDataException {
+ // close is a no op since this operator completes operating in open()
}
@Override
public void fail() throws HyracksDataException {
- writer.fail();
+ // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ // flush will never be called on this runtime
+ throw new UnsupportedOperationException();
}
@Override
public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ // setInputRecordDescriptor will never be called on this runtime since it has no input
throw new UnsupportedOperationException();
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1294614..f6ebf19 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -112,6 +112,7 @@
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private IFrameWriter startOfPipeline;
+ private boolean opened = false;
@Override
public void open() throws HyracksDataException {
@@ -124,6 +125,7 @@
pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
startOfPipeline = pa.assemblePipeline(writer, ctx);
}
+ opened = true;
startOfPipeline.open();
}
@@ -134,12 +136,16 @@
@Override
public void close() throws HyracksDataException {
- startOfPipeline.close();
+ if (opened) {
+ startOfPipeline.close();
+ }
}
@Override
public void fail() throws HyracksDataException {
- startOfPipeline.fail();
+ if (opened) {
+ startOfPipeline.fail();
+ }
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 03e2aaf..e1081e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -19,11 +19,14 @@
package org.apache.hyracks.algebricks.runtime.operators.meta;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
public class PipelineAssembler {
@@ -44,18 +47,21 @@
this.outputArity = outputArity;
}
- public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws
- HyracksDataException {
+ public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
// plug the operators
IFrameWriter start = writer;// this.writer;
for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+ newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+ start = enforce ? EnforceFrameWriter.enforce(start) : start;
if (i == pipeline.getRuntimeFactories().length - 1) {
if (outputArity == 1) {
- newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+ newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor);
}
} else {
- newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+ newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
}
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 3e30f73..925ff93 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -79,11 +79,6 @@
}
@Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
public void close() throws HyracksDataException {
try {
frameSorter.sort();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 2b7c2da..3ccceed 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -56,6 +56,11 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
public void close() throws HyracksDataException {
writer.close();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index e123adf..496679f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -65,11 +65,6 @@
}
@Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
public void flush() throws HyracksDataException {
writer.flush();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 38fe7d1..33b7725 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -118,7 +118,7 @@
@Override
public void fail() throws HyracksDataException {
if (isOpen) {
- super.fail();
+ writer.fail();
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index ebf3d3a..55146e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -81,6 +81,7 @@
@Override
public void fail() throws HyracksDataException {
+ // fail() is a no op. in close we will cleanup
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 0f57fd7..171544d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -85,7 +85,6 @@
private IScalarEvaluator eval;
private IMissingWriter missingWriter = null;
private ArrayTupleBuilder missingTupleBuilder = null;
- private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -93,7 +92,6 @@
initAccessAppendFieldRef(ctx);
eval = cond.createScalarEvaluator(ctx);
}
- isOpen = true;
writer.open();
//prepare nullTupleBuilder
@@ -107,20 +105,11 @@
}
@Override
- public void fail() throws HyracksDataException {
- if (isOpen) {
- super.fail();
- }
- }
-
- @Override
public void close() throws HyracksDataException {
- if (isOpen) {
- try {
- flushIfNotFailed();
- } finally {
- writer.close();
- }
+ try {
+ flushIfNotFailed();
+ } finally {
+ writer.close();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa9232e..e2868ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -177,7 +177,7 @@
}
public StartJobFunction(JobId jobId) {
- this(null, null, null, jobId);
+ this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
}
public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index ad54110..75cbf61 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -102,14 +102,14 @@
@Override
public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
- jobSpec);
+ IActivityClusterGraphGeneratorFactory jsacggf =
+ new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
return startJob(jsacggf, jobFlags);
}
@Override
public JobId distributeJob(JobSpecification jobSpec) throws Exception {
- JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+ IActivityClusterGraphGeneratorFactory jsacggf =
new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
return distributeJob(jsacggf);
}
@@ -212,15 +212,14 @@
@Override
public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
throws Exception {
- JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
- jobSpec);
+ IActivityClusterGraphGeneratorFactory jsacggf =
+ new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
return startJob(deploymentId, jsacggf, jobFlags);
}
@Override
public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
- EnumSet<JobFlag> jobFlags)
- throws Exception {
+ EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index c8e4cf8..df693b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.context;
import java.io.Serializable;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -26,6 +27,7 @@
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
@@ -48,4 +50,6 @@
void setSharedObject(Object object);
Object getSharedObject();
+
+ Set<JobFlag> getJobFlags();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
new file mode 100644
index 0000000..bf54e01
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class EnforceFrameWriter implements IFrameWriter {
+
+ // The downstream data consumer of this writer.
+ private final IFrameWriter writer;
+
+ // A flag that indicates whether the data consumer of this writer has failed.
+ private boolean downstreamFailed = false;
+
+ // A flag that indicates whether the the data producer of this writer has called fail() for this writer.
+ // There could be two cases:
+ // CASE 1: the downstream of this writer fails and the exception is propagated to the source operator, which
+ // cascades to the fail() of this writer;
+ // CASE 2: the failure happens in the upstream of this writer and the source operator cascades to the fail()
+ // of this writer.
+ private boolean failCalledByUpstream = false;
+
+ // A flag that indicates whether the downstream of this writer is open.
+ private boolean downstreamOpen = false;
+
+ protected EnforceFrameWriter(IFrameWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public final void open() throws HyracksDataException {
+ try {
+ if (downstreamOpen) {
+ throw HyracksDataException.create(ErrorCode.OPEN_ON_OPEN_WRITER);
+ }
+ if (downstreamFailed || failCalledByUpstream) {
+ throw HyracksDataException.create(ErrorCode.OPEN_ON_FAILED_WRITER);
+ }
+ writer.open();
+ downstreamOpen = true;
+ } catch (Throwable th) {
+ downstreamFailed = true;
+ throw th;
+ }
+ }
+
+ @Override
+ public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (!downstreamOpen) {
+ throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_CLOSED_WRITER);
+ }
+ if (downstreamFailed || failCalledByUpstream) {
+ throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_FAILED_WRITER);
+ }
+ try {
+ writer.nextFrame(buffer);
+ } catch (Throwable th) {
+ downstreamFailed = true;
+ throw th;
+ }
+ }
+
+ @Override
+ public final void flush() throws HyracksDataException {
+ if (!downstreamOpen) {
+ throw HyracksDataException.create(ErrorCode.FLUSH_ON_CLOSED_WRITER);
+ }
+ if (downstreamFailed || failCalledByUpstream) {
+ throw HyracksDataException.create(ErrorCode.FLUSH_ON_FAILED_WRITER);
+ }
+ try {
+ writer.flush();
+ } catch (Throwable th) {
+ downstreamFailed = true;
+ throw th;
+ }
+ }
+
+ @Override
+ public final void fail() throws HyracksDataException {
+ writer.fail();
+ if (failCalledByUpstream) {
+ throw HyracksDataException.create(ErrorCode.FAIL_ON_FAILED_WRITER);
+ }
+ failCalledByUpstream = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ downstreamOpen = false;
+ if (downstreamFailed && !failCalledByUpstream) {
+ throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL);
+ }
+ }
+
+ public static IFrameWriter enforce(IFrameWriter writer) {
+ return writer instanceof EnforceFrameWriter ? writer : new EnforceFrameWriter(writer);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
index f6c201e..82433e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -23,16 +23,15 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IOperatorNodePushable {
- public void initialize() throws HyracksDataException;
+ void initialize() throws HyracksDataException;
- public void deinitialize() throws HyracksDataException;
+ void deinitialize() throws HyracksDataException;
- public int getInputArity();
+ int getInputArity();
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
- throws HyracksDataException;
+ void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException;
- public IFrameWriter getInputFrameWriter(int index);
+ IFrameWriter getInputFrameWriter(int index);
- public String getDisplayName();
+ String getDisplayName();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b52a6a5..8f36fcd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -92,6 +92,14 @@
public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
public static final int TASK_ABORTED = 58;
+ public static final int OPEN_ON_OPEN_WRITER = 59;
+ public static final int OPEN_ON_FAILED_WRITER = 60;
+ public static final int NEXT_FRAME_ON_FAILED_WRITER = 61;
+ public static final int NEXT_FRAME_ON_CLOSED_WRITER = 62;
+ public static final int FLUSH_ON_FAILED_WRITER = 63;
+ public static final int FLUSH_ON_CLOSED_WRITER = 64;
+ public static final int FAIL_ON_FAILED_WRITER = 65;
+ public static final int MISSED_FAIL_CALL = 66;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index a33c6c9a..7225cd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,5 +19,6 @@
package org.apache.hyracks.api.job;
public enum JobFlag {
- PROFILE_RUNTIME
+ PROFILE_RUNTIME,
+ ENFORCE_CONTRACT
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 314bf8b..7fdf106 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,12 +35,14 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
/**
* The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -90,18 +92,18 @@
private void init() throws HyracksDataException {
Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
List<IConnectorDescriptor> outputConnectors;
-
+ final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
/*
* Set up the source operators
*/
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
- nPartitions);
+ IOperatorNodePushable opPushable =
+ entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
inputArity += opPushable.getInputArity();
- outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
- Collections.emptyList());
+ outputConnectors =
+ MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList());
for (IConnectorDescriptor conn : outputConnectors) {
childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
}
@@ -131,7 +133,9 @@
/*
* construct the dataflow connection from a producer to a consumer
*/
- sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+ IFrameWriter writer = destOp.getInputFrameWriter(inputChannel);
+ writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+ sourceOp.setOutputFrameWriter(outputChannel, writer,
recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 35a2fc5..4bf069c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -75,6 +75,13 @@
56 = LSM disk component scan is not allowed for a secondary index
57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
58 = Task %1$s was aborted
+59 = Data pipeline protocol violation: open() is called on a opened writer
+60 = Data pipeline protocol violation: open() is called on a failed writer
+61 = Data pipeline protocol violation: nextFrame() is called on a failed writer
+62 = Data pipeline protocol violation: nextFrame() is called on a closed writer
+63 = Data pipeline protocol violation: flush() is called on a failed writer
+64 = Data pipeline protocol violation: flush() is called on a closed writer
+65 = Data pipeline protocol violation: fail() is called twice on a writer
+66 = Data pipeline protocol violation: fail() is not called by the upstream when there is a failure in the downstream
-# 10000 ---- 19999: compilation errors
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 55a7a82..95a6d9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -116,10 +115,10 @@
}
//Run a Pre-distributed job by passing the JobId
- public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
PreDistributedJobDescriptor distributedJobDescriptor)
throws HyracksException {
- this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
+ this(deploymentId, jobId, jobFlags,
distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
this.scheduler = new JobExecutor(ccs, this, constaints, true);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 2dbb631..e083d2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -68,7 +68,7 @@
run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
} else {
//ActivityClusterGraph has already been distributed
- run = new JobRun(ccs, deploymentId, jobId,
+ run = new JobRun(ccs, deploymentId, jobId, jobFlags,
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
}
jobManager.add(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df3a..cbc6146 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.common.controllers;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
@@ -64,7 +65,8 @@
CLUSTER_TOPOLOGY(STRING),
JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
JOB_QUEUE_CAPACITY(INTEGER, 4096),
- JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
+ JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
+ ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
private final IOptionType parser;
private Object defaultValue;
@@ -156,6 +158,9 @@
return "The maximum number of jobs to queue before rejecting new jobs";
case JOB_MANAGER_CLASS:
return "Specify the implementation class name for the job manager";
+ case ENFORCE_FRAME_WRITER_PROTOCOL:
+ return "A flag indicating if runtime should enforce frame writer protocol and detect "
+ + "bad behaving operators";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -357,4 +362,12 @@
public int getJobQueueCapacity() {
return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY);
}
+
+ public boolean getEnforceFrameWriterProtocol() {
+ return getAppConfig().getBoolean(Option.ENFORCE_FRAME_WRITER_PROTOCOL);
+ }
+
+ public void setEnforceFrameWriterProtocol(boolean enforce) {
+ configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 42726a7..0fc3be7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -64,8 +64,10 @@
MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT),
CLUSTER_CONNECT_RETRIES(INTEGER, 5),
- IODEVICES(STRING_ARRAY, appConfig -> new String[] {
- FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
+ IODEVICES(
+ STRING_ARRAY,
+ appConfig -> new String[] {
+ FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
"<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/iodevice"),
NET_THREAD_COUNT(INTEGER, 1),
NET_BUFFER_COUNT(INTEGER, 1),
@@ -95,7 +97,7 @@
}
<T> Option(IOptionType<T> parser, Function<IApplicationConfig, T> defaultValue,
- String defaultValueDescription) {
+ String defaultValueDescription) {
this.parser = parser;
this.defaultValue = defaultValue;
this.defaultValueDescription = defaultValueDescription;
@@ -246,6 +248,7 @@
return configManager;
}
+ @Override
public IApplicationConfig getAppConfig() {
return appConfig;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 04d48f3..d689bc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -52,6 +52,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.partitions.PartitionId;
@@ -104,9 +105,13 @@
private Object sharedObject;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
- NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
+ private final Set<JobFlag> jobFlags;
+
+ public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
+ ExecutorService executor, NodeControllerService ncs,
+ List<List<PartitionChannel>> inputChannelsFromConnectors) {
this.joblet = joblet;
+ this.jobFlags = jobFlags;
this.taskAttemptId = taskId;
this.displayName = displayName;
this.executorService = executor;
@@ -426,4 +431,9 @@
public Object getSharedObject() {
return sharedObject;
}
+
+ @Override
+ public Set<JobFlag> getJobFlags() {
+ return jobFlags;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 02e8051..95f3e83 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -131,12 +132,10 @@
}
final int partition = tid.getPartition();
List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
- task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+ task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
createInputChannels(td, inputs));
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
-
List<IPartitionCollector> collectors = new ArrayList<>();
-
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
IConnectorDescriptor conn = inputs.get(i);
@@ -145,26 +144,28 @@
LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
- IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
- recordDesc, cPolicy);
+ IPartitionCollector collector =
+ createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
collectors.add(collector);
}
}
List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
if (outputs != null) {
+ final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT);
for (int i = 0; i < outputs.size(); ++i) {
final IConnectorDescriptor conn = outputs.get(i);
RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
- IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
- partition, taId, flags);
+ IPartitionWriterFactory pwFactory =
+ createPartitionWriterFactory(task, cPolicy, jobId, conn, partition, taId, flags);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " + conn.getConnectorId());
}
- IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td
- .getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+ td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}
@@ -203,11 +204,11 @@
private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
throws HyracksDataException {
- IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td
- .getInputPartitionCounts()[i], td.getPartitionCount());
+ IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+ td.getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
- return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
- .getTaskAttemptId(), ncs.getExecutor());
+ return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
+ task.getTaskAttemptId(), ncs.getExecutor());
} else {
return collector;
}
@@ -222,8 +223,9 @@
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
- conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+ return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
}
};
} else {
@@ -231,9 +233,9 @@
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
- jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
- .getExecutor());
+ return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
}
};
}
@@ -241,8 +243,8 @@
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn
- .getConnectorId(), senderIndex, receiverIndex), taId);
+ return new PipelinedPartition(ctx, ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId);
}
};
}
@@ -272,11 +274,14 @@
if (inputAddresses[i] != null) {
for (int j = 0; j < inputAddresses[i].length; j++) {
NetworkAddress networkAddress = inputAddresses[i][j];
- PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
- .getTaskAttemptId().getTaskId().getPartition());
- PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs
- .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
- .lookupIpAddress()), networkAddress.getPort()), pid, 5));
+ PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j,
+ td.getTaskAttemptId().getTaskId().getPartition());
+ PartitionChannel channel = new PartitionChannel(pid,
+ new NetworkInputChannel(ncs.getNetworkManager(),
+ new InetSocketAddress(
+ InetAddress.getByAddress(networkAddress.lookupIpAddress()),
+ networkAddress.getPort()),
+ pid, 5));
channels.add(channel);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7901141..3105d42 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -190,6 +190,9 @@
}
aggregator.close();
aggregateState.close();
+ } catch (Exception e) {
+ appenderWrapper.fail();
+ throw e;
} finally {
appenderWrapper.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 48e6b35..9ac9296 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -18,15 +18,11 @@
*/
package org.apache.hyracks.dataflow.std.misc;
-import java.nio.ByteBuffer;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -38,22 +34,6 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
- };
+ return new SinkOperatorNodePushable();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
index fe64472..d987a35 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
@@ -19,15 +19,11 @@
package org.apache.hyracks.dataflow.std.misc;
-import java.nio.ByteBuffer;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -39,22 +35,6 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
- };
+ return new SinkOperatorNodePushable();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
new file mode 100644
index 0000000..85e1bb8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class SinkOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+
+ @Override
+ public void open() throws HyracksDataException {
+ // Does nothing.
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // Does nothing.
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // Does nothing.
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // Does nothing.
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index bcebb7d..3c11669 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -63,6 +63,9 @@
flushWriter.open();
try {
getSorter().flush(flushWriter);
+ } catch (Exception e) {
+ flushWriter.fail();
+ throw e;
} finally {
flushWriter.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 509607c..0352cea 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -78,6 +78,7 @@
protected final boolean appendIndexFilter;
protected ArrayTupleBuilder nonFilterTupleBuild;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
+ protected boolean failed = false;
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
@@ -193,7 +194,7 @@
writeSearchResults(i);
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -207,12 +208,15 @@
HyracksDataException closeException = null;
if (index != null) {
// if index == null, then the index open was not successful
- try {
- if (appender.getTupleCount() > 0) {
- appender.write(writer, true);
+ if (!failed) {
+ try {
+ if (appender.getTupleCount() > 0) {
+ appender.write(writer, true);
+ }
+ } catch (Throwable th) {
+ writer.fail();
+ closeException = new HyracksDataException(th);
}
- } catch (Throwable th) {
- closeException = new HyracksDataException(th);
}
try {
@@ -251,6 +255,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
writer.fail();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 2171122..b100300 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -20,8 +20,10 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -33,6 +35,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -155,4 +158,9 @@
public Object getSharedObject() {
return sharedObject;
}
+
+ @Override
+ public Set<JobFlag> getJobFlags() {
+ return EnumSet.noneOf(JobFlag.class);
+ }
}