Ensure nextFrame and flush are not called in a failed pipeline.
Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
Change-Id: I7859bf51e2203ef8518168e94a8a2d935193f9f3
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2a0631c..a815f3a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.net.Inet4Address;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -42,9 +41,6 @@
import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -131,7 +127,8 @@
ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
ccConfig.setResultTTL(120000L);
ccConfig.setResultSweepThreshold(1000L);
- configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb"));
+ ccConfig.setEnforceFrameWriterProtocol(true);
+ configManager.set(ControllerConfig.Option.DEFAULT_DIR, getDefaultStoragePath());
return ccConfig;
}
@@ -150,7 +147,7 @@
ncConfig.setResultTTL(120000L);
ncConfig.setResultSweepThreshold(1000L);
ncConfig.setVirtualNC(true);
- configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb", ncName));
+ configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), ncName));
return ncConfig;
}
@@ -215,13 +212,6 @@
}
}
- public void runJob(JobSpecification spec) throws Exception {
- GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
- JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
- GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
- hcc.waitForCompletion(jobId);
- }
-
protected String getDefaultStoragePath() {
return joinPath("target", "io", "dir");
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 80c05db..bda57a7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -191,8 +192,10 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.UnmanagedFileSplit;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
/*
@@ -214,6 +217,7 @@
protected final IRewriterFactory rewriterFactory;
protected final IStorageComponentProvider componentProvider;
protected final ExecutorService executorService;
+ protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
@@ -228,6 +232,9 @@
rewriterFactory = compliationProvider.getRewriterFactory();
activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
this.executorService = executorService;
+ if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) {
+ this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
+ }
}
public SessionOutput getSessionOutput() {
@@ -621,7 +628,7 @@
progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
// #. runJob
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -653,7 +660,7 @@
JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -900,7 +907,7 @@
"Failed to create job spec for replicating Files Index For external dataset");
}
filesIndexReplicated = true;
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
}
}
@@ -937,7 +944,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
// #. create the index artifact in NC.
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
@@ -948,7 +955,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
// #. begin new metadataTxn
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -986,7 +993,7 @@
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1005,7 +1012,7 @@
JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} catch (Exception e2) {
e.addSuppressed(e2);
if (bActiveTxn) {
@@ -1189,7 +1196,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1226,7 +1233,7 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
} catch (Exception e2) {
// do no throw exception since still the metadata needs to be compensated.
@@ -1405,7 +1412,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
// #. begin a new transaction
@@ -1466,7 +1473,7 @@
progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
// #. begin a new transaction
@@ -1496,7 +1503,7 @@
// remove the all indexes in NC
try {
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
} catch (Exception e2) {
// do no throw exception since still the metadata needs to be compensated.
@@ -1659,7 +1666,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (spec != null) {
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
}
} catch (Exception e) {
if (bActiveTxn) {
@@ -1725,7 +1732,7 @@
if (jobSpec == null) {
return jobSpec;
}
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
} finally {
locker.unlock();
}
@@ -1753,7 +1760,7 @@
bActiveTxn = false;
if (jobSpec != null && !compileOnly) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
return jobSpec;
} catch (Exception e) {
@@ -1935,7 +1942,7 @@
} else {
JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()));
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
}
@@ -2022,7 +2029,7 @@
activeEventHandler.registerListener(listener);
IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED);
feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
- JobUtils.runJob(hcc, feedJob,
+ JobUtils.runJob(hcc, feedJob, jobFlags,
Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
eventSubscriber.sync();
LOGGER.log(Level.INFO, "Submitted");
@@ -2211,7 +2218,7 @@
// #. run the jobs
for (JobSpecification jobSpec : jobsToExecute) {
- JobUtils.runJob(hcc, jobSpec, true);
+ runJob(hcc, jobSpec);
}
} catch (Exception e) {
if (bActiveTxn) {
@@ -2300,14 +2307,14 @@
}
break;
case IMMEDIATE:
- createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+ createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
metadataProvider.findOutputRecordType());
}, clientContextId, ctx);
break;
case DEFERRED:
- createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+ createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
if (outMetadata != null) {
outMetadata.getResultSets()
@@ -2325,7 +2332,7 @@
ResultSetId resultSetId, MutableBoolean printed) {
Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
try {
- createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
+ createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
final ResultHandle handle = new ResultHandle(id, resultSetId);
ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING);
ResultUtil.printResultHandle(sessionOutput, handle);
@@ -2353,16 +2360,20 @@
}
}
- private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler,
- IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId,
- IStatementExecutorContext ctx) throws Exception {
+ private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception {
+ JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+ }
+
+ private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+ IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
+ String clientContextId, IStatementExecutorContext ctx) throws Exception {
locker.lock();
try {
final JobSpecification jobSpec = compiler.compile();
if (jobSpec == null) {
return;
}
- final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+ final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
if (ctx != null && clientContextId != null) {
ctx.put(clientContextId, jobId); // Adds the running job into the context.
}
@@ -2507,14 +2518,14 @@
transactionState = TransactionState.BEGIN;
// run the files update job
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
for (Index index : indexes) {
if (!ExternalIndexingOperations.isFileIndex(index)) {
spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles,
appendedFiles, metadataProvider);
// run the files update job
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
}
}
@@ -2533,7 +2544,7 @@
bActiveTxn = false;
transactionState = TransactionState.READY_TO_COMMIT;
// We don't release the latch since this job is expected to be quick
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
// Start a new metadata transaction to record the final state of the transaction
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2602,7 +2613,7 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
try {
- JobUtils.runJob(hcc, spec, true);
+ runJob(hcc, spec);
} catch (Exception e2) {
// This should never happen -- fix throw illegal
e.addSuppressed(e2);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index cbe551b..7d4b41d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -205,7 +205,7 @@
NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
BTreeSearchOperatorNodePushable searchOp =
searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1);
- emptyTupleOp.setFrameWriter(0, searchOp,
+ emptyTupleOp.setOutputFrameWriter(0, searchOp,
primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
return emptyTupleOp;
@@ -312,7 +312,7 @@
public IHyracksTaskContext createTestContext(boolean withMessaging) throws HyracksDataException {
IHyracksTaskContext ctx = TestUtils.create(KB32);
if (withMessaging) {
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
}
ctx = Mockito.spy(ctx);
Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 5e3da5f..b80f8f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -40,6 +40,9 @@
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.control.common.controllers.CCConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -59,6 +62,11 @@
ExternalProperties mockAsterixExternalProperties = mock(ExternalProperties.class);
when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties);
when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002);
+ ICCServiceContext mockServiceContext = mock(ICCServiceContext.class);
+ when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext);
+ IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class);
+ when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig);
+ when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true);
// Mocks AsterixClusterProperties.
Cluster mockCluster = mock(Cluster.class);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index 48ca338..b1c7ff3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -72,7 +72,7 @@
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
message.getBuffer().flip();
@@ -152,7 +152,7 @@
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE + 1);
ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
@@ -236,7 +236,7 @@
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
message.getBuffer().clear();
writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE);
ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
@@ -294,7 +294,7 @@
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
message.getBuffer().flip();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index ef72c67..1e3960f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -136,7 +136,7 @@
marker.getBuffer().putLong(markerId);
marker.getBuffer().flip();
markerId++;
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, marker, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, marker, ctx);
tupleAppender.flush(insertOp);
}
ITupleReference tuple = tupleGenerator.next();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index e8f018e..6f666f4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -11,7 +11,7 @@
"compiler\.joinmemory" : 262144,
"compiler\.parallelism" : 0,
"compiler\.sortmemory" : 327680,
- "default\.dir" : "target/io/dir/asterixdb",
+ "default\.dir" : "target/io/dir",
"instance\.name" : "DEFAULT_INSTANCE",
"log\.level" : "INFO",
"max\.wait\.active\.cluster" : 60,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index e099835..5f0c47b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -11,7 +11,7 @@
"compiler\.joinmemory" : 262144,
"compiler\.parallelism" : -1,
"compiler\.sortmemory" : 327680,
- "default\.dir" : "target/io/dir/asterixdb",
+ "default\.dir" : "target/io/dir",
"instance\.name" : "DEFAULT_INSTANCE",
"log\.level" : "WARNING",
"max\.wait\.active\.cluster" : 60,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index b3fe5cc..b03de79 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -11,7 +11,7 @@
"compiler\.joinmemory" : 262144,
"compiler\.parallelism" : 3,
"compiler\.sortmemory" : 327680,
- "default\.dir" : "target/io/dir/asterixdb",
+ "default\.dir" : "target/io/dir",
"instance\.name" : "DEFAULT_INSTANCE",
"log\.level" : "WARNING",
"max\.wait\.active\.cluster" : 60,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 3f9fba9..1c610ac 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -83,7 +83,7 @@
try {
if (isPrimary && ctx.getSharedObject() != null) {
PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback(lsmIndex);
- TaskUtil.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+ TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
}
writer.open();
modCallback =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index df647d5..5fc1837 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -62,6 +62,7 @@
public static final int TYPE_CONVERT_INTEGER_SOURCE = 19;
public static final int TYPE_CONVERT_INTEGER_TARGET = 20;
public static final int TYPE_CONVERT_OUT_OF_BOUND = 21;
+ public static final int FIELD_SHOULD_HAVE_CONCRETE_TYPE = 22;
public static final int INSTANTIATION_ERROR = 100;
// Compilation errors
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index 41f9e67..cacbfbc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -19,7 +19,10 @@
package org.apache.asterix.common.utils;
+import java.util.EnumSet;
+
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -32,8 +35,13 @@
public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
throws Exception {
+ return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion);
+ }
+
+ public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+ boolean waitForCompletion) throws Exception {
spec.setMaxReattempts(0);
- final JobId jobId = hcc.startJob(spec);
+ final JobId jobId = hcc.startJob(spec, jobFlags);
if (waitForCompletion) {
hcc.waitForCompletion(jobId);
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 7204e34..e303ba1 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -55,6 +55,7 @@
19 = Can't convert integer types. The source type should be one of %1$s.
20 = Can't convert integer types. The target type should be one of %1$s.
21 = Source value %1$s is out of range that %2$s can hold - %2$s.MAX_VALUE: %3$s, %2$s.MIN_VALUE: %4$s
+22 = Field value has type tag ANY, but it should have a concrete type
100 = Unable to instantiate class %1$s
# Compile-time check errors
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8a7bda9..2876ea6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -66,7 +66,7 @@
FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition);
IFrame message = new VSizeFrame(ctx);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
/*
* Set null feed message. Feed pipeline carries with it a message with each frame
* Initially, the message is set to a null message that can be changed by feed adapters.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 0558ead..24a7462 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -106,7 +106,7 @@
this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.message = new VSizeFrame(ctx);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
this.opDesc = feedMetaOperatorDescriptor;
this.recordDescProvider = recordDescProvider;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 662b543..97c1115d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -98,7 +98,7 @@
this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.message = new VSizeFrame(ctx);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
this.recordDescProvider = recordDescProvider;
this.opDesc = feedMetaOperatorDescriptor;
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java
index 90af41b..c644bbe 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java
@@ -18,11 +18,11 @@
*/
package org.apache.asterix.dataflow.data.common;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
@@ -64,7 +64,7 @@
length = NonTaggedFormatUtil.getFieldValueLength(data, itemOffset, typeTag, false);
// Last param is a hack to pass the type tag.
token.reset(data, itemOffset, itemOffset + length, length, data[start + 1]);
- } catch (AsterixException e) {
+ } catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
itemIndex++;
@@ -79,7 +79,7 @@
this.itemIndex = 0;
}
- protected int getItemOffset(byte[] data, int start, int itemIndex) throws AsterixException {
+ protected int getItemOffset(byte[] data, int start, int itemIndex) throws HyracksDataException {
return AOrderedListSerializerDeserializer.getItemOffset(data, start, itemIndex);
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java
index b90e87f..6926713 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.dataflow.data.common;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
public class AUnorderedListBinaryTokenizer extends AOrderedListBinaryTokenizer {
@@ -29,7 +29,7 @@
}
@Override
- protected int getItemOffset(byte[] data, int start, int itemIndex) throws AsterixException {
+ protected int getItemOffset(byte[] data, int start, int itemIndex) throws HyracksDataException {
return AUnorderedListSerializerDeserializer.getItemOffset(data, start, itemIndex);
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java
index da95091..0a66d56 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java
@@ -24,7 +24,6 @@
import java.util.List;
import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.IAObject;
@@ -42,7 +41,8 @@
public class AOrderedListSerializerDeserializer implements ISerializerDeserializer<AOrderedList> {
private static final long serialVersionUID = 1L;
- public static final AOrderedListSerializerDeserializer SCHEMALESS_INSTANCE = new AOrderedListSerializerDeserializer();
+ public static final AOrderedListSerializerDeserializer SCHEMALESS_INSTANCE =
+ new AOrderedListSerializerDeserializer();
private final IAType itemType;
@SuppressWarnings("rawtypes")
@@ -74,18 +74,19 @@
ISerializerDeserializer currentDeserializer = deserializer;
if (itemType.getTypeTag() == ATypeTag.ANY && typeTag != ATypeTag.ANY) {
currentItemType = TypeTagUtil.getBuiltinTypeByTag(typeTag);
- currentDeserializer = SerializerDeserializerProvider.INSTANCE
- .getNonTaggedSerializerDeserializer(currentItemType);
+ currentDeserializer =
+ SerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(currentItemType);
}
- List<IAObject> items = new ArrayList<IAObject>();
+ List<IAObject> items = new ArrayList<>();
in.readInt(); // list size
int numberOfitems;
numberOfitems = in.readInt();
if (numberOfitems > 0) {
if (!NonTaggedFormatUtil.isFixedSizedCollection(currentItemType)) {
- for (int i = 0; i < numberOfitems; i++)
+ for (int i = 0; i < numberOfitems; i++) {
in.readInt();
+ }
}
for (int i = 0; i < numberOfitems; i++) {
IAObject v = (IAObject) currentDeserializer.deserialize(in);
@@ -119,14 +120,15 @@
}
public static int getNumberOfItems(byte[] serOrderedList, int offset) {
- if (serOrderedList[offset] == ATypeTag.ARRAY.serialize())
+ if (serOrderedList[offset] == ATypeTag.ARRAY.serialize()) {
// 6 = tag (1) + itemTag (1) + list size (4)
return AInt32SerializerDeserializer.getInt(serOrderedList, offset + 6);
- else
+ } else {
return -1;
+ }
}
- public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+ public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
if (serOrderedList[offset] == ATypeTag.ARRAY.serialize()) {
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serOrderedList[offset + 1]);
if (NonTaggedFormatUtil.isFixedSizedCollection(typeTag)) {
@@ -136,8 +138,9 @@
return offset + AInt32SerializerDeserializer.getInt(serOrderedList, offset + 10 + (4 * itemIndex));
}
// 10 = tag (1) + itemTag (1) + list size (4) + number of items (4)
- } else
+ } else {
return -1;
+ }
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java
index dd95586..a6090af 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java
@@ -23,7 +23,6 @@
import java.util.ArrayList;
import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AUnorderedList;
import org.apache.asterix.om.base.IACursor;
@@ -43,7 +42,8 @@
private static final long serialVersionUID = 1L;
- public static final AUnorderedListSerializerDeserializer SCHEMALESS_INSTANCE = new AUnorderedListSerializerDeserializer();
+ public static final AUnorderedListSerializerDeserializer SCHEMALESS_INSTANCE =
+ new AUnorderedListSerializerDeserializer();
private final IAType itemType;
private final AUnorderedListType unorderedlistType;
@@ -76,18 +76,19 @@
ISerializerDeserializer currentDeserializer = deserializer;
if (itemType.getTypeTag() == ATypeTag.ANY && typeTag != ATypeTag.ANY) {
currentItemType = TypeTagUtil.getBuiltinTypeByTag(typeTag);
- currentDeserializer = SerializerDeserializerProvider.INSTANCE
- .getNonTaggedSerializerDeserializer(currentItemType);
+ currentDeserializer =
+ SerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(currentItemType);
}
in.readInt(); // list size
int numberOfitems;
numberOfitems = in.readInt();
- ArrayList<IAObject> items = new ArrayList<IAObject>();
+ ArrayList<IAObject> items = new ArrayList<>();
if (numberOfitems > 0) {
if (!NonTaggedFormatUtil.isFixedSizedCollection(currentItemType)) {
- for (int i = 0; i < numberOfitems; i++)
+ for (int i = 0; i < numberOfitems; i++) {
in.readInt();
+ }
}
for (int i = 0; i < numberOfitems; i++) {
items.add((IAObject) currentDeserializer.deserialize(in));
@@ -121,14 +122,15 @@
}
public static int getNumberOfItems(byte[] serOrderedList, int offset) {
- if (serOrderedList[offset] == ATypeTag.MULTISET.serialize())
+ if (serOrderedList[offset] == ATypeTag.MULTISET.serialize()) {
// 6 = tag (1) + itemTag (1) + list size (4)
return AInt32SerializerDeserializer.getInt(serOrderedList, offset + 6);
- else
+ } else {
return -1;
+ }
}
- public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+ public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
if (serOrderedList[offset] == ATypeTag.MULTISET.serialize()) {
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serOrderedList[offset + 1]);
if (NonTaggedFormatUtil.isFixedSizedCollection(typeTag)) {
@@ -138,8 +140,9 @@
return offset + AInt32SerializerDeserializer.getInt(serOrderedList, offset + 10 + (4 * itemIndex));
}
// 10 = tag (1) + itemTag (1) + list size (4) + number of items (4)
- } else
+ } else {
return -1;
+ }
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java
index f2e7299..7a2efe0 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java
@@ -29,6 +29,7 @@
import org.apache.asterix.om.util.container.IObjectFactory;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.AbstractPointable;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IPointableFactory;
@@ -123,7 +124,7 @@
return NonTaggedFormatUtil.isFixedSizedCollection(inputType.getItemType());
}
- public int getFixedLength(AbstractCollectionType inputType) throws AsterixException {
+ public int getFixedLength(AbstractCollectionType inputType) throws HyracksDataException {
return NonTaggedFormatUtil.getFieldValueLength(bytes, 0, inputType.getItemType().getTypeTag(), false);
}
@@ -163,7 +164,7 @@
// Item accessors
// ----------------------
- public int getItemOffset(AbstractCollectionType inputType, int index) throws AsterixException {
+ public int getItemOffset(AbstractCollectionType inputType, int index) throws HyracksDataException {
if (isFixedType(inputType)) {
return getItemCountOffset() + getItemCountSize() + index * getFixedLength(inputType);
} else {
@@ -172,14 +173,14 @@
}
}
- public byte getItemTag(AbstractCollectionType inputType, int index) throws AsterixException {
+ public byte getItemTag(AbstractCollectionType inputType, int index) throws HyracksDataException {
if (getType() != ATypeTag.ANY.serialize()) {
return getType();
}
return bytes[getItemOffset(inputType, index)];
}
- public int getItemSize(AbstractCollectionType inputType, int index) throws AsterixException {
+ public int getItemSize(AbstractCollectionType inputType, int index) throws HyracksDataException {
if (isFixedType(inputType)) {
return getFixedLength(inputType);
} else {
@@ -188,8 +189,8 @@
}
}
- public void getItemValue(AbstractCollectionType inputType, int index, DataOutput dOut) throws IOException,
- AsterixException {
+ public void getItemValue(AbstractCollectionType inputType, int index, DataOutput dOut)
+ throws IOException, AsterixException {
if (getType() != ATypeTag.ANY.serialize()) {
dOut.writeByte(getType());
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 190a3b2..0aa256f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -32,6 +32,7 @@
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.om.utils.RecordUtil;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.AbstractPointable;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IPointableFactory;
@@ -252,7 +253,7 @@
return aType;
}
- public int getClosedFieldSize(ARecordType recordType, int fieldId) throws AsterixException {
+ public int getClosedFieldSize(ARecordType recordType, int fieldId) throws HyracksDataException {
if (isClosedFieldNull(recordType, fieldId)) {
return 0;
}
@@ -294,7 +295,7 @@
return getOpenFieldNameOffset(recordType, fieldId) + getOpenFieldNameSize(recordType, fieldId);
}
- public int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws AsterixException {
+ public int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws HyracksDataException {
int offset = getOpenFieldValueOffset(recordType, fieldId);
ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(getOpenFieldTag(recordType, fieldId));
return NonTaggedFormatUtil.getFieldValueLength(bytes, offset, tag, true);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java
index 37e8c05..01cea05 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java
@@ -19,7 +19,7 @@
package org.apache.asterix.om.utils;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
@@ -39,6 +39,7 @@
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import org.apache.hyracks.util.string.UTF8StringUtil;
@@ -106,12 +107,12 @@
}
public static int getFieldValueLength(byte[] serNonTaggedAObject, int offset, ATypeTag typeTag, boolean tagged)
- throws AsterixException {
+ throws HyracksDataException {
switch (typeTag) {
case ANY:
ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serNonTaggedAObject[offset]);
if (tag == ATypeTag.ANY) {
- throw new AsterixException("Field value has type tag ANY, but it should have a concrete type.");
+ throw HyracksDataException.create(ErrorCode.FIELD_SHOULD_HAVE_CONCRETE_TYPE);
}
return getFieldValueLength(serNonTaggedAObject, offset, tag, true) + 1;
case MISSING:
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
index 379c362..c07aae2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.runtime.evaluators.common;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
@@ -72,32 +71,24 @@
@Override
public void next() throws HyracksDataException {
- try {
- pos = nextPos;
- ++count;
- nextPos = startOff + listLength;
- if (count + 1 < numberOfItems) {
- nextPos = getItemOffset(data, startOff, count + 1);
- }
- itemLen = nextPos - pos;
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ pos = nextPos;
+ ++count;
+ nextPos = startOff + listLength;
+ if (count + 1 < numberOfItems) {
+ nextPos = getItemOffset(data, startOff, count + 1);
}
+ itemLen = nextPos - pos;
}
@Override
public void reset() throws HyracksDataException {
count = 0;
- try {
- pos = getItemOffset(data, startOff, count);
- nextPos = startOff + listLength;
- if (count + 1 < numberOfItems) {
- nextPos = getItemOffset(data, startOff, count + 1);
- }
- itemLen = nextPos - pos;
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ pos = getItemOffset(data, startOff, count);
+ nextPos = startOff + listLength;
+ if (count + 1 < numberOfItems) {
+ nextPos = getItemOffset(data, startOff, count + 1);
}
+ itemLen = nextPos - pos;
}
@Override
@@ -153,7 +144,7 @@
reset();
}
- protected abstract int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException;
+ protected abstract int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException;
protected abstract int getNumberOfItems(byte[] serOrderedList, int offset);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java
index c03cfa6..3c97fc9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java
@@ -22,7 +22,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
import org.apache.asterix.om.types.ATypeTag;
@@ -76,7 +75,7 @@
return size;
}
- public int getItemOffset(int itemIndex) throws AsterixException {
+ public int getItemOffset(int itemIndex) throws HyracksDataException {
if (listType == ATypeTag.MULTISET) {
return AUnorderedListSerializerDeserializer.getItemOffset(listBytes, start, itemIndex);
} else {
@@ -84,12 +83,12 @@
}
}
- public int getItemLength(int itemOffset) throws AsterixException {
+ public int getItemLength(int itemOffset) throws HyracksDataException {
ATypeTag itemType = getItemType(itemOffset);
return NonTaggedFormatUtil.getFieldValueLength(listBytes, itemOffset, itemType, itemsAreSelfDescribing());
}
- public ATypeTag getItemType(int itemOffset) throws AsterixException {
+ public ATypeTag getItemType(int itemOffset) {
if (itemType == ATypeTag.ANY) {
return EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(listBytes[itemOffset]);
} else {
@@ -97,7 +96,7 @@
}
}
- public void writeItem(int itemIndex, DataOutput dos) throws AsterixException, IOException {
+ public void writeItem(int itemIndex, DataOutput dos) throws IOException {
int itemOffset = getItemOffset(itemIndex);
int itemLength = getItemLength(itemOffset);
if (itemsAreSelfDescribing()) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java
index f0c2f5a..c5add44 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java
@@ -18,13 +18,13 @@
*/
package org.apache.asterix.runtime.evaluators.common;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public final class OrderedListIterator extends AbstractAsterixListIterator {
@Override
- protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+ protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
return AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, itemIndex);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
index f519065..b70c6ad0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
@@ -72,8 +71,8 @@
// result
protected final AMutableFloat res = new AMutableFloat(0);
@SuppressWarnings("unchecked")
- protected final ISerializerDeserializer<AFloat> reusSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AFLOAT);
+ protected final ISerializerDeserializer<AFloat> reusSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
public SimilarityJaccardPrefixEvaluator(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
throws HyracksDataException {
@@ -91,8 +90,8 @@
// similarity threshold
sim = 0;
evalThreshold.evaluate(tuple, inputVal);
- float similarityThreshold = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(),
- inputVal.getStartOffset() + 1);
+ float similarityThreshold =
+ AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), inputVal.getStartOffset() + 1);
if (similarityThreshold != similarityThresholdCache || similarityFilters == null) {
similarityFilters = new SimilarityFiltersJaccard(similarityThreshold);
@@ -126,39 +125,23 @@
int lengthTokens1;
if (serList[startOffset] == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG) {
- lengthTokens1 = AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
- startOffset);
+ lengthTokens1 =
+ AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
// read tokens
for (i = 0; i < lengthTokens1; i++) {
- int itemOffset;
- int token;
- try {
- itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
- token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
- BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset,
- startOffset + 1);
+ int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+ int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+ BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset, startOffset + 1);
tokens1.add(token);
}
} else {
- lengthTokens1 = AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
- startOffset);
+ lengthTokens1 =
+ AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
// read tokens
for (i = 0; i < lengthTokens1; i++) {
- int itemOffset;
- int token;
-
- try {
- itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
-
- token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
- BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset,
- startOffset + 1);
+ int itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+ int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+ BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset, startOffset + 1);
tokens1.add(token);
}
}
@@ -181,39 +164,23 @@
int lengthTokens2;
if (serList[startOffset] == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG) {
- lengthTokens2 = AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
- startOffset);
+ lengthTokens2 =
+ AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
// read tokens
for (i = 0; i < lengthTokens2; i++) {
- int itemOffset;
- int token;
-
- try {
- itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
- token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
- BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset,
- startOffset + 1);
+ int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+ int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+ BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset, startOffset + 1);
tokens2.add(token);
}
} else {
- lengthTokens2 = AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
- startOffset);
+ lengthTokens2 =
+ AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
// read tokens
for (i = 0; i < lengthTokens2; i++) {
- int itemOffset;
- int token;
-
- try {
- itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
- token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
- BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset,
- startOffset + 1);
+ int itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+ int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+ BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset, startOffset + 1);
tokens2.add(token);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java
index ee58011..b755da2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java
@@ -18,13 +18,13 @@
*/
package org.apache.asterix.runtime.evaluators.common;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public final class UnorderedListIterator extends AbstractAsterixListIterator {
@Override
- protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+ protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
return AUnorderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, itemIndex);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java
index 714fabe..b78bc5c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -96,9 +95,8 @@
if (serList[offset] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
&& serList[offset] != ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
- throw new TypeMismatchException(BuiltinFunctions.ANY_COLLECTION_MEMBER, 0,
- serList[offset], ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG,
- ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
+ throw new TypeMismatchException(BuiltinFunctions.ANY_COLLECTION_MEMBER, 0, serList[offset],
+ ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG, ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
}
try {
@@ -127,8 +125,8 @@
if (selfDescList) {
itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[itemOffset]);
- itemLength = NonTaggedFormatUtil.getFieldValueLength(serList, itemOffset, itemTag, true)
- + 1;
+ itemLength =
+ NonTaggedFormatUtil.getFieldValueLength(serList, itemOffset, itemTag, true) + 1;
result.set(serList, itemOffset, itemLength);
} else {
itemLength = NonTaggedFormatUtil.getFieldValueLength(serList, itemOffset, itemTag, false);
@@ -137,9 +135,7 @@
result.set(resultStorage);
}
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java
index 09e15b5..a59d03d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -99,39 +98,32 @@
throw new UnsupportedTypeException(getIdentifier(), serOrderedList[offset]);
}
}
-
- try {
- // calculate length first
- int utf_8_len = 0;
- for (int i = 0; i < size; i++) {
- int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serOrderedList,
- offset, i);
- int codePoint = 0;
- codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
- getIdentifier().getName(), 0,
- serOrderedList, itemOffset, offset + 1);
- utf_8_len += UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
- }
- out.writeByte(stringTypeTag);
- UTF8StringUtil.writeUTF8Length(utf_8_len, tempStoreForLength, out);
- for (int i = 0; i < size; i++) {
- int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serOrderedList,
- offset, i);
- int codePoint = 0;
- codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
- getIdentifier().getName(), 0,
- serOrderedList, itemOffset, offset + 1);
- utf_8_len = UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
- for (int j = 0; j < utf_8_len; j++) {
- out.writeByte(currentUTF8[j]);
- }
- }
- result.set(resultStorage);
- } catch (AsterixException ex) {
- throw new HyracksDataException(ex);
+ // calculate length first
+ int utf_8_len = 0;
+ for (int i = 0; i < size; i++) {
+ int itemOffset =
+ AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, i);
+ int codePoint = 0;
+ codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+ getIdentifier().getName(), 0, serOrderedList, itemOffset, offset + 1);
+ utf_8_len += UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
}
- } catch (IOException e1) {
- throw new HyracksDataException(e1);
+ out.writeByte(stringTypeTag);
+ UTF8StringUtil.writeUTF8Length(utf_8_len, tempStoreForLength, out);
+ for (int i = 0; i < size; i++) {
+ int itemOffset =
+ AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, i);
+ int codePoint = 0;
+ codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+ getIdentifier().getName(), 0, serOrderedList, itemOffset, offset + 1);
+ utf_8_len = UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
+ for (int j = 0; j < utf_8_len; j++) {
+ out.writeByte(currentUTF8[j]);
+ }
+ }
+ result.set(resultStorage);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java
index 76d9cf3..74f8757 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.APointSerializerDeserializer;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -75,10 +74,10 @@
private final IPointable inputArgList = new VoidPointable();
private final IScalarEvaluator evalList = listEvalFactory.createScalarEvaluator(ctx);
@SuppressWarnings("unchecked")
- private final ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private final ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.
- INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
+ private final ISerializerDeserializer<ANull> nullSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ private final ISerializerDeserializer<AMissing> missingSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -95,51 +94,46 @@
ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
}
listAccessor.reset(listBytes, offset);
- try {
- // First check the list consists of a valid items
- for (int i = 0; i < listAccessor.size(); i++) {
- int itemOffset = listAccessor.getItemOffset(i);
- ATypeTag itemType = listAccessor.getItemType(itemOffset);
- if (itemType != ATypeTag.DOUBLE) {
- if (itemType == ATypeTag.NULL) {
- nullSerde.serialize(ANull.NULL, out);
- return;
- }
- if (itemType == ATypeTag.MISSING) {
- missingSerde.serialize(AMissing.MISSING, out);
- return;
- }
- throw new UnsupportedItemTypeException(BuiltinFunctions.CREATE_POLYGON,
- itemType.serialize());
+ // First check the list consists of a valid items
+ for (int i = 0; i < listAccessor.size(); i++) {
+ int itemOffset = listAccessor.getItemOffset(i);
+ ATypeTag itemType = listAccessor.getItemType(itemOffset);
+ if (itemType != ATypeTag.DOUBLE) {
+ if (itemType == ATypeTag.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ return;
}
-
+ if (itemType == ATypeTag.MISSING) {
+ missingSerde.serialize(AMissing.MISSING, out);
+ return;
+ }
+ throw new UnsupportedItemTypeException(BuiltinFunctions.CREATE_POLYGON,
+ itemType.serialize());
}
- if (listAccessor.size() < 6) {
- throw new InvalidDataFormatException(getIdentifier(),
- ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
- } else if (listAccessor.size() % 2 != 0) {
- throw new InvalidDataFormatException(getIdentifier(),
- ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
- }
- out.writeByte(ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
- out.writeShort(listAccessor.size() / 2);
- final int skipTypeTag = listAccessor.itemsAreSelfDescribing() ? 1 : 0;
- for (int i = 0; i < listAccessor.size() / 2; i++) {
- int firstDoubleOffset = listAccessor.getItemOffset(i * 2) + skipTypeTag;
- int secondDobuleOffset = listAccessor.getItemOffset((i * 2) + 1) + skipTypeTag;
-
- APointSerializerDeserializer.serialize(
- ADoubleSerializerDeserializer.getDouble(listBytes, firstDoubleOffset),
- ADoubleSerializerDeserializer.getDouble(listBytes, secondDobuleOffset),
- out);
- }
- result.set(resultStorage);
- } catch (AsterixException ex) {
- throw new HyracksDataException(ex);
}
- } catch (IOException e1) {
- throw new HyracksDataException(e1);
+ if (listAccessor.size() < 6) {
+ throw new InvalidDataFormatException(getIdentifier(),
+ ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
+ } else if (listAccessor.size() % 2 != 0) {
+ throw new InvalidDataFormatException(getIdentifier(),
+ ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
+ }
+ out.writeByte(ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
+ out.writeShort(listAccessor.size() / 2);
+
+ final int skipTypeTag = listAccessor.itemsAreSelfDescribing() ? 1 : 0;
+ for (int i = 0; i < listAccessor.size() / 2; i++) {
+ int firstDoubleOffset = listAccessor.getItemOffset(i * 2) + skipTypeTag;
+ int secondDobuleOffset = listAccessor.getItemOffset((i * 2) + 1) + skipTypeTag;
+
+ APointSerializerDeserializer.serialize(
+ ADoubleSerializerDeserializer.getDouble(listBytes, firstDoubleOffset),
+ ADoubleSerializerDeserializer.getDouble(listBytes, secondDobuleOffset), out);
+ }
+ result.set(resultStorage);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
index 276c9ae..0c55ef1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -106,8 +105,8 @@
itemIndex = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GET_ITEM.getName(), 0,
indexBytes, indexOffset);
} else {
- throw new TypeMismatchException(BuiltinFunctions.GET_ITEM,
- 0, serOrderedList[offset], ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
+ throw new TypeMismatchException(BuiltinFunctions.GET_ITEM, 0, serOrderedList[offset],
+ ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
}
if (itemIndex < 0 || itemIndex >= AOrderedListSerializerDeserializer
@@ -124,26 +123,25 @@
serItemTypeTag = serOrderedList[offset + 1];
}
- itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset,
- itemIndex);
+ itemOffset =
+ AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, itemIndex);
if (selfDescList) {
itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serOrderedList[itemOffset]);
- itemLength = NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag,
- true) + 1;
+ itemLength =
+ NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag, true)
+ + 1;
result.set(serOrderedList, itemOffset, itemLength);
} else {
- itemLength = NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag,
- false);
+ itemLength =
+ NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag, false);
resultStorage.reset();
output.writeByte(serItemTypeTag);
output.write(serOrderedList, itemOffset, itemLength);
result.set(resultStorage);
}
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java
index 385b2c1..8584d06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionConstants;
import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
@@ -33,13 +32,13 @@
import org.apache.asterix.fuzzyjoin.similarity.SimilarityMetric;
import org.apache.asterix.om.base.ADouble;
import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.asterix.runtime.evaluators.common.SimilarityFiltersCache;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -55,8 +54,8 @@
public class SimilarityDescriptor extends AbstractScalarFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "similarity@7",
- 7);
+ private final static FunctionIdentifier FID =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "similarity@7", 7);
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
@Override
public IFunctionDescriptor createFunctionDescriptor() {
@@ -94,8 +93,8 @@
// result
private final AMutableDouble res = new AMutableDouble(0);
@SuppressWarnings("unchecked")
- private final ISerializerDeserializer<ADouble> doubleSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ private final ISerializerDeserializer<ADouble> doubleSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -120,8 +119,8 @@
throw new TypeMismatchException(getIdentifier(), 1, data[offset],
ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
}
- SimilarityFilters similarityFilters = similarityFiltersCache.get(similarityThreshold, data,
- offset, len);
+ SimilarityFilters similarityFilters =
+ similarityFiltersCache.get(similarityThreshold, data, offset, len);
evalLen1.evaluate(tuple, inputVal);
data = inputVal.getByteArray();
@@ -167,26 +166,16 @@
lengthTokens1 = AOrderedListSerializerDeserializer.getNumberOfItems(serList, offset);
// read tokens
for (i = 0; i < lengthTokens1; i++) {
- int itemOffset;
- try {
- itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, offset,
- i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
+ int itemOffset =
+ AOrderedListSerializerDeserializer.getItemOffset(serList, offset, i);
tokens1.add(IntegerPointable.getInteger(serList, itemOffset));
}
} else {
lengthTokens1 = AUnorderedListSerializerDeserializer.getNumberOfItems(serList, offset);
// read tokens
for (i = 0; i < lengthTokens1; i++) {
- int itemOffset;
- try {
- itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, offset,
- i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
+ int itemOffset =
+ AUnorderedListSerializerDeserializer.getItemOffset(serList, offset, i);
tokens1.add(IntegerPointable.getInteger(serList, itemOffset));
}
}
@@ -213,26 +202,16 @@
lengthTokens2 = AOrderedListSerializerDeserializer.getNumberOfItems(serList, offset);
// read tokens
for (i = 0; i < lengthTokens2; i++) {
- int itemOffset;
- try {
- itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, offset,
- i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
+ int itemOffset =
+ AOrderedListSerializerDeserializer.getItemOffset(serList, offset, i);
tokens2.add(IntegerPointable.getInteger(serList, itemOffset));
}
} else {
lengthTokens2 = AUnorderedListSerializerDeserializer.getNumberOfItems(serList, offset);
// read tokens
for (i = 0; i < lengthTokens2; i++) {
- int itemOffset;
- try {
- itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, offset,
- i);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
+ int itemOffset =
+ AUnorderedListSerializerDeserializer.getItemOffset(serList, offset, i);
tokens2.add(IntegerPointable.getInteger(serList, itemOffset));
}
}
@@ -243,8 +222,8 @@
// -- - token prefix - --
evalTokenPrefix.evaluate(tuple, inputVal);
- int tokenPrefix = IntegerPointable.getInteger(inputVal.getByteArray(),
- inputVal.getStartOffset() + 1);
+ int tokenPrefix =
+ IntegerPointable.getInteger(inputVal.getByteArray(), inputVal.getStartOffset() + 1);
//
// -- - position filter - --
@@ -272,7 +251,7 @@
try {
doubleSerde.serialize(res, out);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
result.set(resultStorage);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
index 8ad3255..851e1a4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
@@ -21,11 +21,9 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMissing;
import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
@@ -33,6 +31,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import org.apache.asterix.runtime.evaluators.common.ListAccessor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -72,10 +71,10 @@
private final IPointable inputArgList = new VoidPointable();
private final IScalarEvaluator evalList = listEvalFactory.createScalarEvaluator(ctx);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AMISSING);
+ private ISerializerDeserializer<ANull> nullSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ private ISerializerDeserializer<AMissing> missingSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
private final byte[] tempLengthArray = new byte[5];
@Override
@@ -93,49 +92,45 @@
ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
}
listAccessor.reset(listBytes, listOffset);
- try {
- // calculate length first
- int utf8Len = 0;
- for (int i = 0; i < listAccessor.size(); i++) {
- int itemOffset = listAccessor.getItemOffset(i);
- ATypeTag itemType = listAccessor.getItemType(itemOffset);
- // Increase the offset by 1 if the give list has heterogeneous elements,
- // since the item itself has a typetag.
- if (listAccessor.itemsAreSelfDescribing()) {
- itemOffset += 1;
- }
- if (itemType != ATypeTag.STRING) {
- if (itemType == ATypeTag.NULL) {
- nullSerde.serialize(ANull.NULL, out);
- result.set(resultStorage);
- return;
- }
- if (itemType == ATypeTag.MISSING) {
- missingSerde.serialize(AMissing.MISSING, out);
- result.set(resultStorage);
- return;
- }
- throw new UnsupportedItemTypeException(getIdentifier(), itemType.serialize());
- }
- utf8Len += UTF8StringUtil.getUTFLength(listBytes, itemOffset);
+ // calculate length first
+ int utf8Len = 0;
+ for (int i = 0; i < listAccessor.size(); i++) {
+ int itemOffset = listAccessor.getItemOffset(i);
+ ATypeTag itemType = listAccessor.getItemType(itemOffset);
+ // Increase the offset by 1 if the give list has heterogeneous elements,
+ // since the item itself has a typetag.
+ if (listAccessor.itemsAreSelfDescribing()) {
+ itemOffset += 1;
}
- out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
- int cbytes = UTF8StringUtil.encodeUTF8Length(utf8Len, tempLengthArray, 0);
- out.write(tempLengthArray, 0, cbytes);
- for (int i = 0; i < listAccessor.size(); i++) {
- int itemOffset = listAccessor.getItemOffset(i);
- if (listAccessor.itemsAreSelfDescribing()) {
- itemOffset += 1;
+ if (itemType != ATypeTag.STRING) {
+ if (itemType == ATypeTag.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
+ return;
}
- utf8Len = UTF8StringUtil.getUTFLength(listBytes, itemOffset);
- out.write(listBytes, UTF8StringUtil.getNumBytesToStoreLength(utf8Len) + itemOffset,
- utf8Len);
+ if (itemType == ATypeTag.MISSING) {
+ missingSerde.serialize(AMissing.MISSING, out);
+ result.set(resultStorage);
+ return;
+ }
+ throw new UnsupportedItemTypeException(getIdentifier(), itemType.serialize());
}
- } catch (AsterixException ex) {
- throw new HyracksDataException(ex);
+ utf8Len += UTF8StringUtil.getUTFLength(listBytes, itemOffset);
}
- } catch (IOException e1) {
- throw new HyracksDataException(e1);
+ out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+ int cbytes = UTF8StringUtil.encodeUTF8Length(utf8Len, tempLengthArray, 0);
+ out.write(tempLengthArray, 0, cbytes);
+ for (int i = 0; i < listAccessor.size(); i++) {
+ int itemOffset = listAccessor.getItemOffset(i);
+ if (listAccessor.itemsAreSelfDescribing()) {
+ itemOffset += 1;
+ }
+ utf8Len = UTF8StringUtil.getUTFLength(listBytes, itemOffset);
+ out.write(listBytes, UTF8StringUtil.getNumBytesToStoreLength(utf8Len) + itemOffset,
+ utf8Len);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
result.set(resultStorage);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java
index 6cfdffc..70b3687 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java
@@ -21,7 +21,6 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMissing;
import org.apache.asterix.om.base.ANull;
@@ -74,10 +73,10 @@
private final IScalarEvaluator evalList = listEvalFactory.createScalarEvaluator(ctx);
private final IScalarEvaluator evalSep = sepEvalFactory.createScalarEvaluator(ctx);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
- private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AMISSING);
+ private ISerializerDeserializer<ANull> nullSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+ private ISerializerDeserializer<AMissing> missingSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
private final byte[] tempLengthArray = new byte[5];
@Override
@@ -151,8 +150,8 @@
out.writeByte(sepBytes[sepOffset + 1 + sepMetaLen + j]);
}
}
- } catch (IOException | AsterixException ex) {
- throw new HyracksDataException(ex);
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
}
result.set(resultStorage);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
index b394062..907dfd3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
@@ -21,7 +21,6 @@
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMissing;
import org.apache.asterix.om.base.ANull;
@@ -72,11 +71,11 @@
private final ListAccessor listAccessor = new ListAccessor();
private final byte[] metaBuffer = new byte[5];
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
+ private ISerializerDeserializer<ANull> nullSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AMISSING);
+ private ISerializerDeserializer<AMissing> missingSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -118,9 +117,7 @@
itemOffset + ByteArrayPointable.getNumberBytesToStoreMeta(length), length);
}
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
result.set(resultStorage);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
index 3321efc..d5918cb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
@@ -21,9 +21,7 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -31,6 +29,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -90,15 +89,15 @@
int offset = inputArg0.getStartOffset();
if (serRecord[offset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
- throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 0,
- serRecord[offset], ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+ throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 0, serRecord[offset],
+ ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
}
eval1.evaluate(tuple, inputArg1);
byte[] indexBytes = inputArg1.getByteArray();
int indexOffset = inputArg1.getStartOffset();
if (indexBytes[indexOffset] != ATypeTag.SERIALIZED_INT32_TYPE_TAG) {
- throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 1,
- indexBytes[offset], ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+ throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 1, indexBytes[offset],
+ ATypeTag.SERIALIZED_INT32_TYPE_TAG);
}
fieldIndex = IntegerPointable.getInteger(indexBytes, indexOffset + 1);
fieldValueType = recordType.getFieldTypes()[fieldIndex];
@@ -137,9 +136,7 @@
out.write(serRecord, fieldValueOffset, fieldValueLength);
result.set(resultStorage);
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
index 87fa292..2078921 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
@@ -21,13 +21,12 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -94,9 +93,7 @@
fieldValueTypeTag, true) + 1;
result.set(serRecord, fieldValueOffset, fieldValueLength);
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
index 2da8e90..067a458 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.List;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMissing;
@@ -80,11 +79,11 @@
private final IPointable[] fieldPointables = new VoidPointable[fieldPath.size()];
private final RuntimeRecordTypeInfo[] recTypeInfos = new RuntimeRecordTypeInfo[fieldPath.size()];
@SuppressWarnings("unchecked")
- private final ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
+ private final ISerializerDeserializer<ANull> nullSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
@SuppressWarnings("unchecked")
- private final ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AMISSING);
+ private final ISerializerDeserializer<AMissing> missingSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
{
generateFieldsPointables();
@@ -100,8 +99,7 @@
ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
DataOutput out = storage.getDataOutput();
AString as = new AString(fieldPath.get(i));
- SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as,
- out);
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as, out);
fieldPointables[i] = new VoidPointable();
fieldPointables[i].set(storage);
}
@@ -118,8 +116,8 @@
int len = inputArg0.getLength();
if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
- throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_NESTED, 0,
- serRecord[start], ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+ throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_NESTED, 0, serRecord[start],
+ ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
}
int subFieldIndex = -1;
@@ -141,8 +139,7 @@
subType = ((AUnionType) subType).getActualType();
byte serializedTypeTag = subType.getTypeTag().serialize();
if (serializedTypeTag != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
- throw new UnsupportedTypeException(
- BuiltinFunctions.FIELD_ACCESS_NESTED.getName(),
+ throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED.getName(),
serializedTypeTag);
}
if (subType.getTypeTag() == ATypeTag.OBJECT) {
@@ -198,8 +195,7 @@
// type check
if (pathIndex < fieldPointables.length - 1
&& serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
- throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED,
- serRecord[start]);
+ throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED, serRecord[start]);
}
}
@@ -215,8 +211,9 @@
}
subTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[subFieldOffset]);
- subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, subTypeTag,
- true) + 1;
+ subFieldLength =
+ NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, subTypeTag, true)
+ + 1;
if (pathIndex >= fieldPointables.length - 1) {
continue;
@@ -232,8 +229,8 @@
return;
}
if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
- throw new UnsupportedTypeException(
- BuiltinFunctions.FIELD_ACCESS_NESTED.getName(), serRecord[start]);
+ throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED.getName(),
+ serRecord[start]);
}
}
// emit the final result.
@@ -244,8 +241,8 @@
out.write(serRecord, subFieldOffset, subFieldLength);
result.set(resultStorage);
}
- } catch (IOException | AsterixException e) {
- throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
index c162e3d..95fb91a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
@@ -21,9 +21,7 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -31,6 +29,7 @@
import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -107,8 +106,8 @@
return;
}
ATypeTag fieldTypeTag = recordType.getFieldTypes()[subFieldIndex].getTypeTag();
- subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
- fieldTypeTag, false);
+ subFieldLength =
+ NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, fieldTypeTag, false);
// write result.
out.writeByte(fieldTypeTag.serialize());
out.write(serRecord, subFieldOffset, subFieldLength);
@@ -125,16 +124,15 @@
return;
}
// Get the field length.
- ATypeTag fieldValueTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(serRecord[subFieldOffset]);
- subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
- fieldValueTypeTag, true) + 1;
+ ATypeTag fieldValueTypeTag =
+ EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[subFieldOffset]);
+ subFieldLength =
+ NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, fieldValueTypeTag, true)
+ + 1;
// write result.
result.set(serRecord, subFieldOffset, subFieldLength);
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 0a11684..1f41c82 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -199,7 +199,7 @@
try {
if (ctx.getSharedObject() != null) {
PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
- TaskUtil.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+ TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
}
missingTupleBuilder = new ArrayTupleBuilder(1);
DataOutput out = missingTupleBuilder.getDataOutput();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 85e92c3..3957c06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -20,8 +20,8 @@
import java.nio.ByteBuffer;
-import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILockManager;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
index 377ad60..25306a6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
@@ -41,6 +41,9 @@
public void open() throws HyracksDataException {
try {
writer.open();
+ } catch (Throwable e) {
+ writer.fail();
+ throw e;
} finally {
writer.close();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 8ee10dc..3409e61 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -21,13 +21,12 @@
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.runtime.evaluators.common.ListAccessor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
import org.apache.asterix.runtime.unnestingfunctions.base.AbstractUnnestingFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -112,9 +111,7 @@
}
}
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
return false;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
index 84a59ac..47fbc4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
@@ -20,7 +20,6 @@
import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
@@ -80,8 +79,7 @@
try {
evalStart.evaluate(tuple, inputVal);
posStart = ATypeHierarchy.getIntegerValue(getIdentifier().getName(), 0,
- inputVal.getByteArray(),
- inputVal.getStartOffset());
+ inputVal.getByteArray(), inputVal.getStartOffset());
evalLen.evaluate(tuple, inputVal);
numItems = ATypeHierarchy.getIntegerValue(getIdentifier().getName(), 1,
@@ -128,8 +126,8 @@
int offset = inputVal.getStartOffset();
int itemLength = 0;
try {
- int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, offset,
- posCrt);
+ int itemOffset =
+ AOrderedListSerializerDeserializer.getItemOffset(serList, offset, posCrt);
if (selfDescList) {
itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[itemOffset]);
}
@@ -141,9 +139,7 @@
resultStorage.getDataOutput().write(serList, itemOffset,
itemLength + (!selfDescList ? 0 : 1));
} catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
result.set(resultStorage);
++posCrt;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 71acecf..d0b7b47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -21,28 +21,20 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
public class SinkPOperator extends AbstractPhysicalOperator {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
new file mode 100644
index 0000000..26002e6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class EnforcePushRuntime extends EnforceFrameWriter implements IPushRuntime {
+
+ private final IPushRuntime pushRuntime;
+
+ private EnforcePushRuntime(IPushRuntime pushRuntime) {
+ super(pushRuntime);
+ this.pushRuntime = pushRuntime;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ pushRuntime.setOutputFrameWriter(index, writer, recordDesc);
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ pushRuntime.setInputRecordDescriptor(index, recordDescriptor);
+ }
+
+ public static IPushRuntime enforce(IPushRuntime pushRuntime) {
+ return pushRuntime instanceof EnforcePushRuntime || pushRuntime instanceof NestedTupleSourceRuntime
+ ? pushRuntime : new EnforcePushRuntime(pushRuntime);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
index 27d6900..d33927b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -22,7 +22,26 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
public interface IPushRuntime extends IFrameWriter {
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
- public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+ /**
+ * Sets the output frame writer for this writer
+ *
+ * @param index,
+ * the index of the output channel.
+ * @param writer,
+ * the writer for writing output.
+ * @param recordDesc,
+ * the output record descriptor.
+ */
+ void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+ /**
+ * Sets the input record descriptor for this writer.
+ *
+ * @param index,
+ * the index of the input channel.
+ * @param recordDescriptor,
+ * the corresponding input record descriptor.
+ */
+ void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 42e5157..e99b61b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -124,6 +124,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
if (isOpen) {
writer.fail();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index b397f23..893aa61 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -29,6 +30,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
@@ -52,7 +54,6 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
-
final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
@@ -91,8 +92,8 @@
}
@Override
- public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
for (int i = 0; i < pipelines.length; i++) {
outputWriter.setInputIdx(i);
pipelines[i].close();
@@ -144,9 +145,13 @@
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
- newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+ start = enforce ? EnforcePushRuntime.enforce(start) : start;
+ newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
} else {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 52245e1..8b8e320 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -28,8 +29,10 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -58,11 +61,14 @@
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
final IFrameWriter writer) throws HyracksDataException {
- final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
- decorFieldIdx.length, writer);
+ final RunningAggregatorOutput outputWriter =
+ new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer);
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+ IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
- pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+ pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx);
}
final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
@@ -136,13 +142,17 @@
private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
throws HyracksDataException {
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
// plug the operators
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
- newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+ newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+ start = enforce ? EnforceFrameWriter.enforce(start) : start;
+ newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
} else {
@@ -206,8 +216,9 @@
int start = 0;
int offset = 0;
for (int i = 0; i < fieldEnds.length; i++) {
- if (i > 0)
+ if (i > 0) {
start = fieldEnds[i - 1];
+ }
offset = fieldEnds[i] - start;
tb.addField(data, start, offset);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 33e7c73..5cced8d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -29,7 +29,7 @@
protected boolean failed;
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
this.writer = writer;
this.outputRecordDesc = recordDesc;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
index e430461..d47199d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -26,7 +26,7 @@
protected RecordDescriptor inputRecordDesc;
@Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalStateException();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index b7707d4..35563e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -27,20 +27,29 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // nextFrame will never be called on this runtime
throw new UnsupportedOperationException();
}
@Override
public void close() throws HyracksDataException {
+ // close is a no op since this operator completes operating in open()
}
@Override
public void fail() throws HyracksDataException {
- writer.fail();
+ // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ // flush will never be called on this runtime
+ throw new UnsupportedOperationException();
}
@Override
public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ // setInputRecordDescriptor will never be called on this runtime since it has no input
throw new UnsupportedOperationException();
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 4678887..d3c9951 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -81,6 +81,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
pgw.fail();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1294614..f6ebf19 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -112,6 +112,7 @@
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private IFrameWriter startOfPipeline;
+ private boolean opened = false;
@Override
public void open() throws HyracksDataException {
@@ -124,6 +125,7 @@
pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
startOfPipeline = pa.assemblePipeline(writer, ctx);
}
+ opened = true;
startOfPipeline.open();
}
@@ -134,12 +136,16 @@
@Override
public void close() throws HyracksDataException {
- startOfPipeline.close();
+ if (opened) {
+ startOfPipeline.close();
+ }
}
@Override
public void fail() throws HyracksDataException {
- startOfPipeline.fail();
+ if (opened) {
+ startOfPipeline.fail();
+ }
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 03e2aaf..e1081e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -19,11 +19,14 @@
package org.apache.hyracks.algebricks.runtime.operators.meta;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
public class PipelineAssembler {
@@ -44,18 +47,21 @@
this.outputArity = outputArity;
}
- public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws
- HyracksDataException {
+ public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+ // should enforce protocol
+ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
// plug the operators
IFrameWriter start = writer;// this.writer;
for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+ newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+ start = enforce ? EnforceFrameWriter.enforce(start) : start;
if (i == pipeline.getRuntimeFactories().length - 1) {
if (outputArity == 1) {
- newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+ newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor);
}
} else {
- newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+ newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
}
if (i > 0) {
newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 3e30f73..925ff93 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -79,11 +79,6 @@
}
@Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
public void close() throws HyracksDataException {
try {
frameSorter.sort();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index e123adf..496679f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -65,11 +65,6 @@
}
@Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
public void flush() throws HyracksDataException {
writer.flush();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 38fe7d1..b3e2b64 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -117,8 +117,9 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
if (isOpen) {
- super.fail();
+ writer.fail();
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index ebf3d3a..55146e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -81,6 +81,7 @@
@Override
public void fail() throws HyracksDataException {
+ // fail() is a no op. in close we will cleanup
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 0f57fd7..d286c3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -108,8 +108,9 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
if (isOpen) {
- super.fail();
+ writer.fail();
}
}
@@ -118,6 +119,9 @@
if (isOpen) {
try {
flushIfNotFailed();
+ } catch (Throwable th) {
+ writer.fail();
+ throw th;
} finally {
writer.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa9232e..e2868ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -177,7 +177,7 @@
}
public StartJobFunction(JobId jobId) {
- this(null, null, null, jobId);
+ this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
}
public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..4f37b90 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -102,14 +102,14 @@
@Override
public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
- jobSpec);
+ IActivityClusterGraphGeneratorFactory jsacggf =
+ new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
return startJob(jsacggf, jobFlags);
}
@Override
public JobId distributeJob(JobSpecification jobSpec) throws Exception {
- JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+ IActivityClusterGraphGeneratorFactory jsacggf =
new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
return distributeJob(jsacggf);
}
@@ -206,15 +206,14 @@
@Override
public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
throws Exception {
- JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
- jobSpec);
+ IActivityClusterGraphGeneratorFactory jsacggf =
+ new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
return startJob(deploymentId, jsacggf, jobFlags);
}
@Override
public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
- EnumSet<JobFlag> jobFlags)
- throws Exception {
+ EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index c8e4cf8..df693b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.context;
import java.io.Serializable;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -26,6 +27,7 @@
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
@@ -48,4 +50,6 @@
void setSharedObject(Object object);
Object getSharedObject();
+
+ Set<JobFlag> getJobFlags();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
new file mode 100644
index 0000000..7dd50f1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class EnforceFrameWriter implements IFrameWriter {
+
+ private static final Logger LOGGER = Logger.getLogger(EnforceFrameWriter.class.getName());
+ private final IFrameWriter writer;
+ private boolean failed = false;
+ private Throwable failure = null;
+
+ protected EnforceFrameWriter(IFrameWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public final void open() throws HyracksDataException {
+ try {
+ failed = true;
+ writer.open();
+ failed = false;
+ } catch (Throwable th) {
+ failure = th;
+ throw th;
+ }
+ }
+
+ @Override
+ public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (failed) {
+ HyracksDataException hde = HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_FAILED_WRITER);
+ hde.addSuppressed(failure);
+ LOGGER.log(Level.WARNING, "nextFrame called on a failed writer", hde);
+ throw hde;
+ }
+ try {
+ failed = true;
+ writer.nextFrame(buffer);
+ failed = false;
+ } catch (Throwable th) {
+ failure = th;
+ throw th;
+ }
+ }
+
+ @Override
+ public final void flush() throws HyracksDataException {
+ if (failed) {
+ HyracksDataException hde = HyracksDataException.create(ErrorCode.FLUSH_ON_FAILED_WRITER);
+ hde.addSuppressed(failure);
+ LOGGER.log(Level.WARNING, "flush called on a failed writer", hde);
+ throw hde;
+ }
+ try {
+ failed = true;
+ writer.flush();
+ failed = false;
+ } catch (Throwable th) {
+ failure = th;
+ throw th;
+ }
+ }
+
+ @Override
+ public final void fail() throws HyracksDataException {
+ failed = true;
+ if (failure == null) {
+ failure = new Exception("Failed signal from upstream");
+ }
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ public static IFrameWriter enforce(IFrameWriter writer) {
+ return writer instanceof EnforceFrameWriter ? writer : new EnforceFrameWriter(writer);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index 9148d6b..fc46b9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -20,12 +20,12 @@
import java.io.Serializable;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.constraints.IConstraintAcceptor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
/**
* Descriptor for operators in Hyracks.
*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
index f6c201e..82433e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -23,16 +23,15 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IOperatorNodePushable {
- public void initialize() throws HyracksDataException;
+ void initialize() throws HyracksDataException;
- public void deinitialize() throws HyracksDataException;
+ void deinitialize() throws HyracksDataException;
- public int getInputArity();
+ int getInputArity();
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
- throws HyracksDataException;
+ void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException;
- public IFrameWriter getInputFrameWriter(int index);
+ IFrameWriter getInputFrameWriter(int index);
- public String getDisplayName();
+ String getDisplayName();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 3c70dba..92fe096 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -91,6 +91,8 @@
public static final int RESOURCE_DOES_NOT_EXIST = 55;
public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
+ public static final int NEXT_FRAME_ON_FAILED_WRITER = 58;
+ public static final int FLUSH_ON_FAILED_WRITER = 59;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index a33c6c9a..7225cd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,5 +19,6 @@
package org.apache.hyracks.api.job;
public enum JobFlag {
- PROFILE_RUNTIME
+ PROFILE_RUNTIME,
+ ENFORCE_CONTRACT
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index 38e7fbe..21fb985 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -29,7 +29,6 @@
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IActivity;
@@ -50,7 +49,8 @@
* @author yingyib
*/
public class ActivityClusterGraphRewriter {
- private static String ONE_TO_ONE_CONNECTOR = "org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor";
+ private static String ONE_TO_ONE_CONNECTOR =
+ "org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor";
/**
* rewrite an activity cluster graph to eliminate
@@ -82,22 +82,22 @@
private void rewriteInterActivityCluster(ActivityCluster ac,
Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
- Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
+ Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<>();
for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
invertedAid2SuperAidMap.put(entry.getKey().getActivityId(), entry.getValue().getActivityId());
}
- Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+ Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<>();
for (Entry<ActivityId, Set<ActivityId>> entry : blocked2BlockerMap.entrySet()) {
ActivityId blocked = entry.getKey();
ActivityId replacedBlocked = invertedAid2SuperAidMap.get(blocked);
Set<ActivityId> blockers = entry.getValue();
Set<ActivityId> replacedBlockers = null;
if (blockers != null) {
- replacedBlockers = new HashSet<ActivityId>();
+ replacedBlockers = new HashSet<>();
for (ActivityId blocker : blockers) {
replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
- ActivityCluster dependingAc = ac.getActivityClusterGraph().getActivityMap()
- .get(invertedAid2SuperAidMap.get(blocker));
+ ActivityCluster dependingAc =
+ ac.getActivityClusterGraph().getActivityMap().get(invertedAid2SuperAidMap.get(blocker));
if (!ac.getDependencies().contains(dependingAc)) {
ac.getDependencies().add(dependingAc);
}
@@ -128,12 +128,12 @@
Map<ActivityId, IActivity> activities = ac.getActivityMap();
Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
- Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap = ac
- .getConnectorActivityMap();
+ Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap =
+ ac.getConnectorActivityMap();
ActivityClusterGraph acg = ac.getActivityClusterGraph();
- Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+ Map<ActivityId, IActivity> startActivities = new HashMap<>();
Map<ActivityId, SuperActivity> superActivities = new HashMap<ActivityId, SuperActivity>();
- Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<ActivityId, Queue<IActivity>>();
+ Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<>();
/**
* Build the initial super activities
@@ -185,8 +185,8 @@
List<IConnectorDescriptor> outputConnectors = activityOutputMap.get(expendingActivity.getActivityId());
if (outputConnectors != null) {
for (IConnectorDescriptor outputConn : outputConnectors) {
- Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = connectorActivityMap
- .get(outputConn.getConnectorId());
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints =
+ connectorActivityMap.get(outputConn.getConnectorId());
IActivity newActivity = endPoints.getRight().getLeft();
SuperActivity existingSuperActivity = invertedActivitySuperActivityMap.get(newActivity);
if (outputConn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
@@ -343,7 +343,7 @@
SuperActivity superActivity = new SuperActivity(acg.getActivityClusterGraph(), acg.getId(), activityId);
superActivities.put(activityId, superActivity);
superActivity.addActivity(activity);
- Queue<IActivity> toBeExpended = new LinkedList<IActivity>();
+ Queue<IActivity> toBeExpended = new LinkedList<>();
toBeExpended.add(activity);
toBeExpendedMap.put(activityId, toBeExpended);
invertedActivitySuperActivityMap.put(activity, superActivity);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
index 28e098f..476a744 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -25,7 +25,6 @@
import java.util.Map.Entry;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivity;
@@ -58,7 +57,7 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+ final Map<ActivityId, IActivity> startActivities = new HashMap<>();
Map<ActivityId, IActivity> activities = getActivityMap();
for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 314bf8b..7fdf106 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,12 +35,14 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
/**
* The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -90,18 +92,18 @@
private void init() throws HyracksDataException {
Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
List<IConnectorDescriptor> outputConnectors;
-
+ final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
/*
* Set up the source operators
*/
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
- nPartitions);
+ IOperatorNodePushable opPushable =
+ entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
inputArity += opPushable.getInputArity();
- outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
- Collections.emptyList());
+ outputConnectors =
+ MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList());
for (IConnectorDescriptor conn : outputConnectors) {
childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
}
@@ -131,7 +133,9 @@
/*
* construct the dataflow connection from a producer to a consumer
*/
- sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+ IFrameWriter writer = destOp.getInputFrameWriter(inputChannel);
+ writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+ sourceOp.setOutputFrameWriter(outputChannel, writer,
recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 604b534..8add407 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -76,4 +76,7 @@
55 = Resource does not exist for %1$s
56 = LSM disk component scan is not allowed for a secondary index
57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
+58 = nextFrame called on a failed writer
+59 = flush called on a failed writer
+
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
index 57b8c50..3f46316 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
@@ -48,9 +48,9 @@
import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
import org.apache.hyracks.control.cc.job.ActivityPlan;
import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.cc.job.Task;
import org.apache.hyracks.control.cc.job.TaskCluster;
import org.apache.hyracks.control.cc.job.TaskClusterId;
+import org.apache.hyracks.control.cc.job.Task;
class ActivityClusterPlanner {
private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
@@ -102,12 +102,10 @@
ActivityCluster dAC = ac.getActivityClusterGraph().getActivityMap().get(danId);
ActivityClusterPlan dACP = jobRun.getActivityClusterPlanMap().get(dAC.getId());
assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for "
- + "dependency AC: Encountered no plan for ActivityID "
- + danId;
+ + "dependency AC: Encountered no plan for ActivityID " + danId;
Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for"
- + " dependency AC: Encountered no plan for ActivityID "
- + danId;
+ + " dependency AC: Encountered no plan for ActivityID " + danId;
assert dATasks.length == tasks.length : "Dependency activity partitioned differently from "
+ "dependent: " + dATasks.length + " != " + tasks.length;
Task dTask = dATasks[i];
@@ -125,8 +123,8 @@
private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
Map<ActivityId, ActivityPlan> activityPlanMap) {
Set<ActivityId> activities = ac.getActivityMap().keySet();
- Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
- activityPlanMap, activities);
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity =
+ computeTaskConnectivity(jobRun, activityPlanMap, activities);
TaskCluster[] taskClusters = ac.getActivityClusterGraph().isUseConnectorPolicyForScheduling()
? buildConnectorPolicyAwareTaskClusters(ac, activityPlanMap, taskConnectivity)
@@ -139,13 +137,13 @@
List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
if (cInfoList != null) {
for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
- Task targetTS = activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft()
- .getPartition()];
+ Task targetTS =
+ activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft().getPartition()];
TaskCluster targetTC = targetTS.getTaskCluster();
if (targetTC != tc) {
ConnectorDescriptorId cdId = p.getRight();
- PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(), p.getLeft()
- .getPartition());
+ PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
+ p.getLeft().getPartition());
tc.getProducedPartitions().add(pid);
targetTC.getRequiredPartitions().add(pid);
partitionProducingTaskClusterMap.put(pid, tc);
@@ -170,8 +168,8 @@
Task[] tasks = ap.getTasks();
taskStates.addAll(Arrays.asList(tasks));
}
- TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac, taskStates.toArray(new Task[taskStates
- .size()]));
+ TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac,
+ taskStates.toArray(new Task[taskStates.size()]));
for (Task t : tc.getTasks()) {
t.setTaskCluster(tc);
}
@@ -209,8 +207,8 @@
}
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity
- .get(ac1TaskStates[i].getTaskId());
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList =
+ taskConnectivity.get(ac1TaskStates[i].getTaskId());
if (cInfoList == null) {
cInfoList = new ArrayList<>();
taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
@@ -358,9 +356,9 @@
int[] fanouts = new int[nProducers];
if (c.allProducersToAllConsumers()) {
- for (int i = 0; i < nProducers; ++i) {
- fanouts[i] = nConsumers;
- }
+ for (int i = 0; i < nProducers; ++i) {
+ fanouts[i] = nConsumers;
+ }
} else {
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -402,8 +400,8 @@
throw new HyracksException("No value found for " + lv);
}
if (!(value instanceof Number)) {
- throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
- + value + ")");
+ throw new HyracksException(
+ "Unexpected type of value bound to " + lv + ": " + value.getClass() + "(" + value + ")");
}
int nParts = ((Number) value).intValue();
if (nParts <= 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 55a7a82..95a6d9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -116,10 +115,10 @@
}
//Run a Pre-distributed job by passing the JobId
- public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
PreDistributedJobDescriptor distributedJobDescriptor)
throws HyracksException {
- this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
+ this(deploymentId, jobId, jobFlags,
distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
this.scheduler = new JobExecutor(ccs, this, constaints, true);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 2dbb631..e083d2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -68,7 +68,7 @@
run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
} else {
//ActivityClusterGraph has already been distributed
- run = new JobRun(ccs, deploymentId, jobId,
+ run = new JobRun(ccs, deploymentId, jobId, jobFlags,
ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
}
jobManager.add(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df3a..cbc6146 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.common.controllers;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
@@ -64,7 +65,8 @@
CLUSTER_TOPOLOGY(STRING),
JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
JOB_QUEUE_CAPACITY(INTEGER, 4096),
- JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
+ JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
+ ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
private final IOptionType parser;
private Object defaultValue;
@@ -156,6 +158,9 @@
return "The maximum number of jobs to queue before rejecting new jobs";
case JOB_MANAGER_CLASS:
return "Specify the implementation class name for the job manager";
+ case ENFORCE_FRAME_WRITER_PROTOCOL:
+ return "A flag indicating if runtime should enforce frame writer protocol and detect "
+ + "bad behaving operators";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -357,4 +362,12 @@
public int getJobQueueCapacity() {
return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY);
}
+
+ public boolean getEnforceFrameWriterProtocol() {
+ return getAppConfig().getBoolean(Option.ENFORCE_FRAME_WRITER_PROTOCOL);
+ }
+
+ public void setEnforceFrameWriterProtocol(boolean enforce) {
+ configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 86cc19e..caa3210 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -65,8 +65,10 @@
MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT),
CLUSTER_CONNECT_RETRIES(INTEGER, 5),
- IODEVICES(STRING_ARRAY, appConfig -> new String[] {
- FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
+ IODEVICES(
+ STRING_ARRAY,
+ appConfig -> new String[] {
+ FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
"<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/iodevice"),
NET_THREAD_COUNT(INTEGER, 1),
NET_BUFFER_COUNT(INTEGER, 1),
@@ -97,7 +99,7 @@
}
<T> Option(IOptionType<T> parser, Function<IApplicationConfig, T> defaultValue,
- String defaultValueDescription) {
+ String defaultValueDescription) {
this.parser = parser;
this.defaultValue = defaultValue;
this.defaultValueDescription = defaultValueDescription;
@@ -252,6 +254,7 @@
return configManager;
}
+ @Override
public IApplicationConfig getAppConfig() {
return appConfig;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7224b49..2ab9d9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -48,6 +48,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.partitions.PartitionId;
@@ -98,9 +99,13 @@
private Object sharedObject;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
- NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
+ private final Set<JobFlag> jobFlags;
+
+ public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
+ ExecutorService executor, NodeControllerService ncs,
+ List<List<PartitionChannel>> inputChannelsFromConnectors) {
this.joblet = joblet;
+ this.jobFlags = jobFlags;
this.taskAttemptId = taskId;
this.displayName = displayName;
this.executorService = executor;
@@ -411,4 +416,9 @@
public Object getSharedObject() {
return sharedObject;
}
+
+ @Override
+ public Set<JobFlag> getJobFlags() {
+ return jobFlags;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 02e8051..95f3e83 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -131,12 +132,10 @@
}
final int partition = tid.getPartition();
List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
- task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+ task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
createInputChannels(td, inputs));
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
-
List<IPartitionCollector> collectors = new ArrayList<>();
-
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
IConnectorDescriptor conn = inputs.get(i);
@@ -145,26 +144,28 @@
LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
- IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
- recordDesc, cPolicy);
+ IPartitionCollector collector =
+ createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
collectors.add(collector);
}
}
List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
if (outputs != null) {
+ final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT);
for (int i = 0; i < outputs.size(); ++i) {
final IConnectorDescriptor conn = outputs.get(i);
RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
- IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
- partition, taId, flags);
+ IPartitionWriterFactory pwFactory =
+ createPartitionWriterFactory(task, cPolicy, jobId, conn, partition, taId, flags);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " + conn.getConnectorId());
}
- IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td
- .getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+ td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}
@@ -203,11 +204,11 @@
private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
throws HyracksDataException {
- IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td
- .getInputPartitionCounts()[i], td.getPartitionCount());
+ IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+ td.getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
- return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
- .getTaskAttemptId(), ncs.getExecutor());
+ return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
+ task.getTaskAttemptId(), ncs.getExecutor());
} else {
return collector;
}
@@ -222,8 +223,9 @@
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
- conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+ return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
}
};
} else {
@@ -231,9 +233,9 @@
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
- jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
- .getExecutor());
+ return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
}
};
}
@@ -241,8 +243,8 @@
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn
- .getConnectorId(), senderIndex, receiverIndex), taId);
+ return new PipelinedPartition(ctx, ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId);
}
};
}
@@ -272,11 +274,14 @@
if (inputAddresses[i] != null) {
for (int j = 0; j < inputAddresses[i].length; j++) {
NetworkAddress networkAddress = inputAddresses[i][j];
- PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
- .getTaskAttemptId().getTaskId().getPartition());
- PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs
- .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
- .lookupIpAddress()), networkAddress.getPort()), pid, 5));
+ PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j,
+ td.getTaskAttemptId().getTaskId().getPartition());
+ PartitionChannel channel = new PartitionChannel(pid,
+ new NetworkInputChannel(ncs.getNetworkManager(),
+ new InetSocketAddress(
+ InetAddress.getByAddress(networkAddress.lookupIpAddress()),
+ networkAddress.getPort()),
+ pid, 5));
channels.add(channel);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 4a2021f..8472479 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -58,7 +58,7 @@
* @param ctx
* @param object
*/
- public static void putInSharedMap(String key, Object object, IHyracksTaskContext ctx) {
+ public static void put(String key, Object object, IHyracksTaskContext ctx) {
TaskUtil.getSharedMap(ctx, true).put(key, object);
}
@@ -74,4 +74,18 @@
Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
return sharedMap == null ? null : (T) sharedMap.get(key);
}
+
+ /**
+ * get a <T> object from the shared map of the task, return the default if not found
+ *
+ * @param key
+ * @param ctx
+ * @param defaultValue
+ *
+ * @return the value associated with the key if found, defaultValue otherwise
+ */
+ public static <T> T get(String key, IHyracksTaskContext ctx, T defaultValue) {
+ T value = get(key, ctx);
+ return value == null ? defaultValue : value;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index a83273a..e338961 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.dataflow.std.base;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.constraints.IConstraintAcceptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -27,6 +25,9 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 5de272a..8b9fe00 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -110,8 +110,7 @@
private int[] probePSizeInTups;
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
- String probeRelName,
- String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators,
+ String probeRelName, String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators,
RecordDescriptor probeRd, RecordDescriptor buildRd, ITuplePartitionComputer probeHpc,
ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, boolean isLeftOuter,
IMissingWriterFactory[] nullWriterFactories1) {
@@ -223,7 +222,8 @@
return writer;
}
- public void closeBuild() throws HyracksDataException {
+ public void closeBuild(boolean isFailed) throws HyracksDataException {
+ // TODO: Should use the isFailed flag to simply cleanup in case of failure
// Flushes the remaining chunks of the all spilled partitions to the disk.
closeAllSpilledPartitions(SIDE.BUILD);
@@ -259,8 +259,8 @@
break;
}
try {
- for (int pid = spilledStatus.nextSetBit(0); pid >= 0
- && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+ for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
+ spilledStatus.nextSetBit(pid + 1)) {
if (bufferManager.getNumTuples(pid) > 0) {
bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
bufferManager.clearPartition(pid);
@@ -293,16 +293,15 @@
// For partitions in main memory, we deduct their size from the free space.
int inMemTupCount = 0;
- for (int p = spilledStatus.nextClearBit(0); p >= 0
- && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+ for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+ spilledStatus.nextClearBit(p + 1)) {
freeSpace -= bufferManager.getPhysicalSize(p);
inMemTupCount += buildPSizeInTups[p];
}
// Calculates the expected hash table size for the given number of tuples in main memory
// and deducts it from the free space.
- long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
- frameSize);
+ long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
freeSpace -= hashTableByteSizeForInMemTuples;
// In the case where free space is less than zero after considering the hash table size,
@@ -317,8 +316,9 @@
int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
if (pidToSpill >= 0) {
// There is a suitable one. We spill that partition to the disk.
- long hashTableSizeDecrease = -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
- inMemTupCount, -buildPSizeInTups[pidToSpill], frameSize);
+ long hashTableSizeDecrease =
+ -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
+ -buildPSizeInTups[pidToSpill], frameSize);
freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
inMemTupCount -= buildPSizeInTups[pidToSpill];
spillPartition(pidToSpill);
@@ -327,8 +327,8 @@
} else {
// There is no single suitable partition. So, we need to spill multiple partitions to the disk
// in order to accommodate the hash table.
- for (int p = spilledStatus.nextClearBit(0); p >= 0
- && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+ for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+ spilledStatus.nextClearBit(p + 1)) {
int spaceToBeReturned = bufferManager.getPhysicalSize(p);
int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
@@ -340,9 +340,9 @@
// Since the number of tuples in memory has been decreased,
// the hash table size will be decreased, too.
// We put minus since the method returns a negative value to represent a newly reclaimed space.
- long expectedHashTableSizeDecrease = -SerializableHashTable
- .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, -numberOfTuplesToBeSpilled,
- frameSize);
+ long expectedHashTableSizeDecrease =
+ -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
+ -numberOfTuplesToBeSpilled, frameSize);
freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
// Adjusts the hash table size
inMemTupCount -= numberOfTuplesToBeSpilled;
@@ -356,8 +356,7 @@
// If more partitions have been spilled to the disk, calculate the expected hash table size again
// before bringing some partitions to main memory.
if (moreSpilled) {
- hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
- frameSize);
+ hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
}
// Brings back some partitions if there is enough free space.
@@ -387,8 +386,8 @@
long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
int minSpaceAfterSpillPartID = -1;
- for (int p = spilledStatus.nextClearBit(0); p >= 0
- && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+ for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+ spilledStatus.nextClearBit(p + 1)) {
if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
continue;
}
@@ -408,8 +407,8 @@
}
private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
- for (int i = spilledStatus.nextSetBit(pid); i >= 0
- && i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
+ for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
+ spilledStatus.nextSetBit(i + 1)) {
int spilledTupleCount = buildPSizeInTups[i];
// Expected hash table size increase after reloading this partition
long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
@@ -632,8 +631,8 @@
buf.append("(A) Spilled partitions" + "\n");
int spilledTupleCount = 0;
int spilledPartByteSize = 0;
- for (int pid = spilledStatus.nextSetBit(0); pid >= 0
- && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+ for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
+ spilledStatus.nextSetBit(pid + 1)) {
if (whichSide == SIDE.BUILD) {
spilledTupleCount += buildPSizeInTups[pid];
spilledPartByteSize += buildRFWriters[pid].getFileSize();
@@ -653,8 +652,8 @@
buf.append("(B) In-memory partitions" + "\n");
int inMemoryTupleCount = 0;
int inMemoryPartByteSize = 0;
- for (int pid = spilledStatus.nextClearBit(0); pid >= 0
- && pid < numOfPartitions; pid = spilledStatus.nextClearBit(pid + 1)) {
+ for (int pid = spilledStatus.nextClearBit(0); pid >= 0 && pid < numOfPartitions; pid =
+ spilledStatus.nextClearBit(pid + 1)) {
if (whichSide == SIDE.BUILD) {
inMemoryTupleCount += buildPSizeInTups[pid];
inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index c44c583..e7dfaaf 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -268,17 +268,19 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
- : predEvaluatorFactory.createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator =
+ predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
- IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
- ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
- hashFunctionGeneratorFactories).createPartitioner(0);
- ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
- hashFunctionGeneratorFactories).createPartitioner(0);
+ ITuplePartitionComputer probeHpc =
+ new FieldHashPartitionComputerFamily(probeKeys, hashFunctionGeneratorFactories)
+ .createPartitioner(0);
+ ITuplePartitionComputer buildHpc =
+ new FieldHashPartitionComputerFamily(buildKeys, hashFunctionGeneratorFactories)
+ .createPartitioner(0);
boolean isFailed = false;
@Override
@@ -287,8 +289,8 @@
throw new HyracksDataException("Not enough memory is assigend for Hybrid Hash Join.");
}
state.memForJoin = memSizeInFrames - 2;
- state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
- nPartitions);
+ state.numOfPartitions =
+ getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories);
@@ -307,7 +309,7 @@
@Override
public void close() throws HyracksDataException {
- state.hybridHJ.closeBuild();
+ state.hybridHJ.closeBuild(isFailed);
if (isFailed) {
state.hybridHJ.clearBuildTempFiles();
} else {
@@ -324,7 +326,6 @@
}
};
- return op;
}
}
@@ -355,21 +356,21 @@
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- final ITuplePairComparator nljComparatorProbe2Build = tuplePairComparatorFactoryProbe2Build
- .createTuplePairComparator(ctx);
- final ITuplePairComparator nljComparatorBuild2Probe = tuplePairComparatorFactoryBuild2Probe
- .createTuplePairComparator(ctx);
- final IPredicateEvaluator predEvaluator = predEvaluatorFactory == null ? null
- : predEvaluatorFactory.createPredicateEvaluator();
+ final ITuplePairComparator nljComparatorProbe2Build =
+ tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
+ final ITuplePairComparator nljComparatorBuild2Probe =
+ tuplePairComparatorFactoryBuild2Probe.createTuplePairComparator(ctx);
+ final IPredicateEvaluator predEvaluator =
+ predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final IMissingWriter[] nonMatchWriter = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length]
- : null;
- final ArrayTupleBuilder nullTupleBuild = isLeftOuter ? new ArrayTupleBuilder(buildRd.getFieldCount())
- : null;
+ final IMissingWriter[] nonMatchWriter =
+ isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
+ final ArrayTupleBuilder nullTupleBuild =
+ isLeftOuter ? new ArrayTupleBuilder(buildRd.getFieldCount()) : null;
if (isLeftOuter) {
DataOutput out = nullTupleBuild.getDataOutput();
for (int i = 0; i < nonMatchWriterFactories.length; i++) {
@@ -379,7 +380,7 @@
}
}
- IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
private IFrame rPartbuff = new VSizeFrame(ctx);
@@ -432,8 +433,8 @@
}
BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
rPartbuff.reset();
- for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
- .nextSetBit(pid + 1)) {
+ for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid =
+ partitionStatus.nextSetBit(pid + 1)) {
RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
@@ -474,10 +475,12 @@
//The buildSideReader should be always the original buildSideReader, so should the probeSideReader
private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader,
int buildSizeInTuple, int probeSizeInTuple, int level) throws HyracksDataException {
- ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
- hashFunctionGeneratorFactories).createPartitioner(level);
- ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
- hashFunctionGeneratorFactories).createPartitioner(level);
+ ITuplePartitionComputer probeHpc =
+ new FieldHashPartitionComputerFamily(probeKeys, hashFunctionGeneratorFactories)
+ .createPartitioner(level);
+ ITuplePartitionComputer buildHpc =
+ new FieldHashPartitionComputerFamily(buildKeys, hashFunctionGeneratorFactories)
+ .createPartitioner(level);
int frameSize = ctx.getInitialFrameSize();
long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
@@ -492,10 +495,10 @@
}
// Calculate the expected hash table size for the both side.
- long expectedHashTableSizeForBuildInFrame = SerializableHashTable
- .getExpectedTableFrameCount(buildSizeInTuple, frameSize);
- long expectedHashTableSizeForProbeInFrame = SerializableHashTable
- .getExpectedTableFrameCount(probeSizeInTuple, frameSize);
+ long expectedHashTableSizeForBuildInFrame =
+ SerializableHashTable.getExpectedTableFrameCount(buildSizeInTuple, frameSize);
+ long expectedHashTableSizeForProbeInFrame =
+ SerializableHashTable.getExpectedTableFrameCount(probeSizeInTuple, frameSize);
//Apply in-Mem HJ if possible
if (!skipInMemoryHJ && ((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin)
@@ -580,15 +583,19 @@
rHHj.setIsReversed(isReversed);
try {
buildSideReader.open();
+ boolean isFailed = false;
try {
rHHj.initBuild();
rPartbuff.reset();
while (buildSideReader.nextFrame(rPartbuff)) {
rHHj.build(rPartbuff.getBuffer());
}
+ } catch (Throwable th) {
+ isFailed = true;
+ throw th;
} finally {
// Makes sure that files are always properly closed.
- rHHj.closeBuild();
+ rHHj.closeBuild(isFailed);
}
} finally {
buildSideReader.close();
@@ -645,9 +652,8 @@
} else { //Case 2.1.2 - Switch to NLJ
if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine(
- "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH "
- + "(isLeftOuter || build<probe) - [Level " + level + "]");
+ LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH "
+ + "(isLeftOuter || build<probe) - [Level " + level + "]");
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
@@ -695,8 +701,8 @@
probeTupleAccessor.reset(rPartbuff.getBuffer());
for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
- nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
- nullTupleBuild.getSize());
+ nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
+ nullTupleBuild.getSize());
}
}
nullResultAppender.write(writer, true);
@@ -712,8 +718,8 @@
boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
&& bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
- IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
- state.memForJoin * ctx.getInitialFrameSize());
+ IDeallocatableFramePool framePool =
+ new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize());
ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
@@ -776,11 +782,11 @@
// Hence the reverse relation is different.
boolean isReversed = outerRd == buildRd && innerRd == probeRd;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
- ITuplePairComparator nljComptorOuterInner = isReversed ? nljComparatorBuild2Probe
- : nljComparatorProbe2Build;
- NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd),
- new FrameTupleAccessor(innerRd), nljComptorOuterInner, memorySize, predEvaluator,
- isLeftOuter, nonMatchWriter);
+ ITuplePairComparator nljComptorOuterInner =
+ isReversed ? nljComparatorBuild2Probe : nljComparatorProbe2Build;
+ NestedLoopJoin nlj =
+ new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
+ nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
nlj.setIsReversed(isReversed);
IFrame cacheBuff = new VSizeFrame(ctx);
@@ -814,7 +820,6 @@
}
}
};
- return op;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 48e6b35..9ac9296 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -18,15 +18,11 @@
*/
package org.apache.hyracks.dataflow.std.misc;
-import java.nio.ByteBuffer;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -38,22 +34,6 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
- };
+ return new SinkOperatorNodePushable();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
index fe64472..d987a35 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
@@ -19,15 +19,11 @@
package org.apache.hyracks.dataflow.std.misc;
-import java.nio.ByteBuffer;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -39,22 +35,6 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- @Override
- public void open() throws HyracksDataException {
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
- };
+ return new SinkOperatorNodePushable();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
new file mode 100644
index 0000000..85e1bb8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class SinkOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+
+ @Override
+ public void open() throws HyracksDataException {
+ // Does nothing.
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // Does nothing.
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // Does nothing.
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // Does nothing.
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index f1a777b..d3120bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
-import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index bcebb7d..bf2268d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -63,6 +63,13 @@
flushWriter.open();
try {
getSorter().flush(flushWriter);
+ } catch (Throwable th) {
+ try {
+ flushWriter.fail();
+ } catch (Throwable secondFailure) {
+ th.addSuppressed(secondFailure);
+ }
+ throw th;
} finally {
flushWriter.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index c27e4ec..752e6e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 82fedb0..3845f2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -86,23 +86,55 @@
@Override
public void close() throws HyracksDataException {
+ boolean failed = false;
+ Throwable closeException = null;
try {
// bulkloader can be null if an exception is thrown before it is initialized.
if (bulkLoader != null) {
bulkLoader.end();
}
} catch (Throwable th) {
- throw HyracksDataException.create(th);
+ failed = true;
+ closeException = th;
+ try {
+ writer.fail();
+ } catch (Throwable failFailure) {
+ closeException.addSuppressed(failFailure);
+ }
} finally {
if (index != null) {
// If index was opened!
try {
indexHelper.close();
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = th;
+ } else {
+ closeException.addSuppressed(th);
+ }
+ if (!failed) {
+ try {
+ writer.fail();
+ } catch (Throwable failFailure) {
+ closeException.addSuppressed(failFailure);
+ }
+ }
} finally {
- writer.close();
+ try {
+ writer.close();
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = th;
+ } else {
+ closeException.addSuppressed(th);
+ }
+ }
}
}
}
+ if (closeException != null) { // NOSONAR false positive
+ throw HyracksDataException.create(closeException);
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 509607c..840ede4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -78,6 +78,7 @@
protected final boolean appendIndexFilter;
protected ArrayTupleBuilder nonFilterTupleBuild;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
+ protected boolean failed = false;
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
@@ -118,6 +119,7 @@
@Override
public void open() throws HyracksDataException {
+ failed = true;
writer.open();
indexHelper.open();
index = indexHelper.getIndexInstance();
@@ -148,8 +150,9 @@
frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
+ failed = false;
}
protected void writeSearchResults(int tupleIndex) throws Exception {
@@ -183,6 +186,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ failed = true;
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
try {
@@ -193,33 +197,44 @@
writeSearchResults(i);
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
+ failed = false;
}
@Override
public void flush() throws HyracksDataException {
+ failed = true;
appender.flush(writer);
+ failed = false;
}
@Override
public void close() throws HyracksDataException {
- HyracksDataException closeException = null;
+ Throwable closeException = null;
if (index != null) {
// if index == null, then the index open was not successful
- try {
- if (appender.getTupleCount() > 0) {
- appender.write(writer, true);
+ if (!failed) {
+ try {
+ if (appender.getTupleCount() > 0) {
+ appender.write(writer, true);
+ }
+ } catch (Throwable th) {
+ closeException = th;
+ failed = true;
+ try {
+ writer.fail();
+ } catch (Throwable failException) {
+ closeException.addSuppressed(failException);
+ }
}
- } catch (Throwable th) {
- closeException = new HyracksDataException(th);
}
try {
cursor.close();
} catch (Throwable th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = th;
} else {
closeException.addSuppressed(th);
}
@@ -228,7 +243,7 @@
indexHelper.close();
} catch (Throwable th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = th;
} else {
closeException.addSuppressed(th);
}
@@ -239,18 +254,19 @@
writer.close();
} catch (Throwable th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = th;
} else {
closeException.addSuppressed(th);
}
}
if (closeException != null) {
- throw closeException;
+ throw HyracksDataException.create(closeException);
}
}
@Override
public void fail() throws HyracksDataException {
+ failed = true;
writer.fail();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 2171122..b100300 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -20,8 +20,10 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -33,6 +35,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -155,4 +158,9 @@
public Object getSharedObject() {
return sharedObject;
}
+
+ @Override
+ public Set<JobFlag> getJobFlags() {
+ return EnumSet.noneOf(JobFlag.class);
+ }
}