add R-Tree support for external indexes and indexes optimizations
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
index b8125aa..8af4ac1 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -44,6 +45,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataAccessByRIDOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -196,10 +198,19 @@
UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset, recordType,
chosenIndex, assignSearchKeys, jobGenParams, context, false, retainInput);
// Generate the rest of the upstream plan which feeds the search results into the primary index.
- UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset,
- recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
+ if(dataset.getDatasetType() == DatasetType.EXTERNAL)
+ {
+ ExternalDataAccessByRIDOperator externalDataAccessOp = AccessMethodUtils.createExternalDataAccessByRIDUnnestMap(dataSourceScan, dataset,
+ recordType, secondaryIndexUnnestOp, context, chosenIndex);
+ return externalDataAccessOp;
+ }
+ else
+ {
+ UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset,
+ recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
- return primaryIndexUnnestOp;
+ return primaryIndexUnnestOp;
+ }
}
@Override
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 2198d67..855ee19 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -590,6 +590,18 @@
//#. create the index artifact in NC.
runJob(hcc, spec, true);
+ //if external data and optimization is turned on, load file names
+ if(ds.getDatasetType() == DatasetType.EXTERNAL && AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ //load the file names into external files index
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ IndexOperations.addExternalDatasetFilesToMetadata(metadataProvider, ds);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ }
+
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -654,6 +666,8 @@
throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
+ "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
}
+
+ //if external dataset, remove external files from metadata
}
throw e;
} finally {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 7bd6c69..e2767cd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -14,13 +14,17 @@
*/
package edu.uci.ics.asterix.file;
+import java.util.ArrayList;
+
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -53,6 +57,17 @@
metadataProvider, physicalOptimizationConfig);
return secondaryIndexCreator.buildLoadingJobSpec();
}
+
+ public static void addExternalDatasetFilesToMetadata(AqlMetadataProvider metadataProvider,
+ Dataset dataset) throws AlgebricksException, MetadataException{
+ //get the file list
+ ArrayList<ExternalFile> files = metadataProvider.getExternalDatasetFiles(dataset);
+ //add files to metadata
+ for(int i=0; i < files.size(); i++)
+ {
+ MetadataManager.INSTANCE.addExternalFile(metadataProvider.getMetadataTxnContext(), files.get(i));
+ }
+ }
public static JobSpecification buildDropSecondaryIndexJobSpec(CompiledIndexDropStatement indexDropStmt,
AqlMetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 0d1741a..e55ae5a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -28,6 +28,7 @@
import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory;
import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -171,7 +172,7 @@
//get adapter name
String adapter = edsd.getAdapter();
//if not an hdfs adapter, throw an exception
- if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapterFactory.HDFS_ADAPTER_NAME))
+ if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName()))
{
throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
}
@@ -236,10 +237,17 @@
}
}
- //second add RID fields (File name and byte location)
- fieldsNames[numSecondaryKeys] = "_file-name";
- fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
-
+ //second add RID fields (File name or number and byte location)
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ fieldsNames[numSecondaryKeys] = "_file-number";
+ fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+ }
+ else
+ {
+ fieldsNames[numSecondaryKeys] = "_file-name";
+ fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+ }
fieldsNames[numSecondaryKeys+1] = "_byte-location";
fieldsTypes[numSecondaryKeys+1] = BuiltinType.AINT64;
@@ -258,21 +266,26 @@
String[] fieldsNames = new String[externalItemType.getFieldNames().length+numPrimaryKeys];
IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length+numPrimaryKeys];
- //add RID fields names
- fieldsNames[0] = "_file-name";
+ //add RID fields names and types
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ fieldsNames[0] = "_file-number";
+ fieldsTypes[0] = BuiltinType.AINT32;
+ }
+ else
+ {
+ fieldsNames[0] = "_file-name";
+ fieldsTypes[0] = BuiltinType.ASTRING;
+ }
fieldsNames[1] = "_byte-location";
-
- //add RID types
- fieldsTypes[0] = BuiltinType.ASTRING;
fieldsTypes[1] = BuiltinType.AINT64;
-
-
if(numPrimaryKeys == 3)
{
//add the row number for rc files
fieldsNames[2] = "_row-number";
fieldsTypes[2] = BuiltinType.AINT32;
}
+
//add the original fields names and types
for(int i=0; i < externalItemType.getFieldNames().length; i++)
{
@@ -291,7 +304,14 @@
primaryBloomFilterKeyFields = new int[numPrimaryKeys];
ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
- primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
+ }
+ else
+ {
+ primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
+ }
primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT64, true);
primaryBloomFilterKeyFields[0]=0;
@@ -396,8 +416,16 @@
secondaryBloomFilterKeyFields[i] = i;
}
- secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
- itemType, "_file-name", 0);
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, "_file-number", 0);
+ }
+ else
+ {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, "_file-name", 0);
+ }
secondaryFieldAccessEvalFactories[numSecondaryKeys+1] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
itemType, "_byte-location", 0);
if(numPrimaryKeys == 3)
@@ -438,7 +466,7 @@
}
protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(JobSpecification spec) throws Exception {
- Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, (ExternalDatasetDetails)dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
+ Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
indexingOpAndConstraints.second);
return indexingOpAndConstraints;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 17590c5..70124e8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -294,4 +294,10 @@
storageProperties.getBloomFilterFalsePositiveRate());
}
}
+
+ @Override
+ protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+ throw new AsterixException("Cannot create inverted index on external dataset due to composite RID Fields.");
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 17632aa..cc8eb1b 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -18,34 +18,45 @@
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.util.ExternalIndexHashPartitionComputerFactory;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -148,8 +159,121 @@
}
@Override
+ protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+ secondaryKeyFields = createIndexStmt.getKeyFields();
+ if (numSecondaryKeys != 1) {
+ throw new AsterixException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ }
+ Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+ IAType spatialType = spatialTypePair.first;
+ anySecondaryKeyIsNullable = spatialTypePair.second;
+ if (spatialType == null) {
+ throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+ }
+ int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ numNestedSecondaryKeyFields = numDimensions * 2;
+ secondaryFieldAccessEvalFactories = metadataProvider.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
+ numPrimaryKeys, numDimensions);
+ secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ + numNestedSecondaryKeyFields];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+ keyType = nestedKeyType.getTypeTag();
+ for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, true);
+ secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+
+ // Add serializers and comparators for primary index fields.
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
+ }
+ secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+ }
+
+ @Override
public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> RIDScanOpAndConstraints;
+ AlgebricksMetaOperatorDescriptor asterixAssignOp;
+ try
+ {
+ //create external indexing scan operator
+ RIDScanOpAndConstraints = createExternalIndexingOp(spec);
+ //create assign operator
+ asterixAssignOp = createExternalAssignOp(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
+ RIDScanOpAndConstraints.second);
+ }
+ catch(Exception e)
+ {
+ throw new AsterixException("Failed to create external index scanning and loading job");
+ }
+
+ // If any of the secondary fields are nullable, then add a select op that filters nulls.
+ AlgebricksMetaOperatorDescriptor selectOp = null;
+ if (anySecondaryKeyIsNullable) {
+ selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys,RIDScanOpAndConstraints.second);
+ }
+
+ // Create secondary RTree bulk load op.
+ AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
+ spec,
+ numNestedSecondaryKeyFields,
+ new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
+ AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ keyType, secondaryComparatorFactories.length), storageProperties
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+ // Connect the operators.
+ // Create a hash partitioning connector
+ ExternalDatasetDetails edsd = (ExternalDatasetDetails)dataset.getDatasetDetails();
+ IBinaryHashFunctionFactory[] hashFactories = null;
+ if(edsd.getProperties().get(HDFSAdapterFactory.KEY_INPUT_FORMAT).trim().equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
+ {
+ hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+ }
+ else
+ {
+ hashFactories = DatasetUtils.computeExternalDataKeysBinaryHashFunFactories(dataset, NonTaggedDataFormat.INSTANCE.getBinaryHashFunctionFactoryProvider());
+ }
+ //select partitioning keys (always the first 2 after secondary keys)
+ int[] keys = new int[2];
+ keys[0] = numSecondaryKeys;
+ keys[1] = numSecondaryKeys + 1;
+
+ IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
+ new ExternalIndexHashPartitionComputerFactory(keys, hashFactories));
+ spec.connect(new OneToOneConnectorDescriptor(spec), RIDScanOpAndConstraints.first, 0, asterixAssignOp, 0);
+ if (anySecondaryKeyIsNullable) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+ spec.connect(hashConn, selectOp, 0, secondaryBulkLoadOp, 0);
+ } else {
+ spec.connect(hashConn, asterixAssignOp, 0, secondaryBulkLoadOp, 0);
+ }
+ spec.addRoot(secondaryBulkLoadOp);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+ else
+ {
// Create dummy key provider for feeding the primary index scan.
AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
@@ -193,5 +317,6 @@
spec.addRoot(secondaryBulkLoadOp);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
+ }
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ccab0f4..7e63e84 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -116,7 +116,7 @@
@Override
public IControlledAdapter createAccessByRIDAdapter(
- Map<String, Object> configuration, IAType atype) throws Exception {
+ Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception {
Configuration conf = configureHadoopConnection(configuration);
clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
@@ -129,19 +129,19 @@
char delimeter = 0x01;
configuration.put(KEY_FORMAT, FORMAT_DELIMITED_TEXT);
configuration.put(KEY_DELIMITER, Character.toString(delimeter));
- ridRecordDesc = getRIDRecDesc(true);
+ ridRecordDesc = getRIDRecDesc(true, files != null);
}
else
{
- ridRecordDesc = getRIDRecDesc(false);
+ ridRecordDesc = getRIDRecDesc(false, files != null);
}
- HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf);
+ HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf, files);
adapter.configure(configuration);
return adapter;
}
@Override
- public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
if (!setup) {
/** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
configureJobConf(configuration);
@@ -170,7 +170,7 @@
configuration.put(KEY_FORMAT, FORMAT_DELIMITED_TEXT);
configuration.put(KEY_DELIMITER, Character.toString(delimeter));
}
- HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+ HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations, files);
hdfsIndexingAdapter.configure(configuration);
return hdfsIndexingAdapter;
}
@@ -199,7 +199,7 @@
return conf;
}
- public static RecordDescriptor getRIDRecDesc(boolean isRCFile){
+ public static RecordDescriptor getRIDRecDesc(boolean isRCFile, boolean optimize){
int numOfPrimaryKeys = 2;
if(isRCFile)
{
@@ -208,8 +208,16 @@
@SuppressWarnings("rawtypes")
ISerializerDeserializer[] serde = new ISerializerDeserializer[numOfPrimaryKeys];
ITypeTraits[] tt = new ITypeTraits[numOfPrimaryKeys];
- serde[0] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.ASTRING);
- tt[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ASTRING);
+ if(optimize)
+ {
+ serde[0] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.AINT32);
+ tt[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.AINT32);
+ }
+ else
+ {
+ serde[0] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.ASTRING);
+ tt[0] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ASTRING);
+ }
serde[1] = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(BuiltinType.AINT64);
tt[1] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.AINT64);
if(isRCFile)
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index e4a1570..64c8153 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -86,7 +86,7 @@
@Override
- public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception {
Configuration conf = HDFSAdapterFactory.configureHadoopConnection(configuration);
clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
//Create RID record desc
@@ -98,20 +98,20 @@
char delimeter = 0x01;
configuration.put(KEY_FORMAT, FORMAT_DELIMITED_TEXT);
configuration.put(KEY_DELIMITER, Character.toString(delimeter));
- ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(true);
+ ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(true, files != null);
}
else
{
- ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(false);
+ ridRecordDesc = HDFSAdapterFactory.getRIDRecDesc(false, files != null);
}
- HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf);
+ HDFSAccessByRIDAdapter adapter = new HDFSAccessByRIDAdapter(atype, ((String)configuration.get(KEY_INPUT_FORMAT)), clusterLocations,ridRecordDesc, conf, files);
adapter.configure(configuration);
return adapter;
}
@Override
public IDatasourceAdapter createIndexingAdapter(
- Map<String, Object> configuration, IAType atype) throws Exception {
+ Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
if (!setup) {
/** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
configureJobConf(configuration);
@@ -133,7 +133,7 @@
}
JobConf conf = confFactory.getConf();
InputSplit[] inputSplits = inputSplitsFactory.getSplits();
- HiveIndexingAdapter hiveIndexingAdapter = new HiveIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+ HiveIndexingAdapter hiveIndexingAdapter = new HiveIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations, files);
//If input format is rcfile, configure parser expected format to delimeted text with 0x01 (default ) as delimiter
if(((String)configuration.get(KEY_INPUT_FORMAT)).equals(INPUT_FORMAT_RC))
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 75d972e..f046f88 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.HashMap;
import java.util.Map;
import edu.uci.ics.asterix.external.dataset.adapter.IControlledAdapter;
@@ -52,7 +53,7 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
+ public IDatasourceAdapter createIndexingAdapter(Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception;
/**
* Creates an instance of IDatasourceAdapter that is used to read records using their RIDs.
@@ -65,5 +66,5 @@
* @return An instance of IControlledAdapter.
* @throws Exception
*/
- public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
+ public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 3dc6cc8..4fae7e7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.HashMap;
import java.util.Map;
import edu.uci.ics.asterix.external.dataset.adapter.IControlledAdapter;
@@ -45,14 +46,14 @@
@Override
public IDatasourceAdapter createIndexingAdapter(
- Map<String, Object> configuration, IAType atype) throws Exception {
+ Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
throw new NotImplementedException("Indexing Adapter is not implemented for NC FileSystem Data");
}
@Override
- public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ public IControlledAdapter createAccessByRIDAdapter(Map<String, Object> configuration, IAType atype, HashMap<Integer, String> files) throws Exception {
throw new NotImplementedException("Access by RID Adapter is not implemented for NC FileSystem Data");
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java
index 36968ef..aa91a56 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataAccessByRIDOperatorDescriptor.java
@@ -1,6 +1,7 @@
package edu.uci.ics.asterix.external.data.operator;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.Map;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
@@ -25,15 +26,17 @@
private final IAType atype;
private IGenericDatasetAdapterFactory datasourceAdapterFactory;
private IControlledAdapter adapter;
+ private final HashMap<Integer, String> files;
public ExternalDataAccessByRIDOperatorDescriptor(
IOperatorDescriptorRegistry spec, Map<String, Object> arguments, IAType atype,
- RecordDescriptor outRecDesc,IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
+ RecordDescriptor outRecDesc,IGenericDatasetAdapterFactory dataSourceAdapterFactory, HashMap<Integer, String> files) {
super(spec, 1, 1);
this.atype = atype;
this.adapterConfiguration = arguments;
this.datasourceAdapterFactory = dataSourceAdapterFactory;
this.recordDescriptors[0] = outRecDesc;
+ this.files = files;
}
@Override
@@ -45,7 +48,7 @@
public void open() throws HyracksDataException {
//create the access by index adapter
try {
- adapter = datasourceAdapterFactory.createAccessByRIDAdapter(adapterConfiguration, atype);
+ adapter = datasourceAdapterFactory.createAccessByRIDAdapter(adapterConfiguration, atype, files);
adapter.initialize(ctx);
} catch (Exception e) {
throw new HyracksDataException("error during creation of external read by RID adapter", e);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java
index 436c7cc..9ff1f06 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataIndexingOperatorDescriptor.java
@@ -24,16 +24,18 @@
private static final long serialVersionUID = 1L;
private final Map<String, Object> adapterConfiguration;
+ private final Map<String,Integer> files;
private final IAType atype;
private IGenericDatasetAdapterFactory datasourceAdapterFactory;
public ExternalDataIndexingOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
- RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
+ RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory, Map<String,Integer> files) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
this.adapterConfiguration = arguments;
this.atype = atype;
this.datasourceAdapterFactory = dataSourceAdapterFactory;
+ this.files = files;
}
@Override
@@ -48,7 +50,7 @@
IDatasourceAdapter adapter = null;
try {
adapter = ((IGenericDatasetAdapterFactory) datasourceAdapterFactory).createIndexingAdapter(
- adapterConfiguration, atype);
+ adapterConfiguration, atype, files);
adapter.initialize(ctx);
adapter.start(partition, writer);
} catch (Exception e) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java
index fd846f2..86a060c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAccessByRIDAdapter.java
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.LongWritable;
@@ -31,17 +32,13 @@
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFile.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.log4j.Logger;
-
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.base.AInt64;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.operators.file.ControlledADMTupleParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.ControlledDelimitedDataTupleParserFactory;
@@ -51,12 +48,8 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -70,29 +63,26 @@
public class HDFSAccessByRIDAdapter extends FileSystemBasedAdapter implements IControlledAdapter{
private static final long serialVersionUID = 1L;
- private transient AlgebricksPartitionConstraint clusterLocations;
private boolean newFrame;
private transient ByteBuffer frameBuffer;
private String inputFormat;
private Configuration conf;
private transient FileSystem fs;
private RecordDescriptor inRecDesc;
+ private final HashMap<Integer, String> files;
- public HDFSAccessByRIDAdapter(IAType atype, String inputFormat, AlgebricksPartitionConstraint clusterLocations, RecordDescriptor inRecDesc, Configuration conf) {
+ public HDFSAccessByRIDAdapter(IAType atype, String inputFormat, AlgebricksPartitionConstraint clusterLocations, RecordDescriptor inRecDesc, Configuration conf, HashMap<Integer,String> files) {
super(atype);
- this.clusterLocations = clusterLocations;
this.inputFormat = inputFormat;
this.conf = conf;
this.inRecDesc = inRecDesc;
+ this.files = files;
}
@Override
public void configure(Map<String, Object> arguments) throws Exception {
this.configuration = arguments;
fs = FileSystem.get(conf);
- //set up the parser factory here for now -> when everything works, make it professional
- //The one below doesn't work for this one
- //configureFormat();
String specifiedFormat = (String) configuration.get(KEY_FORMAT);
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
@@ -147,8 +137,6 @@
return AdapterType.READ;
}
- //modefy the initialize function
- //add the initial set up for the adapter
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
@@ -157,144 +145,108 @@
((ControlledTupleParser)parser).initialize(getInputStream(0));
}
- @SuppressWarnings("unchecked")
@Override
public InputStream getInputStream(int partition) throws IOException {
- //different input stream implementation based on the input format
- if(inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
- {
- return new InputStream() {
- private RCFile.Reader reader;
- private int rowDifference;
- private String lastFileName = "";
- private String newFileName;
- private long lastByteLocation = 0;
- private long newByteLocation = 0;
- private int lastRowNumber = 0;
- private int newRowNumber = 0;
- private LongWritable key;
- private BytesRefArrayWritable value;
- private int EOL = "\n".getBytes()[0];
- private byte delimiter = 0x01;
- private boolean pendingValue = false;
- private int currentTupleIdx;
- private int numberOfTuplesInCurrentFrame;
- private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
- private ByteBufferInputStream bbis = new ByteBufferInputStream();
- private DataInputStream dis = new DataInputStream(bbis);
+ //if files map is not null, then it is optimized and we should return optimized inputStream, else return regular
+ if(files == null)
+ {
- @Override
- public void close()
- {
- if (reader != null)
+ //different input stream implementation based on the input format
+ if(inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
+ {
+ return new InputStream() {
+ private RCFile.Reader reader;
+ private int rowDifference;
+ private String lastFileName = "";
+ private String newFileName;
+ private long lastByteLocation = 0;
+ private long newByteLocation = 0;
+ private int lastRowNumber = 0;
+ private int newRowNumber = 0;
+ private LongWritable key;
+ private BytesRefArrayWritable value;
+ private int EOL = "\n".getBytes()[0];
+ private byte delimiter = 0x01;
+ private boolean pendingValue = false;
+ private int currentTupleIdx;
+ private int numberOfTuplesInCurrentFrame;
+ private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+ private ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public void close()
{
- reader.close();
- }
- try {
- super.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if(newFrame)
- {
- //first time called with this frame
- //reset frame buffer
- tupleAccessor.reset(frameBuffer);
- //get number of tuples in frame
- numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
- //set tuple index to first tuple
- currentTupleIdx = 0;
- //set new frame to false
- newFrame = false;
- pendingValue = false;
- }
-
- //check and see if there is a pending value
- //Double check this
- int numBytes = 0;
- if (pendingValue) {
- //last value didn't fit into buffer
- int sizeOfNextTuple = getTupleSize(value) + 1;
- if(sizeOfNextTuple > len)
+ if (reader != null)
{
- return 0;
+ reader.close();
}
- copyCurrentTuple(buffer, offset + numBytes);
- buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- //set pending to false
- pendingValue = false;
- //move to next tuple
- currentTupleIdx++;
+ try {
+ super.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- //No pending value or done with pending value
- //check if there are more tuples in the frame
- while(currentTupleIdx < numberOfTuplesInCurrentFrame)
- {
- //get 3 things from the current tuple in the frame(File name, byte location and row number)
- //get the fileName
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
- newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
- //check if it is a new file
- if(!lastFileName.equals(newFileName))//stringBuilder.toString()))
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if(newFrame)
{
- //new file
- lastFileName = newFileName;
- //close old file
- if(reader != null)
- {
- reader.close();
- }
- //open new file
- reader = new Reader(fs, new Path(lastFileName), conf);
- //read and save byte location
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
- lastByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- //seek
- reader.seek(lastByteLocation);
- //read and save rowNumber
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
- lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
- //loop until row
- for(int i=0; i < lastRowNumber; i++)
- {
- //this loop perform a single I/O and move to the next record in the block which is already in memory
- //if no more records in the current block, it perform another I/O and get the next block
- //<this should never happen here>
- reader.next(key);
- }
- //read record
- reader.getCurrentRow(value);
- //copy it to the buffer if there is enough space
+ //first time called with this frame
+ //reset frame buffer
+ tupleAccessor.reset(frameBuffer);
+ //get number of tuples in frame
+ numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+ //set tuple index to first tuple
+ currentTupleIdx = 0;
+ //set new frame to false
+ newFrame = false;
+ pendingValue = false;
+ }
+
+ //check and see if there is a pending value
+ //Double check this
+ int numBytes = 0;
+ if (pendingValue) {
+ //last value didn't fit into buffer
int sizeOfNextTuple = getTupleSize(value) + 1;
- if(sizeOfNextTuple + numBytes > len)
+ if(sizeOfNextTuple > len)
{
- //mark waiting value
- pendingValue = true;
- return numBytes;
+ return 0;
}
copyCurrentTuple(buffer, offset + numBytes);
buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
numBytes += sizeOfNextTuple;
+ //set pending to false
+ pendingValue = false;
+ //move to next tuple
+ currentTupleIdx++;
}
- else
- {
- //same file
- //get the byte location
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
- newByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- //check if same block
- if(lastByteLocation != newByteLocation)
+ //No pending value or done with pending value
+ //check if there are more tuples in the frame
+ while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+ {
+ //get 3 things from the current tuple in the frame(File name, byte location and row number)
+ //get the fileName
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+ newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
+ //check if it is a new file
+ if(!lastFileName.equals(newFileName))//stringBuilder.toString()))
{
- //new block
- lastByteLocation = newByteLocation;
+ //new file
+ lastFileName = newFileName;
+ //close old file
+ if(reader != null)
+ {
+ reader.close();
+ }
+ //open new file
+ reader = new Reader(fs, new Path(lastFileName), conf);
+ //read and save byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ lastByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
//seek
reader.seek(lastByteLocation);
//read and save rowNumber
@@ -303,6 +255,9 @@
//loop until row
for(int i=0; i < lastRowNumber; i++)
{
+ //this loop perform a single I/O and move to the next record in the block which is already in memory
+ //if no more records in the current block, it perform another I/O and get the next block
+ //<this should never happen here>
reader.next(key);
}
//read record
@@ -321,25 +276,497 @@
}
else
{
- //same block
- //get the row number
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
- newRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+ //same file
+ //get the byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ newByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- //calculate row difference
- rowDifference = newRowNumber - lastRowNumber;
-
- //update last row number
- lastRowNumber = newRowNumber;
-
- //move to the new row
- for(int i=0; i < rowDifference; i++)
+ //check if same block
+ if(lastByteLocation != newByteLocation)
{
+ //new block
+ lastByteLocation = newByteLocation;
+ //seek
+ reader.seek(lastByteLocation);
+ //read and save rowNumber
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+ lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+ //loop until row
+ for(int i=0; i < lastRowNumber; i++)
+ {
+ reader.next(key);
+ }
+ //read record
+ reader.getCurrentRow(value);
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = true;
+ return numBytes;
+ }
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ else
+ {
+ //same block
+ //get the row number
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+ newRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+
+ //calculate row difference
+ rowDifference = newRowNumber - lastRowNumber;
+
+ //update last row number
+ lastRowNumber = newRowNumber;
+
+ //move to the new row
+ for(int i=0; i < rowDifference; i++)
+ {
+ reader.next(key);
+ }
+ //read record
+ reader.getCurrentRow(value);
+
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = true;
+ return numBytes;
+ }
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ }
+ //move to next tuple
+ currentTupleIdx++;
+ }
+ //no more tuples in frame
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+
+ private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+ int rcOffset = 0;
+ for(int i=0; i< value.size(); i++)
+ {
+ System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+ rcOffset += value.get(i).getLength() + 1;
+ buffer[rcOffset - 1] = delimiter;
+ }
+ }
+
+ private int getTupleSize(BytesRefArrayWritable value2) {
+ int size=0;
+ //loop over rc column and add lengths
+ for(int i=0; i< value.size(); i++)
+ {
+ size += value.get(i).getLength();
+ }
+ //add delimeters bytes sizes
+ size += value.size() -1;
+ return size;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+ };
+ }
+ else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT))
+ {
+ return new InputStream() {
+ private FSDataInputStream reader;
+ private String lastFileName = "";
+ private String newFileName;
+ private int EOL = "\n".getBytes()[0];
+ private int currentTupleIdx;
+ private int numberOfTuplesInCurrentFrame;
+ private long byteLocation;
+ private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+ private String value;
+ private String pendingValue = null;
+ private ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if(newFrame)
+ {
+ //first time called with this frame
+ //reset frame buffer
+ tupleAccessor.reset(frameBuffer);
+ //get number of tuples in frame
+ numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+ //set tuple index to first tuple
+ currentTupleIdx = 0;
+ //set new frame to false
+ newFrame = false;
+ }
+
+ //check and see if there is a pending value
+ int numBytes = 0;
+ if (pendingValue != null) {
+ //last value didn't fit into buffer
+ int sizeOfNextTuple = pendingValue.length() + 1;
+ if(sizeOfNextTuple > len)
+ {
+ return 0;
+ }
+ //there is enough space
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.length());
+ buffer[offset + numBytes + pendingValue.length()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ //set pending to false
+ pendingValue = null;
+ //move to next tuple
+ currentTupleIdx++;
+ }
+
+ //No pending value or done with pending value
+ //check if there are more tuples in the frame
+ while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+ {
+ //get the fileName
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+ newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
+ //check if it is a new file
+ if(!lastFileName.equals(newFileName))
+ {
+ //new file
+ lastFileName = newFileName;
+ //close old file
+ if(reader != null)
+ {
+ reader.close();
+ }
+ //open new file
+ reader = fs.open(new Path(lastFileName));
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ value = reader.readLine();
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.length() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+ buffer[offset + numBytes + value.length()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ else
+ {
+ //same file, just seek and read
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ value = reader.readLine();
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.length() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+ buffer[offset + numBytes + value.length()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ currentTupleIdx++;
+ }
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ @Override
+ public void close(){
+ try {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ super.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ };
+ }
+ else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))
+ {
+ return new InputStream() {
+ private SequenceFile.Reader reader;
+ private Writable key;
+ private Text value;
+ private String lastFileName = "";
+ private String newFileName;
+ private long byteLocation;
+ private int EOL = "\n".getBytes()[0];
+ private int currentTupleIdx;
+ private int numberOfTuplesInCurrentFrame;
+ private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+ private Text pendingValue = null;
+ private ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+
+ if(newFrame)
+ {
+ //first time called with this frame
+ //reset frame buffer
+ tupleAccessor.reset(frameBuffer);
+ //get number of tuples in frame
+ numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+ //set tuple index to first tuple
+ currentTupleIdx = 0;
+ //set new frame to false
+ newFrame = false;
+ }
+
+ //check and see if there is a pending value
+ //Double check this
+ int numBytes = 0;
+ if (pendingValue != null) {
+ //last value didn't fit into buffer
+ int sizeOfNextTuple = pendingValue.getLength() + 1;
+ if(sizeOfNextTuple > len)
+ {
+ return 0;
+ }
+ //there is enough space
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+ buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ //set pending to false
+ pendingValue = null;
+ //move to next tuple
+ currentTupleIdx++;
+ }
+
+ //No pending value or done with pending value
+ //check if there are more tuples in the frame
+ while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+ {
+ //get the fileName]
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+ newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
+ //check if it is a new file
+ if(!lastFileName.equals(newFileName))
+ {
+ //new file
+ lastFileName = newFileName;
+ //close old file
+ if(reader != null)
+ {
+ reader.close();
+ }
+ //open new file
+ reader = new SequenceFile.Reader(fs,new Path(lastFileName),conf);
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ reader.next(key, value);
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.getLength() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ else
+ {
+ //same file, just seek and read
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ reader.next(key, value);
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.getLength() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ currentTupleIdx++;
+ }
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ @Override
+ public void close(){
+ try {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ super.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ }
+ //unknow format
+ throw new IOException("Unknown input format");
+ }
+ else
+ {
+ //optimized
+ //different input stream implementation based on the input format
+ if(inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC))
+ {
+ return new InputStream() {
+ private RCFile.Reader reader;
+ private int rowDifference;
+ private int lastFileNumber = -1;
+ private int newFileNumber = 0;
+ private long lastByteLocation = 0;
+ private long newByteLocation = 0;
+ private int lastRowNumber = 0;
+ private int newRowNumber = 0;
+ private LongWritable key;
+ private BytesRefArrayWritable value;
+ private int EOL = "\n".getBytes()[0];
+ private byte delimiter = 0x01;
+ private boolean pendingValue = false;
+ private int currentTupleIdx;
+ private int numberOfTuplesInCurrentFrame;
+ private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+ private ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public void close()
+ {
+ if (reader != null)
+ {
+ reader.close();
+ }
+ try {
+ super.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if(newFrame)
+ {
+ //first time called with this frame
+ //reset frame buffer
+ tupleAccessor.reset(frameBuffer);
+ //get number of tuples in frame
+ numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+ //set tuple index to first tuple
+ currentTupleIdx = 0;
+ //set new frame to false
+ newFrame = false;
+ pendingValue = false;
+ }
+
+ //check and see if there is a pending value
+ //Double check this
+ int numBytes = 0;
+ if (pendingValue) {
+ //last value didn't fit into buffer
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if(sizeOfNextTuple > len)
+ {
+ return 0;
+ }
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ //set pending to false
+ pendingValue = false;
+ //move to next tuple
+ currentTupleIdx++;
+ }
+
+ //No pending value or done with pending value
+ //check if there are more tuples in the frame
+ while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+ {
+ //get 3 things from the current tuple in the frame(File name, byte location and row number)
+ //get the fileName
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+ newFileNumber = ((AInt32) inRecDesc.getFields()[0].deserialize(dis)).getIntegerValue();
+ //check if it is a new file
+ if(lastFileNumber != newFileNumber)
+ {
+ //new file
+ lastFileNumber = newFileNumber;
+ //close old file
+ if(reader != null)
+ {
+ reader.close();
+ }
+ //open new file
+ reader = new Reader(fs, new Path(files.get(newFileNumber)), conf);
+ //read and save byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ lastByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(lastByteLocation);
+ //read and save rowNumber
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+ lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+ //loop until row
+ for(int i=0; i < lastRowNumber; i++)
+ {
+ //this loop perform a single I/O and move to the next record in the block which is already in memory
+ //if no more records in the current block, it perform another I/O and get the next block
+ //<this should never happen here>
reader.next(key);
}
//read record
reader.getCurrentRow(value);
-
//copy it to the buffer if there is enough space
int sizeOfNextTuple = getTupleSize(value) + 1;
if(sizeOfNextTuple + numBytes > len)
@@ -352,316 +779,387 @@
buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
numBytes += sizeOfNextTuple;
}
- }
- //move to next tuple
- currentTupleIdx++;
- }
- //no more tuples in frame
- return (numBytes == 0) ? -1 : numBytes;
- }
+ else
+ {
+ //same file
+ //get the byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ newByteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
- int rcOffset = 0;
- for(int i=0; i< value.size(); i++)
- {
- System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
- rcOffset += value.get(i).getLength() + 1;
- buffer[rcOffset - 1] = delimiter;
- }
- }
+ //check if same block
+ if(lastByteLocation != newByteLocation)
+ {
+ //new block
+ lastByteLocation = newByteLocation;
+ //seek
+ reader.seek(lastByteLocation);
+ //read and save rowNumber
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+ lastRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
+ //loop until row
+ for(int i=0; i < lastRowNumber; i++)
+ {
+ reader.next(key);
+ }
+ //read record
+ reader.getCurrentRow(value);
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = true;
+ return numBytes;
+ }
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ else
+ {
+ //same block
+ //get the row number
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 2));
+ newRowNumber = ((AInt32)(inRecDesc.getFields()[2].deserialize(dis))).getIntegerValue();
- private int getTupleSize(BytesRefArrayWritable value2) {
- int size=0;
- //loop over rc column and add lengths
- for(int i=0; i< value.size(); i++)
- {
- size += value.get(i).getLength();
- }
- //add delimeters bytes sizes
- size += value.size() -1;
- return size;
- }
+ //calculate row difference
+ rowDifference = newRowNumber - lastRowNumber;
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
- }
- };
- }
- else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT))
- {
- return new InputStream() {
- private FSDataInputStream reader;
- private String lastFileName = "";
- private String newFileName;
- private int EOL = "\n".getBytes()[0];
- private int currentTupleIdx;
- private int numberOfTuplesInCurrentFrame;
- private long byteLocation;
- private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
- private String value;
- private String pendingValue = null;
- private ByteBufferInputStream bbis = new ByteBufferInputStream();
- private DataInputStream dis = new DataInputStream(bbis);
+ //update last row number
+ lastRowNumber = newRowNumber;
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if(newFrame)
- {
- //first time called with this frame
- //reset frame buffer
- tupleAccessor.reset(frameBuffer);
- //get number of tuples in frame
- numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
- //set tuple index to first tuple
- currentTupleIdx = 0;
- //set new frame to false
- newFrame = false;
+ //move to the new row
+ for(int i=0; i < rowDifference; i++)
+ {
+ reader.next(key);
+ }
+ //read record
+ reader.getCurrentRow(value);
+
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = true;
+ return numBytes;
+ }
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ }
+ //move to next tuple
+ currentTupleIdx++;
+ }
+ //no more tuples in frame
+ return (numBytes == 0) ? -1 : numBytes;
}
- //check and see if there is a pending value
- int numBytes = 0;
- if (pendingValue != null) {
- //last value didn't fit into buffer
- int sizeOfNextTuple = pendingValue.length() + 1;
- if(sizeOfNextTuple > len)
+ private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+ int rcOffset = 0;
+ for(int i=0; i< value.size(); i++)
{
- return 0;
+ System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+ rcOffset += value.get(i).getLength() + 1;
+ buffer[rcOffset - 1] = delimiter;
}
- //there is enough space
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.length());
- buffer[offset + numBytes + pendingValue.length()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- //set pending to false
- pendingValue = null;
- //move to next tuple
- currentTupleIdx++;
}
-
- //No pending value or done with pending value
- //check if there are more tuples in the frame
- while(currentTupleIdx < numberOfTuplesInCurrentFrame)
- {
- //get the fileName
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
- newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
- //check if it is a new file
- if(!lastFileName.equals(newFileName))
+
+ private int getTupleSize(BytesRefArrayWritable value2) {
+ int size=0;
+ //loop over rc column and add lengths
+ for(int i=0; i< value.size(); i++)
{
- //new file
- lastFileName = newFileName;
- //close old file
- if(reader != null)
+ size += value.get(i).getLength();
+ }
+ //add delimeters bytes sizes
+ size += value.size() -1;
+ return size;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+ };
+ }
+ else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT))
+ {
+ return new InputStream() {
+ private FSDataInputStream reader;
+ private int lastFileNumber = -1;
+ private int newFileNumber = 0;
+ private int EOL = "\n".getBytes()[0];
+ private int currentTupleIdx;
+ private int numberOfTuplesInCurrentFrame;
+ private long byteLocation;
+ private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+ private String value;
+ private String pendingValue = null;
+ private ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if(newFrame)
+ {
+ //first time called with this frame
+ //reset frame buffer
+ tupleAccessor.reset(frameBuffer);
+ //get number of tuples in frame
+ numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+ //set tuple index to first tuple
+ currentTupleIdx = 0;
+ //set new frame to false
+ newFrame = false;
+ }
+
+ //check and see if there is a pending value
+ int numBytes = 0;
+ if (pendingValue != null) {
+ //last value didn't fit into buffer
+ int sizeOfNextTuple = pendingValue.length() + 1;
+ if(sizeOfNextTuple > len)
+ {
+ return 0;
+ }
+ //there is enough space
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.length());
+ buffer[offset + numBytes + pendingValue.length()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ //set pending to false
+ pendingValue = null;
+ //move to next tuple
+ currentTupleIdx++;
+ }
+
+ //No pending value or done with pending value
+ //check if there are more tuples in the frame
+ while(currentTupleIdx < numberOfTuplesInCurrentFrame)
+ {
+ //get the file number
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+ newFileNumber = ((AInt32) inRecDesc.getFields()[0].deserialize(dis)).getIntegerValue();
+ //check if it is a new file
+ if(lastFileNumber != newFileNumber)
+ {
+ //new file
+ lastFileNumber = newFileNumber;
+ //close old file
+ if(reader != null)
+ {
+ reader.close();
+ }
+
+ //open new file
+ reader = fs.open(new Path(files.get(newFileNumber)));
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ value = reader.readLine();
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.length() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+ buffer[offset + numBytes + value.length()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ else
+ {
+ //same file, just seek and read
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ value = reader.readLine();
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.length() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
+ buffer[offset + numBytes + value.length()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ currentTupleIdx++;
+ }
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ @Override
+ public void close(){
+ try {
+ if (reader != null)
{
reader.close();
}
- //open new file
- reader = fs.open(new Path(lastFileName));
- //read byte location
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
- byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- //seek
- reader.seek(byteLocation);
- //read record
- value = reader.readLine();
- //copy it to the buffer if there is enough space
- int sizeOfNextTuple = value.length() + 1;
- if(sizeOfNextTuple + numBytes > len)
+ super.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ };
+ }
+ else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))
+ {
+ return new InputStream() {
+ private SequenceFile.Reader reader;
+ private Writable key;
+ private Text value;
+ private int lastFileNumber = -1;
+ private int newFileNumber = 0;
+ private long byteLocation;
+ private int EOL = "\n".getBytes()[0];
+ private int currentTupleIdx;
+ private int numberOfTuplesInCurrentFrame;
+ private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
+ private Text pendingValue = null;
+ private ByteBufferInputStream bbis = new ByteBufferInputStream();
+ private DataInputStream dis = new DataInputStream(bbis);
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+
+ if(newFrame)
+ {
+ //first time called with this frame
+ //reset frame buffer
+ tupleAccessor.reset(frameBuffer);
+ //get number of tuples in frame
+ numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
+ //set tuple index to first tuple
+ currentTupleIdx = 0;
+ //set new frame to false
+ newFrame = false;
+ }
+
+ //check and see if there is a pending value
+ //Double check this
+ int numBytes = 0;
+ if (pendingValue != null) {
+ //last value didn't fit into buffer
+ int sizeOfNextTuple = pendingValue.getLength() + 1;
+ if(sizeOfNextTuple > len)
{
- //mark waiting value
- pendingValue = value;
- return numBytes;
+ return 0;
}
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
- buffer[offset + numBytes + value.length()] = (byte) EOL;
+ //there is enough space
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+ buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
numBytes += sizeOfNextTuple;
+ //set pending to false
+ pendingValue = null;
+ //move to next tuple
+ currentTupleIdx++;
}
- else
+
+ //No pending value or done with pending value
+ //check if there are more tuples in the frame
+ while(currentTupleIdx < numberOfTuplesInCurrentFrame)
{
- //same file, just seek and read
- //read byte location
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
- byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- //seek
- reader.seek(byteLocation);
- //read record
- value = reader.readLine();
- //copy it to the buffer if there is enough space
- int sizeOfNextTuple = value.length() + 1;
- if(sizeOfNextTuple + numBytes > len)
+ //get the fileName]
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
+ newFileNumber = ((AInt32) inRecDesc.getFields()[0].deserialize(dis)).getIntegerValue();
+ //check if it is a new file
+ if(lastFileNumber != newFileNumber)
{
- //mark waiting value
- pendingValue = value;
- return numBytes;
+ //new file
+ lastFileNumber = newFileNumber;
+ //close old file
+ if(reader != null)
+ {
+ reader.close();
+ }
+ //open new file
+ reader = new SequenceFile.Reader(fs,new Path(files.get(newFileNumber)),conf);
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ reader.next(key, value);
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.getLength() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
}
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.length());
- buffer[offset + numBytes + value.length()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
+ else
+ {
+ //same file, just seek and read
+ //read byte location
+ bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
+ byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
+ //seek
+ reader.seek(byteLocation);
+ //read record
+ reader.next(key, value);
+ //copy it to the buffer if there is enough space
+ int sizeOfNextTuple = value.getLength() + 1;
+ if(sizeOfNextTuple + numBytes > len)
+ {
+ //mark waiting value
+ pendingValue = value;
+ return numBytes;
+ }
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ currentTupleIdx++;
}
- currentTupleIdx++;
- }
- return (numBytes == 0) ? -1 : numBytes;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
- }
-
- @Override
- public void close(){
- try {
- if (reader != null)
- {
- reader.close();
- }
- super.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- };
- }
- else if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))
- {
- return new InputStream() {
- private SequenceFile.Reader reader;
- private Writable key;
- private Text value;
- private String lastFileName = "";
- private String newFileName;
- private long byteLocation;
- private int EOL = "\n".getBytes()[0];
- private int currentTupleIdx;
- private int numberOfTuplesInCurrentFrame;
- private IFrameTupleAccessor tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),inRecDesc);
- private Text pendingValue = null;
- private ByteBufferInputStream bbis = new ByteBufferInputStream();
- private DataInputStream dis = new DataInputStream(bbis);
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
-
- if(newFrame)
- {
- //first time called with this frame
- //reset frame buffer
- tupleAccessor.reset(frameBuffer);
- //get number of tuples in frame
- numberOfTuplesInCurrentFrame = tupleAccessor.getTupleCount();
- //set tuple index to first tuple
- currentTupleIdx = 0;
- //set new frame to false
- newFrame = false;
+ return (numBytes == 0) ? -1 : numBytes;
}
- //check and see if there is a pending value
- //Double check this
- int numBytes = 0;
- if (pendingValue != null) {
- //last value didn't fit into buffer
- int sizeOfNextTuple = pendingValue.getLength() + 1;
- if(sizeOfNextTuple > len)
- {
- return 0;
- }
- //there is enough space
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- //set pending to false
- pendingValue = null;
- //move to next tuple
- currentTupleIdx++;
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
}
- //No pending value or done with pending value
- //check if there are more tuples in the frame
- while(currentTupleIdx < numberOfTuplesInCurrentFrame)
- {
- //get the fileName]
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 0));
- newFileName = ((AString) inRecDesc.getFields()[0].deserialize(dis)).getStringValue();
- //check if it is a new file
- if(!lastFileName.equals(newFileName))
- {
- //new file
- lastFileName = newFileName;
- //close old file
- if(reader != null)
+ @Override
+ public void close(){
+ try {
+ if (reader != null)
{
reader.close();
}
- //open new file
- reader = new SequenceFile.Reader(fs,new Path(lastFileName),conf);
- //read byte location
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
- byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- //seek
- reader.seek(byteLocation);
- //read record
- reader.next(key, value);
- //copy it to the buffer if there is enough space
- int sizeOfNextTuple = value.getLength() + 1;
- if(sizeOfNextTuple + numBytes > len)
- {
- //mark waiting value
- pendingValue = value;
- return numBytes;
- }
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
+ super.close();
+ } catch (IOException e) {
+ e.printStackTrace();
}
- else
- {
- //same file, just seek and read
- //read byte location
- bbis.setByteBuffer(frameBuffer, tupleAccessor.getTupleStartOffset(currentTupleIdx) + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(currentTupleIdx, 1));
- byteLocation = ((AInt64) inRecDesc.getFields()[1].deserialize(dis)).getLongValue();
- //seek
- reader.seek(byteLocation);
- //read record
- reader.next(key, value);
- //copy it to the buffer if there is enough space
- int sizeOfNextTuple = value.getLength() + 1;
- if(sizeOfNextTuple + numBytes > len)
- {
- //mark waiting value
- pendingValue = value;
- return numBytes;
- }
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- }
- currentTupleIdx++;
}
- return (numBytes == 0) ? -1 : numBytes;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
- }
-
- @Override
- public void close(){
- try {
- if (reader != null)
- {
- reader.close();
- }
- super.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- };
+ };
+ }
+ //unknow format
+ throw new IOException("Unknown input format");
}
- //unknow format
- throw new IOException("Unknown input format");
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
index 240621b..59b39c5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
@@ -16,12 +16,11 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.log4j.Logger;
-
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
/**
@@ -34,34 +33,33 @@
public class HDFSIndexingAdapter extends FileSystemBasedAdapter {
private static final long serialVersionUID = 1L;
- public static final Logger LOGGER = Logger.getLogger(HDFSIndexingAdapter.class.getName());
private transient String[] readSchedule;
private transient boolean executed[];
private transient InputSplit[] inputSplits;
private transient JobConf conf;
private transient AlgebricksPartitionConstraint clusterLocations;
-
+ private final Map<String,Integer> files;
private transient String nodeName;
-
public static final byte[] fileNameFieldNameWithRecOpeningBraces = "{\"_file-name\":\"".getBytes();
+ public static final byte[] fileNameFieldClosingQuotation = "\"".getBytes();
+ public static final byte[] fileNumberFieldNameWithRecOpeningBraces = "{\"_file-number\":".getBytes();
public static final byte[] bytelocationFieldName = ",\"_byte-location\":".getBytes();
public static final byte[] bytelocationValueEnd = "i64,".getBytes();
public HDFSIndexingAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
- AlgebricksPartitionConstraint clusterLocations) {
+ AlgebricksPartitionConstraint clusterLocations, Map<String,Integer> files) {
super(atype);
this.readSchedule = readSchedule;
this.executed = executed;
this.inputSplits = inputSplits;
this.conf = conf;
this.clusterLocations = clusterLocations;
+ this.files = files;
}
@Override
public void configure(Map<String, Object> arguments) throws Exception {
- LOGGER.info("Configuring the adapter, why does it disappear");
this.configuration = arguments;
- LOGGER.info("Configuring format");
configureFormat();
}
@@ -115,147 +113,84 @@
@Override
public InputStream getInputStream(int partition) throws IOException {
- LOGGER.info("Creating the input stream in node: "+ nodeName);
- //LOGGER.info("Number of input splits found = "+ inputSplits.length);
- if(conf.getInputFormat() instanceof RCFileInputFormat)
+ if(files == null)
{
- //indexing rc input format
- return new InputStream() {
+ if(conf.getInputFormat() instanceof RCFileInputFormat)
+ {
+ //indexing rc input format
+ return new InputStream() {
- private RecordReader<LongWritable, BytesRefArrayWritable> reader;
- private LongWritable key;
- private BytesRefArrayWritable value;
- private boolean hasMore = false;
- private int EOL = "\n".getBytes()[0];
- private byte delimiter = 0x01;
- private boolean pendingValue = false;
- private int currentSplitIndex = 0;
- private byte[] fileName;
- private byte[] byteLocation;
- private byte[] rowNumberBytes;
- private long blockByteLocation;
- private long NextblockByteLocation;
- private int rowNumber;
-
- @SuppressWarnings("unchecked")
- private boolean moveToNext() throws IOException {
- for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ private RecordReader<LongWritable, BytesRefArrayWritable> reader;
+ private LongWritable key;
+ private BytesRefArrayWritable value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private byte delimiter = 0x01;
+ private boolean pendingValue = false;
+ private int currentSplitIndex = 0;
+ private byte[] fileName;
+ private byte[] byteLocation;
+ private byte[] rowNumberBytes;
+ private long blockByteLocation;
+ private long NextblockByteLocation;
+ private int rowNumber;
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
/**
- * pick an unread split to read
- * synchronize among simultaneous partitions in the same machine
+ * read all the partitions scheduled to the current node
*/
- synchronized (executed) {
- if (executed[currentSplitIndex] == false) {
- executed[currentSplitIndex] = true;
- } else {
- continue;
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
}
- }
- /**
- * read the split
- */
- reader = getRecordReader(currentSplitIndex);
- key = reader.createKey();
- value = reader.createValue();
- fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
- blockByteLocation = reader.getPos();
- pendingValue = reader.next(key, value);
- NextblockByteLocation = reader.getPos();
- rowNumber = 1;
- byteLocation = String.valueOf(blockByteLocation).getBytes("UTF-8");
- rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if (reader == null) {
- if (!moveToNext()) {
- //nothing to read
- return -1;
- }
- }
-
- int numBytes = 0;
- if (pendingValue) {
- //last value didn't fit into buffer
- // 1 for EOL
- int sizeOfNextTuple = getTupleSize(value) + 1;
- //fileName.length + byteLocation.length + rowNumberBytes.length;
-
- //copy filename
- System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
- buffer[offset + numBytes + fileName.length] = delimiter;
- numBytes += fileName.length + 1;
-
- //copy byte location
- System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
- buffer[offset + numBytes + byteLocation.length] = delimiter;
- numBytes += byteLocation.length + 1;
-
- //copy row number
- System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
- buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
- numBytes += rowNumberBytes.length + 1;
-
- copyCurrentTuple(buffer, offset + numBytes);
- buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- //set pending to false
- pendingValue = false;
- }
-
- while (numBytes < len) {
- hasMore = reader.next(key, value);
- if (!hasMore) {
- while (moveToNext()) {
- hasMore = reader.next(key, value);
- if (hasMore) {
- //move to the next non-empty split
- break;
- }
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = reader.createValue();
+ fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
+ blockByteLocation = reader.getPos();
+ pendingValue = reader.next(key, value);
+ NextblockByteLocation = reader.getPos();
+ rowNumber = 1;
+ byteLocation = String.valueOf(blockByteLocation).getBytes("UTF-8");
+ rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+ return true;
}
}
- if (!hasMore) {
- return (numBytes == 0) ? -1 : numBytes;
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
}
- //check if moved to next block
- blockByteLocation = reader.getPos();
- if(blockByteLocation != NextblockByteLocation)
- {
- //moved to a new block, reset stuff
- //row number
- rowNumber = 1;
- rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+ int numBytes = 0;
+ if (pendingValue) {
+ //last value didn't fit into buffer
+ // 1 for EOL
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if (numBytes + sizeOfNextTuple + rowNumberBytes.length + byteLocation.length + fileName.length + 3 > len) {
+ return 0;
+ }
- //block location
- byteLocation = String.valueOf(NextblockByteLocation).getBytes("UTF-8");
- NextblockByteLocation = blockByteLocation;
- }
- else
- {
- rowNumber += 1;
- rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
- }
-
- int sizeOfNextTuple = getTupleSize(value) + 1;
- if (numBytes + sizeOfNextTuple + rowNumberBytes.length + byteLocation.length + fileName.length + 3 > len) {
- // cannot add tuple to current buffer
- // but the reader has moved pass the fetched tuple
- // we need to store this for a subsequent read call.
- // and return this then.
- pendingValue = true;
- break;
- } else {
//copy filename
System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
buffer[offset + numBytes + fileName.length] = delimiter;
@@ -274,140 +209,14 @@
copyCurrentTuple(buffer, offset + numBytes);
buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
numBytes += sizeOfNextTuple;
- }
- }
- return numBytes;
- }
-
- private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
- int rcOffset = 0;
- for(int i=0; i< value.size(); i++)
- {
- System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
- rcOffset += value.get(i).getLength() + 1;
- buffer[rcOffset - 1] = delimiter;
- }
- }
-
- private int getTupleSize(BytesRefArrayWritable value2) {
- int size=0;
- //loop over rc column and add lengths
- for(int i=0; i< value.size(); i++)
- {
- size += value.get(i).getLength();
- }
- //add delimeters bytes sizes
- size += value.size() -1;
- return size;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
- }
-
- private RecordReader getRecordReader(int slitIndex) throws IOException {
- RCFileInputFormat format = (RCFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
- return reader;
- }
-
- };
- }
- else
- {
- //get content format
- if(configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT))
- {
- LOGGER.info("Creating the indexing input stream with delimiter = "+ configuration.get(KEY_DELIMITER));
- //reading data and RIDs for delimited text
- return new InputStream() {
-
- private RecordReader<Object, Text> reader;
- private Object key;
- private Text value;
- private boolean hasMore = false;
- private int EOL = "\n".getBytes()[0];
- private Text pendingValue = null;
- private int currentSplitIndex = 0;
- private byte[] fileName;
- private byte[] byteLocation;
- private byte delimiter = ((String)configuration.get(KEY_DELIMITER)).getBytes()[0];
-
- @SuppressWarnings("unchecked")
- private boolean moveToNext() throws IOException {
- for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (readSchedule[currentSplitIndex].equals(nodeName)) {
- /**
- * pick an unread split to read
- * synchronize among simultaneous partitions in the same machine
- */
- synchronized (executed) {
- if (executed[currentSplitIndex] == false) {
- executed[currentSplitIndex] = true;
- } else {
- continue;
- }
- }
-
- /**
- * read the split
- */
- reader = getRecordReader(currentSplitIndex);
- key = reader.createKey();
- value = (Text) reader.createValue();
- fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if (reader == null) {
- if (!moveToNext()) {
- //nothing to read
- return -1;
- }
- }
-
- int numBytes = 0;
- if (pendingValue != null) {
- int sizeOfNextTuple = pendingValue.getLength() + 1;
- if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len)
- {
- return numBytes;
- }
- //copy filename
- System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
- buffer[offset + numBytes + fileName.length] = delimiter;
- numBytes += fileName.length + 1;
-
- //copy byte location
- System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
- buffer[offset + numBytes + byteLocation.length] = delimiter;
- numBytes += byteLocation.length + 1;
-
- //copy actual value
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
- pendingValue = null;
+ //set pending to false
+ pendingValue = false;
}
while (numBytes < len) {
- //get reader position before you actually read
- byteLocation = String.valueOf(reader.getPos()).getBytes();
hasMore = reader.next(key, value);
if (!hasMore) {
while (moveToNext()) {
- //get reader position before you actually read
- byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
hasMore = reader.next(key, value);
if (hasMore) {
//move to the next non-empty split
@@ -418,145 +227,198 @@
if (!hasMore) {
return (numBytes == 0) ? -1 : numBytes;
}
- int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len) {
+
+ //check if moved to next block
+ blockByteLocation = reader.getPos();
+ if(blockByteLocation != NextblockByteLocation)
+ {
+ //moved to a new block, reset stuff
+ //row number
+ rowNumber = 1;
+ rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+
+ //block location
+ byteLocation = String.valueOf(NextblockByteLocation).getBytes("UTF-8");
+ NextblockByteLocation = blockByteLocation;
+ }
+ else
+ {
+ rowNumber += 1;
+ rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+ }
+
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if (numBytes + sizeOfNextTuple + rowNumberBytes.length + byteLocation.length + fileName.length + 3 > len) {
// cannot add tuple to current buffer
// but the reader has moved pass the fetched tuple
// we need to store this for a subsequent read call.
// and return this then.
- pendingValue = value;
+ pendingValue = true;
break;
} else {
//copy filename
System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
buffer[offset + numBytes + fileName.length] = delimiter;
numBytes += fileName.length + 1;
-
+
//copy byte location
System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
buffer[offset + numBytes + byteLocation.length] = delimiter;
numBytes += byteLocation.length + 1;
- //Copy actual value
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ //copy row number
+ System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
+ buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
+ numBytes += rowNumberBytes.length + 1;
+
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
numBytes += sizeOfNextTuple;
}
}
return numBytes;
}
+ private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+ int rcOffset = 0;
+ for(int i=0; i< value.size(); i++)
+ {
+ System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+ rcOffset += value.get(i).getLength() + 1;
+ buffer[rcOffset - 1] = delimiter;
+ }
+ }
+
+ private int getTupleSize(BytesRefArrayWritable value2) {
+ int size=0;
+ //loop over rc column and add lengths
+ for(int i=0; i< value.size(); i++)
+ {
+ size += value.get(i).getLength();
+ }
+ //add delimeters bytes sizes
+ size += value.size() -1;
+ return size;
+ }
+
@Override
public int read() throws IOException {
throw new NotImplementedException("Use read(byte[], int, int");
}
private RecordReader getRecordReader(int slitIndex) throws IOException {
- if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
- return reader;
- } else {
- TextInputFormat format = (TextInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
- return reader;
- }
+ RCFileInputFormat format = (RCFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
}
};
}
- else if((configuration.get(KEY_FORMAT).equals(FORMAT_ADM)))
+ else
{
- //reading data and RIDs for adm formatted data
- return new InputStream() {
+ //get content format
+ if(configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT))
+ {
+ //reading data and RIDs for delimited text
+ return new InputStream() {
- private RecordReader<Object, Text> reader;
- private Object key;
- private Text value;
- private boolean hasMore = false;
- private int EOL = "\n".getBytes()[0];
- private Text pendingValue = null;
- private int currentSplitIndex = 0;
- private byte[] fileName;
- private byte[] byteLocation;
+ private RecordReader<Object, Text> reader;
+ private Object key;
+ private Text value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private Text pendingValue = null;
+ private int currentSplitIndex = 0;
+ private byte[] fileName;
+ private byte[] byteLocation;
+ private byte delimiter = ((String)configuration.get(KEY_DELIMITER)).getBytes()[0];
- @SuppressWarnings("unchecked")
- private boolean moveToNext() throws IOException {
- for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
/**
- * pick an unread split to read
- * synchronize among simultaneous partitions in the same machine
+ * read all the partitions scheduled to the current node
*/
- synchronized (executed) {
- if (executed[currentSplitIndex] == false) {
- executed[currentSplitIndex] = true;
- } else {
- continue;
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
}
- }
- /**
- * read the split
- */
- reader = getRecordReader(currentSplitIndex);
- key = reader.createKey();
- value = (Text) reader.createValue();
- fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- if (reader == null) {
- if (!moveToNext()) {
- //nothing to read
- return -1;
- }
- }
-
- int numBytes = 0;
- if (pendingValue != null) {
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
- pendingValue = null;
- }
-
- while (numBytes < len) {
- //get reader position before you actually read
- byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
- hasMore = reader.next(key, value);
- if (!hasMore) {
- while (moveToNext()) {
- //get reader position before you actually read
- byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
- hasMore = reader.next(key, value);
- if (hasMore) {
- //move to the next non-empty split
- break;
- }
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = (Text) reader.createValue();
+ fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
+ return true;
}
}
- if (!hasMore) {
- return (numBytes == 0) ? -1 : numBytes;
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
}
- //get the index of the first field name
- int firstFieldLocation = value.find("\"");
- int admValueSize = value.getLength();
- if(firstFieldLocation >= 0)
- {
- int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
- int sizeOfNextTupleAndRID = fileNameFieldNameWithRecOpeningBraces.length + fileName.length + bytelocationFieldName.length + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
- if (numBytes + sizeOfNextTupleAndRID > len) {
+
+ int numBytes = 0;
+ if (pendingValue != null) {
+ int sizeOfNextTuple = pendingValue.getLength() + 1;
+ if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len)
+ {
+ return numBytes;
+ }
+ //copy filename
+ System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
+ buffer[offset + numBytes + fileName.length] = delimiter;
+ numBytes += fileName.length + 1;
+
+ //copy byte location
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+ buffer[offset + numBytes + byteLocation.length] = delimiter;
+ numBytes += byteLocation.length + 1;
+
+ //copy actual value
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+ buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+ numBytes += pendingValue.getLength() + 1;
+ pendingValue = null;
+ }
+
+ while (numBytes < len) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes();
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+ int sizeOfNextTuple = value.getLength() + 1;
+ if (numBytes + sizeOfNextTuple +byteLocation.length + fileName.length + 2> len) {
// cannot add tuple to current buffer
// but the reader has moved pass the fetched tuple
// we need to store this for a subsequent read call.
@@ -564,58 +426,779 @@
pendingValue = value;
break;
} else {
- //copy fileNameFieldNameWithRecOpeningBraces
- System.arraycopy(fileNameFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNameFieldNameWithRecOpeningBraces.length);
- numBytes += fileNameFieldNameWithRecOpeningBraces.length;
- //copy fileName
- System.arraycopy(fileName, 0, buffer, offset + numBytes,fileName.length);
- numBytes += fileName.length;
- //copy bytelocationFieldName
- System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
- numBytes += bytelocationFieldName.length;
- //copy byte location value
- System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
- numBytes += byteLocation.length;
- //copy byte location field end
- System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
- numBytes += bytelocationValueEnd.length;
- //copy the actual adm instance
- System.arraycopy(value.getBytes(), firstFieldLocation+1, buffer, offset + numBytes,admValueSize - firstFieldLocation - 1);
- buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
- numBytes += admValueSize - firstFieldLocation;
+ //copy filename
+ System.arraycopy(fileName, 0, buffer, offset + numBytes, fileName.length);
+ buffer[offset + numBytes + fileName.length] = delimiter;
+ numBytes += fileName.length + 1;
+
+ //copy byte location
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+ buffer[offset + numBytes + byteLocation.length] = delimiter;
+ numBytes += byteLocation.length + 1;
+
+ //Copy actual value
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
}
}
+ return numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ private RecordReader getRecordReader(int slitIndex) throws IOException {
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ } else {
+ TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ }
+ }
+
+ };
+ }
+ else if((configuration.get(KEY_FORMAT).equals(FORMAT_ADM)))
+ {
+ //reading data and RIDs for adm formatted data
+ return new InputStream() {
+
+ private RecordReader<Object, Text> reader;
+ private Object key;
+ private Text value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private Text pendingValue = null;
+ private int currentSplitIndex = 0;
+ private byte[] fileName;
+ private byte[] byteLocation;
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = (Text) reader.createValue();
+ fileName = ((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath().getBytes();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
+ }
+
+ int numBytes = 0;
+ if (pendingValue != null) {
+ int firstFieldLocation = value.find("\"");
+ int admValueSize = value.getLength();
+ if(firstFieldLocation >= 0)
+ {
+ int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+ int sizeOfNextTupleAndRID = fileNameFieldNameWithRecOpeningBraces.length + fileName.length + fileNameFieldClosingQuotation.length + bytelocationFieldName.length + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+ if (numBytes + sizeOfNextTupleAndRID > len) {
+ // still cannot add tuple to current buffer
+ // return 0 so parser would double the buffer size.
+ return 0;
+ } else {
+ //copy fileNameFieldNameWithRecOpeningBraces
+ System.arraycopy(fileNameFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNameFieldNameWithRecOpeningBraces.length);
+ numBytes += fileNameFieldNameWithRecOpeningBraces.length;
+ //copy fileName
+ System.arraycopy(fileName, 0, buffer, offset + numBytes,fileName.length);
+ numBytes += fileName.length;
+ //copy fileName closing quotation
+ System.arraycopy(fileNameFieldClosingQuotation, 0, buffer, offset + numBytes,fileNameFieldClosingQuotation.length);
+ numBytes += fileNameFieldClosingQuotation.length;
+ //copy bytelocationFieldName
+ System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+ numBytes += bytelocationFieldName.length;
+ //copy byte location value
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+ numBytes += byteLocation.length;
+ //copy byte location field end
+ System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+ numBytes += bytelocationValueEnd.length;
+ //copy the actual adm instance
+ System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+ buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+ numBytes += admValueSize - firstFieldLocation +1;
+ }
+ }
+ pendingValue = null;
+ }
+
+ while (numBytes < len) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+ //get the index of the first field name
+ int firstFieldLocation = value.find("\"");
+ int admValueSize = value.getLength();
+ if(firstFieldLocation >= 0)
+ {
+ int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+ int sizeOfNextTupleAndRID = fileNameFieldNameWithRecOpeningBraces.length + fileName.length + fileNameFieldClosingQuotation.length + bytelocationFieldName.length + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+ if (numBytes + sizeOfNextTupleAndRID > len) {
+ // cannot add tuple to current buffer
+ // but the reader has moved pass the fetched tuple
+ // we need to store this for a subsequent read call.
+ // and return this then.
+ pendingValue = value;
+ break;
+ } else {
+ //copy fileNameFieldNameWithRecOpeningBraces
+ System.arraycopy(fileNameFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNameFieldNameWithRecOpeningBraces.length);
+ numBytes += fileNameFieldNameWithRecOpeningBraces.length;
+ //copy fileName
+ System.arraycopy(fileName, 0, buffer, offset + numBytes,fileName.length);
+ numBytes += fileName.length;
+ //copy fileName closing quotation
+ System.arraycopy(fileNameFieldClosingQuotation, 0, buffer, offset + numBytes,fileNameFieldClosingQuotation.length);
+ numBytes += fileNameFieldClosingQuotation.length;
+ //copy bytelocationFieldName
+ System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+ numBytes += bytelocationFieldName.length;
+ //copy byte location value
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+ numBytes += byteLocation.length;
+ //copy byte location field end
+ System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+ numBytes += bytelocationValueEnd.length;
+ //copy the actual adm instance
+ System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+ buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+ numBytes += admValueSize - firstFieldLocation +1;
+ }
+ }
+ }
+ return numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ private RecordReader getRecordReader(int slitIndex) throws IOException {
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ } else {
+ TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ }
+ }
+
+ };
+ }
+ else
+ {
+ throw new IOException("Can't index " +configuration.get(KEY_FORMAT)+" input");
+ }
+ }
+ }
+ else
+ {
+ if(conf.getInputFormat() instanceof RCFileInputFormat)
+ {
+ //indexing rc input format
+ return new InputStream() {
+
+ private RecordReader<LongWritable, BytesRefArrayWritable> reader;
+ private LongWritable key;
+ private BytesRefArrayWritable value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private byte delimiter = 0x01;
+ private boolean pendingValue = false;
+ private int currentSplitIndex = 0;
+ private byte[] fileNumber;
+ private byte[] byteLocation;
+ private byte[] rowNumberBytes;
+ private Integer file;
+ private long blockByteLocation;
+ private long NextblockByteLocation;
+ private int rowNumber;
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = reader.createValue();
+ //getting the file number
+ file = files.get(((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath());
+ if(file == null)
+ {
+ throw new HyracksException("a file was not found in the map while indexing");
+ }
+ fileNumber = String.valueOf(file).getBytes("UTF-8");
+ blockByteLocation = reader.getPos();
+ pendingValue = reader.next(key, value);
+ NextblockByteLocation = reader.getPos();
+ rowNumber = 1;
+ byteLocation = String.valueOf(blockByteLocation).getBytes("UTF-8");
+ rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
+ }
+
+ int numBytes = 0;
+ if (pendingValue) {
+ //last value didn't fit into buffer
+ // 1 for EOL
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if (numBytes + sizeOfNextTuple + rowNumberBytes.length + byteLocation.length + fileNumber.length + 3 > len) {
+ return 0;
+ }
+ //copy file number
+ System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+ buffer[offset + numBytes + fileNumber.length] = delimiter;
+ numBytes += fileNumber.length + 1;
+
+ //copy byte location
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+ buffer[offset + numBytes + byteLocation.length] = delimiter;
+ numBytes += byteLocation.length + 1;
+
+ //copy row number
+ System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
+ buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
+ numBytes += rowNumberBytes.length + 1;
+
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ //set pending to false
+ pendingValue = false;
+ }
+
+ while (numBytes < len) {
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+
+ //check if moved to next block
+ blockByteLocation = reader.getPos();
+ if(blockByteLocation != NextblockByteLocation)
+ {
+ //moved to a new block, reset stuff
+ //row number
+ rowNumber = 1;
+ rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+
+ //block location
+ byteLocation = String.valueOf(NextblockByteLocation).getBytes("UTF-8");
+ NextblockByteLocation = blockByteLocation;
+ }
+ else
+ {
+ rowNumber += 1;
+ rowNumberBytes = String.valueOf(rowNumber).getBytes("UTF-8");
+ }
+
+ int sizeOfNextTuple = getTupleSize(value) + 1;
+ if (numBytes + sizeOfNextTuple + rowNumberBytes.length + byteLocation.length + fileNumber.length + 3 > len) {
+ // cannot add tuple to current buffer
+ // but the reader has moved pass the fetched tuple
+ // we need to store this for a subsequent read call.
+ // and return this then.
+ pendingValue = true;
+ break;
+ } else {
+ //copy file number
+ System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+ buffer[offset + numBytes + fileNumber.length] = delimiter;
+ numBytes += fileNumber.length + 1;
+
+ //copy byte location
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+ buffer[offset + numBytes + byteLocation.length] = delimiter;
+ numBytes += byteLocation.length + 1;
+
+ //copy row number
+ System.arraycopy(rowNumberBytes, 0, buffer, offset + numBytes, rowNumberBytes.length);
+ buffer[offset + numBytes + rowNumberBytes.length] = delimiter;
+ numBytes += rowNumberBytes.length + 1;
+
+ copyCurrentTuple(buffer, offset + numBytes);
+ buffer[offset + numBytes + sizeOfNextTuple - 1] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
}
return numBytes;
}
+ private void copyCurrentTuple(byte[] buffer, int offset) throws IOException {
+ int rcOffset = 0;
+ for(int i=0; i< value.size(); i++)
+ {
+ System.arraycopy(value.get(i).getData(), value.get(i).getStart(), buffer, offset + rcOffset, value.get(i).getLength());
+ rcOffset += value.get(i).getLength() + 1;
+ buffer[rcOffset - 1] = delimiter;
+ }
+ }
+
+ private int getTupleSize(BytesRefArrayWritable value2) {
+ int size=0;
+ //loop over rc column and add lengths
+ for(int i=0; i< value.size(); i++)
+ {
+ size += value.get(i).getLength();
+ }
+ //add delimeters bytes sizes
+ size += value.size() -1;
+ return size;
+ }
+
@Override
public int read() throws IOException {
throw new NotImplementedException("Use read(byte[], int, int");
}
private RecordReader getRecordReader(int slitIndex) throws IOException {
- if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
- return reader;
- } else {
- TextInputFormat format = (TextInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
- return reader;
- }
+ RCFileInputFormat format = (RCFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
}
};
}
else
{
- throw new IOException("Can't index " +configuration.get(KEY_FORMAT)+" input");
+ //get content format
+ if(configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT))
+ {
+ //reading data and RIDs for delimited text
+ return new InputStream() {
+
+ private RecordReader<Object, Text> reader;
+ private Object key;
+ private Text value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private Text pendingValue = null;
+ private int currentSplitIndex = 0;
+ private Integer file;
+ private byte[] fileNumber;
+ private byte[] byteLocation;
+ private byte delimiter = ((String)configuration.get(KEY_DELIMITER)).getBytes()[0];
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = (Text) reader.createValue();
+ file = files.get(((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath());
+ if(file == null)
+ {
+ throw new HyracksException("The file:"+((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath()+" was not found in the map while indexing");
+ }
+ fileNumber = String.valueOf(file).getBytes("UTF-8");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
+ }
+
+ int numBytes = 0;
+ if (pendingValue != null) {
+ int sizeOfNextTuple = pendingValue.getLength() + 1;
+ if (numBytes + sizeOfNextTuple +byteLocation.length + fileNumber.length + 2> len)
+ {
+ return numBytes;
+ }
+ //copy file number
+ System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+ buffer[offset + numBytes + fileNumber.length] = delimiter;
+ numBytes += fileNumber.length + 1;
+
+ //copy byte location
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+ buffer[offset + numBytes + byteLocation.length] = delimiter;
+ numBytes += byteLocation.length + 1;
+
+ //copy actual value
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+ buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+ numBytes += pendingValue.getLength() + 1;
+ pendingValue = null;
+ }
+
+ while (numBytes < len) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes();
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+ int sizeOfNextTuple = value.getLength() + 1;
+ if (numBytes + sizeOfNextTuple +byteLocation.length + fileNumber.length + 2> len) {
+ // cannot add tuple to current buffer
+ // but the reader has moved pass the fetched tuple
+ // we need to store this for a subsequent read call.
+ // and return this then.
+ pendingValue = value;
+ break;
+ } else {
+ //copy file number
+ System.arraycopy(fileNumber, 0, buffer, offset + numBytes, fileNumber.length);
+ buffer[offset + numBytes + fileNumber.length] = delimiter;
+ numBytes += fileNumber.length + 1;
+
+ //copy byte location
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes, byteLocation.length);
+ buffer[offset + numBytes + byteLocation.length] = delimiter;
+ numBytes += byteLocation.length + 1;
+
+ //Copy actual value
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ }
+ return numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ private RecordReader getRecordReader(int slitIndex) throws IOException {
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ } else {
+ TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ }
+ }
+
+ };
+ }
+ else if((configuration.get(KEY_FORMAT).equals(FORMAT_ADM)))
+ {
+ //reading data and RIDs for adm formatted data
+ return new InputStream() {
+
+ private RecordReader<Object, Text> reader;
+ private Object key;
+ private Text value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private Text pendingValue = null;
+ private int currentSplitIndex = 0;
+ private Integer file;
+ private byte[] fileNumber;
+ private byte[] byteLocation;
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = (Text) reader.createValue();
+ file = files.get(((FileSplit)(inputSplits[currentSplitIndex])).getPath().toUri().getPath());
+ if(file == null)
+ {
+ throw new HyracksException("a file was not found in the map while indexing");
+ }
+ fileNumber = String.valueOf(file).getBytes("UTF-8");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
+ }
+
+ int numBytes = 0;
+ if (pendingValue != null) {
+ int firstFieldLocation = value.find("\"");
+ int admValueSize = value.getLength();
+ if(firstFieldLocation >= 0)
+ {
+ int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+ int sizeOfNextTupleAndRID = fileNumberFieldNameWithRecOpeningBraces.length + fileNumber.length + bytelocationFieldName.length + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+ if (numBytes + sizeOfNextTupleAndRID > len) {
+ // still cannot add tuple to current buffer
+ // return 0 so parser would double the buffer size.
+ return 0;
+ } else {
+ //copy fileNumberFieldNameWithRecOpeningBraces
+ System.arraycopy(fileNumberFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNumberFieldNameWithRecOpeningBraces.length);
+ numBytes += fileNumberFieldNameWithRecOpeningBraces.length;
+ //copy file Number
+ System.arraycopy(fileNumber, 0, buffer, offset + numBytes,fileNumber.length);
+ numBytes += fileNumber.length;
+ //copy bytelocationFieldName
+ System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+ numBytes += bytelocationFieldName.length;
+ //copy byte location value
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+ numBytes += byteLocation.length;
+ //copy byte location field end
+ System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+ numBytes += bytelocationValueEnd.length;
+ //copy the actual adm instance
+ System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+ buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+ numBytes += admValueSize - firstFieldLocation +1;
+ }
+ }
+ pendingValue = null;
+ }
+
+ while (numBytes < len) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ //get reader position before you actually read
+ byteLocation = String.valueOf(reader.getPos()).getBytes("UTF-8");
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+ //get the index of the first field name
+ int firstFieldLocation = value.find("\"");
+ int admValueSize = value.getLength();
+ if(firstFieldLocation >= 0)
+ {
+ int sizeOfNextTuple = value.getLength() - firstFieldLocation + 1;
+ int sizeOfNextTupleAndRID = fileNumberFieldNameWithRecOpeningBraces.length + fileNumber.length + bytelocationFieldName.length + byteLocation.length + bytelocationValueEnd.length + sizeOfNextTuple;
+ if (numBytes + sizeOfNextTupleAndRID > len) {
+ // cannot add tuple to current buffer
+ // but the reader has moved pass the fetched tuple
+ // we need to store this for a subsequent read call.
+ // and return this then.
+ pendingValue = value;
+ break;
+ } else {
+ //copy fileNumberFieldNameWithRecOpeningBraces
+ System.arraycopy(fileNumberFieldNameWithRecOpeningBraces, 0, buffer, offset + numBytes,fileNumberFieldNameWithRecOpeningBraces.length);
+ numBytes += fileNumberFieldNameWithRecOpeningBraces.length;
+ //copy fileNumber
+ System.arraycopy(fileNumber, 0, buffer, offset + numBytes,fileNumber.length);
+ numBytes += fileNumber.length;
+ //copy bytelocationFieldName
+ System.arraycopy(bytelocationFieldName, 0, buffer, offset + numBytes,bytelocationFieldName.length);
+ numBytes += bytelocationFieldName.length;
+ //copy byte location value
+ System.arraycopy(byteLocation, 0, buffer, offset + numBytes,byteLocation.length);
+ numBytes += byteLocation.length;
+ //copy byte location field end
+ System.arraycopy(bytelocationValueEnd, 0, buffer, offset + numBytes,bytelocationValueEnd.length);
+ numBytes += bytelocationValueEnd.length;
+ //copy the actual adm instance
+ System.arraycopy(value.getBytes(), firstFieldLocation, buffer, offset + numBytes,admValueSize - firstFieldLocation);
+ buffer[offset + numBytes + admValueSize - firstFieldLocation] = (byte) EOL;
+ numBytes += admValueSize - firstFieldLocation +1;
+ }
+ }
+ }
+ return numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ private RecordReader getRecordReader(int slitIndex) throws IOException {
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ } else {
+ TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ }
+ }
+
+ };
+ }
+ else
+ {
+ throw new IOException("Can't index " +configuration.get(KEY_FORMAT)+" input");
+ }
}
}
-
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java
index 552e5ab..178b106 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveIndexingAdapter.java
@@ -28,8 +28,8 @@
private HDFSIndexingAdapter hdfsIndexingAdapter;
public HiveIndexingAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
- AlgebricksPartitionConstraint clusterLocations) {
- this.hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+ AlgebricksPartitionConstraint clusterLocations, Map<String,Integer> files) {
+ this.hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations, files);
this.atype = atype;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/ExternalDataFilesMetadataProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/ExternalDataFilesMetadataProvider.java
new file mode 100644
index 0000000..47550a4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/ExternalDataFilesMetadataProvider.java
@@ -0,0 +1,38 @@
+package edu.uci.ics.asterix.external.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.AbstractDatasourceAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
+
+public class ExternalDataFilesMetadataProvider {
+ public static ArrayList<FileStatus> getHDFSFileStatus(AbstractDatasourceAdapter adapter) throws IOException
+ {
+ ArrayList<FileStatus> files = new ArrayList<FileStatus>();
+ //Configure hadoop connection
+ Configuration conf = HDFSAdapterFactory.configureHadoopConnection(adapter.getConfiguration());
+ FileSystem fs = FileSystem.get(conf);
+ //get the list of paths from the adapter
+ StringTokenizer tokenizer = new StringTokenizer(((String)adapter.getConfiguration().get(HDFSAdapter.KEY_PATH)),",");
+ Path inputPath = null;
+ FileStatus[] fileStatuses;
+ while(tokenizer.hasMoreTokens())
+ {
+ inputPath = new Path(tokenizer.nextToken().trim());
+ fileStatuses = fs.listStatus(inputPath);
+ for(int i=0; i < fileStatuses.length; i++)
+ {
+ files.add(fileStatuses[i]);
+ }
+ }
+ return files;
+ }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 8f0eedb..2de5d78 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -31,6 +31,7 @@
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Node;
@@ -268,6 +269,39 @@
}
return dataset;
}
+
+ @Override
+ public List<ExternalFile> getDatasetExternalFiles(
+ MetadataTransactionContext mdTxnCtx, Dataset dataset)
+ throws MetadataException {
+ List<ExternalFile> externalFiles;
+ try {
+ externalFiles = metadataNode.getExternalDatasetFiles(mdTxnCtx.getJobId(), dataset);
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ return externalFiles;
+ }
+
+ @Override
+ public void addExternalFile(MetadataTransactionContext mdTxnCtx,
+ ExternalFile externalFile) throws MetadataException {
+ try {
+ metadataNode.addExternalDatasetFile(mdTxnCtx.getJobId(), externalFile);
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
+ public void dropExternalFile(MetadataTransactionContext mdTxnCtx,
+ ExternalFile externalFile) throws MetadataException {
+ try {
+ metadataNode.dropExternalFile(mdTxnCtx.getJobId(), externalFile.getDataverseName(), externalFile.getDatasetName(), externalFile.getFileNumber());
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ }
@Override
public List<Index> getDatasetIndexes(MetadataTransactionContext ctx, String dataverseName, String datasetName)
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 14a0695..d9fed5b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -43,6 +43,7 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -52,6 +53,7 @@
import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasourceAdapterTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
+import edu.uci.ics.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
@@ -61,6 +63,7 @@
import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.NestedDatatypeNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.TupleCopyValueExtractor;
+import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.types.BuiltinType;
@@ -1153,6 +1156,113 @@
throw new MetadataException(e);
}
}
+
+ @Override
+ public void addExternalDatasetFile(JobId jobId, ExternalFile externalFile)
+ throws MetadataException, RemoteException {
+ try {
+ // Insert into the 'externalFiles' dataset.
+ ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(true);
+ ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile);
+ insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple);
+ } catch (TreeIndexDuplicateKeyException e) {
+ throw new MetadataException("An externalFile with this number " + externalFile.getFileNumber()
+ + " already exists in dataset '" + externalFile.getDatasetName() + "' in dataverse '"+externalFile.getDataverseName()+"'.", e);
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
+ public List<ExternalFile> getExternalDatasetFiles(JobId jobId,
+ Dataset dataset) throws MetadataException, RemoteException {
+ try {
+ ITupleReference searchKey = createTuple(dataset.getDataverseName(),dataset.getDatasetName());
+ ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false);
+ IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>(
+ tupleReaderWriter);
+ List<ExternalFile> results = new ArrayList<ExternalFile>();
+ searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
+ return results;
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public ITupleReference createExternalFileSearchTuple(String dataverseName, String datasetName, int fileNumber) throws HyracksDataException {
+ ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+ ISerializerDeserializer<AInt32> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+
+ AMutableString aString = new AMutableString("");
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(3);
+
+ //dataverse field
+ aString.setValue(dataverseName);
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ //dataset field
+ aString.setValue(datasetName);
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ //file number field
+ intSerde.serialize(new AInt32(fileNumber), tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+
+ public ExternalFile getExternalDatasetFile(JobId jobId,String dataverseName, String datasetName,
+ int fileNumber) throws MetadataException, RemoteException {
+ try {
+ //create the search key
+ ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
+ ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false);
+ IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>(
+ tupleReaderWriter);
+ List<ExternalFile> results = new ArrayList<ExternalFile>();
+ searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
+ return results.get(0);
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
+ public void dropExternalFile(JobId jobId, String dataverseName,
+ String datasetName, int fileNumber) throws MetadataException,
+ RemoteException {
+ ExternalFile externalFile;
+ try {
+ externalFile = getExternalDatasetFile(jobId, dataverseName, datasetName,fileNumber);
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ if (externalFile == null) {
+ throw new MetadataException("Cannot drop external file because it doesn't exist.");
+ }
+ try {
+ // Delete entry from the 'ExternalFile' dataset.
+ ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
+ // Searches the index for the tuple to be deleted. Acquires an S
+ // lock on the 'ExternalFile' dataset.
+ ITupleReference datasetTuple = getTupleToBeDeleted(jobId,
+ MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey);
+ deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, datasetTuple);
+
+ } catch (TreeIndexException e) {
+ throw new MetadataException("Couldn't drop externalFile.", e);
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
+
@Override
public int getMostRecentDatasetId() throws MetadataException, RemoteException {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 22c5e46..53f72dd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -26,6 +26,7 @@
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Node;
@@ -439,6 +440,36 @@
*/
public List<Function> getDataverseFunctions(MetadataTransactionContext ctx, String dataverseName)
throws MetadataException;
+
+ /**
+ * @param mdTxnCtx
+ * MetadataTransactionContext of an active metadata transaction.
+ * @param externalFile
+ * An instance of type ExternalFile that represents the external file being
+ * added
+ * @throws MetadataException
+ */
+ public void addExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile) throws MetadataException;
+
+ /**
+ * @param mdTxnCtx
+ * MetadataTransactionContext of an active metadata transaction.
+ * @param dataset
+ * An instance of type Dataset that represents the "external" dataset
+ * @return A list of external files belonging to the dataset
+ * @throws MetadataException
+ */
+ public List<ExternalFile> getDatasetExternalFiles(MetadataTransactionContext mdTxnCtx, Dataset dataset) throws MetadataException;
+
+ /**
+ * @param mdTxnCtx
+ * MetadataTransactionContext of an active metadata transaction.
+ * @param externalFile
+ * An instance of type ExternalFile that represents the external file being
+ * dropped
+ * @throws MetadataException
+ */
+ public void dropExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile) throws MetadataException;
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
@@ -453,4 +484,5 @@
public void releaseReadLatch();
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index d1e63e1..206ef8a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -28,6 +28,7 @@
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Node;
@@ -471,6 +472,45 @@
* @throws RemoteException
*/
public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException;
+
+ /**
+ * @param jobId
+ * A globally unique id for an active metadata transaction.
+ * @param externalFile
+ * An object representing the external file entity
+ * @throws MetadataException
+ * for example, if the file already exists.
+ * @throws RemoteException
+ */
+ public void addExternalDatasetFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException;
+
+ /**
+ * @param jobId
+ * A globally unique id for an active metadata transaction.
+ * @param dataset
+ * A dataset the files belongs to.
+ * @throws MetadataException
+ * @throws RemoteException
+ */
+ public List<ExternalFile> getExternalDatasetFiles(JobId jobId, Dataset dataset
+ ) throws MetadataException, RemoteException;
+
+ /**
+ * Deletes an externalFile , acquiring local locks on behalf of the given
+ * transaction id.
+ *
+ * @param jobId
+ * A globally unique id for an active metadata transaction.
+ * @param dataverseName
+ * dataverse asociated with the external dataset that owns the file to be deleted.
+ * @param datasetName
+ * Name of dataset owning the file to be deleted.
+ * @param fileNumber
+ * the id number for the file to be deleted
+ * @throws RemoteException
+ */
+ public void dropExternalFile(JobId jobId, String dataverseName, String datasetName, int fileNumber) throws MetadataException,
+ RemoteException;
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index aa976f8..9d795f5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -118,7 +118,7 @@
MetadataPrimaryIndexes.DATASET_DATASET, MetadataPrimaryIndexes.DATATYPE_DATASET,
MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
- MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET };
+ MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET };
secondaryIndexes = new IMetadataIndex[] { MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX };
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 8bdd92b..100ec40 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -41,9 +41,11 @@
public static final int NODEGROUP_DATASET_ID = 6;
public static final int FUNCTION_DATASET_ID = 7;
public static final int DATASOURCE_ADAPTER_DATASET_ID = 8;
+ public static final int EXTERNAL_FILE_DATASET_ID = 9;
public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
public static IMetadataIndex DATASOURCE_ADAPTER_DATASET;
+ public static IMetadataIndex EXTERNAL_FILE_DATASET;
/**
* Create all metadata primary index descriptors. MetadataRecordTypes must
@@ -92,5 +94,11 @@
BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name" }, 0,
MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0,
1 });
+
+ EXTERNAL_FILE_DATASET = new MetadataIndex("ExternalFile", null, 4, new IAType[] {
+ BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, new String[] { "DataverseName", "DatasetName",
+ "FileNumber"}, 0,
+ MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE, EXTERNAL_FILE_DATASET_ID, true, new int[] { 0,
+ 1, 2 });
}
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index d3eee76..11f9c91 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -47,6 +47,7 @@
public static ARecordType NODEGROUP_RECORDTYPE;
public static ARecordType FUNCTION_RECORDTYPE;
public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
+ public static ARecordType EXTERNAL_FILE_RECORDTYPE;
/**
* Create all metadata record types.
@@ -76,6 +77,7 @@
NODEGROUP_RECORDTYPE = createNodeGroupRecordType();
FUNCTION_RECORDTYPE = createFunctionRecordType();
DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
+ EXTERNAL_FILE_RECORDTYPE = createExternalFileRecordType();
} catch (AsterixException e) {
throw new MetadataException(e);
}
@@ -357,5 +359,19 @@
BuiltinType.ASTRING };
return new ARecordType("DatasourceAdapterRecordType", fieldNames, fieldTypes, true);
}
+
+ public static final int EXTERNAL_FILE_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
+ public static final int EXTERNAL_FILE_ARECORD_DATASET_NAME_FIELD_INDEX = 1;
+ public static final int EXTERNAL_FILE_ARECORD_FILE_NUMBER_FIELD_INDEX = 2;
+ public static final int EXTERNAL_FILE_ARECORD_FILE_NAME_FIELD_INDEX = 3;
+ public static final int EXTERNAL_FILE_ARECORD_FILE_SIZE_FIELD_INDEX = 4;
+ public static final int EXTERNAL_FILE_ARECORD_FILE_MOD_DATE_FIELD_INDEX = 5;
+
+ private static ARecordType createExternalFileRecordType() throws AsterixException {
+ String[] fieldNames = { "DataverseName", "DatasetName", "FileNumber", "FileName", "FileSize", "FileModDate"};
+ IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT64,
+ BuiltinType.ADATETIME};
+ return new ARecordType("ExternalFileRecordType", fieldNames, fieldTypes, true);
+ }
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 98320ce..d37bf8f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -18,11 +18,14 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
+import org.apache.hadoop.fs.FileStatus;
+
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
@@ -46,10 +49,12 @@
import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
import edu.uci.ics.asterix.external.data.operator.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.AbstractDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
+import edu.uci.ics.asterix.external.util.ExternalDataFilesMetadataProvider;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
@@ -63,6 +68,7 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -158,8 +164,9 @@
private boolean asyncResults;
private ResultSetId resultSetId;
private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
+ private static boolean optimizeExternalIndexes = false;
- private final Dataverse defaultDataverse;
+ private final Dataverse defaultDataverse;
private JobId jobId;
private final AsterixStorageProperties storageProperties;
@@ -167,22 +174,6 @@
private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
private static Scheduler hdfsScheduler;
- public String getPropertyValue(String propertyName) {
- return config.get(propertyName);
- }
-
- public void setConfig(Map<String, String> config) {
- this.config = config;
- }
-
- public Map<String, String[]> getAllStores() {
- return stores;
- }
-
- public Map<String, String> getConfig() {
- return config;
- }
-
public AqlMetadataProvider(Dataverse defaultDataverse) {
this.defaultDataverse = defaultDataverse;
this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
@@ -262,6 +253,30 @@
public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
return resultSerializerFactoryProvider;
}
+
+ public String getPropertyValue(String propertyName) {
+ return config.get(propertyName);
+ }
+
+ public void setConfig(Map<String, String> config) {
+ this.config = config;
+ }
+
+ public Map<String, String[]> getAllStores() {
+ return stores;
+ }
+
+ public Map<String, String> getConfig() {
+ return config;
+ }
+
+ public static boolean isOptimizeExternalIndexes() {
+ return optimizeExternalIndexes;
+ }
+
+ public static void setOptimizeExternalIndexes(boolean optimizeExternalIndexes) {
+ AqlMetadataProvider.optimizeExternalIndexes = optimizeExternalIndexes;
+ }
@Override
public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
@@ -403,13 +418,93 @@
@SuppressWarnings("rawtypes")
public Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataIndexingRuntime(
- JobSpecification jobSpec, IAType itemType, ExternalDatasetDetails datasetDetails, IDataFormat format)
+ JobSpecification jobSpec, IAType itemType, Dataset dataset, IDataFormat format)
throws AlgebricksException {
IGenericDatasetAdapterFactory adapterFactory;
IDatasourceAdapter adapter;
String adapterName;
DatasourceAdapter adapterEntity;
String adapterFactoryClassname;
+ ExternalDatasetDetails datasetDetails = null;
+ try {
+ datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ adapterName = datasetDetails.getAdapter();
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ adapterName);
+ if (adapterEntity != null) {
+ adapterFactoryClassname = adapterEntity.getClassname();
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ } else {
+ adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
+ if (adapterFactoryClassname == null) {
+ throw new AlgebricksException(" Unknown adapter :" + adapterName);
+ }
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ }
+
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
+ wrapProperties(datasetDetails.getProperties()), itemType, null);
+ } catch (AlgebricksException ae) {
+ throw ae;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to create adapter " + e);
+ }
+ if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+ IDatasourceAdapter.AdapterType.READ_WRITE))) {
+ throw new AlgebricksException("external dataset adapter does not support read operation");
+ }
+ ARecordType rt = (ARecordType) itemType;
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ RecordDescriptor indexerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+ ExternalDataIndexingOperatorDescriptor dataIndexScanner = null;
+ List<ExternalFile> files = null;
+ HashMap<String, Integer> filesNumbers = null;
+ if(optimizeExternalIndexes)
+ {
+ try {
+ files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to get list of external files from metadata " + e);
+ }
+
+ filesNumbers = new HashMap<String,Integer>();
+ for(int i=0; i< files.size(); i++)
+ {
+ filesNumbers.put(files.get(i).getFileName(), files.get(i).getFileNumber());
+ }
+
+ dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+ wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
+ }
+ else
+ {
+ dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
+ wrapPropertiesEmpty(datasetDetails.getProperties()), rt, indexerDesc, adapterFactory,filesNumbers);
+ }
+ AlgebricksPartitionConstraint constraint;
+ try {
+ constraint = adapter.getPartitionConstraint();
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner, constraint);
+ }
+
+ public ArrayList<ExternalFile> getExternalDatasetFiles(Dataset dataset) throws AlgebricksException
+ {
+ ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
+ if(dataset.getDatasetType() != DatasetType.EXTERNAL)
+ {
+ throw new AlgebricksException("Can only get external dataset files");
+ }
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails)dataset.getDatasetDetails();
+ IGenericDatasetAdapterFactory adapterFactory;
+ IDatasourceAdapter adapter;
+ String adapterName;
+ DatasourceAdapter adapterEntity;
+ String adapterFactoryClassname;
try {
adapterName = datasetDetails.getAdapter();
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -425,30 +520,28 @@
adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
- adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createIndexingAdapter(
- wrapProperties(datasetDetails.getProperties()), itemType);
- } catch (AlgebricksException ae) {
- throw ae;
- } catch (Exception e) {
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+ wrapProperties(datasetDetails.getProperties()), null);
+ }
+ catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("Unable to create adapter " + e);
}
- if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
- IDatasourceAdapter.AdapterType.READ_WRITE))) {
- throw new AlgebricksException("external dataset adapter does not support read operation");
- }
- ARecordType rt = (ARecordType) itemType;
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor recordDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- ExternalDataIndexingOperatorDescriptor dataIndexScanner = new ExternalDataIndexingOperatorDescriptor(jobSpec,
- wrapPropertiesEmpty(datasetDetails.getProperties()), rt, recordDesc, adapterFactory);
- AlgebricksPartitionConstraint constraint;
+
try {
- constraint = adapter.getPartitionConstraint();
- } catch (Exception e) {
- throw new AlgebricksException(e);
+ ArrayList<FileStatus> fileStatuses = ExternalDataFilesMetadataProvider.getHDFSFileStatus((AbstractDatasourceAdapter) adapter);
+ for(int i=0; i<fileStatuses.size(); i++)
+ {
+ files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), new Date(fileStatuses.get(i).getModificationTime()),
+ fileStatuses.get(i).getLen(),
+ fileStatuses.get(i).getPath().toUri().getPath(),
+ i));
+ }
+ return files;
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to get list of HDFS files " + e);
}
- return new Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint>(dataIndexScanner, constraint);
}
@SuppressWarnings("rawtypes")
@@ -503,10 +596,33 @@
IDataFormat format = NonTaggedDataFormat.INSTANCE;
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor outRecDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- ExternalDataAccessByRIDOperatorDescriptor dataAccessor = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
- itemType, outRecDesc, adapterFactory);
+
+ ExternalDataAccessByRIDOperatorDescriptor dataAccessOperator = null;
+ if(optimizeExternalIndexes)
+ {
+ //create the hashmap
+ List<ExternalFile> files=null;
+ try {
+ files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Couldn't get file names for access by optimized RIDs",e);
+ }
+ HashMap<Integer, String> filesMapping = new HashMap<Integer, String>();
+ for(int i=0; i < files.size(); i++)
+ {
+ filesMapping.put(files.get(i).getFileNumber(), files.get(i).getFileName());
+ }
+ dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
+ itemType, outRecDesc, adapterFactory, filesMapping);
+ }
+ else
+ {
+ dataAccessOperator = new ExternalDataAccessByRIDOperatorDescriptor(jobSpec, wrapPropertiesEmpty(datasetDetails.getProperties()),
+ itemType, outRecDesc, adapterFactory, null);
+ }
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = splitProviderAndPartitionConstraintsForExternalDataset(dataset.getDataverseName(),dataset.getDatasetName(),secondaryIndex.getIndexName());
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessor, splitsAndConstraints.second);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataAccessOperator, splitsAndConstraints.second);
}
@SuppressWarnings("rawtypes")
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalFile.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalFile.java
new file mode 100644
index 0000000..0128783
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalFile.java
@@ -0,0 +1,110 @@
+package edu.uci.ics.asterix.metadata.entities;
+
+import java.util.Date;
+
+import edu.uci.ics.asterix.metadata.MetadataCache;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+
+public class ExternalFile implements IMetadataEntity{
+
+ /**
+ * A class for metadata entity externalFile
+ * This class represents an external dataset file and is intended for use with external data indexes
+ */
+ private static final long serialVersionUID = 1L;
+
+ private String dataverseName;
+ private String datasetName;
+ private Date lastModefiedTime;
+ private long size;
+ private String fileName;
+ private int fileNumber;
+
+
+ public ExternalFile(String dataverseName, String datasetName, Date lastModefiedTime, long size, String fileName,
+ int fileNumber) {
+ this.dataverseName = dataverseName;
+ this.datasetName = datasetName;
+ this.lastModefiedTime = lastModefiedTime;
+ this.size = size;
+ this.fileName = fileName;
+ this.fileNumber = fileNumber;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public void setDataverseName(String dataverseName) {
+ this.dataverseName = dataverseName;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public void setDatasetName(String datasetName) {
+ this.datasetName = datasetName;
+ }
+ public Date getLastModefiedTime() {
+ return lastModefiedTime;
+ }
+
+ public void setLastModefiedTime(Date lastModefiedTime) {
+ this.lastModefiedTime = lastModefiedTime;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public int getFileNumber() {
+ return fileNumber;
+ }
+
+ public void setFileNumber(int fileNumber) {
+ this.fileNumber = fileNumber;
+ }
+
+ @Override
+ public Object addToCache(MetadataCache cache) {
+ //return cache.addExternalFileIfNotExists(this);
+ return null;
+ }
+
+ @Override
+ public Object dropFromCache(MetadataCache cache) {
+ //cache.dropExternalFile(this);
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null)
+ return false;
+ if (obj == this)
+ return true;
+ if (!(obj instanceof ExternalFile))
+ return false;
+ ExternalFile anotherFile = (ExternalFile) obj;
+ if(fileNumber != anotherFile.fileNumber)
+ return false;
+ if(!fileName.equals(anotherFile.fileName))
+ return false;
+ return true;
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java
new file mode 100644
index 0000000..6837c72
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java
@@ -0,0 +1,158 @@
+package edu.uci.ics.asterix.metadata.entitytupletranslators;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
+import edu.uci.ics.asterix.metadata.entities.ExternalFile;
+import edu.uci.ics.asterix.om.base.ADateTime;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ExternalFileTupleTranslator extends AbstractTupleTranslator<ExternalFile>{
+
+ // Field indexes of serialized ExternalFile in a tuple.
+ // First key field.
+ public static final int EXTERNAL_FILE_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+ // Second key field.
+ public static final int EXTERNAL_FILE_DATASETNAME_TUPLE_FIELD_INDEX = 1;
+ // Third key field
+ public static final int EXTERNAL_FILE_NUMBER_TUPLE_FIELD_INDEX = 2;
+ // Payload field containing serialized ExternalFile.
+ public static final int EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX = 3;
+
+ protected AMutableInt32 aInt32 = new AMutableInt32(0);
+ protected AMutableDateTime aDateTime = new AMutableDateTime(0);
+ protected AMutableInt64 aInt64 = new AMutableInt64(0);
+
+ @SuppressWarnings("unchecked")
+ protected ISerializerDeserializer<AInt32> intSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ @SuppressWarnings("unchecked")
+ protected ISerializerDeserializer<ADateTime> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADATETIME);
+ @SuppressWarnings("unchecked")
+ protected ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE);
+
+ public ExternalFileTupleTranslator(boolean getTuple) {
+ super(getTuple, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET.getFieldCount());
+ }
+
+ @Override
+ public ExternalFile getMetadataEntytiFromTuple(ITupleReference tuple)
+ throws MetadataException, IOException {
+ byte[] serRecord = tuple.getFieldData(EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = tuple.getFieldStart(EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = tuple.getFieldLength(EXTERNAL_FILE_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord externalFileRecord = (ARecord) recordSerDes.deserialize(in);
+ return createExternalFileFromARecord(externalFileRecord);
+ }
+
+ private ExternalFile createExternalFileFromARecord(ARecord externalFileRecord) {
+ String dataverseName = ((AString) externalFileRecord
+ .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATAVERSENAME_FIELD_INDEX))
+ .getStringValue();
+ String datasetName = ((AString) externalFileRecord
+ .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATASET_NAME_FIELD_INDEX)).getStringValue();
+ String FileName = ((AString) externalFileRecord
+ .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NAME_FIELD_INDEX)).getStringValue();
+ int fileNumber = ((AInt32) externalFileRecord
+ .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NUMBER_FIELD_INDEX)).getIntegerValue();
+ Date lastMoDifiedDate = new Date(((ADateTime) externalFileRecord
+ .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_MOD_DATE_FIELD_INDEX)).getChrononTime());
+ long fileSize = ((AInt64) externalFileRecord
+ .getValueByPos(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_SIZE_FIELD_INDEX)).getLongValue();
+
+ return new ExternalFile(dataverseName, datasetName,lastMoDifiedDate,fileSize,FileName,fileNumber);
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(
+ ExternalFile externalFile) throws MetadataException, IOException {
+ // write the key in the first 3 fields of the tuple
+ tupleBuilder.reset();
+ //dataverse name
+ aString.setValue(externalFile.getDataverseName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ //dataset name
+ aString.setValue(externalFile.getDatasetName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ //file number
+ aInt32.setValue(externalFile.getFileNumber());
+ intSerde.serialize(aInt32, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ // write the pay-load in the fourth field of the tuple
+ recordBuilder.reset(MetadataRecordTypes.EXTERNAL_FILE_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(externalFile.getDataverseName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(externalFile.getDatasetName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_DATASET_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aInt32.setValue(externalFile.getFileNumber());
+ intSerde.serialize(aInt32, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NUMBER_FIELD_INDEX, fieldValue);
+
+ // write field 3
+ fieldValue.reset();
+ aString.setValue(externalFile.getFileName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 4
+ fieldValue.reset();
+ aInt64.setValue(externalFile.getSize());
+ longSerde.serialize(aInt64, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_SIZE_FIELD_INDEX, fieldValue);
+
+ //write field 5
+ fieldValue.reset();
+ aDateTime.setValue(externalFile.getLastModefiedTime().getTime());
+ dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.EXTERNAL_FILE_ARECORD_FILE_MOD_DATE_FIELD_INDEX, fieldValue);
+
+ // write record
+ try {
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ } catch (AsterixException e) {
+ throw new MetadataException(e);
+ }
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
index 534c183..95d26d9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetUtils.java
@@ -20,6 +20,7 @@
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -96,18 +97,32 @@
ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
if (edd.getProperties().get(KEY_INPUT_FORMAT).trim().equals(INPUT_FORMAT_RC))
{
- //RID: <String(File name), Int64(Block byte location), Int32(row number)>
+ //RID: <String(File name) OR Int32(File number), Int64(Block byte location), Int32(row number)>
IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[3];
- bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT32);
+ }
+ else
+ {
+ bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+ }
bhffs[1] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT64);
bhffs[2] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT32);
return bhffs;
}
else
{
- //RID: <String(File name), Int64(Record byte location)>
+ //RID: <String(File name) OR Int32(File number), Int64(Record byte location)>
IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[2];
- bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT32);
+ }
+ else
+ {
+ bhffs[0] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.ASTRING);
+ }
bhffs[1] = hashFunProvider.getBinaryHashFunctionFactory(BuiltinType.AINT64);
return bhffs;
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 23d4ef4..c576cf8 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.tools.external.data;
+import java.util.HashMap;
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -72,7 +73,7 @@
@Override
public IDatasourceAdapter createIndexingAdapter(
- Map<String, Object> configuration, IAType atype) throws Exception {
+ Map<String, Object> configuration, IAType atype, Map<String,Integer> files) throws Exception {
throw new NotImplementedException("Rate Controlled Indexing Adapter is not implemented for feeds");
}
@@ -93,7 +94,7 @@
@Override
public IControlledAdapter createAccessByRIDAdapter(
- Map<String, Object> configuration, IAType atype) throws Exception {
+ Map<String, Object> configuration, IAType atype, HashMap<Integer,String> files) throws Exception {
throw new NotImplementedException("Rate Controlled Access by RID Adapter is not implemented for feeds");
}