This change covers three points:
1. move the generation of UUIDs out of parsers
2. fixes a bug in the metadata lock when performing load operations
3. the external data scan operator is again a leaf operator
4. made sure the startup call doesn't return until the servers are ready to receive requests
Merged master into branch for the second time.
The following commits from your working branch will be included:
commit da929c990249fe7f0699417fd7f7ea34908355af
Author: Abdullah Alamoudi <bamousaa@gmail.com>
Date: Fri Jan 16 11:51:14 2015 +0300
Moved generation of UUIDs out of parsers
Change-Id: I32437493cf48c79417f4280e9d8a85cfb1559b62
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/205
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Taewoo Kim <wangsaeu@gmail.com>
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
index 59f8d22..f96ccdf 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.optimizer.rules;
import java.util.ArrayList;
@@ -26,6 +40,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -45,6 +60,10 @@
// match: [insert to internal dataset with autogenerated id] - assign - project
// produce: insert - assign - assign* - project
+ // **
+ // OR [insert to internal dataset with autogenerated id] - assign - [datasource scan]
+ // produce insert - assign - assign* - datasource scan
+
AbstractLogicalOperator currentOp = (AbstractLogicalOperator) opRef.getValue();
if (currentOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE) {
return false;
@@ -54,6 +73,12 @@
if (insertOp.getOperation() != Kind.INSERT) {
return false;
}
+
+ DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
+ boolean autogenerated = ((InternalDatasetDetails) dds.getDataset().getDatasetDetails()).isAutogenerated();
+ if (!autogenerated) {
+ return false;
+ }
if (((AqlDataSource) insertOp.getDataSource()).getDatasourceType() != AqlDataSourceType.INTERNAL_DATASET) {
return false;
@@ -64,22 +89,21 @@
return false;
}
AssignOperator assignOp = (AssignOperator) parentOp;
+ LogicalVariable inputRecord;
+ //bug here. will not work for internal datasets with filters since the pattern becomes [project-assign-assign-insert] <-this should be fixed->
AbstractLogicalOperator grandparentOp = (AbstractLogicalOperator) parentOp.getInputs().get(0).getValue();
- if (grandparentOp.getOperatorTag() != LogicalOperatorTag.PROJECT) {
- return false;
- }
- ProjectOperator projectOp = (ProjectOperator) grandparentOp;
-
- DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
- boolean autogenerated = ((InternalDatasetDetails) dds.getDataset().getDatasetDetails()).isAutogenerated();
- if (!autogenerated) {
- return false;
+ if (grandparentOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ ProjectOperator projectOp = (ProjectOperator) grandparentOp;
+ inputRecord = projectOp.getVariables().get(0);
+ } else if(grandparentOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN){
+ DataSourceScanOperator dssOp = (DataSourceScanOperator) grandparentOp;
+ inputRecord = dssOp.getVariables().get(0);
+ } else{
+ return false;
}
String pkFieldName = ((InternalDatasetDetails) dds.getDataset().getDatasetDetails()).getPrimaryKey().get(0);
-
- LogicalVariable inputRecord = projectOp.getVariables().get(0);
ILogicalExpression rec0 = new VariableReferenceExpression(inputRecord);
ILogicalExpression rec1 = createPrimaryKeyRecordExpression(pkFieldName);
ILogicalExpression mergedRec = createRecordMergeFunction(rec0, rec1);
@@ -87,7 +111,7 @@
LogicalVariable v = context.newVar();
AssignOperator newAssign = new AssignOperator(v, new MutableObject<ILogicalExpression>(nonNullMergedRec));
- newAssign.getInputs().add(new MutableObject<ILogicalOperator>(projectOp));
+ newAssign.getInputs().add(new MutableObject<ILogicalOperator>(grandparentOp));
assignOp.getInputs().set(0, new MutableObject<ILogicalOperator>(newAssign));
VariableUtilities.substituteVariables(assignOp, inputRecord, v, context);
VariableUtilities.substituteVariables(insertOp, inputRecord, v, context);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
index 06b203c..cf0d074 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
@@ -102,7 +102,7 @@
DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
AqlDataSource ds = (AqlDataSource) dataSourceScanOp.getDataSource();
if (ds.getDatasourceType() != AqlDataSourceType.FEED
- && ds.getDatasourceType() != AqlDataSourceType.ADAPTED_LOADABLE) {
+ && ds.getDatasourceType() != AqlDataSourceType.LOADABLE) {
if (((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) {
return true;
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
index 8239eca..a0d268a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
@@ -301,7 +301,7 @@
if (e1.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
IDataSource<AqlSourceId> dataSource = (IDataSource<AqlSourceId>) scan.getDataSource();
AqlDataSourceType dsType = ((AqlDataSource) dataSource).getDatasourceType();
- if (dsType == AqlDataSourceType.FEED || dsType == AqlDataSourceType.ADAPTED_LOADABLE) {
+ if (dsType == AqlDataSourceType.FEED || dsType == AqlDataSourceType.LOADABLE) {
return false;
}
AqlSourceId asid = dataSource.getId();
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
index fd52b31..e81d083 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -95,7 +95,7 @@
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.declared.AdaptedLoadableDataSource;
+import edu.uci.ics.asterix.metadata.declared.LoadableDataSource;
import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
@@ -110,7 +110,6 @@
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.AsterixFunctionInfo;
-import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
@@ -154,6 +153,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
@@ -161,6 +161,8 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
@@ -201,6 +203,7 @@
CompiledLoadFromFileStatement clffs = (CompiledLoadFromFileStatement) stmt;
Dataset dataset = metadataProvider.findDataset(clffs.getDataverseName(), clffs.getDatasetName());
if (dataset == null) {
+ // This would never happen since we check for this in AqlTranslator
throw new AlgebricksException("Unable to load dataset " + clffs.getDatasetName()
+ " since it does not exist");
}
@@ -208,35 +211,51 @@
DatasetDataSource targetDatasource = validateDatasetInfo(metadataProvider, stmt.getDataverseName(),
stmt.getDatasetName());
List<String> partitionKeys = DatasetUtils.getPartitioningKeys(targetDatasource.getDataset());
- AdaptedLoadableDataSource lds;
+
+ LoadableDataSource lds;
try {
- lds = new AdaptedLoadableDataSource(dataset, itemType, clffs.getAdapter(), clffs.getProperties(),
- clffs.alreadySorted());
+ lds = new LoadableDataSource(dataset, itemType, clffs.getAdapter(), clffs.getProperties());
} catch (IOException e) {
throw new AlgebricksException(e);
}
+
+ // etsOp is a dummy input operator used to keep the compiler happy. it
+ // could be removed but would result in
+ // the need to fix many rewrite rules that assume that datasourcescan
+ // operators always have input.
ILogicalOperator etsOp = new EmptyTupleSourceOperator();
- List<LogicalVariable> loadVars = new ArrayList<>();
- // Add a logical variable for PK.
- for (int i = 0; i < partitionKeys.size(); ++i) {
- loadVars.add(context.newVar());
- }
+
// Add a logical variable for the record.
- loadVars.add(context.newVar());
- DataSourceScanOperator dssOp = new DataSourceScanOperator(loadVars, lds);
- dssOp.getInputs().add(new MutableObject<ILogicalOperator>(etsOp));
+ List<LogicalVariable> payloadVars = new ArrayList<LogicalVariable>();
+ payloadVars.add(context.newVar());
- ILogicalExpression payloadExpr = new VariableReferenceExpression(loadVars.get(loadVars.size() - 1));
+ // Create a scan operator and make the empty tuple source its input
+ DataSourceScanOperator dssOp = new DataSourceScanOperator(payloadVars, lds);
+ dssOp.getInputs().add(new MutableObject<ILogicalOperator>(etsOp));
+ ILogicalExpression payloadExpr = new VariableReferenceExpression(payloadVars.get(0));
Mutable<ILogicalExpression> payloadRef = new MutableObject<ILogicalExpression>(payloadExpr);
- // Set the expression for PK
- List<Mutable<ILogicalExpression>> pkRefs = new ArrayList<>();
- for (int i = 0; i < loadVars.size() - 1; ++i) {
- ILogicalExpression pkExpr = new VariableReferenceExpression(loadVars.get(i));
- Mutable<ILogicalExpression> pkRef = new MutableObject<ILogicalExpression>(pkExpr);
- pkRefs.add(pkRef);
+
+ // Creating the assign to extract the PK out of the record
+ ArrayList<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
+ ArrayList<Mutable<ILogicalExpression>> pkExprs = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<Mutable<ILogicalExpression>>();
+ LogicalVariable payloadVar = payloadVars.get(0);
+ for (String keyFieldName : partitionKeys) {
+ prepareVarAndExpression(keyFieldName, payloadVar, pkVars, pkExprs, varRefsForLoading);
}
- // Set the Filter variable if it was specified
+ AssignOperator assign = new AssignOperator(pkVars, pkExprs);
+ assign.getInputs().add(new MutableObject<ILogicalOperator>(dssOp));
+
+ // If the input is pre-sorted, we set the ordering property explicitly in the assign
+ if (clffs.alreadySorted()) {
+ List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+ for (int i = 0; i < pkVars.size(); ++i) {
+ orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC));
+ }
+ assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns));
+ }
+
String additionalFilteringField = DatasetUtils.getFilterField(targetDatasource.getDataset());
List<LogicalVariable> additionalFilteringVars = null;
List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions = null;
@@ -247,22 +266,22 @@
additionalFilteringAssignExpressions = new ArrayList<Mutable<ILogicalExpression>>();
additionalFilteringExpressions = new ArrayList<Mutable<ILogicalExpression>>();
- prepareVarAndExpression(additionalFilteringField, loadVars.get(loadVars.size() - 1), additionalFilteringVars,
+ prepareVarAndExpression(additionalFilteringField, payloadVar, additionalFilteringVars,
additionalFilteringAssignExpressions, additionalFilteringExpressions);
additionalFilteringAssign = new AssignOperator(additionalFilteringVars,
additionalFilteringAssignExpressions);
}
- InsertDeleteOperator insertOp = new InsertDeleteOperator(targetDatasource, payloadRef, pkRefs,
+ InsertDeleteOperator insertOp = new InsertDeleteOperator(targetDatasource, payloadRef, varRefsForLoading,
InsertDeleteOperator.Kind.INSERT, true);
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
if (additionalFilteringAssign != null) {
- additionalFilteringAssign.getInputs().add(new MutableObject<ILogicalOperator>(dssOp));
+ additionalFilteringAssign.getInputs().add(new MutableObject<ILogicalOperator>(assign));
insertOp.getInputs().add(new MutableObject<ILogicalOperator>(additionalFilteringAssign));
} else {
- insertOp.getInputs().add(new MutableObject<ILogicalOperator>(dssOp));
+ insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
}
ILogicalOperator leafOperator = new SinkOperator();
@@ -299,7 +318,10 @@
topOp.getAnnotations().put("output-record-type", outputRecordType);
}
} else {
- /** add the collection-to-sequence right before the final project, because dataset only accept non-collection records */
+ /**
+ * add the collection-to-sequence right before the final project,
+ * because dataset only accept non-collection records
+ */
LogicalVariable seqVar = context.newVar();
@SuppressWarnings("unchecked")
/** This assign adds a marker function collection-to-sequence: if the input is a singleton collection, unnest it; otherwise do nothing. */
@@ -338,7 +360,6 @@
additionalFilteringAssign = new AssignOperator(additionalFilteringVars,
additionalFilteringAssignExpressions);
-
}
AssignOperator assign = new AssignOperator(vars, exprs);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 9884486..d8a71b8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -721,7 +721,6 @@
String datasetName = stmtCreateIndex.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1657,13 +1656,14 @@
private void handleLoadStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
throws Exception {
+ LoadStatement loadStmt = (LoadStatement) stmt;
+ String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
+ String datasetName = loadStmt.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
+ MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
try {
- LoadStatement loadStmt = (LoadStatement) stmt;
- String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
loadStmt.dataIsAlreadySorted());
@@ -1680,7 +1680,7 @@
}
throw e;
} finally {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.modifyDatasetEnd(dataverseName, dataverseName + "." + datasetName);
}
}
@@ -2399,13 +2399,14 @@
private void handleRunStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws AsterixException, Exception {
RunStatement runStmt = (RunStatement) stmt;
- switch(runStmt.getSystem()) {
+ switch (runStmt.getSystem()) {
case "pregel":
case "pregelix":
handlePregelixStatement(metadataProvider, runStmt, hcc);
break;
default:
- throw new AlgebricksException("The system \""+runStmt.getSystem()+"\" specified in your run statement is not supported.");
+ throw new AlgebricksException("The system \"" + runStmt.getSystem()
+ + "\" specified in your run statement is not supported.");
}
}
@@ -2588,8 +2589,7 @@
}
throw e;
} finally {
- MetadataLockManager.INSTANCE
- .pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+ MetadataLockManager.INSTANCE.pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
}
}
@@ -2657,14 +2657,6 @@
return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null);
}
- private void acquireReadLatch() {
- MetadataManager.INSTANCE.acquireReadLatch();
- }
-
- private void releaseReadLatch() {
- MetadataManager.INSTANCE.releaseReadLatch();
- }
-
private void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
try {
if (IS_DEBUG_MODE) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/ExternalIndexingOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/ExternalIndexingOperations.java
index 707b7de..da12f14 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/ExternalIndexingOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/ExternalIndexingOperations.java
@@ -165,7 +165,7 @@
FileStatus[] fileStatuses = fs.listStatus(new Path(aPath));
for (int i = 0; i < fileStatuses.length; i++) {
int nextFileNumber = files.size();
- if (fileStatuses[i].isDir()) {
+ if (fileStatuses[i].isDirectory()) {
listSubFiles(dataset, fs, fileStatuses[i], files);
} else {
files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(),
@@ -196,7 +196,7 @@
FileStatus[] fileStatuses = srcFs.listStatus(path);
for (int i = 0; i < fileStatuses.length; i++) {
int nextFileNumber = files.size();
- if (fileStatuses[i].isDir()) {
+ if (fileStatuses[i].isDirectory()) {
listSubFiles(dataset, srcFs, fileStatuses[i], files);
} else {
files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
@@ -267,7 +267,7 @@
HDFSIndexingAdapterFactory adapterFactory = new HDFSIndexingAdapterFactory();
adapterFactory.setFiles(files);
adapterFactory.configure(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties(),
- (ARecordType) itemType, false, null);
+ (ARecordType) itemType);
return new Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>(
new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory),
adapterFactory.getPartitionConstraint());
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java
index 5459a80..9b799d5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -37,9 +37,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
@@ -122,9 +120,6 @@
* and transaction load on dataset referesh
*/
- RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
- AlgebricksMetaOperatorDescriptor etsOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
- new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
// Create external indexing scan operator
ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
@@ -165,10 +160,6 @@
spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
root = metaOp;
}
- AlgebricksPartitionConstraintHelper
- .setPartitionConstraintInJobSpec(spec, etsOp, primaryPartitionConstraint);
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), etsOp, 0, primaryScanOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
if (anySecondaryKeyIsNullable) {
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeOperationsHelper.java
index 1f48509..409371a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -50,7 +50,6 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -281,9 +280,6 @@
* In case of external data, this method is used to build loading jobs for both initial load on index creation
* and transaction load on dataset referesh
*/
- RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
- AlgebricksMetaOperatorDescriptor etsOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
- new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
// Create external indexing scan operator
ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec);
// Assign op.
@@ -327,10 +323,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
root = metaOp;
}
- AlgebricksPartitionConstraintHelper
- .setPartitionConstraintInJobSpec(spec, etsOp, primaryPartitionConstraint);
- // Connect the operators.
- spec.connect(new OneToOneConnectorDescriptor(spec), etsOp, 0, primaryScanOp, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
if (anySecondaryKeyIsNullable) {
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index af7e511..1320629 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -20,6 +20,7 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
import edu.uci.ics.asterix.api.http.servlet.APIServlet;
import edu.uci.ics.asterix.api.http.servlet.AQLAPIServlet;
@@ -87,6 +88,10 @@
setupFeedServer(externalProperties);
feedServer.start();
+
+ waitUntilServerStart(webServer);
+ waitUntilServerStart(jsonAPIServer);
+ waitUntilServerStart(feedServer);
AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
(HyracksConnection) getNewHyracksClientConnection());
@@ -95,6 +100,15 @@
ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
}
+ private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception{
+ while(!webServer.isStarted()){
+ if(webServer.isFailed()){
+ throw new Exception("Server failed to start");
+ }
+ wait(1000);
+ }
+ }
+
@Override
public void stop() throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/asterix-app/src/test/resources/optimizerts/results/consolidate-selects-complex.plan b/asterix-app/src/test/resources/optimizerts/results/consolidate-selects-complex.plan
index f8bd22d..3abde90 100644
--- a/asterix-app/src/test/resources/optimizerts/results/consolidate-selects-complex.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/consolidate-selects-complex.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_01.plan b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_01.plan
index 9d2d60b..3fbd339 100644
--- a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_01.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_01.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_02.plan b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_02.plan
index 8cad02f..e3baddf 100644
--- a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_02.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive-open_02.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_01.plan b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_01.plan
index 9d2d60b..3fbd339 100644
--- a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_01.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_01.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_02.plan b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_02.plan
index 8cad02f..e3baddf 100644
--- a/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_02.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/orders-index-search-conjunctive_02.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt.plan b/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt.plan
index 97c7b80..fb35c0c 100644
--- a/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC), $$2(ASC)] HASH:[$$1, $$2] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC), $$3(ASC)] HASH:[$$2, $$3] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index-open.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index-open.plan
index d86e4f5..f77dc35 100644
--- a/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index-open.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index-open.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index.plan
index d86e4f5..f77dc35 100644
--- a/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-secondary-index.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
index 83a29cc..2d3d8fc 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- COMMIT |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-secondary-index.plan
index 25b65de..1534f6c 100644
--- a/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/skip-index/skip-rtree-secondary-index.plan
@@ -3,10 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- BULKLOAD |PARTITIONED|
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$1(ASC)] HASH:[$$1] |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$2(ASC)] HASH:[$$2] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
-- DISTRIBUTE_RESULT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index e69c903..dbf3fd7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -53,7 +53,6 @@
/**
* A factory class for creating an instance of HDFSAdapter
*/
-@SuppressWarnings("deprecation")
public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
private static final long serialVersionUID = 1L;
@@ -151,7 +150,7 @@
}
@Override
- public void configure(Map<String, String> configuration, ARecordType outputType, boolean isPKAutoGenerated, List<String> primaryKeys) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
if (!initialized) {
hdfsScheduler = initializeHDFSScheduler();
initialized = true;
@@ -214,9 +213,9 @@
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
} else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
- parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, false, false, -1, null);
+ parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, false);
} else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
- parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, false, false, -1, null);
+ parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, false);
} else if (FORMAT_BINARY.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
} else {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index a11cc22..1d11041 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -44,7 +44,6 @@
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
-@SuppressWarnings("deprecation")
public class HDFSIndexingAdapterFactory extends HDFSAdapterFactory {
private static final long serialVersionUID = 1L;
@@ -111,8 +110,7 @@
}
@Override
- public void configure(Map<String, String> configuration, ARecordType outputType, boolean isPKAutoGenerated,
- List<String> primaryKeys) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
if (!initialized) {
hdfsScheduler = initializeHDFSScheduler();
initialized = true;
@@ -136,14 +134,13 @@
char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
- boolean hasHeader = StreamBasedAdapterFactory.getHasHeader(configuration);
parserFactory = new HDFSIndexingParserFactory((ARecordType) atype,
configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
- configuration.get(KEY_FORMAT), delimiter, quote, hasHeader,
+ configuration.get(KEY_FORMAT), delimiter, quote,
configuration.get(HDFSAdapterFactory.KEY_PARSER));
}
-
+
/**
* A static function that creates and return delimited text data parser
*
@@ -153,8 +150,7 @@
* (the delimiter value)
* @return
*/
- @SuppressWarnings("null")
- public static DelimitedDataParser getDelimitedDataParser(ARecordType recordType, char delimiter, char quote, boolean hasHeader) {
+ public static DelimitedDataParser getDelimitedDataParser(ARecordType recordType, char delimiter, char quote) {
int n = recordType.getFieldTypes().length;
IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
for (int i = 0; i < n; i++) {
@@ -177,7 +173,7 @@
}
fieldParserFactories[i] = vpf;
}
- return new DelimitedDataParser(recordType, fieldParserFactories, delimiter, quote, hasHeader, false, -1, null);
+ return new DelimitedDataParser(recordType, fieldParserFactories, delimiter, quote, false);
}
public static AlgebricksPartitionConstraint getClusterLocations() {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 1db713d..ab59241 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -72,10 +72,10 @@
}
@Override
- public void configure(Map<String, String> configuration, ARecordType outputType, boolean isPKAutoGenerated, List<String> primaryKeys) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
if (!configured) {
populateConfiguration(configuration);
- hdfsAdapterFactory.configure(configuration, outputType, isPKAutoGenerated, primaryKeys);
+ hdfsAdapterFactory.configure(configuration, outputType);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2da3803..110a965 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -50,12 +50,10 @@
private IAType sourceDatatype;
private FileSplit[] fileSplits;
- private boolean isPKAutoGenerated = false;
- private int primaryKeyPosition = -1;
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx, isPKAutoGenerated, primaryKeyPosition);
+ NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx);
return fsAdapter;
}
@@ -75,46 +73,12 @@
}
@Override
- public void configure(Map<String, String> configuration, ARecordType outputType, boolean isPKAutoGenerated, List<String> primaryKeys) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
String[] splits = ((String) configuration.get(KEY_PATH)).split(",");
- this.isPKAutoGenerated = isPKAutoGenerated;
-
- // Configuration for a Dataset that has an auto-generated PK
- ARecordType sourceRecordType = null;
- IAType sourceDatatype = null;
-
- if (isPKAutoGenerated) {
- // Currently, support for a single field Primary Key only
- String primaryKey = primaryKeys.get(0);
-
- String typeName = outputType.getTypeName();
- String[] targetFieldNames = outputType.getFieldNames();
- String[] sourceFieldNames = new String[targetFieldNames.length - 1];
- IAType[] targetFieldTypes = outputType.getFieldTypes();
- IAType[] sourceFieldTypes = new IAType[targetFieldTypes.length - 1];
- boolean isOpen = outputType.isOpen();
-
- int idx = 0;
- for (int i = 0; i < outputType.getFieldNames().length; i++) {
- if (!targetFieldNames[i].equals(primaryKey)) {
- sourceFieldNames[idx] = targetFieldNames[i];
- sourceFieldTypes[idx] = targetFieldTypes[i];
- idx++;
- } else {
- this.primaryKeyPosition = i;
- }
- }
- sourceRecordType = new ARecordType(typeName, sourceFieldNames,
- sourceFieldTypes, isOpen);
-
- sourceDatatype = (IAType) sourceRecordType;
- } else {
- sourceDatatype = (IAType) outputType;
- }
-
+ IAType sourceDatatype = (IAType) outputType;
configureFileSplits(splits);
- configureFormat(sourceDatatype, isPKAutoGenerated, primaryKeyPosition, outputType);
+ configureFormat(sourceDatatype);
}
@@ -183,7 +147,7 @@
}
@Override
- public void setFiles(List<ExternalFile> files) throws AlgebricksException{
+ public void setFiles(List<ExternalFile> files) throws AlgebricksException {
throw new AlgebricksException("can't set files for this Adapter");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index c3a3746..0f340bc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -81,9 +81,7 @@
typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
}
- // Used when a data set has an auto-generated PK field
- protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType, boolean conditionalPush,
- boolean isPKAutoGenerated, int primaryKeyPosition, ARecordType origSourceDataTypeForAutoGeneratedPK)
+ protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
throws AsterixException {
int n = recordType.getFieldTypes().length;
IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
@@ -113,26 +111,23 @@
boolean hasHeader = getHasHeader(configuration);
return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, fieldParserFactories, delimiter,
- quote, hasHeader, configuration) : new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories,
- delimiter, quote, hasHeader, isPKAutoGenerated, primaryKeyPosition, origSourceDataTypeForAutoGeneratedPK);
+ quote, hasHeader, configuration) : new NtDelimitedDataTupleParserFactory(recordType,
+ fieldParserFactories, delimiter, quote, hasHeader);
}
- protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush,
- boolean isPKAutoGenerated, int primaryKeyPosition, ARecordType origSourceDataTypeForAutoGeneratedPK)
+ protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
throws AsterixException {
try {
return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, configuration)
- : new AdmSchemafullRecordParserFactory(recordType, isPKAutoGenerated, primaryKeyPosition,
- origSourceDataTypeForAutoGeneratedPK);
+ : new AdmSchemafullRecordParserFactory(recordType);
} catch (Exception e) {
throw new AsterixException(e);
}
}
- protected void configureFormat(IAType sourceDatatype, boolean isPKAutoGenerated, int primaryKeyPosition,
- IAType origSourceDataTypeForAutoGeneratedPK) throws Exception {
- String propValue = configuration.get(BATCH_SIZE);
+ protected void configureFormat(IAType sourceDatatype) throws Exception {
+ String propValue = (String) configuration.get(BATCH_SIZE);
int batchSize = propValue != null ? Integer.parseInt(propValue) : -1;
propValue = configuration.get(BATCH_INTERVAL);
long batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
@@ -144,11 +139,9 @@
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
} else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
- parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush,
- isPKAutoGenerated, primaryKeyPosition, (ARecordType) origSourceDataTypeForAutoGeneratedPK);
- } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
- parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush,
- isPKAutoGenerated, primaryKeyPosition, (ARecordType) origSourceDataTypeForAutoGeneratedPK);
+ parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
+ } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
+ parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
} else {
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index d3c5716..c2a8a95 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -37,24 +37,19 @@
protected final ITupleParser tupleParser;
protected final IAType sourceDatatype;
protected IHyracksTaskContext ctx;
- protected boolean isPKAutoGenerated;
- protected int primaryKeyPosition;
- public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx,
- boolean isPKAutoGenerated, int primaryKeyPosition)
+ public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
throws HyracksDataException {
this.tupleParser = parserFactory.createTupleParser(ctx);
this.sourceDatatype = sourceDatatype;
this.ctx = ctx;
- this.isPKAutoGenerated = isPKAutoGenerated;
- this.primaryKeyPosition = primaryKeyPosition;
}
@Override
public void start(int partition, IFrameWriter writer) throws Exception {
InputStream in = getInputStream(partition);
if (tupleParser instanceof AbstractTupleParser) {
- ((AbstractTupleParser)tupleParser).setFilename(getFilename(partition));
+ ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
}
tupleParser.parse(in, writer);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index e0628be..9f4b97c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -37,7 +37,7 @@
/**
* Provides functionality for fetching external data stored in an HDFS instance.
*/
-@SuppressWarnings({ "deprecation" })
+
public class HDFSAdapter extends FileSystemBasedAdapter {
private static final long serialVersionUID = 1L;
@@ -53,7 +53,7 @@
public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
String nodeName, ITupleParserFactory parserFactory, IHyracksTaskContext ctx,
Map<String, String> configuration, List<ExternalFile> files) throws HyracksDataException {
- super(parserFactory, atype, ctx, false, -1);
+ super(parserFactory, atype, ctx);
this.readSchedule = readSchedule;
this.executed = executed;
this.inputSplits = inputSplits;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
index 2864efd..9af1dd7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
@@ -31,8 +31,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-@SuppressWarnings("deprecation")
-public class HDFSIndexingAdapter extends FileSystemBasedAdapter{
+public class HDFSIndexingAdapter extends FileSystemBasedAdapter {
private static final long serialVersionUID = 1L;
private transient String[] readSchedule;
@@ -48,9 +47,9 @@
public HDFSIndexingAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits,
JobConf conf, AlgebricksPartitionConstraint clusterLocations, List<ExternalFile> files,
- ITupleParserFactory parserFactory, IHyracksTaskContext ctx, String nodeName,
- String inputFormat, String format) throws IOException {
- super(parserFactory, atype, ctx, false, -1);
+ ITupleParserFactory parserFactory, IHyracksTaskContext ctx, String nodeName, String inputFormat,
+ String format) throws IOException {
+ super(parserFactory, atype, ctx);
this.nodeName = nodeName;
this.readSchedule = readSchedule;
this.executed = executed;
@@ -63,11 +62,12 @@
@Override
public InputStream getInputStream(int partition) throws IOException {
- if(inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC)){
+ if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
return new RCFileDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
- } else if(format.equals(HDFSAdapterFactory.FORMAT_ADM) || format.equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)){
- return new TextualDataReader(inputSplits,readSchedule,nodeName,conf,executed,files);
- } else{
+ } else if (format.equals(HDFSAdapterFactory.FORMAT_ADM)
+ || format.equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+ return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
+ } else {
return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 7f7a48d..27d9bb8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -33,7 +33,7 @@
public HiveAdapter(IAType atype, HDFSAdapter hdfsAdapter, ITupleParserFactory parserFactory, IHyracksTaskContext ctx)
throws HyracksDataException {
- super(parserFactory, atype, ctx, false, -1);
+ super(parserFactory, atype, ctx);
this.hdfsAdapter = hdfsAdapter;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index d98c0ff..78554f2 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -41,13 +41,7 @@
public NCFileSystemAdapter(FileSplit[] fileSplits, ITupleParserFactory parserFactory, IAType atype,
IHyracksTaskContext ctx) throws HyracksDataException {
- super(parserFactory, atype, ctx, false, -1);
- this.fileSplits = fileSplits;
- }
-
- public NCFileSystemAdapter(FileSplit[] fileSplits, ITupleParserFactory parserFactory, IAType atype,
- IHyracksTaskContext ctx, boolean isPKAutoGenerated, int primaryKeyPosition) throws HyracksDataException {
- super(parserFactory, atype, ctx, isPKAutoGenerated, primaryKeyPosition);
+ super(parserFactory, atype, ctx);
this.fileSplits = fileSplits;
}
@@ -68,10 +62,9 @@
public void start(int partition, IFrameWriter writer) throws Exception {
InputStream in = getInputStream(partition);
if (tupleParser instanceof AbstractTupleParser) {
- ((AbstractTupleParser)tupleParser).setFilename(getFilename(partition));
+ ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
}
tupleParser.parse(in, writer);
-
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
index 6b2174f..e2ef9fd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
@@ -45,8 +45,6 @@
private final char delimiter;
// quote in case of delimited text
private final char quote;
- // whether delimited text file has a header (which should be ignored)
- private final boolean hasHeader;
// parser class name in case of binary format
private final String parserClassName;
// the expected data type
@@ -57,13 +55,12 @@
private Map<String, String> arguments;
public HDFSIndexingParserFactory(ARecordType atype, String inputFormat, String format, char delimiter,
- char quote, boolean hasHeader, String parserClassName) {
+ char quote, String parserClassName) {
this.inputFormat = inputFormat;
this.format = format;
this.parserClassName = parserClassName;
this.delimiter = delimiter;
this.quote = quote;
- this.hasHeader = hasHeader;
this.atype = atype;
}
@@ -94,7 +91,7 @@
} else if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_DELIMITED_TEXT)) {
// choice 3 with delimited data parser
DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype,
- delimiter, quote, hasHeader);
+ delimiter, quote);
return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
index 13090626..c6176fc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
@@ -42,7 +42,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-@SuppressWarnings("deprecation")
public class HDFSLookupAdapter implements IControlledAdapter, Serializable {
private static final long serialVersionUID = 1L;
@@ -100,10 +99,9 @@
// create a delimited text parser
char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
- boolean hasHeader = StreamBasedAdapterFactory.getHasHeader(configuration);
DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype,
- delimiter, quote, hasHeader);
+ delimiter, quote);
if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
// Text input format
TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextualFullScanDataReader.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextualFullScanDataReader.java
index ee1e3ee..76c9a5e 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextualFullScanDataReader.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/input/TextualFullScanDataReader.java
@@ -28,9 +28,8 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-@SuppressWarnings("deprecation")
-public class TextualFullScanDataReader extends InputStream{
-
+public class TextualFullScanDataReader extends InputStream {
+
private RecordReader<Object, Text> reader;
private Object key;
private Text value;
@@ -43,12 +42,13 @@
private String[] readSchedule;
private String nodeName;
private JobConf conf;
-
- public TextualFullScanDataReader(boolean executed[], InputSplit[] inputSplits, String[] readSchedule, String nodeName, JobConf conf){
+
+ public TextualFullScanDataReader(boolean executed[], InputSplit[] inputSplits, String[] readSchedule,
+ String nodeName, JobConf conf) {
this.executed = executed;
this.inputSplits = inputSplits;
this.readSchedule = readSchedule;
- this.nodeName = nodeName;
+ this.nodeName = nodeName;
this.conf = conf;
}
@@ -89,8 +89,7 @@
}
@Override
- public int read(byte[] buffer, int offset, int len)
- throws IOException {
+ public int read(byte[] buffer, int offset, int len) throws IOException {
if (reader == null) {
if (!moveToNext()) {
// nothing to read
@@ -104,8 +103,7 @@
if (sizeOfNextTuple > len) {
return 0;
}
- System.arraycopy(pendingValue.getBytes(), 0, buffer,
- offset + numBytes, pendingValue.getLength());
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
numBytes += pendingValue.getLength() + 1;
pendingValue = null;
@@ -134,8 +132,7 @@
pendingValue = value;
break;
} else {
- System.arraycopy(value.getBytes(), 0, buffer,
- offset + numBytes, value.getLength());
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
buffer[offset + numBytes + value.getLength()] = (byte) EOL;
numBytes += sizeOfNextTuple;
}
@@ -145,28 +142,20 @@
@Override
public int read() throws IOException {
- throw new NotImplementedException(
- "Use read(byte[], int, int");
+ throw new NotImplementedException("Use read(byte[], int, int");
}
@SuppressWarnings("rawtypes")
- private RecordReader getRecordReader(int splitIndex)
- throws IOException {
+ private RecordReader getRecordReader(int splitIndex) throws IOException {
if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf
- .getInputFormat();
- RecordReader reader = format
- .getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
- conf, getReporter());
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
+ conf, getReporter());
return reader;
} else {
- TextInputFormat format = (TextInputFormat) conf
- .getInputFormat();
- RecordReader reader = format
- .getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
- conf, getReporter());
+ TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[splitIndex],
+ conf, getReporter());
return reader;
}
}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
index c722b6e..b042e9c 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -31,6 +31,11 @@
public class TestTypedAdapterFactory implements ITypedAdapterFactory {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
public static final String NAME = "test_typed_adapter";
private static ARecordType adapterOutputType = initOutputType();
@@ -73,8 +78,7 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType, false, -1,
- null);
+ ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
return new TestTypedAdapter(tupleParserFactory, adapterOutputType, ctx, configuration);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AdaptedLoadableDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AdaptedLoadableDataSource.java
deleted file mode 100644
index 92f70b1..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AdaptedLoadableDataSource.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package edu.uci.ics.asterix.metadata.declared;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
-
-public class AdaptedLoadableDataSource extends AqlDataSource {
-
- private final INodeDomain domain;
-
- private final IAType[] schemaTypes;
-
- private final Dataset targetDataset;
-
- private final List<String> partitioningKeys;
-
- private final String adapter;
-
- private final Map<String, String> adapterProperties;
-
- private final boolean alreadySorted;
-
- public AdaptedLoadableDataSource(Dataset targetDataset, IAType itemType, String adapter,
- Map<String, String> properties, boolean alreadySorted) throws AlgebricksException, IOException {
- super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source",
- AqlDataSourceType.ADAPTED_LOADABLE);
- this.targetDataset = targetDataset;
- this.adapter = adapter;
- this.adapterProperties = properties;
- this.alreadySorted = alreadySorted;
- partitioningKeys = DatasetUtils.getPartitioningKeys(targetDataset);
- domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(targetDataset));
- ARecordType recType = (ARecordType) itemType;
- schemaTypes = new IAType[partitioningKeys.size() + 1];
- for (int i = 0; i < partitioningKeys.size(); ++i) {
- schemaTypes[i] = recType.getFieldType(partitioningKeys.get(i));
- }
- schemaTypes[schemaTypes.length - 1] = itemType;
- }
-
- @Override
- public IAType[] getSchemaTypes() {
- return schemaTypes;
- }
-
- @Override
- public INodeDomain getDomain() {
- return domain;
- }
-
- @Override
- public void computeLocalStructuralProperties(List<ILocalStructuralProperty> localProps,
- List<LogicalVariable> variables) {
- if (alreadySorted) {
- List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
- for (int i = 0; i < partitioningKeys.size(); ++i) {
- orderColumns.add(new OrderColumn(variables.get(i), OrderKind.ASC));
- }
- localProps.add(new LocalOrderProperty(orderColumns));
- }
- }
-
- public List<String> getPartitioningKeys() {
- return partitioningKeys;
- }
-
- public String getAdapter() {
- return adapter;
- }
-
- public Map<String, String> getAdapterProperties() {
- return adapterProperties;
- }
-
- public IAType getTargetDatasetType() {
- return schemaTypes[schemaTypes.length - 1];
- }
-
- public Dataset getTargetDataset() {
- return targetDataset;
- }
-
-}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index f7d27d7..ef23135 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -54,7 +54,7 @@
INTERNAL_DATASET,
EXTERNAL_DATASET,
FEED,
- ADAPTED_LOADABLE
+ LOADABLE
}
public AqlDataSource(AqlSourceId id, String datasourceDataverse, String datasourceName,
@@ -128,7 +128,7 @@
List<ILocalStructuralProperty> propsLocal;
int n;
switch (ds.getDatasourceType()) {
- case ADAPTED_LOADABLE:
+ case LOADABLE:
case EXTERNAL_DATASET:
pp = new RandomPartitioningProperty(domain);
propsLocal = new ArrayList<ILocalStructuralProperty>();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 53b99ae..4336e5d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -292,8 +292,7 @@
}
IAType type = findType(dataverse, outputRecordType);
if (!(type instanceof ARecordType)) {
- throw new AlgebricksException
- ("Type " + outputRecordType + " is not a record type!");
+ throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
}
return (ARecordType) type;
}
@@ -321,13 +320,16 @@
try {
switch (((AqlDataSource) dataSource).getDatasourceType()) {
case FEED: {
+ // loading data from a feed
return buildFeedIntakeRuntime(jobSpec, dataSource);
}
case INTERNAL_DATASET: {
+ // querying an internal dataset
return buildInternalDatasetScan(jobSpec, scanVariables, minFilterVars, maxFilterVars, opSchema,
typeEnv, dataSource, context, implConfig);
}
case EXTERNAL_DATASET: {
+ // querying an external dataset
Dataset dataset = ((DatasetDataSource) dataSource).getDataset();
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
@@ -338,18 +340,20 @@
return buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
NonTaggedDataFormat.INSTANCE);
}
- case ADAPTED_LOADABLE: {
- AdaptedLoadableDataSource alds = (AdaptedLoadableDataSource) dataSource;
-
+ case LOADABLE: {
+ // This is a load into dataset operation
+ LoadableDataSource alds = (LoadableDataSource) dataSource;
List<String> partitioningKeys = alds.getPartitioningKeys();
boolean isPKAutoGenerated = ((InternalDatasetDetails) alds.getTargetDataset().getDatasetDetails())
.isAutogenerated();
-
- IAdapterFactory wrappedAdapterFactory = getConfiguredAdapterFactory(alds.getTargetDataset(),
- alds.getAdapter(), alds.getAdapterProperties(), alds.getTargetDatasetType(),
- isPKAutoGenerated, partitioningKeys);
+ ARecordType itemType = (ARecordType) alds.getLoadedType();
+ int pkIndex = 0;
+ IAdapterFactory adapterFactory = getConfiguredAdapterFactory(alds.getTargetDataset(),
+ alds.getAdapter(), alds.getAdapterProperties(), itemType, isPKAutoGenerated,
+ partitioningKeys);
RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- return buildAdaptedLoadableDatasetScan(jobSpec, alds, wrappedAdapterFactory, rDesc);
+ return buildLoadableDatasetScan(jobSpec, alds, adapterFactory, rDesc, isPKAutoGenerated,
+ partitioningKeys, itemType, pkIndex);
}
default: {
throw new IllegalArgumentException();
@@ -361,32 +365,19 @@
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildAdaptedLoadableDatasetScan(
- JobSpecification jobSpec, AdaptedLoadableDataSource alds, IAdapterFactory wrappedAdapterFactory,
- RecordDescriptor rDesc) throws AlgebricksException {
- if (!(wrappedAdapterFactory.getSupportedOperations().equals(SupportedOperation.READ) || wrappedAdapterFactory
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
+ LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
+ List<String> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
+ if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ) || adapterFactory
.getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
throw new AlgebricksException(" External dataset adapter does not support read operation");
}
- ISerializerDeserializer recSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider().getSerializerDeserializer(
- alds.getTargetDatasetType());
- ARecordType recType = (ARecordType) alds.getTargetDatasetType();
- int[] extractFields = new int[rDesc.getFieldCount() - 1];
- for (int i = 0; i < extractFields.length; ++i) {
- try {
- extractFields[i] = recType.findFieldPosition(alds.getPartitioningKeys().get(i));
- } catch (IOException e) {
- throw new AlgebricksException(e);
- }
- }
- IAdapterFactory fieldExtractingAdapterFactory = new FieldExtractingAdapterFactory(wrappedAdapterFactory,
- new RecordDescriptor(new ISerializerDeserializer[] { recSerde }), rDesc, extractFields,
- (ARecordType) alds.getTargetDatasetType());
+
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
- fieldExtractingAdapterFactory);
+ adapterFactory);
AlgebricksPartitionConstraint constraint;
try {
- constraint = fieldExtractingAdapterFactory.getPartitionConstraint();
+ constraint = adapterFactory.getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
@@ -476,8 +467,7 @@
switch (adapterFactory.getAdapterType()) {
case GENERIC:
- ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) itemType,
- isPKAutoGenerated, primaryKeys);
+ ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) itemType);
break;
case TYPED:
((ITypedAdapterFactory) adapterFactory).configure(configuration);
@@ -748,24 +738,6 @@
}
}
- /* BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
- typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
- compactionInfo.second, isSecondary ? new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()) : new PrimaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
- storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary), retainInput,
- retainNull, context.getNullWriterFactory(), searchCallbackFactory);
-
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
-
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
- }*/
-
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, boolean retainNull, Dataset dataset, String indexName,
@@ -962,11 +934,16 @@
switch (((AqlDataSource) dataSource).getDatasourceType()) {
case INTERNAL_DATASET:
case EXTERNAL_DATASET:
- result = false;
+ result = ((DatasetDataSource) dataSource).getDataset().getDatasetType() == DatasetType.EXTERNAL;
break;
case FEED:
result = true;
break;
+ case LOADABLE:
+ result = true;
+ break;
+ default:
+ break;
}
return result;
}
@@ -1299,7 +1276,8 @@
// Number of Keys that needs to be propagated
int numKeys = inputSchema.getSize();
- // Get the rest of Logical Variables that are not (PK or SK) and each variable's positions.
+ // Get the rest of Logical Variables that are not (PK or SK) and each
+ // variable's positions.
// These variables will be propagated through TokenizeOperator.
List<LogicalVariable> otherKeys = new ArrayList<LogicalVariable>();
if (inputSchema.getSize() > 0) {
@@ -1330,8 +1308,10 @@
}
// For tokenization, sorting and loading.
- // One token (+ optional partitioning field) + primary keys + secondary keys + other variables
- // secondary keys and other variables will be just passed to the IndexInsertDelete Operator.
+ // One token (+ optional partitioning field) + primary keys + secondary
+ // keys + other variables
+ // secondary keys and other variables will be just passed to the
+ // IndexInsertDelete Operator.
int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
// generate field permutations for the input
@@ -1384,7 +1364,8 @@
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
- // Find the key type of the secondary key. If it's a derived type, return the derived type.
+ // Find the key type of the secondary key. If it's a derived type,
+ // return the derived type.
// e.g. UNORDERED LIST -> return UNORDERED LIST type
IAType secondaryKeyType = null;
Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyExprs.get(0).toString(),
@@ -1416,7 +1397,8 @@
ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
- // The order of the output record: propagated variables (including PK and SK), token, and number of token.
+ // The order of the output record: propagated variables (including
+ // PK and SK), token, and number of token.
// #1. propagate all input variables
for (int k = 0; k < recordDesc.getFieldCount(); k++) {
tokenKeyPairFields[k] = recordDesc.getFields()[k];
@@ -1640,7 +1622,8 @@
if (primaryKeys.size() > 1) {
throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key.");
}
- // The size of secondaryKeys can be two if it receives input from its TokenizeOperator- [token, number of token]
+ // The size of secondaryKeys can be two if it receives input from its
+ // TokenizeOperator- [token, number of token]
if (secondaryKeys.size() > 1 && !isPartitioned) {
throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
} else if (secondaryKeys.size() > 2 && isPartitioned) {
@@ -1653,7 +1636,8 @@
}
// For tokenization, sorting and loading.
- // One token (+ optional partitioning field) + primary keys: [token, number of token, PK]
+ // One token (+ optional partitioning field) + primary keys: [token,
+ // number of token, PK]
int numKeys = primaryKeys.size() + secondaryKeys.size();
int numTokenKeyPairFields = (!isPartitioned) ? 1 + primaryKeys.size() : 2 + primaryKeys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
@@ -2141,11 +2125,11 @@
try {
type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, typeName);
} catch (MetadataException e) {
- throw new AlgebricksException("Metadata exception while looking up type '" + typeName +
- "' in dataverse '" + dataverse + "'", e);
+ throw new AlgebricksException("Metadata exception while looking up type '" + typeName + "' in dataverse '"
+ + dataverse + "'", e);
}
if (type == null) {
- throw new AlgebricksException("Type name '" + typeName + "' unknown in dataverse '" + dataverse +"'");
+ throw new AlgebricksException("Type name '" + typeName + "' unknown in dataverse '" + dataverse + "'");
}
return type.getDatatype();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
new file mode 100644
index 0000000..4419438
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.declared;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class LoadableDataSource extends AqlDataSource {
+
+ private final INodeDomain domain;
+ private final IAType[] schemaTypes;
+ private final Dataset targetDataset;
+ private final List<String> partitioningKeys;
+ private final String adapter;
+ private final Map<String, String> adapterProperties;
+ private final boolean isPKAutoGenerated;
+
+ public boolean isPKAutoGenerated() {
+ return isPKAutoGenerated;
+ }
+
+ public LoadableDataSource(Dataset targetDataset, IAType itemType, String adapter, Map<String, String> properties)
+ throws AlgebricksException, IOException {
+ super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source",
+ AqlDataSourceType.LOADABLE);
+ this.targetDataset = targetDataset;
+ this.adapter = adapter;
+ this.adapterProperties = properties;
+ partitioningKeys = DatasetUtils.getPartitioningKeys(targetDataset);
+ domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(targetDataset));
+ ARecordType recType = (ARecordType) itemType;
+ isPKAutoGenerated = ((InternalDatasetDetails) targetDataset.getDatasetDetails()).isAutogenerated();
+ if (isPKAutoGenerated) {
+ // Since the key is auto-generated, we need to use another
+ // record type which has all fields except the PK
+ String[] fieldNames = new String[recType.getFieldNames().length - 1];
+ IAType[] fieldTypes = new IAType[recType.getFieldTypes().length - 1];
+ int i = 0;
+ int j = 0;
+ while (i < fieldNames.length) {
+ if (!((ARecordType) itemType).getFieldNames()[j].equals(partitioningKeys.get(0))) {
+ fieldNames[i] = ((ARecordType) itemType).getFieldNames()[j];
+ fieldTypes[i] = ((ARecordType) itemType).getFieldTypes()[j];
+ i++;
+ } else {
+ }
+ j++;
+ }
+ try {
+ itemType = new ARecordType(recType.getTypeName(), fieldNames, fieldTypes, recType.isOpen());
+ } catch (AsterixException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ schemaTypes = new IAType[] { itemType };
+ }
+
+ @Override
+ public IAType[] getSchemaTypes() {
+ return schemaTypes;
+ }
+
+ @Override
+ public INodeDomain getDomain() {
+ return domain;
+ }
+
+ @Override
+ public void computeLocalStructuralProperties(List<ILocalStructuralProperty> localProps,
+ List<LogicalVariable> variables) {
+ }
+
+ public List<String> getPartitioningKeys() {
+ return partitioningKeys;
+ }
+
+ public String getAdapter() {
+ return adapter;
+ }
+
+ public Map<String, String> getAdapterProperties() {
+ return adapterProperties;
+ }
+
+ public IAType getLoadedType() {
+ return schemaTypes[schemaTypes.length - 1];
+ }
+
+ public Dataset getTargetDataset() {
+ return targetDataset;
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
new file mode 100644
index 0000000..0938ccc
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.declared;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.base.AMutableUUID;
+import edu.uci.ics.asterix.om.base.AUUID;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.PointableAllocator;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class PKGeneratingAdapter implements IDatasourceAdapter {
+
+ private static final long serialVersionUID = 1L;
+ private final RecordDescriptor inRecDesc;
+ private final RecordDescriptor outRecDesc;
+ private final IDatasourceAdapter wrappedAdapter;
+ private final PKGeneratingPushRuntime pkRuntime;
+ private final int pkIndex;
+
+ public PKGeneratingAdapter(IHyracksTaskContext ctx, RecordDescriptor inRecDesc, RecordDescriptor outRecDesc,
+ ARecordType inRecType, ARecordType outRecType, IDatasourceAdapter wrappedAdapter, int pkIndex) {
+ this.inRecDesc = inRecDesc;
+ this.outRecDesc = outRecDesc;
+ this.wrappedAdapter = wrappedAdapter;
+ this.pkRuntime = new PKGeneratingPushRuntime(ctx, inRecType, outRecType);
+ this.pkIndex = pkIndex;
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ pkRuntime.setInputRecordDescriptor(0, inRecDesc);
+ pkRuntime.setFrameWriter(0, writer, outRecDesc);
+ pkRuntime.open();
+ try {
+ wrappedAdapter.start(partition, pkRuntime);
+ } catch (Throwable t) {
+ pkRuntime.fail();
+ throw t;
+ } finally {
+ pkRuntime.close();
+ }
+ }
+
+ private class PKGeneratingPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+ private final IHyracksTaskContext ctx;
+ private final ARecordType outRecType;
+ private final ArrayTupleBuilder tb;
+ private final AMutableUUID aUUID = new AMutableUUID(0, 0);
+ private final byte AUUIDTag = ATypeTag.UUID.serialize();
+ private final byte[] serializedUUID = new byte[16];
+ private final PointableAllocator pa = new PointableAllocator();
+ private final ARecordPointable recordPointable;
+ private final IAType[] outClosedTypes;
+
+ private final RecordBuilder recBuilder;
+
+ public PKGeneratingPushRuntime(IHyracksTaskContext ctx, ARecordType inRecType, ARecordType outRecType) {
+ this.ctx = ctx;
+ this.outRecType = outRecType;
+ this.tb = new ArrayTupleBuilder(2);
+ this.recBuilder = new RecordBuilder();
+ this.recordPointable = (ARecordPointable) pa.allocateRecordValue(inRecType);
+ this.outClosedTypes = outRecType.getFieldTypes();
+ }
+
+ /*
+ * We write this method in low level instead of using pre-existing libraries since this will be called for each record and to avoid
+ * size validation
+ */
+ private void serializeUUID(AUUID aUUID, byte[] serializedUUID) {
+ long v = aUUID.getLeastSignificantBits();
+ serializedUUID[0] = (byte) (v >>> 56);
+ serializedUUID[1] = (byte) (v >>> 48);
+ serializedUUID[2] = (byte) (v >>> 40);
+ serializedUUID[3] = (byte) (v >>> 32);
+ serializedUUID[4] = (byte) (v >>> 24);
+ serializedUUID[5] = (byte) (v >>> 16);
+ serializedUUID[6] = (byte) (v >>> 8);
+ serializedUUID[7] = (byte) (v >>> 0);
+ v = aUUID.getMostSignificantBits();
+ serializedUUID[8] = (byte) (v >>> 56);
+ serializedUUID[9] = (byte) (v >>> 48);
+ serializedUUID[10] = (byte) (v >>> 40);
+ serializedUUID[11] = (byte) (v >>> 32);
+ serializedUUID[12] = (byte) (v >>> 24);
+ serializedUUID[13] = (byte) (v >>> 16);
+ serializedUUID[14] = (byte) (v >>> 8);
+ serializedUUID[15] = (byte) (v >>> 0);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ initAccessAppendRef(ctx);
+ recBuilder.reset(outRecType);
+ recBuilder.init();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ tAccess.reset(buffer);
+ for (int i = 0; i < tAccess.getTupleCount(); ++i) {
+ tb.reset();
+ tRef.reset(tAccess, i);
+
+ // We need to do the following:
+ // 1. generate a UUID
+ // 2. fill in the first field with the UUID
+ aUUID.nextUUID();
+ tb.getDataOutput().writeByte(AUUIDTag);
+ serializeUUID(aUUID, serializedUUID);
+ tb.getDataOutput().write(serializedUUID);
+ tb.addFieldEndOffset();
+ // 3. fill in the second field with the record after adding to it the UUID
+ recordPointable.set(tRef.getFieldData(0), tRef.getFieldStart(0), tRef.getFieldLength(0));
+ // Start by closed fields
+ int inIndex = 0;
+ for (int f = 0; f < outClosedTypes.length; f++) {
+ if (f == pkIndex) {
+ recBuilder.addField(f, serializedUUID);
+ } else {
+ recBuilder.addField(f, recordPointable.getFieldValues().get(inIndex));
+ inIndex++;
+ }
+ }
+
+ // Add open fields
+ if (outRecType.isOpen()) {
+ List<IVisitablePointable> fp = recordPointable.getFieldNames();
+ if (fp.size() >= outClosedTypes.length) {
+ int index = outClosedTypes.length - 1;
+ while (index < fp.size()) {
+ recBuilder.addField(fp.get(index), recordPointable.getFieldValues().get(index));
+ index++;
+ }
+ }
+ }
+ //write the record
+ recBuilder.write(tb.getDataOutput(), true);
+ tb.addFieldEndOffset();
+ appendToFrameFromTupleBuilder(tb);
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException("Error in the auto id generation and merge of the record", e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ flushIfNotFailed();
+ }
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
new file mode 100644
index 0000000..e371b2b
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.declared;
+
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+/**
+ *
+ * @author alamouda
+ * This wrapper adapter factory is an adapter that is used when loading data into a dataset with a UUID primary key. The factory creates an adapter that
+ * generates UUIDs and merge them into parsed records
+ */
+public class PKGeneratingAdapterFactory implements IAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IAdapterFactory wrappedAdapterFactory;
+ private final RecordDescriptor inRecDesc;
+ private final RecordDescriptor outRecDesc;
+ private final ARecordType inRecType;
+ private final ARecordType outRecType;
+ private final int pkIndex;
+
+ public PKGeneratingAdapterFactory(IAdapterFactory wrappedAdapterFactory, RecordDescriptor inRecDesc,
+ RecordDescriptor outRecDesc, ARecordType inRecType, ARecordType outRecType, int pkIndex) {
+ this.wrappedAdapterFactory = wrappedAdapterFactory;
+ this.inRecDesc = inRecDesc;
+ this.outRecDesc = outRecDesc;
+ this.inRecType = inRecType;
+ this.outRecType = outRecType;
+ this.pkIndex = pkIndex;
+ }
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return wrappedAdapterFactory.getSupportedOperations();
+ }
+
+ @Override
+ public String getName() {
+ return "PKGeneratingAdapter[ " + wrappedAdapterFactory.getName() + " ]";
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return wrappedAdapterFactory.getAdapterType();
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return wrappedAdapterFactory.getPartitionConstraint();
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition);
+ return new PKGeneratingAdapter(ctx, inRecDesc, outRecDesc, inRecType, outRecType, wrappedAdapter, pkIndex);
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
index 1114cc9..28abd75 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
@@ -47,8 +47,7 @@
dataParser = new ADMDataParser();
break;
case DELIMITED_DATA:
- dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader,
- false, -1, null);
+ dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader);
break;
}
return new ConditionalPushTupleParser(ctx, recordType, dataParser, configuration);
@@ -102,7 +101,7 @@
public ConditionalPushTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
Map<String, String> configuration) throws HyracksDataException {
- super(ctx, recType, false, -1, null);
+ super(ctx, recType);
this.dataParser = dataParser;
String propValue = (String) configuration.get(BATCH_SIZE);
batchSize = propValue != null ? Integer.parseInt(propValue) : Integer.MAX_VALUE;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index 953adcf..06de1e0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -23,7 +21,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/*
* A single activity operator that provides the functionality of scanning data using an
@@ -34,10 +32,11 @@
private static final long serialVersionUID = 1L;
private IAdapterFactory adapterFactory;
+
public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
IAdapterFactory dataSourceAdapterFactory) {
- super(spec, 1, 1);
+ super(spec, 0, 1);
recordDescriptors[0] = rDesc;
this.adapterFactory = dataSourceAdapterFactory;
}
@@ -47,32 +46,20 @@
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
- public void open() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
IDatasourceAdapter adapter = null;
try {
adapter = adapterFactory.createAdapter(ctx, partition);
adapter.start(partition, writer);
} catch (Exception e) {
throw new HyracksDataException("exception during reading from external data source", e);
+ } finally {
+ writer.close();
}
-
- }
-
- @Override
- public void fail() throws HyracksDataException {
- }
-
- @Override
- public void close() throws HyracksDataException {
- writer.close();
}
};
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 8598db3..2b05989 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -174,7 +174,7 @@
+ IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
}
((IGenericAdapterFactory) adapterFactory).configure(adapterConfiguration,
- (ARecordType) adapterOutputType, false, null);
+ (ARecordType) adapterOutputType);
((IGenericAdapterFactory) adapterFactory).createAdapter(ctx, partition);
}
break;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index e0f1870..c7b905d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -293,13 +293,14 @@
"You must specify the datatype associated with the incoming data. Datatype is specified by the "
+ IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
}
- Datatype datatype = MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- feed.getDataverseName(), outputTypeName);
+ Datatype datatype = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, feed.getDataverseName(),
+ outputTypeName);
if (datatype == null) {
- throw new Exception("no datatype \"" + outputTypeName + "\" in dataverse \"" + feed.getDataverseName() + "\"");
+ throw new Exception("no datatype \"" + outputTypeName + "\" in dataverse \""
+ + feed.getDataverseName() + "\"");
}
adapterOutputType = (ARecordType) datatype.getDatatype();
- ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType, false, null);
+ ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
break;
default:
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
index 089b01b..47aa8d9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
@@ -25,7 +25,7 @@
public static final String KEY_TYPE_NAME = "type-name";
- public void configure(Map<String, String> configuration, ARecordType outputType, boolean isPKAutoGenerated, List<String> primaryKeys) throws Exception;
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
public void setFiles(List<ExternalFile> files) throws AlgebricksException;
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/builders/RecordBuilder.java b/asterix-om/src/main/java/edu/uci/ics/asterix/builders/RecordBuilder.java
index 520b577..845d201 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/builders/RecordBuilder.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/builders/RecordBuilder.java
@@ -155,6 +155,16 @@
nullBitMap[id / 8] |= (byte) (1 << (7 - (id % 8)));
}
}
+
+ public void addField(int id, byte[] value) {
+ closedPartOffsets[id] = closedPartOutputStream.size();
+ // We assume the tag is not included (closed field)
+ closedPartOutputStream.write(value, 0, value.length);
+ numberOfClosedFields++;
+ if (isNullable && value[0] != SER_NULL_TYPE_TAG) {
+ nullBitMap[id / 8] |= (byte) (1 << (7 - (id % 8)));
+ }
+ }
@Override
public void addField(IValueReference name, IValueReference value) throws AsterixException {
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/AUUIDSerializerDeserializer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/AUUIDSerializerDeserializer.java
index eaba0ae..d77473a 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/AUUIDSerializerDeserializer.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/serde/AUUIDSerializerDeserializer.java
@@ -34,6 +34,7 @@
public AUUID deserialize(DataInput in) throws HyracksDataException {
long msb = Integer64SerializerDeserializer.INSTANCE.deserialize(in);
long lsb = Integer64SerializerDeserializer.INSTANCE.deserialize(in);
+ // for each deserialization, a new object is created. maybe we should change this.
return new AUUID(msb, lsb);
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/BuiltinType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/BuiltinType.java
index ca13f39..00e0dd8 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/BuiltinType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/BuiltinType.java
@@ -738,9 +738,10 @@
}
};
- // AUUID_STRING is used when converting between the string representation of
- // UUID and corresponding a UUID instance
+ // AUUID_STRING is used when converting between the string representation of
+ // UUID and corresponding a UUID instance
public static final BuiltinType AUUID_STRING = new LowerCaseConstructorType() {
+ private static final long serialVersionUID = 1L;
@Override
public ATypeTag getTypeTag() {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index fb9c4d3..cc1573a 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -983,10 +983,9 @@
}
fieldParserFactories[i] = vpf;
}
- return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter, quote, hasHeader,
- false, -1, null);
+ return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter, quote, hasHeader);
} else {
- return new AdmSchemafullRecordParserFactory(recType, false, -1, null);
+ return new AdmSchemafullRecordParserFactory(recType);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java
index 31a1319..7e6ff04 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/ADMDataParser.java
@@ -67,11 +67,6 @@
private String mismatchErrorMessage = "Mismatch Type, expecting a value of type ";
private String mismatchErrorMessage2 = " got a value of type ";
- private boolean isPKAutoGenerated;
- private int primaryKeyPosition;
- private ARecordType origRecTypeForAutoGeneratedPK;
- private byte AUUIDTag = ATypeTag.UUID.serialize();
-
static class ParseException extends AsterixException {
private static final long serialVersionUID = 1L;
private String filename;
@@ -118,16 +113,11 @@
}
public ADMDataParser() {
- this(null, false, -1, null);
+ this(null);
}
- // Constructor for dealing with auto-generated PK
- public ADMDataParser(String filename, boolean isPKAutoGenerated, int primaryKeyPosition,
- ARecordType origRecTypeForAutoGeneratedPK) {
+ public ADMDataParser(String filename) {
this.filename = filename;
- this.isPKAutoGenerated = isPKAutoGenerated;
- this.primaryKeyPosition = primaryKeyPosition;
- this.origRecTypeForAutoGeneratedPK = origRecTypeForAutoGeneratedPK;
}
@Override
@@ -146,13 +136,7 @@
@Override
public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException {
- // Use original record type variable - origRecTypeForAutoGeneratedPK if we have auto-generated PK.
- // The recordType variable does not contain field information about auto-generated PK.
- if (!isPKAutoGenerated) {
- this.recordType = recordType;
- } else {
- this.recordType = origRecTypeForAutoGeneratedPK;
- }
+ this.recordType = recordType;
this.datasetRec = datasetRec;
try {
admLexer = new AdmLexer(new java.io.InputStreamReader(in));
@@ -647,33 +631,10 @@
} while (inRecord);
if (recType != null) {
- nullableFieldId = checkNullConstraints(recType, nulls, 0);
- boolean insertedAutoGeneratedPK = false;
-
- // This is a typical situation for a dataset with an auto-generated PK
- // where nullableFieldId equals primaryKey field.
- // In this case, we create a random ID and assign it as a PK
- if (isPKAutoGenerated && nullableFieldId == primaryKeyPosition) {
- fieldValueBuffer.reset();
- aUUID.nextUUID();
- fieldValueBuffer.getDataOutput().writeByte(AUUIDTag);
- Integer64SerializerDeserializer.INSTANCE.serialize(aUUID.getMostSignificantBits(),
- fieldValueBuffer.getDataOutput());
- Integer64SerializerDeserializer.INSTANCE.serialize(aUUID.getLeastSignificantBits(),
- fieldValueBuffer.getDataOutput());
- recBuilder.addField(primaryKeyPosition, fieldValueBuffer);
- insertedAutoGeneratedPK = true;
- nulls.set(nullableFieldId);
- // Check from the primary key field position to make sure no other field is missing
- nullableFieldId = checkNullConstraints(recType, nulls, nullableFieldId);
- }
+ nullableFieldId = checkNullConstraints(recType, nulls);
if (nullableFieldId != -1) {
throw new ParseException("Field: " + recType.getFieldNames()[nullableFieldId] + " can not be null");
}
- if (isPKAutoGenerated && !insertedAutoGeneratedPK) {
- throw new ParseException("Auto-generated PK Field: " + recType.getFieldNames()[primaryKeyPosition]
- + " should not exist in the ADM file.");
- }
}
recBuilder.write(out, true);
returnRecordBuilder(recBuilder);
@@ -681,9 +642,9 @@
returnTempBuffer(fieldValueBuffer);
}
- private int checkNullConstraints(ARecordType recType, BitSet nulls, int startingPosition) {
+ private int checkNullConstraints(ARecordType recType, BitSet nulls) {
boolean isNull = false;
- for (int i = startingPosition; i < recType.getFieldTypes().length; i++) {
+ for (int i = 0; i < recType.getFieldTypes().length; i++) {
if (nulls.get(i) == false) {
IAType type = recType.getFieldTypes()[i];
if (type.getTypeTag() != ATypeTag.NULL && type.getTypeTag() != ATypeTag.UNION) {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java
index 402b639..c5621d6 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractDataParser.java
@@ -84,7 +84,7 @@
protected AMutableDouble aDouble = new AMutableDouble(0);
protected AMutableFloat aFloat = new AMutableFloat(0);
protected AMutableString aString = new AMutableString("");
- protected AMutableBinary aBinary = new AMutableBinary(new byte[] { }, 0, 0);
+ protected AMutableBinary aBinary = new AMutableBinary(new byte[] {}, 0, 0);
protected AMutableString aStringFieldName = new AMutableString("");
protected AMutableUUID aUUID = new AMutableUUID(0, 0);
// For temporal and spatial data types
@@ -131,8 +131,9 @@
@SuppressWarnings("unchecked")
protected ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
- // For UUID, we assume that the format is the string representation of UUID
- // (xxxxxxxx-xxxx-xxxx-xxxxxxxxxxxx) when parsing the data.
+
+ // For UUID, we assume that the format is the string representation of UUID
+ // (xxxxxxxx-xxxx-xxxx-xxxxxxxxxxxx) when parsing the data.
@SuppressWarnings("unchecked")
protected ISerializerDeserializer<AUUID> uuidSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.AUUID_STRING);
@@ -333,17 +334,16 @@
protected void parseHexBinaryString(char[] input, int start, int length, DataOutput out)
throws HyracksDataException {
- byte[] newBytes = ByteArrayHexParserFactory
- .extractPointableArrayFromHexString(input, start, length, aBinary.getBytes());
+ byte[] newBytes = ByteArrayHexParserFactory.extractPointableArrayFromHexString(input, start, length,
+ aBinary.getBytes());
aBinary.setValue(newBytes, 0, newBytes.length);
binarySerde.serialize(aBinary, out);
}
protected void parseBase64BinaryString(char[] input, int start, int length, DataOutput out)
throws HyracksDataException {
- byte[] newBytes = ByteArrayBase64ParserFactory
- .extractPointableArrayFromBase64String(input, start, length,
- aBinary.getBytes(), base64ParserQuadruplet);
+ byte[] newBytes = ByteArrayBase64ParserFactory.extractPointableArrayFromBase64String(input, start, length,
+ aBinary.getBytes(), base64ParserQuadruplet);
aBinary.setValue(newBytes, 0, newBytes.length);
binarySerde.serialize(aBinary, out);
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
index e535cf4..79f30cb 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
@@ -47,19 +47,12 @@
protected final ARecordType recType;
protected final IHyracksTaskContext ctx;
protected String filename;
- protected boolean isPKAutoGenerated;
- protected int primaryKeyPosition;
- protected final ARecordType origRecTypeForAutoGeneratedPK;
- public AbstractTupleParser(IHyracksTaskContext ctx, ARecordType recType,
- boolean isPKAutoGenerated, int primaryKeyPosition, ARecordType origRecTypeForAutoGeneratedPK) throws HyracksDataException {
+ public AbstractTupleParser(IHyracksTaskContext ctx, ARecordType recType) throws HyracksDataException {
appender = new FrameTupleAppender(ctx.getFrameSize());
frame = ctx.allocateFrame();
this.recType = recType;
this.ctx = ctx;
- this.isPKAutoGenerated = isPKAutoGenerated;
- this.primaryKeyPosition = primaryKeyPosition;
- this.origRecTypeForAutoGeneratedPK = origRecTypeForAutoGeneratedPK;
}
public void setFilename(String filename) {
@@ -94,6 +87,7 @@
protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java
index 149fd0a..2ce0e61 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmSchemafullRecordParserFactory.java
@@ -27,25 +27,15 @@
public class AdmSchemafullRecordParserFactory implements ITupleParserFactory {
private static final long serialVersionUID = 1L;
-
protected ARecordType recType;
- // To deal with an auto-generated PK field
- protected boolean isPKAutoGenerated;
- protected int primaryKeyPosition;
- protected ARecordType origRecordTypeForAutoGeneratedPK;
-
- public AdmSchemafullRecordParserFactory(ARecordType recType, boolean isPKAutoGenerated, int primaryKeyPosition,
- ARecordType origRecordTypeForAutoGeneratedPK) {
+ public AdmSchemafullRecordParserFactory(ARecordType recType) {
this.recType = recType;
- this.isPKAutoGenerated = isPKAutoGenerated;
- this.primaryKeyPosition = primaryKeyPosition;
- this.origRecordTypeForAutoGeneratedPK = origRecordTypeForAutoGeneratedPK;
}
@Override
public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
- return new AdmTupleParser(ctx, recType, isPKAutoGenerated, primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
+ return new AdmTupleParser(ctx, recType);
}
}
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java
index 074a2ca..8aab2db 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AdmTupleParser.java
@@ -24,14 +24,13 @@
*/
public class AdmTupleParser extends AbstractTupleParser {
- public AdmTupleParser(IHyracksTaskContext ctx, ARecordType recType, boolean isPKAutoGenerated,
- int primaryKeyPosition, ARecordType origRecordTypeForAutoGeneratedPK) throws HyracksDataException {
- super(ctx, recType, isPKAutoGenerated, primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
+ public AdmTupleParser(IHyracksTaskContext ctx, ARecordType recType) throws HyracksDataException {
+ super(ctx, recType);
}
@Override
public IDataParser getDataParser() {
- return new ADMDataParser(filename, isPKAutoGenerated, primaryKeyPosition, origRecTypeForAutoGeneratedPK);
+ return new ADMDataParser(filename);
}
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java
index 41cc7cf..0dd9014 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataParser.java
@@ -31,7 +31,6 @@
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
@@ -43,7 +42,6 @@
protected final char quote;
protected final boolean hasHeader;
protected final ARecordType recordType;
-
private IARecordBuilder recBuilder;
private ArrayBackedValueStorage fieldValueBuffer;
private DataOutput fieldValueBufferOutput;
@@ -52,44 +50,22 @@
private byte[] fieldTypeTags;
private int[] fldIds;
private ArrayBackedValueStorage[] nameBuffers;
- private byte AUUIDTag = ATypeTag.UUID.serialize();
-
- // Variables used to set a UUID for the auto-generated PK field
- private final boolean isPKAutoGenerated;
- private final int primaryKeyPosition;
- private final ARecordType origRecordTypeForAutoGeneratedPK;
-
private boolean areAllNullFields;
private int fieldCount;
public DelimitedDataParser(ARecordType recordType, IValueParserFactory[] valueParserFactories, char fieldDelimter,
char quote, boolean hasHeader) {
- this(recordType, valueParserFactories, fieldDelimter, quote, hasHeader, false, -1, null);
- }
-
- public DelimitedDataParser(ARecordType recordType, IValueParserFactory[] valueParserFactories, char fieldDelimter,
- char quote, boolean hasHeader, boolean isPKAutoGenerated, int primaryKeyPosition,
- ARecordType origRecordTypeForAutoGeneratedPK) {
this.recordType = recordType;
this.valueParserFactories = valueParserFactories;
this.fieldDelimiter = fieldDelimter;
this.quote = quote;
this.hasHeader = hasHeader;
- this.isPKAutoGenerated = isPKAutoGenerated;
- this.primaryKeyPosition = primaryKeyPosition;
- this.origRecordTypeForAutoGeneratedPK = origRecordTypeForAutoGeneratedPK;
}
@Override
public void initialize(InputStream in, ARecordType recordType, boolean datasetRec) throws AsterixException,
IOException {
- ARecordType recordTypeToApply = null;
- if (isPKAutoGenerated)
- recordTypeToApply = origRecordTypeForAutoGeneratedPK;
- else
- recordTypeToApply = recordType;
-
valueParsers = new IValueParser[valueParserFactories.length];
for (int i = 0; i < valueParserFactories.length; ++i) {
valueParsers[i] = valueParserFactories[i].createValueParser();
@@ -97,11 +73,8 @@
fieldValueBuffer = new ArrayBackedValueStorage();
fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
-
- // If PK is auto-generated, then we need to use the recordType that
- // includes PK, since recordType variable does not include PK field.
recBuilder = new RecordBuilder();
- recBuilder.reset(recordTypeToApply);
+ recBuilder.reset(recordType);
recBuilder.init();
int n = recordType.getFieldNames().length;
@@ -138,18 +111,11 @@
while (cursor.nextField(fieldCount));
}
while (cursor.nextRecord()) {
- // If PK is auto-generated, then we need to use the recordType that
- // includes PK, since recordType variable does not include PK field.
- if (isPKAutoGenerated)
- recBuilder.reset(origRecordTypeForAutoGeneratedPK);
- else
- recBuilder.reset(recordType);
-
+ recBuilder.reset(recordType);
recBuilder.init();
areAllNullFields = true;
-
+
fieldCount = 0;
-
for (int i = 0; i < valueParsers.length; ++i) {
if (!cursor.nextField(fieldCount)) {
break;
@@ -185,29 +151,7 @@
} else {
recBuilder.addField(fldIds[i], fieldValueBuffer);
}
-
fieldCount++;
-
- }
-
- // Should not have any more fields now.
- // We parsed all fields except an auto-generated PK at this point.
- // We now create a new UUID and assign it as a PK.
- if (isPKAutoGenerated && fieldCount == origRecordTypeForAutoGeneratedPK.getFieldTypes().length - 1) {
- fieldValueBuffer.reset();
- aUUID.nextUUID();
- fieldValueBufferOutput.writeByte(AUUIDTag);
- Integer64SerializerDeserializer.INSTANCE.serialize(aUUID.getMostSignificantBits(),
- fieldValueBufferOutput);
- Integer64SerializerDeserializer.INSTANCE.serialize(aUUID.getLeastSignificantBits(),
- fieldValueBufferOutput);
- recBuilder.addField(primaryKeyPosition, fieldValueBuffer);
- areAllNullFields = false;
- } else if (isPKAutoGenerated && fieldCount >= origRecordTypeForAutoGeneratedPK.getFieldTypes().length) {
- // If we have all fields in the file including auto-generated PK,
- // throw an exception
- throw new AsterixException("At line: " + cursor.lineCount
- + " - check number of fields. Auto-generated PK field should not exist in the input data.");
}
if (!areAllNullFields) {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
index 798b510..42e336d 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/DelimitedDataTupleParser.java
@@ -28,13 +28,9 @@
private final DelimitedDataParser dataParser;
public DelimitedDataTupleParser(IHyracksTaskContext ctx, ARecordType recType,
- IValueParserFactory[] valueParserFactories,
- char fieldDelimter, char quote, boolean hasHeader,
- boolean isPKAutoGenerated, int primaryKeyPosition,
- ARecordType origRecordTypeForAutoGeneratedPK) throws HyracksDataException {
- super(ctx, recType, isPKAutoGenerated, primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
- dataParser = new DelimitedDataParser(recType, valueParserFactories, fieldDelimter, quote, hasHeader, isPKAutoGenerated,
- primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
+ IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote, boolean hasHeader) throws HyracksDataException {
+ super(ctx, recType);
+ dataParser = new DelimitedDataParser(recType, valueParserFactories, fieldDelimter, quote, hasHeader);
}
@Override
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
index 137067d..0e85fd2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/NtDelimitedDataTupleParserFactory.java
@@ -34,32 +34,19 @@
protected final char quote;
// whether delimited text file has a header (which should be ignored)
protected final boolean hasHeader;
- // To deal with an auto-generated PK
- protected final boolean isPKAutoGenerated;
- protected final int primaryKeyPosition;
- protected final ARecordType origRecordTypeForAutoGeneratedPK;
public NtDelimitedDataTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, char quote, boolean hasHeader) {
- this(recordType, valueParserFactories, fieldDelimiter, quote, hasHeader, false, -1, null);
- }
-
- public NtDelimitedDataTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, char quote, boolean hasHeader, boolean isPKAutoGenerated, int primaryKeyposition,
- ARecordType origRecordTypeForAutoGeneratedPK) {
+ char fieldDelimiter, char quote, boolean hasHeader) {
this.recordType = recordType;
this.valueParserFactories = valueParserFactories;
this.fieldDelimiter = fieldDelimiter;
this.quote = quote;
this.hasHeader = hasHeader;
- this.isPKAutoGenerated = isPKAutoGenerated;
- this.primaryKeyPosition = primaryKeyposition;
- this.origRecordTypeForAutoGeneratedPK = origRecordTypeForAutoGeneratedPK;
}
@Override
public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
return new DelimitedDataTupleParser(ctx, recordType, valueParserFactories, fieldDelimiter, quote,
- hasHeader, isPKAutoGenerated, primaryKeyPosition, origRecordTypeForAutoGeneratedPK);
+ hasHeader);
}
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 71ff53a..17d989b 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -79,10 +79,10 @@
}
@Override
- public void configure(Map<String, String> configuration, ARecordType outputType, boolean isPKAutoGenerated, List<String> primaryKeys) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
outputType = (ARecordType) outputType;
- this.configureFormat(outputType, false, -1, null);
+ this.configureFormat(outputType);
this.configureSockets(configuration);
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 60cf924..40342cd 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -38,7 +38,7 @@
public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration,
FileSystemBasedAdapter coreAdapter, String format, ITupleParserFactory parserFactory,
IHyracksTaskContext ctx) throws Exception {
- super(parserFactory, atype, ctx, false, -1);
+ super(parserFactory, atype, ctx);
this.coreAdapter = coreAdapter;
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 8f62320..f282d3b 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -102,8 +102,7 @@
}
@Override
- public void configure(Map<String, String> configuration, ARecordType recordType, boolean isPKAutoGenerated,
- List<String> primaryKeys) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType recordType) throws Exception {
this.configuration = configuration;
checkRequiredArgs(configuration);
String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
@@ -117,7 +116,7 @@
}
format = configuration.get(KEY_FORMAT);
adapterFactory = (IGenericAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
- adapterFactory.configure(configuration, recordType, isPKAutoGenerated, primaryKeys);
+ adapterFactory.configure(configuration, recordType);
atype = (ARecordType) recordType;
configureFormat();
@@ -210,8 +209,7 @@
break;
case DELIMITED_DATA:
dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter,
- quote, hasHeader, false, -1,
- null);
+ quote, hasHeader);
break;
}
return new RateControlledTupleParser(ctx, recordType, dataParser, configuration);
@@ -230,7 +228,7 @@
public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
Map<String, String> configuration) throws HyracksDataException {
- super(ctx, recType, false, -1, null);
+ super(ctx, recType);
this.dataParser = dataParser;
String propValue = configuration.get(INTER_TUPLE_INTERVAL);
if (propValue != null) {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java
index 2827610..24a89dd 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SocketClientAdapterFactory.java
@@ -104,6 +104,6 @@
}
fileSplits = fileSplitsValue.trim().split(",");
genericSocketAdapterFactory = new GenericSocketFeedAdapterFactory();
- genericSocketAdapterFactory.configure(configuration, outputType, false, null);
+ genericSocketAdapterFactory.configure(configuration, outputType);
}
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 51fe45f..f2f730f 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -73,7 +73,7 @@
public void configure(Map<String, String> configuration) throws Exception {
configuration.put(KEY_FORMAT, FORMAT_ADM);
this.configuration = configuration;
- this.configureFormat(initOutputType(), false, -1, null);
+ this.configureFormat(initOutputType());
}
@Override