Change DataflowHelperFactory not to require Task Context
Change-Id: I9dcd95dbefca131c4bbdb43306f00f6f8ea60800
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1758
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 23de847..5932aff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,13 +18,24 @@
*/
package org.apache.asterix.messaging;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INcResponse;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -35,6 +46,10 @@
private static final Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
private final ClusterControllerService ccs;
+ private final Map<Long, MutablePair<MutableInt, MutablePair<ResponseState, Object>>> handles =
+ new ConcurrentHashMap<>();
+ private static final AtomicLong REQUEST_ID_GENERATOR = new AtomicLong(0);
+ private static final Object UNINITIALIZED = new Object();
public CCMessageBroker(ClusterControllerService ccs) {
this.ccs = ccs;
@@ -56,4 +71,75 @@
NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
}
+
+ public long newRequestId() {
+ return REQUEST_ID_GENERATOR.incrementAndGet();
+ }
+
+ @Override
+ public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
+ long timeout) throws Exception {
+ MutableInt numRequired = new MutableInt(0);
+ MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair =
+ MutablePair.of(numRequired, MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED));
+ pair.getKey().setValue(ncs.size());
+ handles.put(reqId, pair);
+ try {
+ synchronized (pair) {
+ for (int i = 0; i < ncs.size(); i++) {
+ String nc = ncs.get(i);
+ INcAddressedMessage message = requests.get(i);
+ sendApplicationMessageToNC(message, nc);
+ }
+ long time = System.currentTimeMillis();
+ while (pair.getLeft().getValue() > 0) {
+ try {
+ pair.wait(timeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ if (System.currentTimeMillis() - time > timeout && pair.getLeft().getValue() > 0) {
+ throw new RuntimeDataException(ErrorCode.NC_REQUEST_TIMEOUT, timeout / 1000.0);
+ }
+ }
+ }
+ MutablePair<ResponseState, Object> right = pair.getRight();
+ switch (right.getKey()) {
+ case FAILURE:
+ throw HyracksDataException.create((Exception) right.getValue());
+ case SUCCESS:
+ return right.getRight();
+ default:
+ throw new RuntimeDataException(ErrorCode.COMPILATION_ILLEGAL_STATE, String.valueOf(right.getKey()));
+ }
+ } finally {
+ handles.remove(reqId);
+ }
+ }
+
+ @Override
+ public void respond(Long reqId, INcResponse response) {
+ Pair<MutableInt, MutablePair<ResponseState, Object>> pair = handles.get(reqId);
+ if (pair != null) {
+ synchronized (pair) {
+ try {
+ MutablePair<ResponseState, Object> result = pair.getValue();
+ switch (result.getKey()) {
+ case SUCCESS:
+ case UNINITIALIZED:
+ response.setResult(result);
+ break;
+ default:
+ break;
+ }
+ } finally {
+ // Decrement the response counter
+ MutableInt remainingResponses = pair.getKey();
+ remainingResponses.setValue(remainingResponses.getValue() - 1);
+ pair.notifyAll();
+ }
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 7d4b41d..8c1ce4e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -447,7 +447,8 @@
public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo,
IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
- return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+ return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+ primaryIndexInfo.fileSplitProvider);
}
public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
@@ -459,6 +460,6 @@
mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
storageComponentProvider);
return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider)
- .create(createTestContext(true), PARTITION);
+ .create(createTestContext(true).getJobletContext().getServiceContext(), PARTITION);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 912ac37..785135b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -63,6 +63,8 @@
public static final int TYPE_CONVERT_INTEGER_TARGET = 20;
public static final int TYPE_CONVERT_OUT_OF_BOUND = 21;
public static final int FIELD_SHOULD_BE_TYPED = 22;
+ public static final int NC_REQUEST_TIMEOUT = 23;
+
public static final int INSTANTIATION_ERROR = 100;
// Compilation errors
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index b2fde52..69c0ca0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -18,9 +18,16 @@
*/
package org.apache.asterix.common.messaging.api;
+import java.util.List;
+
import org.apache.hyracks.api.messages.IMessageBroker;
public interface ICCMessageBroker extends IMessageBroker {
+ public enum ResponseState {
+ UNINITIALIZED,
+ SUCCESS,
+ FAILURE
+ }
/**
* Sends the passed message to the specified {@code nodeId}
@@ -29,5 +36,24 @@
* @param nodeId
* @throws Exception
*/
- public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+ void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+
+ /**
+ * Sends the passed requests to all NCs and wait for the response
+ *
+ * @param ncs
+ * @param requests
+ * @param timeout
+ * @throws Exception
+ */
+ Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
+ long timeout) throws Exception;
+
+ /**
+ * respond to a sync request
+ *
+ * @param reqId
+ * @param response
+ */
+ void respond(Long reqId, INcResponse response);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java
new file mode 100644
index 0000000..e3c3d2b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+@FunctionalInterface
+public interface INcResponse {
+ /**
+ * Sets the response in the result mutable place holder
+ * adjust the response state as needed
+ *
+ * @param result
+ */
+ void setResult(MutablePair<ResponseState, Object> result);
+
+}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 9efb6b8..c118c36 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -56,6 +56,8 @@
20 = Can't convert integer types. The target type should be one of %1$s.
21 = Source value %1$s is out of range that %2$s can hold - %2$s.MAX_VALUE: %3$s, %2$s.MIN_VALUE: %4$s
22 = The accessed field is untyped, but should be typed
+23 = %1$ss passed before getting back the responses from NCs
+
100 = Unable to instantiate class %1$s
# Compile-time check errors
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
index 95debe3..8d83b9a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
@@ -63,8 +63,8 @@
try {
// perform operation on btrees
for (int i = 0; i < treeIndexesDataflowHelperFactories.size(); i++) {
- IIndexDataflowHelper indexHelper =
- treeIndexesDataflowHelperFactories.get(i).create(ctx, partition);
+ IIndexDataflowHelper indexHelper = treeIndexesDataflowHelperFactories.get(i)
+ .create(ctx.getJobletContext().getServiceContext(), partition);
performOpOnIndex(indexHelper, ctx);
}
} catch (Exception e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
index 09a3c47..79dc396 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
@@ -66,7 +66,8 @@
@Override
public void initialize() throws HyracksDataException {
IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition);
- IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition);
+ IIndexDataflowHelper indexHelper =
+ dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator();
// Build the index
indexBuilder.build();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
index 94ef285..4bc2867 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
@@ -62,7 +62,8 @@
return new AbstractOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- final IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition);
+ final IIndexDataflowHelper indexHelper =
+ dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator();
// Open and get
indexHelper.open();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 20744bc..6299982 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -61,7 +61,8 @@
throws HyracksDataException {
// Create a file index accessor to be used for files lookup operations
final ExternalFileIndexAccessor snapshotAccessor = new ExternalFileIndexAccessor(
- dataflowHelperFactory.create(ctx, partition), searchOpCallbackFactory, version);
+ dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition),
+ searchOpCallbackFactory, version);
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
// The adapter that uses the file index along with the coming tuples to access files in HDFS
private LookupAdapter<?> adapter;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index 5348744..19b8a68 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -23,9 +23,11 @@
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameTupleAppender;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -301,11 +303,15 @@
private IHyracksTaskContext[] mockIHyracksTaskContext() throws HyracksDataException {
IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
+ IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+ INCServiceContext serviceCtx = Mockito.mock(INCServiceContext.class);
Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer());
Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer());
Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE);
Mockito.when(ctx.reallocateFrame(Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean()))
.thenReturn(mockByteBuffer());
+ Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
+ Mockito.when(jobletCtx.getServiceContext()).thenReturn(serviceCtx);
return new IHyracksTaskContext[] { ctx };
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
index 77d45f8..12065d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
@@ -21,11 +21,11 @@
import java.io.Serializable;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@FunctionalInterface
public interface IIndexDataflowHelperFactory extends Serializable {
- IIndexDataflowHelper create(final IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+ IIndexDataflowHelper create(final INCServiceContext ctx, int partition) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 82fedb0..5fc07ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -48,7 +48,7 @@
IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput,
long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc) throws HyracksDataException {
this.ctx = ctx;
- this.indexHelper = indexDataflowHelperFactory.create(ctx, partition);
+ this.indexHelper = indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
this.numElementsHint = numElementsHint;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java
index dd47154..4c811bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.storage.am.common.dataflow;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.FileSplit;
@@ -38,9 +38,9 @@
}
@Override
- public IIndexDataflowHelper create(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ public IIndexDataflowHelper create(INCServiceContext ctx, int partition) throws HyracksDataException {
FileSplit fileSplit = fileSplitProvider.getFileSplits()[partition];
FileReference resourceRef = fileSplit.getFileReference(ctx.getIoManager());
- return new IndexDataflowHelper(ctx.getJobletContext().getServiceContext(), storageMgr, resourceRef);
+ return new IndexDataflowHelper(ctx, storageMgr, resourceRef);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index f6073a4..fce31ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -31,7 +31,7 @@
public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx,
int partition) throws HyracksDataException {
- this.indexHelper = indexHelperFactory.create(ctx, partition);
+ this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index d41acdf..e80a837 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -63,7 +63,7 @@
IndexOperation op, IModificationOperationCallbackFactory modOpCallbackFactory,
ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
this.ctx = ctx;
- this.indexHelper = indexHelperFactory.create(ctx, partition);
+ this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.modOpCallbackFactory = modOpCallbackFactory;
this.tupleFilterFactory = tupleFilterFactory;
this.inputRecDesc = inputRecDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 0352cea..b358f07 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -86,7 +86,7 @@
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter)
throws HyracksDataException {
this.ctx = ctx;
- this.indexHelper = indexHelperFactory.create(ctx, partition);
+ this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.retainInput = retainInput;
this.retainMissing = retainMissing;
this.appendIndexFilter = appendIndexFilter;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 6075c3d..bc7cb85 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -48,7 +48,7 @@
IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory)
throws HyracksDataException {
this.ctx = ctx;
- this.treeIndexHelper = indexHelperFactory.create(ctx, partition);
+ this.treeIndexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.searchCallbackFactory = searchCallbackFactory;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 0210145..c00cecb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -49,7 +49,7 @@
IIndexDataflowHelperFactory indexHelperFactory, IStorageManager storageManager)
throws HyracksDataException {
this.ctx = ctx;
- this.treeIndexHelper = indexHelperFactory.create(ctx, partition);
+ this.treeIndexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.storageManager = storageManager;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
index 5e4bc7d..5ff3308 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -35,7 +35,7 @@
public LSMIndexCompactOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory) throws HyracksDataException {
- this.indexHelper = indexHelperFactory.create(ctx, partition);
+ this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
}
@Override