changes to clean up(compensate) pending DDL operations from the metadata
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1134 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
index 100c837..201a593 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
@@ -78,6 +78,9 @@
recoveryMgr.checkpoint();
if (isMetadataNode) {
+ //#. clean-up incomplete DDL operations, which is DDLRecovery
+ MetadataBootstrap.startDDLRecovery();
+
// Start a sub-component for the API server. This server is only connected to by the
// API server that lives on the CC and never by a client wishing to execute AQL.
// TODO: The API sub-system will change dramatically in the future and this code will go away,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 3989a0f..63ed290 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -157,6 +157,15 @@
}
ctx.dropDataverse(dataverseName);
}
+
+ @Override
+ public List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws MetadataException {
+ try {
+ return metadataNode.getDataverses(ctx.getJobId());
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ }
@Override
public Dataverse getDataverse(MetadataTransactionContext ctx, String dataverseName) throws MetadataException {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index ad1bcc8..a6be4fa 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -580,6 +580,23 @@
}
@Override
+ public List<Dataverse> getDataverses(JobId jobId) throws MetadataException, RemoteException {
+ try {
+ DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false);
+ IValueExtractor<Dataverse> valueExtractor = new MetadataEntityValueExtractor<Dataverse>(tupleReaderWriter);
+ List<Dataverse> results = new ArrayList<Dataverse>();
+ searchIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, null, valueExtractor, results);
+ if (results.isEmpty()) {
+ return null;
+ }
+ return results;
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+
+ }
+
+ @Override
public Dataverse getDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
@@ -847,12 +864,18 @@
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
- IBinaryComparator[] searchCmps = new IBinaryComparator[searchKey.getFieldCount()];
- for (int i = 0; i < searchKey.getFieldCount(); i++) {
- searchCmps[i] = comparatorFactories[i].createBinaryComparator();
+
+ IBinaryComparator[] searchCmps = null;
+ MultiComparator searchCmp = null;
+ RangePredicate rangePred = null;
+ if (searchKey != null) {
+ searchCmps = new IBinaryComparator[searchKey.getFieldCount()];
+ for (int i = 0; i < searchKey.getFieldCount(); i++) {
+ searchCmps[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ searchCmp = new MultiComparator(searchCmps);
}
- MultiComparator searchCmp = new MultiComparator(searchCmps);
- RangePredicate rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
+ rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
indexAccessor.search(rangeCursor, rangePred);
try {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index ad08f18..fdf2d60 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -120,6 +120,16 @@
public void addDataverse(MetadataTransactionContext ctx, Dataverse dataverse) throws MetadataException;
/**
+ * Retrieves all dataverses
+ *
+ * @param ctx
+ * MetadataTransactionContext of an active metadata transaction.
+ * @return A list of dataverse instances.
+ * @throws MetadataException
+ */
+ List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws MetadataException;
+
+ /**
* Retrieves a dataverse with given name.
*
* @param ctx
@@ -169,7 +179,7 @@
* For example, if the dataset already exists.
*/
public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException;
-
+
/**
* Retrieves a dataset within a given dataverse.
*
@@ -441,4 +451,6 @@
public void acquireReadLatch();
public void releaseReadLatch();
+
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index fc9fe18..836d42f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -100,6 +100,19 @@
public void addDataverse(JobId jobId, Dataverse dataverse) throws MetadataException, RemoteException;
/**
+ * Retrieves all dataverses, acquiring local locks on behalf of
+ * the given transaction id.
+ *
+ * @param jobId
+ * A globally unique id for an active metadata transaction.
+ * @return A list of dataverse instances.
+ * @throws MetadataException
+ * For example, if the dataverse does not exist.
+ * @throws RemoteException
+ */
+ public List<Dataverse> getDataverses(JobId jobId) throws MetadataException, RemoteException;
+
+ /**
* Retrieves a dataverse with given name, acquiring local locks on behalf of
* the given transaction id.
*
@@ -460,4 +473,5 @@
public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException;
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException;
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 35dae4d..b038e2b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -16,6 +16,7 @@
package edu.uci.ics.asterix.metadata.bootstrap;
import java.io.File;
+import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -31,6 +32,7 @@
import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
+import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
@@ -64,7 +66,6 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
@@ -364,4 +365,51 @@
return metadataNodeName;
}
+ public static void startDDLRecovery() throws RemoteException, ACIDException, MetadataException {
+ //#. clean up any record which has pendingAdd/DelOp flag
+ // as traversing all records from DATAVERSE_DATASET to DATASET_DATASET, and then to INDEX_DATASET.
+ String dataverseName = null;
+ String datasetName = null;
+ String indexName = null;
+ MetadataTransactionContext mdTxnCtx = null;
+
+ MetadataManager.INSTANCE.acquireWriteLatch();
+
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+ List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+ for (Dataverse dataverse : dataverses) {
+ dataverseName = dataverse.getDataverseName();
+ if (dataverse.getPendingOp() != IMetadataEntity.PENDING_NO_OP) {
+ //drop pending dataverse
+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+ } else {
+ List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
+ for (Dataset dataset : datasets) {
+ datasetName = dataset.getDatasetName();
+ if (dataset.getPendingOp() != IMetadataEntity.PENDING_NO_OP) {
+ //drop pending dataset
+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+ } else {
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+ datasetName);
+ for (Index index : indexes) {
+ indexName = index.getIndexName();
+ if (index.getPendingOp() != IMetadataEntity.PENDING_NO_OP) {
+ //drop pending index
+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw new MetadataException(e);
+ } finally {
+ MetadataManager.INSTANCE.releaseWriteLatch();
+ }
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
index f9eaf5a..d23cdd0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
@@ -67,7 +67,7 @@
ARecord dataverseRecord = recordSerDes.deserialize(in);
return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
- ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
+ ((AInt32) dataverseRecord.getValueByPos(3)).getIntegerValue());
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
index 4d1b9a5..f4bb73a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
@@ -5,16 +5,22 @@
public class CheckpointObject implements Serializable, Comparable<CheckpointObject> {
private static final long serialVersionUID = 1L;
-
+
+ private final long checkpointLSN;
private final long minMCTFirstLSN;
private final int maxJobId;
private final long timeStamp;
- public CheckpointObject(long minMCTFirstLSN, int maxJobId, long timeStamp) {
+ public CheckpointObject(long checkpointLSN, long minMCTFirstLSN, int maxJobId, long timeStamp) {
+ this.checkpointLSN = checkpointLSN;
this.minMCTFirstLSN = minMCTFirstLSN;
this.maxJobId = maxJobId;
this.timeStamp = timeStamp;
}
+
+ public long getCheckpointLSN() {
+ return checkpointLSN;
+ }
public long getMinMCTFirstLSN() {
return minMCTFirstLSN;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 8bb1acf..b16b2e3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -53,6 +53,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
@@ -107,9 +108,12 @@
new ACIDException("Checkpoint file doesn't exist", e);
}
- //#. if minMCTFirstLSN is equal to -1,
+ //#. if minMCTFirstLSN is equal to -1 &&
+ // checkpointLSN in the checkpoint file is equal to the lastLSN in the log file,
// then return healthy state. Otherwise, return corrupted.
- if (checkpointObject.getMinMCTFirstLSN() == -1) {
+ LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
+ if (checkpointObject.getMinMCTFirstLSN() == -1
+ && checkpointObject.getCheckpointLSN() == logMgr.getCurrentLsn().get()) {
state = SystemState.HEALTHY;
return state;
} else {
@@ -129,7 +133,8 @@
ILogManager logManager = txnSubsystem.getLogManager();
ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
ITransactionManager txnManager = txnSubsystem.getTransactionManager();
- TransactionalResourceManagerRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+ TransactionalResourceManagerRepository txnResourceRepository = txnSubsystem
+ .getTransactionalResourceRepository();
//winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit LSN of the TxnId>
Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
@@ -244,6 +249,25 @@
index = indexLifecycleManager.getIndex(resourceId);
if (index == null) {
localResource = localResourceRepository.getResourceById(resourceId);
+
+ /*******************************************************************
+ * [Notice]
+ * -> Issue
+ * Delete index may cause a problem during redo.
+ * The index operation to be redone couldn't be redone because the corresponding index
+ * may not exist in NC due to the possible index drop DDL operation.
+ * -> Approach
+ * Avoid the problem during redo.
+ * More specifically, the problem will be detected when the localResource of
+ * the corresponding index is retrieved, which will end up with 'null' return from
+ * localResourceRepository. If null is returned, then just go and process the next
+ * log record.
+ *******************************************************************/
+ if (localResource == null) {
+ continue;
+ }
+ /*******************************************************************/
+
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
localResource.getResourceName(), localResource.getPartition());
@@ -283,6 +307,8 @@
throw new ACIDException("Unsupported LogType: " + logType);
}
}
+
+ JobIdFactory.initJobId(maxJobId);
}
@Override
@@ -300,8 +326,8 @@
//TODO
//put the correct minMCTFirstLSN by getting from Zach. :)
long minMCTFirstLSM = -1;
- CheckpointObject checkpointObject = new CheckpointObject(minMCTFirstLSM, txnMgr.getMaxJobId(),
- System.currentTimeMillis());
+ CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
+ txnMgr.getMaxJobId(), System.currentTimeMillis());
FileOutputStream fos = null;
ObjectOutputStream oosToFos = null;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java
index da86199..cac4f8e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java
@@ -25,4 +25,8 @@
public static JobId generateJobId() {
return new JobId(Id.incrementAndGet());
}
+
+ public static void initJobId(int id) {
+ Id.set(id);
+ }
}
\ No newline at end of file