[NO ISSUE][TX]: Multiple fixes to atomic statements
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Correctly computes partition count in case of parallel
execution.
- Correctly computes node count in case of a node group
is used.
- Fix for the case when a job prepared message is recieved
after the transaction is aborted.
- Job commit message is sent in commitTransaction method.
This makes sure that the commit messages are sent only
after the job was successfull.
- Fix for the case when job is aborted/cancelled after
flushes are scheduled on atomic datasets but are not
finished.
Change-Id: Ieb029d6273f19fa4bd0e7edfb8897f894c1f5b6e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17643
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 10f118c0..bb93fbc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -554,7 +554,7 @@
public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec,
INodeJobTracker jobTracker, AlgebricksAbsolutePartitionConstraint clusterLocations) {
- final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec);
+ final Set<String> jobParticipatingNodes = jobTracker.getJobParticipatingNodes(spec, null);
return new AlgebricksAbsolutePartitionConstraint(Arrays.stream(clusterLocations.getLocations())
.filter(jobParticipatingNodes::contains).toArray(String[]::new));
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index beb55d8..4dbf38b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -69,8 +69,7 @@
@Override
public void commitTransaction(JobId jobId) throws ACIDException {
IGlobalTransactionContext context = getTransactionContext(jobId);
- if (context.getTxnStatus() == TransactionStatus.ACTIVE
- || context.getTxnStatus() == TransactionStatus.PREPARED) {
+ if (context.getAcksReceived() != context.getNumPartitions()) {
synchronized (context) {
try {
context.wait();
@@ -80,6 +79,19 @@
}
}
}
+ context.setTxnStatus(TransactionStatus.PREPARED);
+ context.persist(ioManager);
+ context.resetAcksReceived();
+ sendJobCommitMessages(context);
+
+ synchronized (context) {
+ try {
+ context.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
+ }
+ }
txnContextRepository.remove(jobId);
}
@@ -95,17 +107,21 @@
@Override
public void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
Map<String, ILSMComponentId> componentIdMap) {
- IGlobalTransactionContext context = getTransactionContext(jobId);
+ IGlobalTransactionContext context = txnContextRepository.get(jobId);
+ if (context == null) {
+ LOGGER.warn("JobPreparedMessage received for jobId " + jobId
+ + ", which does not exist. The transaction for the job is already aborted");
+ return;
+ }
if (context.getNodeResourceMap().containsKey(nodeId)) {
context.getNodeResourceMap().get(nodeId).putAll(componentIdMap);
} else {
context.getNodeResourceMap().put(nodeId, componentIdMap);
}
if (context.incrementAndGetAcksReceived() == context.getNumPartitions()) {
- context.setTxnStatus(TransactionStatus.PREPARED);
- context.persist(ioManager);
- context.resetAcksReceived();
- sendJobCommitMessages(context);
+ synchronized (context) {
+ context.notifyAll();
+ }
}
}
@@ -127,10 +143,10 @@
if (context.incrementAndGetAcksReceived() == context.getNumNodes()) {
context.delete(ioManager);
context.setTxnStatus(TransactionStatus.COMMITTED);
- sendEnableMergeMessages(context);
synchronized (context) {
context.notifyAll();
}
+ sendEnableMergeMessages(context);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f390237..9f292e7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -261,6 +261,7 @@
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.TokenizerCategory;
import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.logging.log4j.Level;
@@ -3524,8 +3525,11 @@
if (spec != null && !isCompileOnly()) {
atomic = dataset.isAtomic();
if (atomic) {
- int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(spec).size();
- int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec);
+ int numParticipatingNodes = appCtx.getNodeJobTracker()
+ .getJobParticipatingNodes(spec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+ .size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec,
+ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
List<Integer> participatingDatasetIds = new ArrayList<>();
participatingDatasetIds.add(dataset.getDatasetId());
spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds,
@@ -3613,8 +3617,11 @@
((InsertStatement) stmt).getDatasetName());
atomic = ds.isAtomic();
if (atomic) {
- int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
- int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+ int numParticipatingNodes = appCtx.getNodeJobTracker()
+ .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+ .size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
+ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
List<Integer> participatingDatasetIds = new ArrayList<>();
participatingDatasetIds.add(ds.getDatasetId());
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
@@ -3672,8 +3679,11 @@
Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
atomic = ds.isAtomic();
if (atomic) {
- int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
- int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+ int numParticipatingNodes = appCtx.getNodeJobTracker()
+ .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+ .size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
+ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
List<Integer> participatingDatasetIds = new ArrayList<>();
participatingDatasetIds.add(ds.getDatasetId());
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
@@ -4791,8 +4801,11 @@
((InsertStatement) atomicStatement).getDatasetName());
atomic = ds.isAtomic();
if (atomic) {
- int numParticipatingNodes = appCtx.getNodeJobTracker().getJobParticipatingNodes(jobSpec).size();
- int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec);
+ int numParticipatingNodes = appCtx.getNodeJobTracker()
+ .getJobParticipatingNodes(jobSpec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class)
+ .size();
+ int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(jobSpec,
+ LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class);
List<Integer> participatingDatasetIds = new ArrayList<>();
participatingDatasetIds.add(ds.getDatasetId());
jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp
new file mode 100644
index 0000000..d1216e1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset page_views primary key (user:string);
+
+create dataset tmp primary key (id:uuid) autogenerated;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp
new file mode 100644
index 0000000..a29e59b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.2.update.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load dataset page_views using localfs
+(("path"="asterix_nc1://data/page_views.adm"),("format"="adm"));
+
+insert into tmp(
+ from page_views as t
+ group by t.user
+ group as g
+ select value {
+ "groups": (select value g.t from g)
+ }
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp
new file mode 100644
index 0000000..b7a8bdc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-3/atomic-statements-3.3.query.sqlpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+WITH beth AS(
+ FROM tmp AS i, i.groups AS item
+ SELECT DISTINCT VALUE
+ {
+ "user": item.user,
+ "action": item.action
+ }
+)
+,
+rev AS(
+ FROM tmp i, i.groups AS item
+ SELECT DISTINCT VALUE
+ {
+ "user": item.user,
+ "estimated_revenue": item.estimated_revenue
+ }
+)
+,
+ts AS (
+ FROM tmp i, i.groups AS item
+ SELECT DISTINCT VALUE
+ {
+ "user": item.user,
+ "timespent": item.timespent
+ }
+)
+
+FROM beth AS a,
+ ts AS b,
+ rev AS c
+WHERE a.user=b.user AND a.user=c.user AND b.user=c.user
+SELECT VALUE a.user
+ORDER BY a.user;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp
new file mode 100644
index 0000000..8035b89
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.1.ddl.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+drop nodegroup group_test if exists;
+create nodegroup group_test on
+ asterix_nc1
+;
+
+create dataset Points
+primary key (x:int, y:int)
+WITH {"node-group":{"name":"group_test"}};;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp
new file mode 100644
index 0000000..a8b70cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.2.update.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into Points
+{"x": 9, "y": 15};
+
+insert into Points
+{"x": 15, "y": 40};
+
+insert into Points
+{"x": 20, "y": 50};
+
+insert into Points
+{"x": 50, "y": 200};
+
+insert into Points
+{"x": 60, "y": 40};
+
+insert into Points
+{"x": 101, "y": 80};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp
new file mode 100644
index 0000000..777499c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/atomic-statements/atomic-statements-4/atomic-statements-4.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+use test;
+
+select value p
+from Points p
+where p.x>10 and p.x<100
+and p.y>10 and p.y<100
+order by p.x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm
new file mode 100644
index 0000000..80d0c73
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-3/atomic-statements-3.3.adm
@@ -0,0 +1,2 @@
+"Bill"
+"John"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm
new file mode 100644
index 0000000..625f0fd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/atomic-statements/atomic-statements-4/atomic-statements-4.3.adm
@@ -0,0 +1,3 @@
+{ "x": 15, "y": 40 }
+{ "x": 20, "y": 50 }
+{ "x": 60, "y": 40 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d1a59a3..d24873e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4145,24 +4145,24 @@
-->
</test-group>
<test-group name="ddl">
-<!-- <test-case FilePath="ddl">-->
-<!-- <compilation-unit name="create-dataset-1">-->
-<!-- <output-dir compare="Clean-JSON">create-dataset-1</output-dir>-->
-<!-- </compilation-unit>-->
-<!-- </test-case>-->
-<!-- <test-case FilePath="ddl">-->
-<!-- <compilation-unit name="create-dataset-2">-->
-<!-- <output-dir compare="Clean-JSON">create-dataset-2</output-dir>-->
-<!-- <source-location>false</source-location>-->
-<!-- <expected-error>type mismatch: missing a required closed field my_id: string</expected-error>-->
-<!-- </compilation-unit>-->
-<!-- </test-case>-->
-<!-- <test-case FilePath="ddl">-->
-<!-- <compilation-unit name="create-dataset-3">-->
-<!-- <output-dir compare="Clean-JSON">create-dataset-3</output-dir>-->
-<!-- <expected-error>ASX1077: Cannot find dataset non_existent in dataverse test nor an alias with name non_existent (in line 23, at column 21)</expected-error>-->
-<!-- </compilation-unit>-->
-<!-- </test-case>-->
+ <test-case FilePath="ddl">
+ <compilation-unit name="create-dataset-1">
+ <output-dir compare="Clean-JSON">create-dataset-1</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="ddl">
+ <compilation-unit name="create-dataset-2">
+ <output-dir compare="Clean-JSON">create-dataset-2</output-dir>
+ <source-location>false</source-location>
+ <expected-error>type mismatch: missing a required closed field my_id: string</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="ddl">
+ <compilation-unit name="create-dataset-3">
+ <output-dir compare="Clean-JSON">create-dataset-3</output-dir>
+ <expected-error>ASX1077: Cannot find dataset non_existent in dataverse test nor an alias with name non_existent (in line 23, at column 21)</expected-error>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="ddl">
<compilation-unit name="analyze-dataset-1">
<output-dir compare="Text">analyze-dataset-1</output-dir>
@@ -16321,25 +16321,35 @@
</compilation-unit>
</test-case>
</test-group>
-<!-- <test-group name="copy">-->
-<!-- <test-case FilePath="copy">-->
-<!-- <compilation-unit name="copy-1">-->
-<!-- <output-dir compare="Text">copy-1</output-dir>-->
-<!-- </compilation-unit>-->
-<!-- </test-case>-->
-<!-- </test-group>-->
-<!-- <test-group name="atomic-statements">-->
-<!-- <test-case FilePath="atomic-statements">-->
-<!-- <compilation-unit name="atomic-statements-1">-->
-<!-- <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>-->
-<!-- <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error>-->
-<!-- <source-location>false</source-location>-->
-<!-- </compilation-unit>-->
-<!-- </test-case>-->
-<!-- <test-case FilePath="atomic-statements">-->
-<!-- <compilation-unit name="atomic-statements-2">-->
-<!-- <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>-->
-<!-- </compilation-unit>-->
-<!-- </test-case>-->
-<!-- </test-group>-->
+ <test-group name="copy">
+ <test-case FilePath="copy">
+ <compilation-unit name="copy-1">
+ <output-dir compare="Text">copy-1</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="atomic-statements">
+ <test-case FilePath="atomic-statements">
+ <compilation-unit name="atomic-statements-1">
+ <output-dir compare="Clean-JSON">atomic-statements-1</output-dir>
+ <expected-error>HYR0033: Inserting duplicate keys into the primary storage</expected-error>
+ <source-location>false</source-location>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="atomic-statements">
+ <compilation-unit name="atomic-statements-2">
+ <output-dir compare="Clean-JSON">atomic-statements-2</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="atomic-statements">
+ <compilation-unit name="atomic-statements-3">
+ <output-dir compare="Clean-JSON">atomic-statements-3</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="atomic-statements">
+ <compilation-unit name="atomic-statements-4">
+ <output-dir compare="Clean-JSON">atomic-statements-4</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
index 27a3d49..66ca95e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INodeJobTracker.java
@@ -21,6 +21,7 @@
import java.util.Set;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -44,8 +45,8 @@
* @param spec
* @return The participating nodes in the job execution
*/
- Set<String> getJobParticipatingNodes(JobSpecification spec);
+ Set<String> getJobParticipatingNodes(JobSpecification spec, Class<? extends IOperatorDescriptor> operatorClass);
- int getNumParticipatingPartitions(JobSpecification spec);
+ int getNumParticipatingPartitions(JobSpecification spec, Class<? extends IOperatorDescriptor> operatorClass);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index a1980b6..3ec2d22 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -294,6 +294,8 @@
}
public void clear() throws HyracksDataException {
+ List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
+ LSMIndexUtil.waitFor(flushes);
deleteMemoryComponent(false);
Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
for (ILSMIndex lsmIndex : indexes) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index 52fba60..56d3eea 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -34,6 +34,9 @@
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
@@ -46,7 +49,7 @@
@Override
public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
- getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
+ getJobParticipatingNodes(spec, null).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
}
@Override
@@ -76,17 +79,42 @@
}
@Override
- public Set<String> getJobParticipatingNodes(JobSpecification spec) {
- return spec.getUserConstraints().stream().map(Constraint::getRValue)
- .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
- .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
- .collect(Collectors.toSet());
+ public Set<String> getJobParticipatingNodes(JobSpecification spec,
+ Class<? extends IOperatorDescriptor> operatorClass) {
+ if (operatorClass != null) {
+ List<OperatorDescriptorId> operatorDescriptorIds =
+ spec.getOperatorMap().entrySet().stream().filter(op -> operatorClass.isInstance(op.getValue()))
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+ return spec.getUserConstraints().stream()
+ .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_LOCATION && operatorDescriptorIds
+ .contains(((PartitionLocationExpression) ce.getLValue()).getOperatorDescriptorId()))
+ .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
+ .map(Object::toString).filter(nodeJobs::containsKey).collect(Collectors.toSet());
+ } else {
+ return spec.getUserConstraints().stream().map(Constraint::getRValue)
+ .filter(ce -> ce.getTag() == ExpressionTag.CONSTANT).map(ConstantExpression.class::cast)
+ .map(ConstantExpression::getValue).map(Object::toString).filter(nodeJobs::containsKey)
+ .collect(Collectors.toSet());
+ }
}
@Override
- public int getNumParticipatingPartitions(JobSpecification spec) {
- return spec.getUserConstraints().stream().filter(ce -> ce.getLValue() instanceof PartitionCountExpression)
- .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
- .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get();
+ public int getNumParticipatingPartitions(JobSpecification spec,
+ Class<? extends IOperatorDescriptor> operatorClass) {
+ if (operatorClass != null) {
+ List<OperatorDescriptorId> operatorDescriptorIds =
+ spec.getOperatorMap().entrySet().stream().filter(op -> operatorClass.isInstance(op.getValue()))
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+ return spec.getUserConstraints().stream()
+ .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_COUNT && operatorDescriptorIds
+ .contains(((PartitionCountExpression) ce.getLValue()).getOperatorDescriptorId()))
+ .map(Constraint::getRValue).map(ConstantExpression.class::cast).map(ConstantExpression::getValue)
+ .map(Object::toString).map(Integer::parseInt).max(Integer::compare).get();
+ } else {
+ return spec.getUserConstraints().stream()
+ .filter(ce -> ce.getLValue().getTag() == ExpressionTag.PARTITION_COUNT).map(Constraint::getRValue)
+ .map(ConstantExpression.class::cast).map(ConstantExpression::getValue).map(Object::toString)
+ .map(Integer::parseInt).max(Integer::compare).get();
+ }
}
}