Change DataflowHelperFactory not to require Task Context

Change-Id: I9dcd95dbefca131c4bbdb43306f00f6f8ea60800
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1758
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 23de847..5932aff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,13 +18,24 @@
  */
 package org.apache.asterix.messaging;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INcResponse;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -35,6 +46,10 @@
 
     private static final Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
     private final ClusterControllerService ccs;
+    private final Map<Long, MutablePair<MutableInt, MutablePair<ResponseState, Object>>> handles =
+            new ConcurrentHashMap<>();
+    private static final AtomicLong REQUEST_ID_GENERATOR = new AtomicLong(0);
+    private static final Object UNINITIALIZED = new Object();
 
     public CCMessageBroker(ClusterControllerService ccs) {
         this.ccs = ccs;
@@ -56,4 +71,75 @@
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
     }
+
+    public long newRequestId() {
+        return REQUEST_ID_GENERATOR.incrementAndGet();
+    }
+
+    @Override
+    public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
+            long timeout) throws Exception {
+        MutableInt numRequired = new MutableInt(0);
+        MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair =
+                MutablePair.of(numRequired, MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED));
+        pair.getKey().setValue(ncs.size());
+        handles.put(reqId, pair);
+        try {
+            synchronized (pair) {
+                for (int i = 0; i < ncs.size(); i++) {
+                    String nc = ncs.get(i);
+                    INcAddressedMessage message = requests.get(i);
+                    sendApplicationMessageToNC(message, nc);
+                }
+                long time = System.currentTimeMillis();
+                while (pair.getLeft().getValue() > 0) {
+                    try {
+                        pair.wait(timeout);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw HyracksDataException.create(e);
+                    }
+                    if (System.currentTimeMillis() - time > timeout && pair.getLeft().getValue() > 0) {
+                        throw new RuntimeDataException(ErrorCode.NC_REQUEST_TIMEOUT, timeout / 1000.0);
+                    }
+                }
+            }
+            MutablePair<ResponseState, Object> right = pair.getRight();
+            switch (right.getKey()) {
+                case FAILURE:
+                    throw HyracksDataException.create((Exception) right.getValue());
+                case SUCCESS:
+                    return right.getRight();
+                default:
+                    throw new RuntimeDataException(ErrorCode.COMPILATION_ILLEGAL_STATE, String.valueOf(right.getKey()));
+            }
+        } finally {
+            handles.remove(reqId);
+        }
+    }
+
+    @Override
+    public void respond(Long reqId, INcResponse response) {
+        Pair<MutableInt, MutablePair<ResponseState, Object>> pair = handles.get(reqId);
+        if (pair != null) {
+            synchronized (pair) {
+                try {
+                    MutablePair<ResponseState, Object> result = pair.getValue();
+                    switch (result.getKey()) {
+                        case SUCCESS:
+                        case UNINITIALIZED:
+                            response.setResult(result);
+                            break;
+                        default:
+                            break;
+                    }
+                } finally {
+                    // Decrement the response counter
+                    MutableInt remainingResponses = pair.getKey();
+                    remainingResponses.setValue(remainingResponses.getValue() - 1);
+                    pair.notifyAll();
+                }
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 7d4b41d..8c1ce4e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -447,7 +447,8 @@
 
     public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo,
             IStorageComponentProvider storageComponentProvider) throws AlgebricksException {
-        return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), primaryIndexInfo.fileSplitProvider);
+        return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                primaryIndexInfo.fileSplitProvider);
     }
 
     public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
@@ -459,6 +460,6 @@
                 mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
                 storageComponentProvider);
         return getPrimaryIndexDataflowHelperFactory(primaryIndexInfo, storageComponentProvider)
-                .create(createTestContext(true), PARTITION);
+                .create(createTestContext(true).getJobletContext().getServiceContext(), PARTITION);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 912ac37..785135b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -63,6 +63,8 @@
     public static final int TYPE_CONVERT_INTEGER_TARGET = 20;
     public static final int TYPE_CONVERT_OUT_OF_BOUND = 21;
     public static final int FIELD_SHOULD_BE_TYPED = 22;
+    public static final int NC_REQUEST_TIMEOUT = 23;
+
     public static final int INSTANTIATION_ERROR = 100;
 
     // Compilation errors
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index b2fde52..69c0ca0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -18,9 +18,16 @@
  */
 package org.apache.asterix.common.messaging.api;
 
+import java.util.List;
+
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface ICCMessageBroker extends IMessageBroker {
+    public enum ResponseState {
+        UNINITIALIZED,
+        SUCCESS,
+        FAILURE
+    }
 
     /**
      * Sends the passed message to the specified {@code nodeId}
@@ -29,5 +36,24 @@
      * @param nodeId
      * @throws Exception
      */
-    public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+    void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+
+    /**
+     * Sends the passed requests to all NCs and wait for the response
+     *
+     * @param ncs
+     * @param requests
+     * @param timeout
+     * @throws Exception
+     */
+    Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
+            long timeout) throws Exception;
+
+    /**
+     * respond to a sync request
+     *
+     * @param reqId
+     * @param response
+     */
+    void respond(Long reqId, INcResponse response);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java
new file mode 100644
index 0000000..e3c3d2b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INcResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState;
+import org.apache.commons.lang3.tuple.MutablePair;
+
+@FunctionalInterface
+public interface INcResponse {
+    /**
+     * Sets the response in the result mutable place holder
+     * adjust the response state as needed
+     *
+     * @param result
+     */
+    void setResult(MutablePair<ResponseState, Object> result);
+
+}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 9efb6b8..c118c36 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -56,6 +56,8 @@
 20 = Can't convert integer types. The target type should be one of %1$s.
 21 = Source value %1$s is out of range that %2$s can hold - %2$s.MAX_VALUE: %3$s, %2$s.MIN_VALUE: %4$s
 22 = The accessed field is untyped, but should be typed
+23 = %1$ss passed before getting back the responses from NCs
+
 100 = Unable to instantiate class %1$s
 
 # Compile-time check errors
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
index 95debe3..8d83b9a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
@@ -63,8 +63,8 @@
                 try {
                     // perform operation on btrees
                     for (int i = 0; i < treeIndexesDataflowHelperFactories.size(); i++) {
-                        IIndexDataflowHelper indexHelper =
-                                treeIndexesDataflowHelperFactories.get(i).create(ctx, partition);
+                        IIndexDataflowHelper indexHelper = treeIndexesDataflowHelperFactories.get(i)
+                                .create(ctx.getJobletContext().getServiceContext(), partition);
                         performOpOnIndex(indexHelper, ctx);
                     }
                 } catch (Exception e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
index 09a3c47..79dc396 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
@@ -66,7 +66,8 @@
             @Override
             public void initialize() throws HyracksDataException {
                 IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition);
-                IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition);
+                IIndexDataflowHelper indexHelper =
+                        dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
                 FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator();
                 // Build the index
                 indexBuilder.build();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
index 94ef285..4bc2867 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
@@ -62,7 +62,8 @@
         return new AbstractOperatorNodePushable() {
             @Override
             public void initialize() throws HyracksDataException {
-                final IIndexDataflowHelper indexHelper = dataflowHelperFactory.create(ctx, partition);
+                final IIndexDataflowHelper indexHelper =
+                        dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
                 FileIndexTupleTranslator filesTupleTranslator = new FileIndexTupleTranslator();
                 // Open and get
                 indexHelper.open();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 20744bc..6299982 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -61,7 +61,8 @@
             throws HyracksDataException {
         // Create a file index accessor to be used for files lookup operations
         final ExternalFileIndexAccessor snapshotAccessor = new ExternalFileIndexAccessor(
-                dataflowHelperFactory.create(ctx, partition), searchOpCallbackFactory, version);
+                dataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition),
+                searchOpCallbackFactory, version);
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             // The adapter that uses the file index along with the coming tuples to access files in HDFS
             private LookupAdapter<?> adapter;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index 5348744..19b8a68 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -23,9 +23,11 @@
 import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameTupleAppender;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -301,11 +303,15 @@
 
     private IHyracksTaskContext[] mockIHyracksTaskContext() throws HyracksDataException {
         IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
+        IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        INCServiceContext serviceCtx = Mockito.mock(INCServiceContext.class);
         Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer());
         Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer());
         Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE);
         Mockito.when(ctx.reallocateFrame(Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean()))
                 .thenReturn(mockByteBuffer());
+        Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
+        Mockito.when(jobletCtx.getServiceContext()).thenReturn(serviceCtx);
         return new IHyracksTaskContext[] { ctx };
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
index 77d45f8..12065d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
@@ -21,11 +21,11 @@
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 
 @FunctionalInterface
 public interface IIndexDataflowHelperFactory extends Serializable {
-    IIndexDataflowHelper create(final IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+    IIndexDataflowHelper create(final INCServiceContext ctx, int partition) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 82fedb0..5fc07ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -48,7 +48,7 @@
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, boolean verifyInput,
             long numElementsHint, boolean checkIfEmptyIndex, RecordDescriptor recDesc) throws HyracksDataException {
         this.ctx = ctx;
-        this.indexHelper = indexDataflowHelperFactory.create(ctx, partition);
+        this.indexHelper = indexDataflowHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
         this.numElementsHint = numElementsHint;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java
index dd47154..4c811bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelperFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.storage.am.common.dataflow;
 
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.FileSplit;
@@ -38,9 +38,9 @@
     }
 
     @Override
-    public IIndexDataflowHelper create(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+    public IIndexDataflowHelper create(INCServiceContext ctx, int partition) throws HyracksDataException {
         FileSplit fileSplit = fileSplitProvider.getFileSplits()[partition];
         FileReference resourceRef = fileSplit.getFileReference(ctx.getIoManager());
-        return new IndexDataflowHelper(ctx.getJobletContext().getServiceContext(), storageMgr, resourceRef);
+        return new IndexDataflowHelper(ctx, storageMgr, resourceRef);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index f6073a4..fce31ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -31,7 +31,7 @@
 
     public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx,
             int partition) throws HyracksDataException {
-        this.indexHelper = indexHelperFactory.create(ctx, partition);
+        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index d41acdf..e80a837 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -63,7 +63,7 @@
             IndexOperation op, IModificationOperationCallbackFactory modOpCallbackFactory,
             ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
         this.ctx = ctx;
-        this.indexHelper = indexHelperFactory.create(ctx, partition);
+        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.modOpCallbackFactory = modOpCallbackFactory;
         this.tupleFilterFactory = tupleFilterFactory;
         this.inputRecDesc = inputRecDesc;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 0352cea..b358f07 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -86,7 +86,7 @@
             ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter)
             throws HyracksDataException {
         this.ctx = ctx;
-        this.indexHelper = indexHelperFactory.create(ctx, partition);
+        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.retainInput = retainInput;
         this.retainMissing = retainMissing;
         this.appendIndexFilter = appendIndexFilter;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 6075c3d..bc7cb85 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -48,7 +48,7 @@
             IIndexDataflowHelperFactory indexHelperFactory, ISearchOperationCallbackFactory searchCallbackFactory)
             throws HyracksDataException {
         this.ctx = ctx;
-        this.treeIndexHelper = indexHelperFactory.create(ctx, partition);
+        this.treeIndexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.searchCallbackFactory = searchCallbackFactory;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index 0210145..c00cecb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -49,7 +49,7 @@
             IIndexDataflowHelperFactory indexHelperFactory, IStorageManager storageManager)
             throws HyracksDataException {
         this.ctx = ctx;
-        this.treeIndexHelper = indexHelperFactory.create(ctx, partition);
+        this.treeIndexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
         this.storageManager = storageManager;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
index 5e4bc7d..5ff3308 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -35,7 +35,7 @@
 
     public LSMIndexCompactOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory) throws HyracksDataException {
-        this.indexHelper = indexHelperFactory.create(ctx, partition);
+        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
     }
 
     @Override