Cleanup and bug fixes in Feeds pipeline
The bug fix is for the MessagingFrameTupleAppender. It used
to consume one extra byte per frame.
Cleanups are for feeds. These include:
1. Remove the use of feed dataflow marker. Feeds which
need to send progress can and should do that without
a marker thread.
2. Lock the memory component for feed commit to be able
to add information to the memory component's metadata
page safely.
In addition, this change introduces a frame level callback
for index operations.
Change-Id: Ie97b2133ebecb7380cf0ba336e60ed714d06f8ee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1523
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 86faa6c..c6ea045 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -237,8 +237,7 @@
}
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), datasetName, indexName,
- dataset.getDatasetDetails().isTemp());
+ metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
// TODO: Here we assume there is only one search key field.
int queryField = keyFields[0];
// Get tokenizer and search modifier factories.
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index f7e70a3..6c6f2af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -54,7 +54,7 @@
private transient IStatementExecutorFactory statementExecutorFactory;
/**
- * Initialize {@code CompilerExtensionManager} from configuration
+ * Initialize {@link org.apache.asterix.app.cc.CCExtensionManager} from configuration
*
* @param list
* a list of extensions
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
index d1edec0..99f641b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -20,7 +20,6 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -45,7 +44,7 @@
runtimeContext.unexportMetadataNodeStub();
}
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
index 8e842a9..8604364 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -21,7 +21,6 @@
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -42,7 +41,7 @@
try {
ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
index 203e453..bca39b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -45,7 +44,7 @@
try {
runtimeContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
} catch (IOException | ACIDException e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index ab19573..f7c33a4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -20,7 +20,6 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -38,7 +37,7 @@
SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS);
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
index d060f61..17fde86 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -20,7 +20,6 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -43,7 +42,7 @@
//Start replication after the state of remote replicas has been initialized.
replicationManager.startReplicationThreads();
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index 8d8a0f2..45f96ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -56,7 +56,6 @@
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -495,7 +494,7 @@
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 5a7036a..c40e236 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -47,7 +47,6 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -162,7 +161,7 @@
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
index 51defaa..b8b3c49 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -41,7 +41,6 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -107,7 +106,7 @@
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
index 2d423f9..16a800b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
@@ -24,7 +24,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
@@ -75,7 +74,7 @@
remoteRecoeryManager.completeFailbackProcess();
} catch (IOException | InterruptedException e) {
LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e);
- hde = ExceptionUtils.convertToHyracksDataException(e);
+ hde = HyracksDataException.create(e);
} finally {
CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
requestId, partitions);
@@ -83,7 +82,7 @@
broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = HyracksDataException.suppress(hde, e);
}
}
if (hde != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
index 2104f9c..8188c44 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
@@ -24,7 +24,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -91,7 +90,7 @@
appContext.unexportMetadataNodeStub();
} catch (RemoteException e) {
LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e);
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
} else {
//close all non-metadata datasets
@@ -114,7 +113,7 @@
broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
index 96ae8be..e0bc49d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
@@ -23,7 +23,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -53,7 +52,7 @@
broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
index be42a9d..472a89c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
@@ -18,7 +18,9 @@
*/
package org.apache.asterix.app.replication.message;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -27,9 +29,6 @@
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
public class StartupTaskRequestMessage implements INCLifecycleMessage {
private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName());
@@ -48,7 +47,7 @@
((INCMessageBroker) cs.getApplicationContext().getMessageBroker()).sendMessageToCC(msg);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
index 6a72776..922ac89 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
@@ -23,7 +23,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -62,7 +61,7 @@
broker.sendMessageToCC(result);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
index 8ce12dd..3be3eab 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
@@ -22,7 +22,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +45,7 @@
appContext.exportMetadataNodeStub();
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
- hde = new HyracksDataException(e);
+ hde = HyracksDataException.create(e);
} finally {
TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
appContext.getTransactionSubsystem().getId());
@@ -54,7 +53,7 @@
broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = HyracksDataException.suppress(hde, e);
}
}
if (hde != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
index 4e415de..7f9cc2b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
@@ -86,7 +85,7 @@
remoteRecoeryManager.takeoverPartitons(partitions);
} catch (IOException | ACIDException e) {
LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = HyracksDataException.suppress(hde, e);
} finally {
//send response after takeover is completed
TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
@@ -95,7 +94,7 @@
broker.sendMessageToCC(reponse);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = HyracksDataException.suppress(hde, e);
}
}
if (hde != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index d785cce..da93fb8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.messaging;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,7 +31,7 @@
public class CCMessageBroker implements ICCMessageBroker {
- private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
private final ClusterControllerService ccs;
public CCMessageBroker(ClusterControllerService ccs) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index bc8a79e..d766827 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -62,8 +62,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, indexName,
- dataset.getDatasetDetails().isTemp());
+ metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index d04217c..48ca338 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -159,8 +159,8 @@
BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
RecordDescriptor rDesc = new RecordDescriptor(serdes);
TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
- IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
- CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+ IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
+ NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
partitioner.open();
FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
List<TestFrameWriter> recipients = new ArrayList<>();
@@ -263,11 +263,11 @@
}
partitioner.nextFrame(frame.getBuffer());
partitioner.flush();
- Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
- Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 2);
- Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);
- Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 2);
- Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 2);
+ Assert.assertEquals(1, partitionWriterFactory.getWriters().get(0).nextFrameCount());
+ Assert.assertEquals(2, partitionWriterFactory.getWriters().get(1).nextFrameCount());
+ Assert.assertEquals(1, partitionWriterFactory.getWriters().get(2).nextFrameCount());
+ Assert.assertEquals(2, partitionWriterFactory.getWriters().get(3).nextFrameCount());
+ Assert.assertEquals(2, partitionWriterFactory.getWriters().get(4).nextFrameCount());
for (TestFrameWriter writer : recipients) {
fta.reset(writer.getLastFrame());
Assert.assertEquals(fta.getTupleCount(), 1);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index cbb4868..1a8ccae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -39,6 +39,7 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -72,7 +73,7 @@
}
@Override
- public synchronized IIndex get(String resourcePath) throws HyracksDataException {
+ public synchronized ILSMIndex get(String resourcePath) throws HyracksDataException {
validateDatasetLifecycleManagerState();
int datasetID = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -80,7 +81,7 @@
}
@Override
- public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+ public synchronized ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
validateDatasetLifecycleManagerState();
DatasetResource datasetResource = datasets.get(datasetID);
if (datasetResource == null) {
@@ -556,7 +557,7 @@
while (used + additionalSize > capacity) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
- + " memory since memory budget would be exceeded.");
+ + " memory since memory budget would be exceeded.");
}
}
used += additionalSize;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 403d3cb..41e587d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -81,7 +81,7 @@
return datasetVirtualBufferCaches;
}
- public IIndex getIndex(long resourceID) {
+ public ILSMIndex getIndex(long resourceID) {
IndexInfo iInfo = getIndexInfo(resourceID);
return (iInfo == null) ? null : iInfo.getIndex();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
new file mode 100644
index 0000000..3952b11
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class NoOpFrameOperationCallbackFactory implements IFrameOperationCallbackFactory {
+ private static final long serialVersionUID = 1L;
+ private static final NoOpFrameOperationCallback CALLBACK = new NoOpFrameOperationCallback();
+ public static final NoOpFrameOperationCallbackFactory INSTANCE = new NoOpFrameOperationCallbackFactory();
+
+ private NoOpFrameOperationCallbackFactory() {
+ }
+
+ @Override
+ public IFrameOperationCallback createFrameOperationCallback(IHyracksTaskContext ctx,
+ ILSMIndexAccessor indexAccessor) {
+ return CALLBACK;
+ }
+
+ private static class NoOpFrameOperationCallback implements IFrameOperationCallback {
+ @Override
+ public void frameCompleted(boolean modified) throws HyracksDataException {
+ // No Op
+ }
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 4898e40..517e243 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -21,7 +21,6 @@
import java.io.InputStream;
import java.util.Map;
-import org.apache.asterix.event.schema.cluster.FaultTolerance;
import org.apache.hyracks.api.util.ErrorMessageUtil;
// Error code:
@@ -85,6 +84,7 @@
public static final int COMPILATION_AQLPLUS_IDENTIFIER_NOT_FOUND = 1024;
public static final int COMPILATION_AQLPLUS_NO_SUCH_JOIN_TYPE = 1025;
public static final int COMPILATION_FUNC_EXPRESSION_CANNOT_UTILIZE_INDEX = 1026;
+ public static final int COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX = 1027;
// Feed errors
public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index b9d187d..649f1f5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.common.exceptions;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
public class ExceptionUtils {
public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
public static final String MISSING_PARAMETER = "Missing parameter.\n";
@@ -34,29 +32,4 @@
return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE
+ expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue;
}
-
- public static HyracksDataException suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
- if (hde == null) {
- return new HyracksDataException(th);
- } else {
- hde.addSuppressed(th);
- return hde;
- }
- }
-
- public static Throwable suppress(Throwable suppressor, Throwable suppressed) {
- if (suppressor == null) {
- return suppressed;
- } else if (suppressed != null) {
- suppressor.addSuppressed(suppressed);
- }
- return suppressor;
- }
-
- public static HyracksDataException convertToHyracksDataException(Throwable throwable) {
- if (throwable == null || throwable instanceof HyracksDataException) {
- return (HyracksDataException) throwable;
- }
- return new HyracksDataException(throwable);
- }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
index b977c4d..bbe2c4f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -119,4 +119,8 @@
pointable.setLong(lsn);
index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, pointable);
}
+
+ public ILSMIndex getIndex() {
+ return index;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index aae7050..bf9c6f9 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -70,6 +70,7 @@
1024 = Identifier %1$s is not found in AQL+ meta-scope
1025 = There is no such join type in AQL+
1026 = The given function expression %1$s cannot utilize index
+1027 = Dataset of type %1$s doesn't have a primary index
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d03f9df..37262b7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -61,7 +61,7 @@
private boolean isFeed;
private FileSplit[] feedLogFileSplits;
private ARecordType metaType;
- private FeedLogManager feedLogManager = null;
+ private transient FeedLogManager feedLogManager;
@Override
public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -75,8 +75,7 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws HyracksDataException, AlgebricksException {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
return dataSourceFactory.getPartitionConstraint();
}
@@ -86,12 +85,12 @@
@Override
public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ IAppRuntimeContext runtimeCtx =
+ (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
try {
restoreExternalObjects(runtimeCtx.getLibraryManager());
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
if (isFeed) {
if (feedLogManager == null) {
@@ -184,6 +183,11 @@
this.metaType = metaType;
}
+ /**
+ * used by extensions to access shared datasource factory for a job
+ *
+ * @return the data source factory
+ */
public IExternalDataSourceFactory getDataSourceFactory() {
return dataSourceFactory;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 3ea3bb1..df1b43e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -40,7 +40,7 @@
*
* @return the display name
*/
- public String getAlias();
+ String getAlias();
/**
* Gets a list of partition constraints. A partition constraint can be a
@@ -54,10 +54,8 @@
* running on the node with the given IP address.
*
* @throws AlgebricksException
- * @throws HyracksDataException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws HyracksDataException, AlgebricksException;
+ AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
/**
* Creates an instance of IDatasourceAdapter.
@@ -67,22 +65,40 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+ IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
/**
+ * Configure the adapter
+ *
* @param libraryManager
* @param configuration
* @throws AlgebricksException
* @throws HyracksDataException
*/
- public void configure(ILibraryManager libraryManager, Map<String, String> configuration)
+ void configure(ILibraryManager libraryManager, Map<String, String> configuration)
throws HyracksDataException, AlgebricksException;
- public void setOutputType(ARecordType outputType);
+ /**
+ * Set the expected record output type of the adapter
+ *
+ * @param outputType
+ */
+ void setOutputType(ARecordType outputType);
- public void setMetaType(ARecordType metaType);
+ /**
+ * Set the expected meta output type of the adapter
+ *
+ * @param metaType
+ */
+ void setMetaType(ARecordType metaType);
- public ARecordType getOutputType();
+ /**
+ * @return the adapter record output type
+ */
+ ARecordType getOutputType();
- public ARecordType getMetaType();
+ /**
+ * @return the adapter meta output type
+ */
+ ARecordType getMetaType();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index e2274b9..5538369 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -53,8 +53,7 @@
* @return
* @throws AsterixException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws AlgebricksException, HyracksDataException;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
/**
* Configure the data parser factory. The passed map contains key value pairs from the
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index d85fe65..57e79c3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.dataflow;
-import java.io.IOException;
-
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordWithPKDataParser;
@@ -34,9 +32,9 @@
public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
- throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
+ final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+ throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
this.dataParser = dataParser;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 4c88b0f..22fa8be 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.dataflow;
-import java.io.IOException;
-
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordWithMetadataParser;
@@ -32,14 +30,14 @@
public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
- throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
+ final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
+ throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
}
@Override
protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record)
throws HyracksDataException {
- ((IRecordWithMetadataParser<T>) dataParser).appendLastParsedPrimaryKeyToTuple(tb);
+ dataParser.appendLastParsedPrimaryKeyToTuple(tb);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2e687ba..7ba3ae4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,71 +19,52 @@
package org.apache.asterix.external.dataflow;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.external.api.IFeedMarker;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.log4j.Logger;
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName());
- protected final IRecordDataParser<T> dataParser;
- protected final IRecordReader<? extends T> recordReader;
+ private final IRecordDataParser<T> dataParser;
+ private final IRecordReader<T> recordReader;
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected static final long INTERVAL = 1000;
- protected final Object mutex = new Object();
- protected final boolean sendMarker;
protected boolean failed = false;
- private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
- private Future<?> dataflowMarkerResult;
public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser,
- IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+ IRecordReader<T> recordReader) throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
this.recordReader = recordReader;
- this.sendMarker = sendMarker;
recordReader.setFeedLogManager(feedLogManager);
recordReader.setController(this);
}
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
- startDataflowMarker();
HyracksDataException hde = null;
try {
failed = false;
tupleForwarder.initialize(ctx, writer);
while (recordReader.hasNext()) {
- // synchronized on mutex before we call next() so we don't a marker before its record
- synchronized (mutex) {
- IRawRecord<? extends T> record = recordReader.next();
- if (record == null) {
- flush();
- mutex.wait(INTERVAL);
- continue;
- }
- tb.reset();
- parseAndForward(record);
+ IRawRecord<? extends T> record = recordReader.next();
+ if (record == null) {
+ flush();
+ Thread.sleep(INTERVAL); // NOSONAR: No one notifies the sleeping thread
+ continue;
}
+ tb.reset();
+ parseAndForward(record);
}
} catch (InterruptedException e) {
//TODO: Find out what could cause an interrupted exception beside termination of a job/feed
@@ -93,24 +74,20 @@
failed = true;
tupleForwarder.flush();
LOGGER.warn("Failure while operating a feed source", e);
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
- stopDataflowMarker();
try {
tupleForwarder.close();
} catch (Throwable th) {
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = HyracksDataException.suppress(hde, th);
}
try {
recordReader.close();
} catch (Throwable th) {
LOGGER.warn("Failure during while operating a feed sourcec", th);
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = HyracksDataException.suppress(hde, th);
} finally {
closeSignal();
- if (sendMarker && dataflowMarkerResult != null) {
- dataflowMarkerResult.cancel(true);
- }
}
if (hde != null) {
throw hde;
@@ -118,20 +95,18 @@
}
private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
- synchronized (dataParser) {
- try {
- dataParser.parse(record, tb.getDataOutput());
- } catch (Exception e) {
- LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
- feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
- // continue the outer loop
- return;
- }
- tb.addFieldEndOffset();
- addMetaPart(tb, record);
- addPrimaryKeys(tb, record);
- tupleForwarder.addTuple(tb);
+ try {
+ dataParser.parse(record, tb.getDataOutput());
+ } catch (Exception e) {
+ LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+ feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
+ // continue the outer loop
+ return;
}
+ tb.addFieldEndOffset();
+ addMetaPart(tb, record);
+ addPrimaryKeys(tb, record);
+ tupleForwarder.addTuple(tb);
}
protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
@@ -140,21 +115,6 @@
protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
}
- private void startDataflowMarker() {
- ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
- if (sendMarker && dataflowMarker == null) {
- dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
- TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
- dataflowMarkerResult = executorService.submit(dataflowMarker);
- }
- }
-
- private void stopDataflowMarker() {
- if (dataflowMarker != null) {
- dataflowMarker.stop();
- }
- }
-
private void closeSignal() {
synchronized (closed) {
closed.set(true);
@@ -172,7 +132,6 @@
@Override
public boolean stop() throws HyracksDataException {
- stopDataflowMarker();
HyracksDataException hde = null;
if (recordReader.stop()) {
if (failed) {
@@ -180,12 +139,12 @@
try {
tupleForwarder.close();
} catch (Throwable th) {
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = HyracksDataException.suppress(hde, th);
}
try {
recordReader.close();
} catch (Throwable th) {
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+ hde = HyracksDataException.suppress(hde, th);
}
if (hde != null) {
throw hde;
@@ -194,7 +153,7 @@
try {
waitForSignal();
} catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
return true;
@@ -208,52 +167,11 @@
return recordReader.handleException(th);
}
- private class DataflowMarker implements Runnable {
- private final IFeedMarker marker;
- private final VSizeFrame mark;
- private volatile boolean stopped = false;
+ public IRecordReader<T> getReader() {
+ return recordReader;
+ }
- public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
- this.marker = marker;
- this.mark = mark;
- }
-
- public synchronized void stop() {
- stopped = true;
- notify();
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- synchronized (this) {
- if (!stopped) {
- // TODO (amoudi): find a better reactive way to do this
- // sleep for two seconds
- wait(TimeUnit.SECONDS.toMillis(2));
- } else {
- break;
- }
- }
- synchronized (mutex) {
- if (marker.mark(mark)) {
- // broadcast
- tupleForwarder.flush();
- // clear
- mark.getBuffer().clear();
- mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
- mark.getBuffer().flip();
- }
- }
- }
- } catch (InterruptedException e) {
- LOGGER.warn("Marker stopped", e);
- Thread.currentThread().interrupt();
- return;
- } catch (Exception e) {
- LOGGER.warn("Marker stopped", e);
- }
- }
+ public IRecordDataParser<T> getParser() {
+ return dataParser;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index d31e074..4177ea6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -28,12 +28,9 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
public class FeedTupleForwarder implements ITupleForwarder {
@@ -58,11 +55,6 @@
this.frame = new VSizeFrame(ctx);
this.writer = writer;
this.appender = new FrameTupleAppender(frame);
- // Set null feed message
- VSizeFrame message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
- // a null message
- message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
- message.getBuffer().flip();
initialized = true;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 45ae52b..c7f6d9c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -28,15 +28,18 @@
public class FeedWithMetaDataFlowController<T> extends FeedRecordDataFlowController<T> {
+ protected final IRecordWithMetadataParser<T> dataParser;
+
public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
- IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
+ IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ this.dataParser = dataParser;
}
@Override
protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws HyracksDataException {
- ((IRecordWithMetadataParser<T>) dataParser).parseMeta(tb.getDataOutput());
+ dataParser.parseMeta(tb.getDataOutput());
tb.addFieldEndOffset();
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 4649559..9b23e38 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -52,8 +52,7 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
- throws HyracksDataException, AlgebricksException {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
return streamFactory.getPartitionConstraint();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 7995091..964508f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -23,7 +23,6 @@
import java.net.ServerSocket;
import java.net.Socket;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
@@ -108,7 +107,7 @@
}
connectionStream = null;
} catch (IOException e) {
- hde = new HyracksDataException(e);
+ hde = HyracksDataException.create(e);
}
try {
if (socket != null) {
@@ -116,14 +115,14 @@
}
socket = null;
} catch (IOException e) {
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = HyracksDataException.suppress(hde, e);
}
try {
if (server != null) {
server.close();
}
} catch (IOException e) {
- hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+ hde = HyracksDataException.suppress(hde, e);
} finally {
server = null;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index fe2d4ec..8a7bda9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -27,11 +27,13 @@
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
+import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
/**
@@ -58,12 +60,21 @@
@Override
protected void start() throws HyracksDataException, InterruptedException {
- writer.open();
try {
+ writer.open();
Thread.currentThread().setName("Intake Thread");
FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition);
- TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+ IFrame message = new VSizeFrame(ctx);
+ TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ /*
+ * Set null feed message. Feed pipeline carries with it a message with each frame
+ * Initially, the message is set to a null message that can be changed by feed adapters.
+ * One use case is adapters which consume data sources that allow restartability. Such adapters
+ * can propagate progress information through the ingestion pipeline to storage nodes
+ */
+ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
adapterRuntimeManager.start();
synchronized (adapterRuntimeManager) {
while (!adapterRuntimeManager.isDone()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index a369fe3..78f24a5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -69,7 +69,6 @@
IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
- boolean sendMarker = ExternalDataUtils.isSendMarker(configuration);
if (indexingOp) {
return new IndexingDataFlowController(ctx,
DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
@@ -83,19 +82,18 @@
if (isChangeFeed) {
int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
- numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader,
- sendMarker);
+ numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
} else {
return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
- (IRecordWithMetadataParser) dataParser, recordReader, sendMarker);
+ (IRecordWithMetadataParser) dataParser, recordReader);
}
} else if (isChangeFeed) {
int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
- (IRecordWithPKDataParser) dataParser, recordReader, sendMarker);
+ (IRecordWithPKDataParser) dataParser, recordReader);
} else {
return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
- recordReader, sendMarker);
+ recordReader);
}
} else {
return new RecordDataFlowController(ctx,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a89d13e..3b6e7ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -41,8 +41,6 @@
public static final String KEY_FILESYSTEM = "fs";
// specifies the address of the HDFS name node
public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
- // specifies whether a feed sends progress markers or not
- public static final String KEY_SEND_MARKER = "send-marker";
// specifies the class implementation of the accessed instance of HDFS
public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d009960..a09ff9b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.external.util;
-import java.util.HashMap;
+import java.util.EnumMap;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -162,7 +162,7 @@
private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
- Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+ Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
@@ -339,8 +339,4 @@
}
return intIndicators;
}
-
- public static boolean isSendMarker(Map<String, String> configuration) {
- return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
- }
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d286ff9..3863920 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -79,12 +79,10 @@
ITupleForwarder forwarder;
ArrayTupleBuilder tb;
IPropertiesProvider propertiesProvider =
- (IPropertiesProvider) ((NodeControllerService) ctx
- .getJobletContext().getApplicationContext().getControllerService())
- .getApplicationContext()
- .getApplicationObject();
- ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
- .get(nodeId)[0];
+ (IPropertiesProvider) ((NodeControllerService) ctx.getJobletContext().getApplicationContext()
+ .getControllerService()).getApplicationContext().getApplicationObject();
+ ClusterPartition nodePartition =
+ propertiesProvider.getMetadataProperties().getNodePartitions().get(nodeId)[0];
parser = new ADMDataParser(outputType, true);
forwarder = DataflowUtils.getTupleForwarder(configuration,
FeedUtils.getFeedLogManager(ctx,
@@ -144,5 +142,4 @@
public ARecordType getMetaType() {
return null;
}
-
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..6901e1d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -310,8 +310,8 @@
}
public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
- String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
- : dataverse;
+ String dv =
+ dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) : dataverse;
if (dv == null) {
return null;
}
@@ -353,25 +353,15 @@
throws AlgebricksException {
DataSource source = findDataSource(dataSourceId);
Dataset dataset = ((DatasetDataSource) source).getDataset();
- try {
- String indexName = indexId;
- Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
- if (secondaryIndex != null) {
- return new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
- } else {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- if (primaryIndex.getIndexName().equals(indexId)) {
- return new DataSourceIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(),
- this);
- } else {
- return null;
- }
- }
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
+ String indexName = indexId;
+ Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+ return (secondaryIndex != null)
+ ? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this)
+ : null;
+ }
+
+ public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
+ return MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
}
public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
@@ -405,8 +395,7 @@
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
throws AlgebricksException {
- ExternalScanOperatorDescriptor dataScanner =
- new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
+ ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
AlgebricksPartitionConstraint constraint;
try {
constraint = adapterFactory.getPartitionConstraint();
@@ -462,8 +451,8 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
- boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes,
- int[] maxFilterFieldIndexes) throws AlgebricksException {
+ boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes)
+ throws AlgebricksException {
boolean isSecondary = true;
int numSecondaryKeys = 0;
try {
@@ -530,8 +519,7 @@
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- spPc = getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(), indexName,
- temp);
+ spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName());
ISearchOperationCallbackFactory searchCallbackFactory;
if (isSecondary) {
@@ -586,8 +574,7 @@
JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
try {
- ARecordType recType =
- (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+ ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
boolean temp = dataset.getDatasetDetails().isTemp();
@@ -630,8 +617,8 @@
ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(
- dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
ARecordType metaType = null;
if (dataset.hasMetaPart()) {
metaType =
@@ -751,7 +738,6 @@
isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
- String indexName = primaryIndex.getIndexName();
ARecordType metaType = dataset.hasMetaPart()
? (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
: null;
@@ -763,8 +749,7 @@
itemType, metaType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
- temp);
+ getSplitProviderAndConstraints(dataset);
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -779,9 +764,9 @@
new TreeIndexBulkLoadOperatorDescriptor(spec, null, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
- numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
- metaType, compactionInfo.first, compactionInfo.second),
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
+ false, numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex,
+ itemType, metaType, compactionInfo.first, compactionInfo.second),
metadataPageManagerFactory);
return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
} catch (MetadataException me) {
@@ -951,12 +936,6 @@
numKeyFields / 2);
}
- public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(String dataverseName,
- String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
- FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
- return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
- }
-
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(String dataverse) {
return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
}
@@ -970,8 +949,7 @@
throws MetadataException {
DatasourceAdapter adapter;
// search in default namespace (built-in adapter)
- adapter =
- MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+ adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
// search in dataverse (user-defined adapter)
if (adapter == null) {
@@ -985,8 +963,7 @@
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
- String dataverseName, String datasetName, String targetIdxName, boolean create)
- throws AlgebricksException {
+ String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName,
targetIdxName, create);
}
@@ -1104,8 +1081,7 @@
IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
- temp);
+ getSplitProviderAndConstraints(dataset);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
@@ -1171,7 +1147,7 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, null, true, indexName,
context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, null,
- metadataPageManagerFactory);
+ metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
op.setType(itemType);
op.setFilterIndex(fieldIdx);
return new Pair<>(op, splitsAndConstraint.second);
@@ -1222,8 +1198,7 @@
Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i),
(hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
IAType keyType = keyPairType.first;
- comparatorFactories[i] =
- BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+ comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
@@ -1245,8 +1220,7 @@
} catch (AsterixException e) {
throw new AlgebricksException(e);
}
- comparatorFactories[i] =
- BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+ comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
@@ -1295,8 +1269,7 @@
dataset.getDatasetName(), dataset.getDatasetName());
String indexName = primaryIndex.getIndexName();
ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName())
- .getDatatype();
+ .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
@@ -1304,8 +1277,7 @@
IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
- temp);
+ getSplitProviderAndConstraints(dataset);
// prepare callback
int datasetId = dataset.getDatasetId();
@@ -1462,10 +1434,11 @@
itemType = (ARecordType) MetadataManager.INSTANCE
.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
validateRecordType(itemType);
- ARecordType metaType = dataset.hasMetaPart()
- ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
- : null;
+ ARecordType metaType =
+ dataset.hasMetaPart()
+ ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+ dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
+ : null;
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1488,8 +1461,8 @@
ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
for (i = 0; i < secondaryKeys.size(); ++i) {
- Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
- secondaryKeyNames.get(i), itemType);
+ Pair<IAType, Boolean> keyPairType =
+ Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), itemType);
IAType keyType = keyPairType.first;
comparatorFactories[i] =
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
@@ -1506,18 +1479,17 @@
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
IModificationOperationCallbackFactory modificationCallbackFactory = temp
? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE)
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE, dataset.hasMetaPart());
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+ dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1536,7 +1508,7 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false,
indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory);
+ prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
@@ -1648,7 +1620,7 @@
dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
int[] btreeFields = new int[primaryComparatorFactories.length];
for (int k = 0; k < btreeFields.length; k++) {
btreeFields[k] = k + numSecondaryKeys;
@@ -1671,11 +1643,10 @@
int datasetId = dataset.getDatasetId();
IModificationOperationCallbackFactory modificationCallbackFactory = temp
? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE)
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE, dataset.hasMetaPart());
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+ dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1694,13 +1665,13 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory, filterFactory, false,
indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory);
+ prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory,
- filterFactory, false, indexName, null, modificationCallbackFactory,
- NoOpOperationCallbackFactory.INSTANCE, metadataPageManagerFactory);
+ comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory,
+ false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+ metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1874,7 +1845,7 @@
IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2055,8 +2026,7 @@
secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- getSplitProviderAndConstraints(dataverseName, datasetName, indexName,
- dataset.getDatasetDetails().isTemp());
+ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
// Generate Output Record format
ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -2125,4 +2095,18 @@
public IStorageComponentProvider getStorageComponentProvider() {
return storaegComponentProvider;
}
+
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds)
+ throws AlgebricksException {
+ FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), ds.getDatasetName(),
+ ds.getDatasetDetails().isTemp());
+ return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ }
+
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds,
+ String indexName) throws AlgebricksException {
+ FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), indexName,
+ ds.getDatasetDetails().isTemp());
+ return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..34faf63 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,14 +29,15 @@
import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.JobUtils;
@@ -84,6 +85,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -409,8 +411,8 @@
case LENGTH_PARTITIONED_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case SINGLE_PARTITION_WORD_INVIX:
- return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this,
- index, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
+ return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this, index,
+ recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
filterCmpFactories);
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
@@ -577,4 +579,30 @@
metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
datasetPartitions, isSink);
}
+
+ /**
+ * Get the index dataflow helper factory for the dataset's primary index
+ *
+ * @param mdProvider
+ * an instance of metadata provider that is used to fetch metadata information
+ * @throws AlgebricksException
+ */
+ public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider)
+ throws AlgebricksException {
+ if (getDatasetType() != DatasetType.INTERNAL) {
+ throw new AlgebricksException(ErrorCode.ASTERIX,
+ ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX, getDatasetType());
+ }
+ Index index = mdProvider.getIndex(getDataverseName(), getDatasetName(), getDatasetName());
+ ARecordType recordType = (ARecordType) mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
+ ARecordType metaType = (ARecordType) mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+ DatasetUtil.getMergePolicyFactory(this, mdProvider.getMetadataTxnContext());
+ return getIndexDataflowHelperFactory(mdProvider, index, recordType, metaType, compactionInfo.first,
+ compactionInfo.second);
+ }
+
+ public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
+ return NoOpFrameOperationCallbackFactory.INSTANCE;
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 80792b5..572cc75 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -330,14 +330,12 @@
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return RuntimeUtils.createJobSpecification();
}
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
- dataset.getDatasetName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
@@ -386,15 +384,12 @@
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return RuntimeUtils.createJobSpecification();
}
-
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
- dataset.getDatasetName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
@@ -429,7 +424,6 @@
}
Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName, datasetName);
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
// get meta item type
@@ -451,7 +445,7 @@
int[] btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fs.length; i++) {
@@ -495,7 +489,6 @@
if (dataset == null) {
throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
}
- boolean temp = dataset.getDatasetDetails().isTemp();
ARecordType itemType =
(ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
ARecordType metaItemType = DatasetUtil.getMetaType(metadataProvider, dataset);
@@ -505,7 +498,7 @@
ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
int[] blooFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 249f035..edaa73e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -452,12 +452,11 @@
JobSpecification spec = RuntimeUtils.createJobSpecification();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
- boolean temp = ds.getDatasetDetails().isTemp();
ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds,
+ IndexingConstants.getFilesIndexName(ds.getDatasetName()));
IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -472,8 +471,7 @@
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -499,11 +497,9 @@
DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-
- boolean temp = ds.getDatasetDetails().isTemp();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds,
+ IndexingConstants.getFilesIndexName(ds.getDatasetName()));
IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -518,8 +514,7 @@
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -546,11 +541,9 @@
DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
- boolean temp = ds.getDatasetDetails().isTemp();
-
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds,
+ IndexingConstants.getFilesIndexName(ds.getDatasetName()));
IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -565,8 +558,7 @@
for (Index index : indexes) {
if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
treeDataflowHelperFactories.add(indexDataflowHelperFactory);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index c6e0a6b..701d0d6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -101,10 +101,8 @@
throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification();
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
- boolean temp = dataset.getDatasetDetails().isTemp();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
ARecordType recordType =
@@ -153,11 +151,9 @@
public static JobSpecification buildDropSecondaryIndexJobSpec(Index index, MetadataProvider metadataProvider,
Dataset dataset) throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification();
- boolean temp = dataset.getDatasetDetails().isTemp();
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
- index.getIndexName(), temp);
+ metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
ARecordType recordType =
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index f7e569c..d731603 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -190,8 +190,7 @@
metaSerde =
metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
- index.getIndexName(), dataset.getDatasetDetails().isTemp());
+ metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
@@ -203,8 +202,7 @@
numFilterFields = 0;
}
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName(), dataset.getDatasetDetails().isTemp());
+ metadataProvider.getSplitProviderAndConstraints(dataset);
primaryFileSplitProvider = primarySplitsAndConstraint.first;
primaryPartitionConstraint = primarySplitsAndConstraint.second;
setPrimaryRecDescAndComparators();
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 2fae304..190a3b2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -71,6 +71,7 @@
public class ARecordPointable extends AbstractPointable {
private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+ public static final ARecordPointableFactory FACTORY = new ARecordPointableFactory();
public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
private static final long serialVersionUID = 1L;
@@ -86,11 +87,15 @@
}
};
- public static final IPointableFactory FACTORY = new IPointableFactory() {
+ public static class ARecordPointableFactory implements IPointableFactory {
+
private static final long serialVersionUID = 1L;
+ private ARecordPointableFactory() {
+ }
+
@Override
- public IPointable createPointable() {
+ public ARecordPointable createPointable() {
return new ARecordPointable();
}
@@ -98,7 +103,8 @@
public ITypeTraits getTypeTraits() {
return TYPE_TRAITS;
}
- };
+
+ }
public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new IObjectFactory<IPointable, ATypeTag>() {
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index dd7335a..042837b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -22,7 +22,6 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -65,7 +64,7 @@
((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 2fedcca..8739948 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -20,7 +20,6 @@
import java.util.Set;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -57,7 +56,7 @@
}
broker.sendApplicationMessageToNC(reponse, src);
} catch (Exception e) {
- throw ExceptionUtils.convertToHyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 6869523..037945a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -56,6 +56,9 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -83,13 +86,17 @@
private final int filterFieldIndex;
private final int metaFieldIndex;
private LockThenSearchOperationCallback searchCallback;
+ private IFrameOperationCallback frameOpCallback;
+ private final IFrameOperationCallbackFactory frameOpCallbackFactory;
- public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
- ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+ public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+ int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
+ ARecordType recordType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory)
+ throws HyracksDataException {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
this.key = new PermutingFrameTupleReference();
this.numOfPrimaryKeys = numOfPrimaryKeys;
+ this.frameOpCallbackFactory = frameOpCallbackFactory;
missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
int[] searchKeyPermutations = new int[numOfPrimaryKeys];
for (int i = 0; i < searchKeyPermutations.length; i++) {
@@ -104,7 +111,7 @@
isFiltered = true;
this.recordType = recordType;
this.presetFieldIndex = filterFieldIndex;
- this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
+ this.recPointable = ARecordPointable.FACTORY.createPointable();
this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
}
@@ -140,17 +147,19 @@
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResource(), ctx, this);
+ modCallback = opDesc.getModificationOpCallbackFactory()
+ .createModificationOperationCallback(indexHelper.getResource(), ctx, this);
searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
indexAccessor = index.createAccessor(modCallback, searchCallback);
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ IAppRuntimeContext runtimeCtx =
+ (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
runtimeCtx.getTransactionSubsystem().getLogManager());
+ frameOpCallback =
+ frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
} catch (Exception e) {
indexHelper.close();
throw new HyracksDataException(e);
@@ -188,7 +197,6 @@
tb.addFieldEndOffset();
}
- //TODO: use tryDelete/tryInsert in order to prevent deadlocks
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
@@ -221,8 +229,7 @@
}
// if with filters, append the filter
if (isFiltered) {
- dos.write(prevTuple.getFieldData(filterFieldIndex),
- prevTuple.getFieldStart(filterFieldIndex),
+ dos.write(prevTuple.getFieldData(filterFieldIndex), prevTuple.getFieldStart(filterFieldIndex),
prevTuple.getFieldLength(filterFieldIndex));
tb.addFieldEndOffset();
}
@@ -258,6 +265,8 @@
writeOutput(i, recordWasInserted, prevTuple != null);
i++;
}
+ // callback here before calling nextFrame on the next operator
+ frameOpCallback.frameCompleted(!firstModification);
appender.write(writer, true);
} catch (IndexException | IOException | AsterixException e) {
throw new HyracksDataException(e);
@@ -318,6 +327,6 @@
@Override
public void flush() throws HyracksDataException {
- writer.flush();
+ // No op since nextFrame flushes by default
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index fe69a04..b37ecae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -37,12 +37,14 @@
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.common.IStorageManager;
public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
private static final long serialVersionUID = 1L;
private final int[] prevValuePermutation;
+ private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private ARecordType type;
private int filterIndex = -1;
@@ -54,12 +56,13 @@
boolean isPrimary, String indexName, IMissingWriterFactory missingWriterFactory,
IModificationOperationCallbackFactory modificationOpCallbackProvider,
ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation,
- IPageManagerFactory pageManagerFactory) {
+ IPageManagerFactory pageManagerFactory, IFrameOperationCallbackFactory frameOpCallbackFactory) {
super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, missingWriterFactory,
modificationOpCallbackProvider, searchOpCallbackProvider, pageManagerFactory);
this.prevValuePermutation = prevValuePermutation;
+ this.frameOpCallbackFactory = frameOpCallbackFactory;
}
@Override
@@ -67,7 +70,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return isPrimary()
? new LSMPrimaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
- recordDescProvider, comparatorFactories.length, type, filterIndex)
+ recordDescProvider, comparatorFactories.length, type, filterIndex, frameOpCallbackFactory)
: new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider, prevValuePermutation);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 33078ff..90f6bbf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -32,7 +32,7 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +46,7 @@
public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
- private final static long SEED = 0L;
+ protected static final long SEED = 0L;
protected final ITransactionManager transactionManager;
protected final ILogManager logMgr;
@@ -85,8 +85,7 @@
try {
transactionContext = transactionManager.getTransactionContext(jobId, false);
transactionContext.setWriteTxn(isWriteTransaction);
- ILogMarkerCallback callback =
- TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+ ILogMarkerCallback callback = TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
logRecord = new LogRecord(callback);
if (isSink) {
return;
@@ -112,6 +111,8 @@
* active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
* flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
*/
+ // TODO: Fix this for upserts. an upsert tuple right now expect to notify the opTracker twice (one for
+ // delete and one for insert)
transactionContext.notifyOptracker(false);
} else {
tRef.reset(tAccess, t);
@@ -126,7 +127,7 @@
}
}
}
- VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
+ IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
if (message != null
&& MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
try {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..cfe2a25 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -29,14 +29,14 @@
private static final long serialVersionUID = 1L;
- private final JobId jobId;
- private final int datasetId;
- private final int[] primaryKeyFields;
- private final boolean isTemporaryDatasetWriteJob;
- private final boolean isWriteTransaction;
- private final int upsertVarIdx;
- private int[] datasetPartitions;
- private final boolean isSink;
+ protected final JobId jobId;
+ protected final int datasetId;
+ protected final int[] primaryKeyFields;
+ protected final boolean isTemporaryDatasetWriteJob;
+ protected final boolean isWriteTransaction;
+ protected final int upsertVarIdx;
+ protected int[] datasetPartitions;
+ protected final boolean isSink;
public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
index 06538af..80dd19b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
@@ -43,6 +43,7 @@
buffer = ctx.allocateFrame(frameSize);
}
+ @Override
public ByteBuffer getBuffer() {
return buffer;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 6c581f0..4c0eb1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -28,8 +28,10 @@
*/
public class HyracksDataException extends HyracksException {
+ private static final long serialVersionUID = 1L;
+
public static HyracksDataException create(Throwable cause) {
- if (cause instanceof HyracksDataException) {
+ if (cause instanceof HyracksDataException || cause == null) {
return (HyracksDataException) cause;
}
return new HyracksDataException(cause);
@@ -48,6 +50,14 @@
.getParams());
}
+ public static HyracksDataException suppress(HyracksDataException root, Throwable th) {
+ if (root == null) {
+ return HyracksDataException.create(th);
+ }
+ root.addSuppressed(th);
+ return root;
+ }
+
public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
Serializable... params) {
super(component, errorCode, message, cause, nodeId, params);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 21b9dcf..77404b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -463,4 +463,8 @@
return CCApplicationEntryPoint.INSTANCE;
}
}
+
+ public ICCApplicationEntryPoint getApplication() {
+ return aep;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 05417a8..77f18ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,6 +25,19 @@
protected int length;
+ /**
+ * copies the content of this pointable to the passed byte array.
+ * the array is expected to be at least of length = length of this pointable
+ *
+ * @param copy
+ * the array to write into
+ * @throws ArrayIndexOutOfBoundsException
+ * if the passed array size is smaller than length
+ */
+ public void copyInto(byte[] copy) {
+ System.arraycopy(bytes, start, copy, 0, length);
+ }
+
@Override
public void set(byte[] bytes, int start, int length) {
this.bytes = bytes;
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
index 74ced4f..2e8071c 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
@@ -18,8 +18,27 @@
*/
package org.apache.hyracks.data.std.api;
+/**
+ * Point to range over byte array
+ */
public interface IPointable extends IValueReference {
- public void set(byte[] bytes, int start, int length);
+ /**
+ * Point to the range from position = start with length = length over the byte array bytes
+ *
+ * @param bytes
+ * the byte array
+ * @param start
+ * the start offset
+ * @param length
+ * the length of the range
+ */
+ void set(byte[] bytes, int start, int length);
- public void set(IValueReference pointer);
+ /**
+ * Point to the same range pointed to by the passed pointer
+ *
+ * @param pointer
+ * the pointer to the targetted range
+ */
+ void set(IValueReference pointer);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
index ee00163..51c155e 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
@@ -20,10 +20,10 @@
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.data.std.api.AbstractPointable;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IPointableFactory;
public final class VoidPointable extends AbstractPointable {
+ public static final VoidPointableFactory FACTORY = new VoidPointableFactory();
public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
private static final long serialVersionUID = 1L;
@@ -38,11 +38,14 @@
}
};
- public static final IPointableFactory FACTORY = new IPointableFactory() {
+ public static class VoidPointableFactory implements IPointableFactory {
private static final long serialVersionUID = 1L;
+ private VoidPointableFactory() {
+ }
+
@Override
- public IPointable createPointable() {
+ public VoidPointable createPointable() {
return new VoidPointable();
}
@@ -50,5 +53,5 @@
public ITypeTraits getTypeTraits() {
return TYPE_TRAITS;
}
- };
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 57f8072..efdd963 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -108,4 +108,11 @@
return false;
}
+ @Override
+ public void flush(IFrameWriter writer) throws HyracksDataException {
+ if (tupleCount > 0) {
+ write(writer, true);
+ }
+ writer.flush();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 8f005d8..77020f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -22,8 +22,8 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
@@ -36,24 +36,24 @@
* This appender must only be used on network boundary
*/
public class MessagingFrameTupleAppender extends FrameTupleAppender {
-
- private final IHyracksTaskContext ctx;
- private static final int NULL_MESSAGE_SIZE = 1;
+ public static final int NULL_MESSAGE_SIZE = 1;
public static final byte NULL_FEED_MESSAGE = 0x01;
public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
public static final byte MARKER_MESSAGE = 0x03;
+
+ private final IHyracksTaskContext ctx;
private boolean initialized = false;
- private VSizeFrame message;
+ private IFrame message;
public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
this.ctx = ctx;
}
- public static void printMessage(VSizeFrame message, PrintStream out) throws HyracksDataException {
+ public static void printMessage(IFrame message, PrintStream out) throws HyracksDataException {
out.println(getMessageString(message));
}
- public static String getMessageString(VSizeFrame message) throws HyracksDataException {
+ public static String getMessageString(IFrame message) throws HyracksDataException {
StringBuilder aString = new StringBuilder();
aString.append("Message Type: ");
switch (getMessageType(message)) {
@@ -76,7 +76,7 @@
return aString.toString();
}
- public static byte getMessageType(VSizeFrame message) throws HyracksDataException {
+ public static byte getMessageType(IFrame message) throws HyracksDataException {
switch (message.getBuffer().array()[0]) {
case NULL_FEED_MESSAGE:
return NULL_FEED_MESSAGE;
@@ -105,15 +105,13 @@
@Override
public int getTupleCount() {
- // if message is set, there is always a message. that message could be a null message (TODO: optimize)
- return tupleCount + ((message == null) ? 0 : 1);
+ return tupleCount + 1;
}
@Override
public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
if (!initialized) {
- message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
- initialized = true;
+ init();
}
// If message fits, we append it, otherwise, we append a null message, then send a message only
// frame with the message
@@ -125,7 +123,7 @@
} else {
ByteBuffer buffer = message.getBuffer();
int messageSize = buffer.limit() - buffer.position();
- if (hasEnoughSpace(1, messageSize)) {
+ if (hasEnoughSpace(0, messageSize)) {
appendMessage(buffer);
forward(outWriter);
} else {
@@ -133,7 +131,7 @@
appendNullMessage();
forward(outWriter);
}
- if (!hasEnoughSpace(1, messageSize)) {
+ if (!hasEnoughSpace(0, messageSize)) {
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
reset(frame.getBuffer(), true);
}
@@ -143,6 +141,11 @@
}
}
+ private void init() {
+ message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
+ initialized = true;
+ }
+
private void forward(IFrameWriter outWriter) throws HyracksDataException {
getBuffer().clear();
outWriter.nextFrame(getBuffer());
@@ -168,4 +171,13 @@
++tupleCount;
IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
}
+
+ /*
+ * Always write and then flush to send out the message if exists
+ */
+ @Override
+ public void flush(IFrameWriter writer) throws HyracksDataException {
+ write(writer, true);
+ writer.flush();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 6d87d89..dbd3afa 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -41,25 +41,30 @@
private final ITuplePartitionComputer tpc;
private final IHyracksTaskContext ctx;
private boolean[] allocatedFrames;
+ private boolean failed = false;
public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
+ this.ctx = ctx;
+ this.tpc = tpc;
this.consumerPartitionCount = consumerPartitionCount;
pWriters = new IFrameWriter[consumerPartitionCount];
isOpen = new boolean[consumerPartitionCount];
allocatedFrames = new boolean[consumerPartitionCount];
appenders = new FrameTupleAppender[consumerPartitionCount];
+ tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+ initializeAppenders(pwFactory);
+ }
+
+ protected void initializeAppenders(IPartitionWriterFactory pwFactory) throws HyracksDataException {
for (int i = 0; i < consumerPartitionCount; ++i) {
try {
pWriters[i] = pwFactory.createFrameWriter(i);
appenders[i] = createTupleAppender(ctx);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
- tupleAccessor = new FrameTupleAccessor(recordDescriptor);
- this.tpc = tpc;
- this.ctx = ctx;
}
protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
@@ -71,25 +76,17 @@
HyracksDataException closeException = null;
for (int i = 0; i < pWriters.length; ++i) {
if (isOpen[i]) {
- if (allocatedFrames[i] && appenders[i].getTupleCount() > 0) {
+ if (allocatedFrames[i] && appenders[i].getTupleCount() > 0 && !failed) {
try {
appenders[i].write(pWriters[i], true);
} catch (Throwable th) {
- if (closeException == null) {
- closeException = new HyracksDataException(th);
- } else {
- closeException.addSuppressed(th);
- }
+ closeException = HyracksDataException.suppress(closeException, th);
}
}
try {
pWriters[i].close();
} catch (Throwable th) {
- if (closeException == null) {
- closeException = new HyracksDataException(th);
- } else {
- closeException.addSuppressed(th);
- }
+ closeException = HyracksDataException.suppress(closeException, th);
}
}
}
@@ -126,17 +123,14 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
HyracksDataException failException = null;
for (int i = 0; i < appenders.length; ++i) {
if (isOpen[i]) {
try {
pWriters[i].fail();
} catch (Throwable th) {
- if (failException == null) {
- failException = new HyracksDataException(th);
- } else {
- failException.addSuppressed(th);
- }
+ failException = HyracksDataException.suppress(failException, th);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
index a985b4d..b89922e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
@@ -27,7 +27,7 @@
@FunctionalInterface
public interface IModificationOperationCallbackFactory extends Serializable {
- public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
+ IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
index 627994c..a19e69a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
@@ -23,10 +23,6 @@
public class MutableArrayValueReference implements IValueReference {
private byte[] array;
- public MutableArrayValueReference() {
- //mutable array. user doesn't need to specify the array in advance
- }
-
public MutableArrayValueReference(byte[] array) {
this.array = array;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
new file mode 100644
index 0000000..de72690
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface that is used to enable frame level operation on indexes
+ */
+@FunctionalInterface
+public interface IFrameOperationCallback {
+ /**
+ * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
+ * the pipeline
+ *
+ * @param modified
+ * true if the index was modified during the processing of the frame, false otherwise
+ * @throws HyracksDataException
+ */
+ void frameCompleted(boolean modified) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
new file mode 100644
index 0000000..8031d32
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A factory for {@link IFrameOperationCallback}
+ */
+@FunctionalInterface
+public interface IFrameOperationCallbackFactory extends Serializable {
+ /**
+ * Create a {@link IFrameOperationCallback} for an index operator
+ *
+ * @param ctx
+ * the task context
+ * @param indexAccessor
+ * the accessor for the index
+ * @return an instance of {@link IFrameOperationCallback}
+ */
+ IFrameOperationCallback createFrameOperationCallback(IHyracksTaskContext ctx, ILSMIndexAccessor indexAccessor);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 7a2bc7c..f21c8a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -28,34 +29,174 @@
public interface ILSMHarness {
+ /**
+ * Force modification even if memory component is full
+ *
+ * @param ctx
+ * the operation context
+ * @param tuple
+ * the operation tuple
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException, IndexException;
+ /**
+ * Modify the index if the memory component is not full, wait for a new memory component if the current one is full
+ *
+ * @param ctx
+ * the operation context
+ * @param tryOperation
+ * true if IO operation
+ * @param tuple
+ * the operation tuple
+ * @return
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException, IndexException;
+ /**
+ * Search the index
+ *
+ * @param ctx
+ * the search operation context
+ * @param cursor
+ * the index cursor
+ * @param pred
+ * the search predicate
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
throws HyracksDataException, IndexException;
+ /**
+ * End the search
+ *
+ * @param ctx
+ * @throws HyracksDataException
+ */
void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException;
+ /**
+ * Schedule a merge
+ *
+ * @param ctx
+ * @param callback
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
+ /**
+ * Schedule full merge
+ *
+ * @param ctx
+ * @param callback
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
+ /**
+ * Perform a merge operation
+ *
+ * @param ctx
+ * @param operation
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException, IndexException;
+ /**
+ * Schedule a flush
+ *
+ * @param ctx
+ * @param callback
+ * @throws HyracksDataException
+ */
void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException;
+ /**
+ * Perform a flush
+ *
+ * @param ctx
+ * @param operation
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException, IndexException;
+ /**
+ * Add bulk loaded component
+ *
+ * @param index
+ * the new component
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void addBulkLoadedComponent(ILSMDiskComponent index) throws HyracksDataException, IndexException;
+ /**
+ * Get index operation tracker
+ */
ILSMOperationTracker getOperationTracker();
+ /**
+ * Schedule replication
+ *
+ * @param ctx
+ * the operation context
+ * @param diskComponents
+ * the disk component to be replicated
+ * @param bulkload
+ * true if the components were bulk loaded, false otherwise
+ * @param opType
+ * The operation type
+ * @throws HyracksDataException
+ */
void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents, boolean bulkload,
LSMOperationType opType) throws HyracksDataException;
+ /**
+ * End a replication operation
+ *
+ * @param ctx
+ * the operation context
+ * @throws HyracksDataException
+ */
void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
+
+ /**
+ * Update the metadata of the memory component of the index. Waiting for a new memory component if
+ * the current memory component is full
+ *
+ * @param ctx
+ * the operation context
+ * @param key
+ * the meta key
+ * @param value
+ * the meta value
+ * @throws HyracksDataException
+ */
+ void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+ throws HyracksDataException;
+
+ /**
+ * Force updating the metadata of the memory component of the index even if memory component is full
+ *
+ * @param ctx
+ * the operation context
+ * @param key
+ * the meta key
+ * @param value
+ * the meta value
+ * @throws HyracksDataException
+ */
+ void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+ throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index fecc674..90c70aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
import org.apache.hyracks.storage.am.common.api.IndexException;
@@ -34,16 +35,43 @@
* concurrent operations).
*/
public interface ILSMIndexAccessor extends IIndexAccessor {
+ /**
+ * Schedule a flush operation
+ *
+ * @param callback
+ * the IO operation callback
+ * @throws HyracksDataException
+ */
void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
+ /**
+ * Schedule a merge operation
+ *
+ * @param callback
+ * the merge operation callback
+ * @param components
+ * the components to be merged
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components)
throws HyracksDataException, IndexException;
+ /**
+ * Schedule a full merge
+ *
+ * @param callback
+ * the merge operation callback
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
/**
- * Deletes the tuple from the memory component only.
+ * Delete the tuple from the memory component only. Don't replace with antimatter tuple
*
+ * @param tuple
+ * the tuple to be deleted
* @throws HyracksDataException
* @throws IndexException
*/
@@ -113,12 +141,49 @@
*/
boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+ /**
+ * Delete the tuple from the memory component only. Don't replace with antimatter tuple
+ * Perform operation even if the memory component is full
+ *
+ * @param tuple
+ * the tuple to delete
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+ /**
+ * Insert a new tuple (failing if duplicate key entry is found)
+ *
+ * @param tuple
+ * the tuple to insert
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+ /**
+ * Force deleting an index entry even if the memory component is full
+ * replace the entry if found with an antimatter tuple, otherwise, simply insert the antimatter tuple
+ *
+ * @param tuple
+ * tuple to delete
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+ /**
+ * Schedule a replication for disk components
+ *
+ * @param diskComponents
+ * the components to be replicated
+ * @param bulkload
+ * true if the components were bulkloaded, false otherwise
+ * @param opType
+ * the operation type
+ * @throws HyracksDataException
+ */
void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType)
throws HyracksDataException;
@@ -137,4 +202,24 @@
* @throws TreeIndexException
*/
void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+
+ /**
+ * Update the metadata of the memory component, wait for the new component if the current one is UNWRITABLE
+ *
+ * @param key
+ * the key
+ * @param value
+ * the value
+ * @throws HyracksDataException
+ */
+ void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException;
+
+ /**
+ * Force update the metadata of the current memory component even if it is UNWRITABLE
+ *
+ * @param key
+ * @param value
+ * @throws HyracksDataException
+ */
+ void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 6bf9312..01e85d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -28,16 +28,17 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
import org.apache.hyracks.storage.am.common.api.IndexException;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -360,6 +361,44 @@
return modify(ctx, tryOperation, tuple, opType);
}
+ @Override
+ public void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+ throws HyracksDataException {
+ if (!lsmIndex.isMemoryComponentsAllocated()) {
+ lsmIndex.allocateMemoryComponents();
+ }
+ getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+ try {
+ lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value);
+ } finally {
+ exitAndComplete(ctx, LSMOperationType.MODIFICATION);
+ }
+ }
+
+ private void exitAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
+ try {
+ exitComponents(ctx, op, null, false);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ } finally {
+ opTracker.completeOperation(null, op, null, ctx.getModificationCallback());
+ }
+ }
+
+ @Override
+ public void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+ throws HyracksDataException {
+ if (!lsmIndex.isMemoryComponentsAllocated()) {
+ lsmIndex.allocateMemoryComponents();
+ }
+ getAndEnterComponents(ctx, LSMOperationType.FORCE_MODIFICATION, false);
+ try {
+ lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value);
+ } finally {
+ exitAndComplete(ctx, LSMOperationType.FORCE_MODIFICATION);
+ }
+ }
+
private boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple,
LSMOperationType opType) throws HyracksDataException, IndexException {
if (!lsmIndex.isMemoryComponentsAllocated()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 4199cfb..0fa69ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -164,4 +165,18 @@
ctx.setOperation(IndexOperation.DELETE);
lsmHarness.forceModify(ctx, tuple);
}
+
+ @Override
+ public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+ // a hack because delete only gets the memory component
+ ctx.setOperation(IndexOperation.DELETE);
+ lsmHarness.updateMeta(ctx,key,value);
+ }
+
+ @Override
+ public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+ // a hack because delete only gets the memory component
+ ctx.setOperation(IndexOperation.DELETE);
+ lsmHarness.forceUpdateMeta(ctx, key, value);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index cef4257..0a6ffd7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -185,4 +186,18 @@
throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index.");
}
+ @Override
+ public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+ // a hack because delete only gets the memory component
+ ctx.setOperation(IndexOperation.DELETE);
+ lsmHarness.updateMeta(ctx, key, value);
+ }
+
+ @Override
+ public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+ // a hack because delete only gets the memory component
+ ctx.setOperation(IndexOperation.DELETE);
+ lsmHarness.forceUpdateMeta(ctx, key, value);
+ }
+
}