Merge commit 'bd728afe' from stabilization-f69489
Change-Id: Ic382adcb9e6f0a084a029206a0785d979a45faad
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index b034f47..62f2c02 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -92,9 +92,9 @@
private AlgebricksAbsolutePartitionConstraint locations;
protected ActivityState prevState;
protected JobId jobId;
- protected long statsTimestamp;
+ protected volatile long statsTimestamp;
protected String stats;
- protected boolean isFetchingStats;
+ protected volatile boolean isFetchingStats;
protected int numRegistered;
protected int numDeRegistered;
protected volatile RecoveryTask rt;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 3c62d99..420585a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -55,7 +55,8 @@
}
@Override
- public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException {
+ public synchronized void init(long validComponentSequence, long lsn, long validComponentId)
+ throws HyracksDataException {
List<IndexCheckpoint> checkpoints;
try {
checkpoints = getCheckpoints();
@@ -66,7 +67,7 @@
LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
delete();
}
- IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn);
+ IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId);
persist(firstCheckpoint);
}
@@ -196,6 +197,9 @@
// ensure it was written correctly by reading it
read(checkpointPath);
return;
+ } catch (ClosedByInterruptException e) {
+ LOGGER.info("interrupted while writing checkpoint at {}", checkpointPath);
+ throw HyracksDataException.create(e);
} catch (IOException e) {
if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
index 22bc323..a81a142 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
@@ -32,26 +32,26 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$prefixTokenLeft][$$prefixTokenRight] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$prefixTokenLeft] |PARTITIONED|
- -- UNNEST |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$95] |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- STREAM_SELECT |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$95(ASC), $$i(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$95] |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight] |PARTITIONED|
+ -- UNNEST |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$97] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$97(ASC), $$i(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$97] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped] |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$tokenUnranked] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- UNNEST |PARTITIONED|
@@ -62,24 +62,19 @@
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$tokenGroupped] |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- RUNNING_AGGREGATE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- SORT_MERGE_EXCHANGE [$$99(ASC), $$tokenGroupped(ASC) ] |PARTITIONED|
- -- STABLE_SORT [$$99(ASC), $$tokenGroupped(ASC)] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- SORT_GROUP_BY[$$128] |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- HASH_PARTITION_EXCHANGE [$$128] |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$tokenGroupped] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- RUNNING_AGGREGATE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ] |PARTITIONED|
+ -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- SORT_GROUP_BY[$$130] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- HASH_PARTITION_EXCHANGE [$$130] |PARTITIONED|
-- SORT_GROUP_BY[$$token] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
@@ -97,23 +92,23 @@
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight] |PARTITIONED|
- -- UNNEST |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$97] |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- STREAM_SELECT |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight] |PARTITIONED|
+ -- UNNEST |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$97(ASC), $$i(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$97] |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$97] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$97(ASC), $$i(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$97] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped] |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$tokenUnranked] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- UNNEST |PARTITIONED|
@@ -124,21 +119,19 @@
-- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$tokenGroupped] |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- RUNNING_AGGREGATE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ] |PARTITIONED|
- -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- SORT_GROUP_BY[$$130] |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- HASH_PARTITION_EXCHANGE [$$130] |PARTITIONED|
- -- REPLICATE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$tokenGroupped] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- RUNNING_AGGREGATE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ] |PARTITIONED|
+ -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- SORT_GROUP_BY[$$130] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- HASH_PARTITION_EXCHANGE [$$130] |PARTITIONED|
-- SORT_GROUP_BY[$$token] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
index c636836..fff35d0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
@@ -51,16 +51,19 @@
-- HYBRID_HASH_JOIN [$$120][$$130] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- BTREE_SEARCH |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -83,50 +86,49 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$134][$$133] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$134] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- STREAM_SELECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$142] |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- AGGREGATE |LOCAL|
- -- STREAM_SELECT |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$171] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$142][$$144] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$176] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$176][$$177] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- BTREE_SEARCH |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$177] |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$177] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$133] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -178,7 +180,7 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$158][$$159] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -211,44 +213,46 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$171][$$170] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$171] |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$171] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- STREAM_SELECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$176] |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- AGGREGATE |LOCAL|
- -- STREAM_SELECT |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$176] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$176][$$177] |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$176][$$177] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- BTREE_SEARCH |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$177] |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- REPLICATE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$177] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$170] |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
index 38d0b5f..d0afd76 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
@@ -45,24 +45,29 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$85][$$142] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$85] |PARTITIONED|
- -- STREAM_PROJECT |UNPARTITIONED|
- -- ASSIGN |UNPARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- REPLICATE |UNPARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- AGGREGATE |UNPARTITIONED|
- -- AGGREGATE |UNPARTITIONED|
- -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
- -- AGGREGATE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- STREAM_SELECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$151] |PARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- REPLICATE |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ -- AGGREGATE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$142] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
@@ -122,24 +127,26 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- HYBRID_HASH_JOIN [$$151][$$152] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$151] |PARTITIONED|
- -- STREAM_PROJECT |UNPARTITIONED|
- -- ASSIGN |UNPARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- REPLICATE |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$151] |PARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
- -- AGGREGATE |UNPARTITIONED|
- -- AGGREGATE |UNPARTITIONED|
- -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
- -- AGGREGATE |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- STREAM_SELECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- REPLICATE |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ -- AGGREGATE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$152] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index dd9ede5..2f0eddf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -29,9 +29,10 @@
*
* @param validComponentSequence
* @param lsn
+ * @param validComponentId
* @throws HyracksDataException
*/
- void init(long validComponentSequence, long lsn) throws HyracksDataException;
+ void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException;
/**
* Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 9654473..cb34600 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -23,7 +23,6 @@
import java.util.Map;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -42,12 +41,12 @@
private long lastComponentId;
private Map<Long, Long> masterNodeFlushMap;
- public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) {
+ public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) {
IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
firstCheckpoint.lowWatermark = lowWatermark;
firstCheckpoint.validComponentSequence = lastComponentSequence;
- firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
+ firstCheckpoint.lastComponentId = validComponentId;
firstCheckpoint.masterNodeFlushMap = new HashMap<>();
return firstCheckpoint;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 448613b..e778cce 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -45,9 +45,11 @@
public class CheckpointPartitionIndexesTask implements IReplicaTask {
private final int partition;
+ private final long maxComponentId;
- public CheckpointPartitionIndexesTask(int partition) {
+ public CheckpointPartitionIndexesTask(int partition, long maxComponentId) {
this.partition = partition;
+ this.maxComponentId = maxComponentId;
}
@Override
@@ -75,7 +77,7 @@
maxComponentSequence =
Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
}
- indexCheckpointManager.init(maxComponentSequence, currentLSN);
+ indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId);
}
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
}
@@ -90,6 +92,7 @@
try {
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(partition);
+ dos.writeLong(maxComponentId);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -98,7 +101,8 @@
public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException {
try {
int partition = input.readInt();
- return new CheckpointPartitionIndexesTask(partition);
+ long maxComponentId = input.readLong();
+ return new CheckpointPartitionIndexesTask(partition, maxComponentId);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index f53d448..ae36c13 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -98,7 +99,8 @@
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
indexCheckpointManager.delete();
- indexCheckpointManager.init(Long.MIN_VALUE, currentLSN);
+ indexCheckpointManager.init(Long.MIN_VALUE, currentLSN,
+ LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
LOGGER.info(() -> "Checkpoint index: " + indexRef);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index ef85977..09f1205 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -25,6 +25,8 @@
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Performs the steps required to ensure any newly added replica
@@ -60,9 +62,17 @@
}
private void checkpointReplicaIndexes() throws IOException {
+ final int partition = replica.getIdentifier().getPartition();
CheckpointPartitionIndexesTask task =
- new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
+ new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
ReplicationProtocol.sendTo(replica, task);
ReplicationProtocol.waitForAck(replica);
}
+
+ private long getPartitionMaxComponentId(int partition) throws HyracksDataException {
+ final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ final PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy);
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index c0da095..8f870c0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -66,8 +66,8 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.util.ExitUtil;
@@ -196,7 +196,8 @@
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
final Path path = Paths.get(resourceFile.getAbsolutePath());
Files.write(path, bytes);
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0);
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0,
+ LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
deleteResourceFileMask(resourceFile);
} catch (Exception e) {
cleanup(resourceFile);
@@ -393,6 +394,21 @@
return partitionReplicatedFiles;
}
+ public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+ throws HyracksDataException {
+ long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
+ final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ final IIndexCheckpointManager indexCheckpointManager =
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
+ maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId());
+ }
+ }
+ return maxComponentId;
+ }
+
private List<String> getIndexFiles(File indexDir) {
final List<String> indexFiles = new ArrayList<>();
if (indexDir.isDirectory()) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 64e50ed..c18d76c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -89,6 +89,10 @@
IPartitioningProperty pp = childProp.getPartitioningProperty();
Map<LogicalVariable, LogicalVariable> ppSubstMap = computePartitioningPropertySubstitutionMap(gby, pp);
if (ppSubstMap != null) {
+ // We cannot modify pp directly, since it is owned by the input operator.
+ // Otherwise, the partitioning property would be modified even before this group by operator,
+ // which will be undesirable.
+ pp = pp.clonePartitioningProperty();
pp.substituteColumnVars(ppSubstMap);
}
List<ILocalStructuralProperty> childLocals = childProp.getLocalProperties();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
index bc6a45d..3e78fd2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
@@ -60,6 +60,12 @@
@Override
public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+
+ }
+
+ @Override
+ public IPartitioningProperty clonePartitioningProperty() {
+ return new BroadcastPartitioningProperty(domain);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
index f41d197..5164192 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
@@ -77,6 +77,8 @@
void setNodeDomain(INodeDomain domain);
void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap);
+
+ IPartitioningProperty clonePartitioningProperty();
}
class UnpartitionedProperty implements IPartitioningProperty {
@@ -116,4 +118,9 @@
public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> variableMap) {
// No partition columns are maintained for UNPARTITIONED.
}
+
+ @Override
+ public IPartitioningProperty clonePartitioningProperty() {
+ return new UnpartitionedProperty();
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 23c8273..b5a2bb5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -28,7 +28,7 @@
public class OrderedPartitionedProperty implements IPartitioningProperty {
- private List<OrderColumn> orderColumns;
+ private final List<OrderColumn> orderColumns;
private INodeDomain domain;
public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain) {
@@ -92,4 +92,9 @@
}
}
+ @Override
+ public IPartitioningProperty clonePartitioningProperty() {
+ return new OrderedPartitionedProperty(new ArrayList<>(orderColumns), domain);
+ }
+
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
index bbd835c..951a031 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
@@ -65,6 +65,12 @@
@Override
public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+
+ }
+
+ @Override
+ public IPartitioningProperty clonePartitioningProperty() {
+ return new RandomPartitioningProperty(domain);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index f59638c..5966407 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -77,4 +78,9 @@
});
}
+ @Override
+ public IPartitioningProperty clonePartitioningProperty() {
+ return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain);
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index f02654e..a67b40e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -79,9 +79,8 @@
response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
keepAlive = HttpUtil.isKeepAlive(request);
- if (keepAlive) {
- response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
- }
+ response.headers().set(HttpHeaderNames.CONNECTION,
+ keepAlive ? HttpHeaderValues.KEEP_ALIVE : HttpHeaderValues.CLOSE);
}
@Override
@@ -119,17 +118,10 @@
}
future = ctx.channel().close();
} else {
- if (keepAlive && response.status() != HttpResponseStatus.UNAUTHORIZED) {
- response.headers().remove(HttpHeaderNames.CONNECTION);
- }
- // we didn't send anything to the user, we need to send an unchunked error response
+ // we didn't send anything to the user, we need to send an non-chunked error response
fullResponse(response.protocolVersion(), response.status(),
error == null ? ctx.alloc().buffer(0, 0) : error, response.headers());
}
- if (response.status() != HttpResponseStatus.UNAUTHORIZED) {
- // since the request failed, we need to close the channel on complete
- future.addListener(ChannelFutureListener.CLOSE);
- }
}
done = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3d928a4..9199fbb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -577,7 +577,7 @@
if (c != EmptyComponent.INSTANCE) {
diskComponents.add(0, c);
}
- assert checkComponentIds();
+ validateComponentIds();
}
@Override
@@ -588,7 +588,7 @@
if (newComponent != EmptyComponent.INSTANCE) {
diskComponents.add(swapIndex, newComponent);
}
- assert checkComponentIds();
+ validateComponentIds();
}
/**
@@ -597,16 +597,16 @@
*
* @throws HyracksDataException
*/
- private boolean checkComponentIds() throws HyracksDataException {
+ private void validateComponentIds() throws HyracksDataException {
for (int i = 0; i < diskComponents.size() - 1; i++) {
ILSMComponentId id1 = diskComponents.get(i).getId();
ILSMComponentId id2 = diskComponents.get(i + 1).getId();
IdCompareResult cmp = id1.compareTo(id2);
if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
- return false;
+ throw new IllegalStateException(
+ "found non-decreasing component ids (" + id1 + " -> " + id2 + ") on index " + this);
}
}
- return true;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index ab59b59..3fa45c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -101,8 +101,8 @@
for (int i = 0; i < bulkloadersCount; i++) {
bulkloaderChain.get(i).cleanupArtifacts();;
}
+ diskComponent.deactivateAndDestroy();
}
- diskComponent.deactivateAndDestroy();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 3a43ba7..c739ad0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -31,6 +32,7 @@
private final AbstractLSMIndex lsmIndex;
private final ILSMDiskComponentBulkLoader componentBulkLoader;
private final ILSMIndexOperationContext opCtx;
+ private boolean failed = false;
public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext opCtx, float fillFactor,
boolean verifyInput, long numElementsHint) throws HyracksDataException {
@@ -68,20 +70,10 @@
@Override
public void end() throws HyracksDataException {
try {
- try {
- lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
- componentBulkLoader.end();
- } catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure
- opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE);
- opCtx.getIoOperation().setFailure(th);
- throw th;
- } finally {
- lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
- }
- if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
- && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
- lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
- }
+ presistComponentToDisk();
+ } catch (Throwable th) { // NOSONAR must cleanup in case of any failure
+ fail(th);
+ throw th;
} finally {
lsmIndex.getIOOperationCallback().completed(opCtx.getIoOperation());
}
@@ -116,4 +108,28 @@
return opCtx.getIoOperation().getFailure();
}
+ private void presistComponentToDisk() throws HyracksDataException {
+ try {
+ lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
+ componentBulkLoader.end();
+ } catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure
+ fail(th);
+ throw th;
+ } finally {
+ lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
+ }
+ if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
+ && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
+ lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
+ }
+ }
+
+ private void fail(Throwable th) {
+ if (!failed) {
+ failed = true;
+ final ILSMIOOperation loadOp = opCtx.getIoOperation();
+ loadOp.setFailure(th);
+ lsmIndex.cleanUpFilesForFailedOperation(loadOp);
+ }
+ }
}
\ No newline at end of file