changes to clean up(compensate) pending DDL operations from the metadata

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1134 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
index 100c837..201a593 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
@@ -78,6 +78,9 @@
         recoveryMgr.checkpoint();
         
         if (isMetadataNode) {
+            //#. clean-up incomplete DDL operations, which is DDLRecovery
+            MetadataBootstrap.startDDLRecovery();
+            
             // Start a sub-component for the API server. This server is only connected to by the 
             // API server that lives on the CC and never by a client wishing to execute AQL.
             // TODO: The API sub-system will change dramatically in the future and this code will go away, 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 3989a0f..63ed290 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -157,6 +157,15 @@
         }
         ctx.dropDataverse(dataverseName);
     }
+    
+    @Override
+    public List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws MetadataException {
+        try {
+            return metadataNode.getDataverses(ctx.getJobId());
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+    }
 
     @Override
     public Dataverse getDataverse(MetadataTransactionContext ctx, String dataverseName) throws MetadataException {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index ad1bcc8..a6be4fa 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -580,6 +580,23 @@
     }
 
     @Override
+    public List<Dataverse> getDataverses(JobId jobId) throws MetadataException, RemoteException {
+        try {
+            DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false);
+            IValueExtractor<Dataverse> valueExtractor = new MetadataEntityValueExtractor<Dataverse>(tupleReaderWriter);
+            List<Dataverse> results = new ArrayList<Dataverse>();
+            searchIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, null, valueExtractor, results);
+            if (results.isEmpty()) {
+                return null;
+            }
+            return results;
+        } catch (Exception e) {
+            throw new MetadataException(e);
+        }
+
+    }
+
+    @Override
     public Dataverse getDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName);
@@ -847,12 +864,18 @@
         IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
         ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
-        IBinaryComparator[] searchCmps = new IBinaryComparator[searchKey.getFieldCount()];
-        for (int i = 0; i < searchKey.getFieldCount(); i++) {
-            searchCmps[i] = comparatorFactories[i].createBinaryComparator();
+
+        IBinaryComparator[] searchCmps = null;
+        MultiComparator searchCmp = null;
+        RangePredicate rangePred = null;
+        if (searchKey != null) {
+            searchCmps = new IBinaryComparator[searchKey.getFieldCount()];
+            for (int i = 0; i < searchKey.getFieldCount(); i++) {
+                searchCmps[i] = comparatorFactories[i].createBinaryComparator();
+            }
+            searchCmp = new MultiComparator(searchCmps);
         }
-        MultiComparator searchCmp = new MultiComparator(searchCmps);
-        RangePredicate rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
+        rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
         indexAccessor.search(rangeCursor, rangePred);
 
         try {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index ad08f18..fdf2d60 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -120,6 +120,16 @@
     public void addDataverse(MetadataTransactionContext ctx, Dataverse dataverse) throws MetadataException;
 
     /**
+     * Retrieves all dataverses
+     * 
+     * @param ctx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @return A list of dataverse instances.
+     * @throws MetadataException
+     */
+    List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws MetadataException;
+    
+    /**
      * Retrieves a dataverse with given name.
      * 
      * @param ctx
@@ -169,7 +179,7 @@
      *             For example, if the dataset already exists.
      */
     public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException;
-
+    
     /**
      * Retrieves a dataset within a given dataverse.
      * 
@@ -441,4 +451,6 @@
     public void acquireReadLatch();
 
     public void releaseReadLatch();
+
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index fc9fe18..836d42f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -100,6 +100,19 @@
     public void addDataverse(JobId jobId, Dataverse dataverse) throws MetadataException, RemoteException;
 
     /**
+     * Retrieves all dataverses, acquiring local locks on behalf of
+     * the given transaction id.
+     * 
+     * @param jobId
+     *            A globally unique id for an active metadata transaction.
+     * @return A list of dataverse instances.
+     * @throws MetadataException
+     *             For example, if the dataverse does not exist.
+     * @throws RemoteException
+     */
+    public List<Dataverse> getDataverses(JobId jobId) throws MetadataException, RemoteException;
+
+    /**
      * Retrieves a dataverse with given name, acquiring local locks on behalf of
      * the given transaction id.
      * 
@@ -460,4 +473,5 @@
     public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException;
 
     public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException;
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 35dae4d..b038e2b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.asterix.metadata.bootstrap;
 
 import java.io.File;
+import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -31,6 +32,7 @@
 import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
 import edu.uci.ics.asterix.metadata.IDatasetDetails;
+import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
@@ -64,7 +66,6 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
@@ -364,4 +365,51 @@
         return metadataNodeName;
     }
 
+    public static void startDDLRecovery() throws RemoteException, ACIDException, MetadataException {
+        //#. clean up any record which has pendingAdd/DelOp flag 
+        //   as traversing all records from DATAVERSE_DATASET to DATASET_DATASET, and then to INDEX_DATASET.
+        String dataverseName = null;
+        String datasetName = null;
+        String indexName = null;
+        MetadataTransactionContext mdTxnCtx = null;
+        
+        MetadataManager.INSTANCE.acquireWriteLatch();
+        
+        try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+            List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+            for (Dataverse dataverse : dataverses) {
+                dataverseName = dataverse.getDataverseName();
+                if (dataverse.getPendingOp() != IMetadataEntity.PENDING_NO_OP) {
+                    //drop pending dataverse
+                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+                } else {
+                    List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
+                    for (Dataset dataset : datasets) {
+                        datasetName = dataset.getDatasetName();
+                        if (dataset.getPendingOp() != IMetadataEntity.PENDING_NO_OP) {
+                            //drop pending dataset
+                            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
+                        } else {
+                            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                                    datasetName);
+                            for (Index index : indexes) {
+                                indexName = index.getIndexName();
+                                if (index.getPendingOp() != IMetadataEntity.PENDING_NO_OP) {
+                                    //drop pending index
+                                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            throw new MetadataException(e);
+        } finally {
+            MetadataManager.INSTANCE.releaseWriteLatch();
+        }
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
index f9eaf5a..d23cdd0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
@@ -67,7 +67,7 @@
         ARecord dataverseRecord = recordSerDes.deserialize(in);
         return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
                 ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
-                ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
+                ((AInt32) dataverseRecord.getValueByPos(3)).getIntegerValue());
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
index 4d1b9a5..f4bb73a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
@@ -5,16 +5,22 @@
 public class CheckpointObject implements Serializable, Comparable<CheckpointObject> {
 
     private static final long serialVersionUID = 1L;
-    
+ 
+    private final long checkpointLSN;
     private final long minMCTFirstLSN;
     private final int maxJobId;
     private final long timeStamp;
 
-    public CheckpointObject(long minMCTFirstLSN, int maxJobId, long timeStamp) {
+    public CheckpointObject(long checkpointLSN, long minMCTFirstLSN, int maxJobId, long timeStamp) {
+        this.checkpointLSN = checkpointLSN;
         this.minMCTFirstLSN = minMCTFirstLSN;
         this.maxJobId = maxJobId;
         this.timeStamp = timeStamp;
     }
+    
+    public long getCheckpointLSN() {
+        return checkpointLSN;
+    }
 
     public long getMinMCTFirstLSN() {
         return minMCTFirstLSN;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 8bb1acf..b16b2e3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -53,6 +53,7 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
@@ -107,9 +108,12 @@
             new ACIDException("Checkpoint file doesn't exist", e);
         }
 
-        //#. if minMCTFirstLSN is equal to -1,
+        //#. if minMCTFirstLSN is equal to -1 && 
+        //   checkpointLSN in the checkpoint file is equal to the lastLSN in the log file,
         //   then return healthy state. Otherwise, return corrupted.
-        if (checkpointObject.getMinMCTFirstLSN() == -1) {
+        LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
+        if (checkpointObject.getMinMCTFirstLSN() == -1
+                && checkpointObject.getCheckpointLSN() == logMgr.getCurrentLsn().get()) {
             state = SystemState.HEALTHY;
             return state;
         } else {
@@ -129,7 +133,8 @@
         ILogManager logManager = txnSubsystem.getLogManager();
         ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
         ITransactionManager txnManager = txnSubsystem.getTransactionManager();
-        TransactionalResourceManagerRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+        TransactionalResourceManagerRepository txnResourceRepository = txnSubsystem
+                .getTransactionalResourceRepository();
 
         //winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit LSN of the TxnId>
         Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
@@ -244,6 +249,25 @@
                         index = indexLifecycleManager.getIndex(resourceId);
                         if (index == null) {
                             localResource = localResourceRepository.getResourceById(resourceId);
+
+                            /*******************************************************************
+                             * [Notice]
+                             * -> Issue
+                             * Delete index may cause a problem during redo.
+                             * The index operation to be redone couldn't be redone because the corresponding index
+                             * may not exist in NC due to the possible index drop DDL operation.
+                             * -> Approach
+                             * Avoid the problem during redo.
+                             * More specifically, the problem will be detected when the localResource of
+                             * the corresponding index is retrieved, which will end up with 'null' return from
+                             * localResourceRepository. If null is returned, then just go and process the next
+                             * log record.
+                             *******************************************************************/
+                            if (localResource == null) {
+                                continue;
+                            }
+                            /*******************************************************************/
+
                             localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                             index = localResourceMetadata.createIndexInstance(appRuntimeContext,
                                     localResource.getResourceName(), localResource.getPartition());
@@ -283,6 +307,8 @@
                     throw new ACIDException("Unsupported LogType: " + logType);
             }
         }
+        
+        JobIdFactory.initJobId(maxJobId);
     }
 
     @Override
@@ -300,8 +326,8 @@
         //TODO
         //put the correct minMCTFirstLSN by getting from Zach. :)
         long minMCTFirstLSM = -1;
-        CheckpointObject checkpointObject = new CheckpointObject(minMCTFirstLSM, txnMgr.getMaxJobId(),
-                System.currentTimeMillis());
+        CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
+                txnMgr.getMaxJobId(), System.currentTimeMillis());
 
         FileOutputStream fos = null;
         ObjectOutputStream oosToFos = null;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java
index da86199..cac4f8e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/JobIdFactory.java
@@ -25,4 +25,8 @@
     public static JobId generateJobId() {
         return new JobId(Id.incrementAndGet());
     }
+    
+    public static void initJobId(int id) {
+        Id.set(id);
+    }
 }
\ No newline at end of file