Add a JSON rest-api for external connector that uses existing AsterixDB datasets.
Change-Id: I674110b26262fbbd93030b252113e153ff4580ef
Reviewed-on: https://asterix-gerrit.ics.uci.edu/288
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-app/pom.xml b/asterix-app/pom.xml
index fe11486..2db0974 100644
--- a/asterix-app/pom.xml
+++ b/asterix-app/pom.xml
@@ -212,6 +212,16 @@
<version>0.8.7-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.10.19</version>
+ </dependency>
+ <dependency>
+ <groupId>com.e-movimento.tinytools</groupId>
+ <artifactId>privilegedaccessor</artifactId>
+ <version>1.2.2</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
new file mode 100644
index 0000000..c5fb76b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+/***
+ * The REST API that takes a dataverse name and a dataset name as the input
+ * and returns an array of file splits (IP, file-path) of the dataset in JSON.
+ * It is mostly used by external runtime, e.g., Pregelix or IMRU to pull data
+ * in parallel from existing AsterixDB datasets.
+ *
+ * @author yingyi
+ */
+public class ConnectorAPIServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ response.setContentType("text/html");
+ response.setCharacterEncoding("utf-8");
+ PrintWriter out = response.getWriter();
+ try {
+ JSONObject jsonResponse = new JSONObject();
+ String dataverseName = request.getParameter("dataverseName");
+ String datasetName = request.getParameter("datasetName");
+ if (dataverseName == null || datasetName == null) {
+ jsonResponse.put("error", "Parameter dataverseName or datasetName is null,");
+ out.write(jsonResponse.toString());
+ out.flush();
+ return;
+ }
+ ServletContext context = getServletContext();
+
+ IHyracksClientConnection hcc = null;
+ synchronized (context) {
+ hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
+ }
+
+ // Metadata transaction begins.
+ MetadataManager.INSTANCE.init();
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+ // Retrieves file splits of the dataset.
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null);
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ jsonResponse.put("error", "Dataset " + datasetName + " does not exist in " + "dataverse "
+ + dataverseName);
+ out.write(jsonResponse.toString());
+ out.flush();
+ return;
+ }
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ FileSplit[] fileSplits = metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName,
+ datasetName, temp);
+
+ // Constructs the returned json object.
+ formResponseObject(jsonResponse, fileSplits, hcc.getNodeControllerInfos());
+ // Metadata transaction commits.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ // Writes file splits.
+ out.write(jsonResponse.toString());
+ out.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ out.println(e.getMessage());
+ out.flush();
+ e.printStackTrace(out);
+ }
+ }
+
+ private void formResponseObject(JSONObject jsonResponse, FileSplit[] fileSplits,
+ Map<String, NodeControllerInfo> nodeMap) throws Exception {
+ JSONArray partititons = new JSONArray();
+ // Generates file partitions.
+ for (FileSplit split : fileSplits) {
+ String ipAddress = nodeMap.get(split.getNodeName()).getNetworkAddress().getAddress().toString();
+ String path = split.getLocalFile().getFile().getAbsolutePath();
+ FilePartition partition = new FilePartition(ipAddress, path);
+ partititons.put(partition.toJSONObject());
+ }
+ // Generates the response object which contains the splits.
+ jsonResponse.put("splits", partititons);
+ }
+}
+
+class FilePartition {
+ private final String ipAddress;
+ private final String path;
+
+ public FilePartition(String ipAddress, String path) {
+ this.ipAddress = ipAddress;
+ this.path = path;
+ }
+
+ public String getIPAddress() {
+ return ipAddress;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return ipAddress + ":" + path;
+ }
+
+ public JSONObject toJSONObject() throws JSONException {
+ JSONObject partition = new JSONObject();
+ partition.put("ip", ipAddress);
+ partition.put("path", path);
+ return partition;
+ }
+}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 1320629..9fa9a76 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -24,6 +24,7 @@
import edu.uci.ics.asterix.api.http.servlet.APIServlet;
import edu.uci.ics.asterix.api.http.servlet.AQLAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.ConnectorAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.FeedDashboardServlet;
import edu.uci.ics.asterix.api.http.servlet.FeedDataProviderServlet;
@@ -76,7 +77,7 @@
MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
AsterixAppContextInfo.getInstance().getCCApplicationContext()
- .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
+ .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
setupWebServer(externalProperties);
@@ -88,7 +89,7 @@
setupFeedServer(externalProperties);
feedServer.start();
-
+
waitUntilServerStart(webServer);
waitUntilServerStart(jsonAPIServer);
waitUntilServerStart(feedServer);
@@ -100,9 +101,9 @@
ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
}
- private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception{
- while(!webServer.isStarted()){
- if(webServer.isFailed()){
+ private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception {
+ while (!webServer.isStarted()) {
+ if (webServer.isFailed()) {
throw new Exception("Server failed to start");
}
wait(1000);
@@ -156,6 +157,7 @@
context.addServlet(new ServletHolder(new UpdateAPIServlet()), "/update");
context.addServlet(new ServletHolder(new DDLAPIServlet()), "/ddl");
context.addServlet(new ServletHolder(new AQLAPIServlet()), "/aql");
+ context.addServlet(new ServletHolder(new ConnectorAPIServlet()), "/connector");
}
private void setupFeedServer(AsterixExternalProperties externalProperties) throws Exception {
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServletTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServletTest.java
new file mode 100644
index 0000000..bf651ad
--- /dev/null
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServletTest.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.api.http.servlet;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import junit.extensions.PA;
+import junit.framework.Assert;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.junit.Test;
+
+import edu.uci.ics.asterix.test.runtime.ExecutionTest;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class ConnectorAPIServletTest {
+
+ @Test
+ public void testGet() throws Exception {
+ // Starts test asterixdb cluster.
+ ExecutionTest.setUp();
+
+ // Configures a test connector api servlet.
+ ConnectorAPIServlet servlet = spy(new ConnectorAPIServlet());
+ ServletConfig mockServletConfig = mock(ServletConfig.class);
+ servlet.init(mockServletConfig);
+ Map<String, NodeControllerInfo> nodeMap = new HashMap<String, NodeControllerInfo>();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ PrintWriter outputWriter = new PrintWriter(outputStream);
+
+ // Creates mocks.
+ ServletContext mockContext = mock(ServletContext.class);
+ IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
+ NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
+ NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
+ HttpServletRequest mockRequest = mock(HttpServletRequest.class);
+ HttpServletResponse mockResponse = mock(HttpServletResponse.class);
+
+ // Sets up mock returns.
+ when(servlet.getServletContext()).thenReturn(mockContext);
+ when(mockRequest.getParameter("dataverseName")).thenReturn("Metadata");
+ when(mockRequest.getParameter("datasetName")).thenReturn("Dataset");
+ when(mockResponse.getWriter()).thenReturn(outputWriter);
+ when(mockContext.getAttribute(anyString())).thenReturn(mockHcc);
+ when(mockHcc.getNodeControllerInfos()).thenReturn(nodeMap);
+ when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
+ when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
+
+ // Calls ConnectorAPIServlet.formResponseObject.
+ nodeMap.put("nc1", mockInfo1);
+ nodeMap.put("nc2", mockInfo2);
+ servlet.doGet(mockRequest, mockResponse);
+
+ // Constructs the actual response.
+ JSONTokener tokener = new JSONTokener(new InputStreamReader(
+ new ByteArrayInputStream(outputStream.toByteArray())));
+ JSONObject actualResponse = new JSONObject(tokener);
+
+ // Checks the correctness of results.
+ JSONArray splits = actualResponse.getJSONArray("splits");
+ String path = ((JSONObject) splits.get(0)).getString("path");
+ Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset"));
+
+ // Tears down the asterixdb cluster.
+ ExecutionTest.tearDown();
+ }
+
+ @Test
+ public void testFormResponseObject() throws JSONException {
+ ConnectorAPIServlet servlet = new ConnectorAPIServlet();
+ JSONObject actualResponse = new JSONObject();
+ FileSplit[] splits = new FileSplit[2];
+ splits[0] = new FileSplit("nc1", "foo1");
+ splits[1] = new FileSplit("nc2", "foo2");
+ Map<String, NodeControllerInfo> nodeMap = new HashMap<String, NodeControllerInfo>();
+ NodeControllerInfo mockInfo1 = mock(NodeControllerInfo.class);
+ NodeControllerInfo mockInfo2 = mock(NodeControllerInfo.class);
+
+ // Sets up mock returns.
+ when(mockInfo1.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.1", 3099));
+ when(mockInfo2.getNetworkAddress()).thenReturn(new NetworkAddress("127.0.0.2", 3099));
+
+ // Calls ConnectorAPIServlet.formResponseObject.
+ nodeMap.put("nc1", mockInfo1);
+ nodeMap.put("nc2", mockInfo2);
+ PA.invokeMethod(servlet,
+ "formResponseObject(org.json.JSONObject, edu.uci.ics.hyracks.dataflow.std.file.FileSplit[], "
+ + "java.util.Map)", actualResponse, splits, nodeMap);
+
+ // Constructs expected response.
+ JSONObject expectedResponse = new JSONObject();
+ JSONArray splitsArray = new JSONArray();
+ JSONObject element1 = new JSONObject();
+ element1.put("ip", "127.0.0.1");
+ element1.put("path", splits[0].getLocalFile().getFile().getAbsolutePath());
+ JSONObject element2 = new JSONObject();
+ element2.put("ip", "127.0.0.2");
+ element2.put("path", splits[1].getLocalFile().getFile().getAbsolutePath());
+ splitsArray.put(element1);
+ splitsArray.put(element2);
+ expectedResponse.put("splits", splitsArray);
+
+ // Checks results.
+ Assert.assertEquals(actualResponse.toString(), expectedResponse.toString());
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 66a932e..5055cea 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -97,8 +97,6 @@
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -106,6 +104,8 @@
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -491,7 +491,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
- throws AlgebricksException {
+ throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only scan datasets of records.");
}
@@ -568,14 +568,14 @@
case INTERNAL:
feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
- .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType,
+ .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType,
feedDesc, feedPolicy.getProperties());
break;
case EXTERNAL:
String libraryName = feedDataSource.getFeed().getAdapterName().split("#")[0];
feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
- .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties());
+ .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties());
break;
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -599,7 +599,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
JobSpecification jobSpec, String dataverse, String feedName, String dataset, FeedActivity feedActivity)
- throws AlgebricksException {
+ throws AlgebricksException {
List<String> feedLocations = new ArrayList<String>();
String[] ingestLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS)
.split(",");
@@ -722,12 +722,12 @@
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
compactionInfo.second, isSecondary ? new SecondaryIndexOperationTrackerProvider(
dataset.getDatasetId()) : new PrimaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), rtcProvider,
- LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, !temp), retainInput, retainNull,
- context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes);
+ dataset.getDatasetId()), rtcProvider,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
+ filterCmpFactories, btreeFields, filterFields, !temp), retainInput, retainNull,
+ context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes);
} else {
// External dataset <- use the btree with buddy btree->
// Be Careful of Key Start Index ?
@@ -735,9 +735,9 @@
ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(
compactionInfo.first, compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, getStorageProperties()
+ LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, getStorageProperties()
.getBloomFilterFalsePositiveRate(), buddyBreeFields,
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -839,12 +839,12 @@
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
- filterTypeTraits, filterCmpFactories, filterFields, !temp), retainInput, retainNull,
- context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes);
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
+ nestedKeyType.getTypeTag(), comparatorFactories.length),
+ storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
+ filterTypeTraits, filterCmpFactories, filterFields, !temp), retainInput, retainNull,
+ context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes);
} else {
// External Dataset
@@ -1039,9 +1039,9 @@
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, !temp));
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
+ filterCmpFactories, btreeFields, filterFields, !temp));
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException me) {
@@ -1053,7 +1053,7 @@
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
- throws AlgebricksException {
+ throws AlgebricksException {
String datasetName = dataSource.getId().getDatasetName();
Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
@@ -1116,30 +1116,30 @@
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
- : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
- txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+ txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- datasetId), compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
- true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, !temp);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp, idfh, null, modificationCallbackFactory, true, indexName);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+ dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ datasetId), compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
+ true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, !temp);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+ } else {
+ op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ fieldPermutation, indexOp, idfh, null, modificationCallbackFactory, true, indexName);
+ }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
@@ -1151,7 +1151,7 @@
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
- throws AlgebricksException {
+ throws AlgebricksException {
return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
additionalNonKeyFields, recordDesc, context, spec, bulkload);
}
@@ -1279,7 +1279,7 @@
IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
- throws AlgebricksException {
+ throws AlgebricksException {
// Sanity checks.
if (primaryKeys.size() > 1) {
@@ -1469,7 +1469,7 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
false);
@@ -1477,7 +1477,7 @@
private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
// No filtering condition.
if (filterExpr == null) {
return null;
@@ -1591,37 +1591,37 @@
IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_BTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- datasetId), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
- btreeFields, filterFields, !temp);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- comparatorFactories, bloomFilterKeyFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first,
- compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, !temp), filterFactory,
- modificationCallbackFactory, false, indexName);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+ dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ datasetId), compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+ } else {
+ op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+ fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first,
+ compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
+ filterCmpFactories, btreeFields, filterFields, !temp), filterFactory,
+ modificationCallbackFactory, false, indexName);
+ }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException e) {
throw new AlgebricksException(e);
} catch (IOException e) {
@@ -1719,10 +1719,11 @@
// SecondaryKeys.size() can be two if it comes from the bulkload.
// In this case, [token, number of token] are the secondaryKeys.
- if (!isPartitioned || secondaryKeys.size() > 1)
+ if (!isPartitioned || secondaryKeys.size() > 1) {
numTokenFields = secondaryKeys.size();
- else if (isPartitioned && secondaryKeys.size() == 1)
+ } else if (isPartitioned && secondaryKeys.size() == 1) {
numTokenFields = secondaryKeys.size() + 1;
+ }
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
@@ -1791,46 +1792,46 @@
IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_INVERTED_INDEX) : new SecondaryIndexModificationOperationCallbackFactory(jobId,
- datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX);
+ datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_INVERTED_INDEX);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
- IIndexDataflowHelperFactory indexDataFlowFactory;
- if (!isPartitioned) {
- indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- datasetId), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
- filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps, !temp);
- } else {
- indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
- compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
- filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps, !temp);
- }
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
- numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
- appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
- } else {
- op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), splitsAndConstraint.first,
- appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
- indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+ dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory indexDataFlowFactory;
+ if (!isPartitioned) {
+ indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+ datasetId), compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+ filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
+ } else {
+ indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+ compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
+ filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
+ }
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
+ numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
+ } else {
+ op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+ appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
+ indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
+ }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException e) {
throw new AlgebricksException(e);
} catch (IOException e) {
@@ -1939,41 +1940,41 @@
IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_RTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMRTreeDataflowHelperFactory(valueProviderFactories,
- RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
- proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
- filterCmpFactories, filterFields, !temp);
- IOperatorDescriptor op;
- if (bulkload) {
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
- primaryComparatorFactories, btreeFields, fieldPermutation,
- GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
- } else {
- op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
- new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset
- .getDatasetId()), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length), storageProperties
- .getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
- filterCmpFactories, filterFields, !temp), filterFactory,
- modificationCallbackFactory, false, indexName);
- }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
+ dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMRTreeDataflowHelperFactory(valueProviderFactories,
+ RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
+ storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
+ filterCmpFactories, filterFields, !temp);
+ IOperatorDescriptor op;
+ if (bulkload) {
+ long numElementsHint = getCardinalityPerPartitionHint(dataset);
+ op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+ primaryComparatorFactories, btreeFields, fieldPermutation,
+ GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+ } else {
+ op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
+ new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset
+ .getDatasetId()), compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
+ nestedKeyType.getTypeTag(), comparatorFactories.length), storageProperties
+ .getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
+ filterCmpFactories, filterFields, !temp), filterFactory,
+ modificationCallbackFactory, false, indexName);
+ }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
}
@@ -1997,7 +1998,7 @@
* Calculate an estimate size of the bloom filter. Note that this is an
* estimation which assumes that the data is going to be uniformly
* distributed across all partitions.
- *
+ *
* @param dataset
* @return Number of elements that will be used to create a bloom filter per
* dataset per partition
@@ -2072,7 +2073,7 @@
return splits.toArray(new FileSplit[] {});
}
- private FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
+ public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
String targetIdxName, boolean temp) throws AlgebricksException {
try {
File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
@@ -2216,7 +2217,7 @@
/**
* Add HDFS scheduler and the cluster location constraint into the scheduler
- *
+ *
* @param properties
* the original dataset properties
* @return a new map containing the original dataset properties and the
@@ -2232,7 +2233,7 @@
/**
* Adapt the original properties to a string-object map
- *
+ *
* @param properties
* the original properties
* @return the new stirng-object map