Added Asterix operation callbacks and tracker. Still some work to be done before we can deploy them in actual query plans.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@816 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java
index d25f9cc..b77fc99 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java
@@ -13,11 +13,11 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushController;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateScheduler;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -44,7 +44,7 @@
private ILSMFlushController flushController;
private ILSMMergePolicy mergePolicy;
- private ILSMOperationTracker opTracker;
+ private ILSMOperationTrackerFactory opTrackerFactory;
private ILSMIOOperationScheduler lsmIOScheduler;
private ILocalResourceRepository localResourceRepository;
private ResourceIdFactory resourceIdFactory;
@@ -68,7 +68,7 @@
flushController = new FlushController();
lsmIOScheduler = ImmediateScheduler.INSTANCE;
mergePolicy = new ConstantMergePolicy(lsmIOScheduler, 3);
- opTracker = new ReferenceCountingOperationTracker();
+ opTrackerFactory = RefCountingOperationTrackerFactory.INSTANCE;
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
ioMgr);
@@ -147,8 +147,8 @@
return mergePolicy;
}
- public ILSMOperationTracker getLSMOperationTracker() {
- return opTracker;
+ public ILSMOperationTrackerFactory getLSMOperationTrackerFactory() {
+ return opTrackerFactory;
}
public ILSMIOOperationScheduler getLSMIOScheduler() {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
index 49f59a0..a5684fc 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixRuntimeComponentsProvider.java
@@ -7,10 +7,12 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ReferenceCountingOperationTracker;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -18,16 +20,15 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
public enum AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
- ILSMIOOperationSchedulerProvider, ILSMFlushControllerProvider, ILSMOperationTrackerProvider,
- ILSMMergePolicyProvider {
+ ILSMIOOperationSchedulerProvider, ILSMFlushControllerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerFactory {
INSTANCE;
@Override
- public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- return ((AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
- .getLSMOperationTracker();
+ public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
+ // TODO: Replace this with the IndexOperationTracker, once the other transactional components are in place.
+ return new ReferenceCountingOperationTracker(index);
}
-
+
@Override
public ILSMFlushController getFlushController(IHyracksTaskContext ctx) {
return ((AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 301c335..7342a85 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -75,10 +75,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeRangeSearchCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
public class MetadataNode implements IMetadataNode {
private static final long serialVersionUID = 1L;
@@ -787,7 +784,7 @@
indexLifecycleManager.open(resourceID);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- ITreeIndexCursor rangeCursor = new LSMBTreeRangeSearchCursor();
+ ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
IBinaryComparator[] searchCmps = new IBinaryComparator[searchKey.getFieldCount()];
for (int i = 0; i < searchKey.getFieldCount(); i++) {
searchCmps[i] = comparatorFactories[i].createBinaryComparator();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 91e734d..26148bb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -301,7 +301,7 @@
metaDataFrameFactory);
LSMBTree lsmBtree = LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, file,
bufferCache, fileMapProvider, typeTraits, comparatorFactories, runtimeContext.getFlushController(),
- runtimeContext.getLSMMergePolicy(), runtimeContext.getLSMOperationTracker(),
+ runtimeContext.getLSMMergePolicy(), runtimeContext.getLSMOperationTrackerFactory(),
runtimeContext.getLSMIOScheduler());
long resourceID = -1;
if (create) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
index 58dd2d0..a907ee3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -27,7 +27,8 @@
protected final IBinaryHashFunction[] primaryKeyHashFunctions;
protected final ILockManager lockManager;
protected final TransactionContext txnCtx;
-
+ protected int transactorLocalNumActiveOperations = 0;
+
public AbstractOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager) {
this.datasetId = datasetId;
@@ -41,11 +42,27 @@
IBinaryHashFunction[] primaryKeyHashFunctions) {
int h = 0;
for (int i = 0; i < primaryKeyFields.length; i++) {
- int entityFieldIdx = primaryKeyFields[i];
- int fh = primaryKeyHashFunctions[i].hash(tuple.getFieldData(entityFieldIdx),
- tuple.getFieldStart(entityFieldIdx), tuple.getFieldLength(entityFieldIdx));
+ int primaryKeyFieldIdx = primaryKeyFields[i];
+ int fh = primaryKeyHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx),
+ tuple.getFieldStart(primaryKeyFieldIdx), tuple.getFieldLength(primaryKeyFieldIdx));
h = h * 31 + fh;
}
return h;
}
+
+ public TransactionContext getTransactionContext() {
+ return txnCtx;
+ }
+
+ public int getLocalNumActiveOperations() {
+ return transactorLocalNumActiveOperations;
+ }
+
+ public void incrementLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations++;
+ }
+
+ public void decrementLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations--;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
index ea82ba7..8d11d04 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallbackFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
new file mode 100644
index 0000000..1635191
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class IndexOperationTracker implements ILSMOperationTracker {
+
+ // Number of active operations on a ILSMIndex instance.
+ private int numActiveOperations = 0;
+ private final ILSMIndex index;
+ private final ILSMIndexAccessor accessor;
+ private final FlushOperationCallback FLUSHCALLBACK_INSTANCE = new FlushOperationCallback();
+
+ public IndexOperationTracker(ILSMIndex index) {
+ this.index = index;
+ accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ }
+
+ @Override
+ public synchronized void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ // Wait for pending flushes to complete.
+ // If flushFlag is set, then the flush is queued to occur by the last completing operation.
+ // This operation should wait for that flush to occur before proceeding.
+ if (index.getFlushController().getFlushStatus(index)) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ numActiveOperations++;
+
+ // Increment transactor-local active operations count.
+ AbstractOperationCallback opCallback = getOperationCallback(opCtx);
+ opCallback.incrementLocalNumActiveOperations();
+ }
+
+ @Override
+ public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ // Searches are immediately considered complete, because they should not prevent the execution of flushes.
+ IndexOperation op = opCtx.getOperation();
+ if (op == IndexOperation.SEARCH || op == IndexOperation.DISKORDERSCAN) {
+ completeOperation(opCtx);
+ }
+ }
+
+ @Override
+ public synchronized void completeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ numActiveOperations--;
+
+ // Decrement transactor-local active operations count.
+ AbstractOperationCallback opCallback = getOperationCallback(opCtx);
+ opCallback.decrementLocalNumActiveOperations();
+
+ // If we need a flush, and this is the last completing operation, then schedule the flush.
+ // Once the flush has completed notify all waiting operations.
+ if (index.getFlushController().getFlushStatus(index) && numActiveOperations == 0) {
+ index.getIOScheduler().scheduleOperation(accessor.createFlushOperation(FLUSHCALLBACK_INSTANCE));
+ }
+ }
+
+ private AbstractOperationCallback getOperationCallback(ILSMIndexOperationContext opCtx) {
+ IndexOperation op = opCtx.getOperation();
+ if (op == IndexOperation.SEARCH || op == IndexOperation.DISKORDERSCAN) {
+ return (AbstractOperationCallback) opCtx.getSearchOperationCallback();
+ } else {
+ return (AbstractOperationCallback) opCtx.getModificationCallback();
+ }
+ }
+
+ private class FlushOperationCallback implements ILSMIOOperationCallback {
+ @Override
+ public void callback() {
+ IndexOperationTracker.this.notifyAll();
+ }
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java
new file mode 100644
index 0000000..1cbc110
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+
+public class IndexOperationTrackerFactory implements ILSMOperationTrackerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
+ return new IndexOperationTracker(index);
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
similarity index 88%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallback.java
rename to asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 6e17f3f..3c9f092 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -30,14 +30,18 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
-public class LSMBTreeModificationOperationCallback extends AbstractOperationCallback implements
+/**
+ * Assumes LSM-BTrees as primary indexes.
+ * Performs locking on primary keys, and also logs before/after images.
+ */
+public class PrimaryIndexModificationOperationCallback extends AbstractOperationCallback implements
IModificationOperationCallback {
protected final long resourceId;
protected final IndexOperation indexOp;
protected final TransactionSubsystem txnSubsystem;
-
- public LSMBTreeModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
+
+ public PrimaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager,
TransactionSubsystem txnSubsystem, long resourceId, IndexOperation indexOp) {
super(datasetId, primaryKeyFields, primaryKeyHashFunctions, txnCtx, lockManager);
@@ -66,7 +70,8 @@
oldOp = IndexOperation.DELETE;
}
try {
- logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after, oldOp, before);
+ logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
+ oldOp, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
similarity index 84%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallbackFactory.java
rename to asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 92f669c..2309724 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -28,13 +28,16 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-public class LSMBTreeModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
+/**
+ * Assumes LSM-BTrees as primary indexes.
+ */
+public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
IModificationOperationCallbackFactory {
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public LSMBTreeModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
+ public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider,
IndexOperation indexOp) {
super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctions, txnSubsystemProvider);
@@ -47,11 +50,10 @@
TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
- return new LSMBTreeModificationOperationCallback(datasetId, primaryKeyFields, primaryKeyHashFunctions,
+ return new PrimaryIndexModificationOperationCallback(datasetId, primaryKeyFields, primaryKeyHashFunctions,
txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, indexOp);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
}
-
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
similarity index 87%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SearchOperationCallback.java
rename to asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index bebe823..d055126 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -25,9 +25,12 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-public class SearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
+/**
+ * Assumes LSM-BTrees as primary indexes. Implements try/locking and unlocking on primary keys.
+ */
+public class PrimaryIndexSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
- public SearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
+ public PrimaryIndexSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
IBinaryHashFunction[] entityIdFieldHashFunctions, ILockManager lockManager, TransactionContext txnCtx) {
super(datasetId, entityIdFields, entityIdFieldHashFunctions, txnCtx, lockManager);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
similarity index 85%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SearchOperationCallbackFactory.java
rename to asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 1cd6444..a124196 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -27,12 +27,12 @@
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-public class SearchOperationCallbackFactory extends AbstractOperationCallbackFactory implements
+public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperationCallbackFactory implements
ISearchOperationCallbackFactory {
private static final long serialVersionUID = 1L;
- public SearchOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] entityIdFields,
+ public PrimaryIndexSearchOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] entityIdFields,
IBinaryHashFunction[] entityIdFieldHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider) {
super(jobId, datasetId, entityIdFields, entityIdFieldHashFunctions, txnSubsystemProvider);
}
@@ -43,7 +43,7 @@
TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
- return new SearchOperationCallback(datasetId, primaryKeyFields, primaryKeyHashFunctions,
+ return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, primaryKeyHashFunctions,
txnSubsystem.getLockManager(), txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
similarity index 72%
copy from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallback.java
copy to asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 6e17f3f..d2fa889 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -21,52 +21,48 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
-public class LSMBTreeModificationOperationCallback extends AbstractOperationCallback implements
+/**
+ * Secondary-index modifications do not require any locking.
+ * We assume that the modification of the corresponding primary index has already taken an appropriate lock.
+ * This callback performs logging of the before and/or after images for secondary indexes.
+ */
+public class SecondaryIndexModificationOperationCallback extends AbstractOperationCallback implements
IModificationOperationCallback {
protected final long resourceId;
protected final IndexOperation indexOp;
+ protected final IndexOperation oldOp;
protected final TransactionSubsystem txnSubsystem;
-
- public LSMBTreeModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
+
+ public SecondaryIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
IBinaryHashFunction[] primaryKeyHashFunctions, TransactionContext txnCtx, ILockManager lockManager,
TransactionSubsystem txnSubsystem, long resourceId, IndexOperation indexOp) {
super(datasetId, primaryKeyFields, primaryKeyHashFunctions, txnCtx, lockManager);
this.resourceId = resourceId;
this.indexOp = indexOp;
+ oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE;
this.txnSubsystem = txnSubsystem;
}
@Override
public void before(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields, primaryKeyHashFunctions);
- try {
- lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
- } catch (ACIDException e) {
- throw new HyracksDataException(e);
- }
+ // Do nothing.
}
@Override
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, ResourceType.LSM_BTREE);
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields, primaryKeyHashFunctions);
- LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
- IndexOperation oldOp = IndexOperation.INSERT;
- if (lsmBTreeTuple.isAntimatter()) {
- oldOp = IndexOperation.DELETE;
- }
try {
- logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after, oldOp, before);
+ logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
+ oldOp, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
similarity index 82%
copy from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallbackFactory.java
copy to asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 92f669c..51ea0bf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/LSMBTreeModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -28,13 +28,13 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-public class LSMBTreeModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
+public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
IModificationOperationCallbackFactory {
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public LSMBTreeModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
+ public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, DatasetId datasetId, int[] primaryKeyFields,
IBinaryHashFunction[] primaryKeyHashFunctions, ITransactionSubsystemProvider txnSubsystemProvider,
IndexOperation indexOp) {
super(jobId, datasetId, primaryKeyFields, primaryKeyHashFunctions, txnSubsystemProvider);
@@ -47,11 +47,10 @@
TransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
TransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
- return new LSMBTreeModificationOperationCallback(datasetId, primaryKeyFields, primaryKeyHashFunctions,
- txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, indexOp);
+ return new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields,
+ primaryKeyHashFunctions, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, indexOp);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
}
-
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
new file mode 100644
index 0000000..67c9ba0
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+
+/**
+ * Secondary index searches perform no locking at all.
+ */
+public class SecondaryIndexSearchOperationCallback extends AbstractOperationCallback implements
+ ISearchOperationCallback {
+
+ public SecondaryIndexSearchOperationCallback() {
+ super(null, null, null, null, null);
+ }
+
+ @Override
+ public boolean proceed(ITupleReference tuple) throws HyracksDataException {
+ return true;
+ }
+
+ @Override
+ public void reconcile(ITupleReference tuple) throws HyracksDataException {
+ // Do nothing.
+ }
+
+ @Override
+ public void cancel(ITupleReference tuple) throws HyracksDataException {
+ // Do nothing.
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
new file mode 100644
index 0000000..7172e06
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+
+public class SecondaryIndexSearchOperationCallbackFactory implements ISearchOperationCallbackFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ return new SecondaryIndexSearchOperationCallback();
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 8c93477..d3b3b36 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -44,7 +44,7 @@
}
private static final long serialVersionUID = -6105616785783310111L;
- private TransactionSubsystem transactionProvider;
+ private TransactionSubsystem transactionSubsystem;
private LogicalLogLocator lastLogLocator;
private TransactionState txnState;
private long startWaitTime;
@@ -67,12 +67,12 @@
public TransactionContext(JobId jobId, TransactionSubsystem transactionProvider) throws ACIDException {
this.jobId = jobId;
- this.transactionProvider = transactionProvider;
+ this.transactionSubsystem = transactionProvider;
init();
}
private void init() throws ACIDException {
- lastLogLocator = LogUtil.getDummyLogicalLogLocator(transactionProvider.getLogManager());
+ lastLogLocator = LogUtil.getDummyLogicalLogLocator(transactionSubsystem.getLogManager());
txnState = TransactionState.ACTIVE;
startWaitTime = INVALID_TIME;
status = ACTIVE_STATUS;