Small Cleanup
Change-Id: Ib527f2eee283faf75c04323b6961c31a6d739d07
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1145
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
index c8a9566..5df687a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
@@ -38,7 +38,6 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
@@ -71,6 +70,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
@@ -90,6 +90,7 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -255,13 +256,15 @@
* @param files
* @param indexerDesc
* @return
- * @throws AsterixException
+ * @throws AlgebricksException
+ * @throws HyracksDataException
* @throws Exception
*/
private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>
getExternalDataIndexingOperator(
AqlMetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset,
- List<ExternalFile> files, RecordDescriptor indexerDesc) throws AsterixException {
+ List<ExternalFile> files, RecordDescriptor indexerDesc)
+ throws HyracksDataException, AlgebricksException {
ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
Map<String, String> configuration = externalDatasetDetails.getProperties();
IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
@@ -273,7 +276,7 @@
public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(
JobSpecification spec, AqlMetadataProvider metadataProvider, Dataset dataset, ARecordType itemType,
- RecordDescriptor indexerDesc, List<ExternalFile> files) throws AsterixException {
+ RecordDescriptor indexerDesc, List<ExternalFile> files) throws HyracksDataException, AlgebricksException {
if (files == null) {
files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index c7bed8d9..275825b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataFlowController;
@@ -45,6 +44,7 @@
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -75,7 +75,8 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
+ throws HyracksDataException, AlgebricksException {
return dataSourceFactory.getPartitionConstraint();
}
@@ -89,7 +90,7 @@
.getApplicationContext().getApplicationObject();
try {
restoreExternalObjects(runtimeCtx.getLibraryManager());
- } catch (AsterixException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
if (isFeed) {
@@ -107,7 +108,8 @@
}
}
- private void restoreExternalObjects(ILibraryManager libraryManager) throws AsterixException {
+ private void restoreExternalObjects(ILibraryManager libraryManager)
+ throws HyracksDataException, AlgebricksException {
if (dataSourceFactory == null) {
dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration);
// create and configure parser factory
@@ -126,7 +128,8 @@
}
@Override
- public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException {
+ public void configure(ILibraryManager libraryManager, Map<String, String> configuration)
+ throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
ExternalDataUtils.validateDataSourceParameters(configuration);
dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(libraryManager, configuration);
@@ -144,7 +147,7 @@
nullifyExternalObjects();
}
- private void configureFeedLogManager() throws AsterixException {
+ private void configureFeedLogManager() throws HyracksDataException, AlgebricksException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
if (isFeed) {
feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index 01fcfc2..6b69d9c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -21,7 +21,6 @@
import java.io.Serializable;
import java.util.Map;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.ILookupReaderFactory;
import org.apache.asterix.external.api.ILookupRecordReader;
@@ -34,6 +33,7 @@
import org.apache.asterix.external.provider.LookupReaderFactoryProvider;
import org.apache.asterix.external.provider.ParserFactoryProvider;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -77,7 +77,8 @@
}
}
- public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException {
+ public void configure(ILibraryManager libraryManager, Map<String, String> configuration)
+ throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration);
dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(libraryManager,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 2d42ba9..3ea3bb1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -21,10 +21,10 @@
import java.io.Serializable;
import java.util.Map;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -52,8 +52,12 @@
* constraint can be expressed as a node IP address or a node controller id.
* In the former case, the IP address is translated to a node controller id
* running on the node with the given IP address.
+ *
+ * @throws AlgebricksException
+ * @throws HyracksDataException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
+ throws HyracksDataException, AlgebricksException;
/**
* Creates an instance of IDatasourceAdapter.
@@ -68,9 +72,11 @@
/**
* @param libraryManager
* @param configuration
- * @throws Exception
+ * @throws AlgebricksException
+ * @throws HyracksDataException
*/
- public void configure(ILibraryManager libraryManager, Map<String, String> configuration) throws AsterixException;
+ public void configure(ILibraryManager libraryManager, Map<String, String> configuration)
+ throws HyracksDataException, AlgebricksException;
public void setOutputType(ARecordType outputType);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index e4f21a6..6a237c6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -20,12 +20,16 @@
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.runtime.util.AsterixAppContextInfo;
import org.apache.asterix.runtime.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IExternalDataSourceFactory extends Serializable {
@@ -49,7 +53,8 @@
* @return
* @throws AsterixException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
+ throws AlgebricksException, HyracksDataException;
/**
* Configure the data parser factory. The passed map contains key value pairs from the
@@ -58,7 +63,7 @@
* @param configuration
* @throws AsterixException
*/
- public void configure(Map<String, String> configuration) throws AsterixException;
+ public void configure(Map<String, String> configuration) throws AlgebricksException, HyracksDataException;
/**
* Specify whether the external data source can be indexed
@@ -69,30 +74,42 @@
return false;
}
+ /**
+ * returns the passed partition constraints if not null, otherwise returns round robin absolute partition
+ * constraints that matches the count.
+ *
+ * @param constraints
+ * @param count
+ * @return
+ * @throws AlgebricksException
+ */
public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
- AlgebricksAbsolutePartitionConstraint constraints, int count) {
+ AlgebricksAbsolutePartitionConstraint constraints, int count) throws AlgebricksException {
if (constraints == null) {
- ArrayList<String> locs = new ArrayList<String>();
- Map<String, String[]> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores();
+ ArrayList<String> locs = new ArrayList<>();
+ Set<String> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores().keySet();
+ if (stores.isEmpty()) {
+ throw new AlgebricksException("Configurations don't have any stores");
+ }
int i = 0;
- while (i < count) {
- for (String node : stores.keySet()) {
+ outer: while (i < count) {
+ Iterator<String> storeIt = stores.iterator();
+ while (storeIt.hasNext()) {
+ String node = storeIt.next();
int numIODevices = AsterixClusterProperties.INSTANCE.getIODevices(node).length;
for (int k = 0; k < numIODevices; k++) {
locs.add(node);
i++;
if (i == count) {
- break;
+ break outer;
}
}
- if (i == count) {
- break;
- }
+ }
+ if (i == 0) {
+ throw new AlgebricksException("All stores have 0 IO devices");
}
}
- String[] cluster = new String[locs.size()];
- cluster = locs.toArray(cluster);
- constraints = new AlgebricksAbsolutePartitionConstraint(cluster);
+ return new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()]));
}
return constraints;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index f9eedd1..2ded3fb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -28,6 +28,7 @@
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -45,7 +46,7 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
int count = urls.size();
clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, count);
return clusterLocations;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index f743a3f..4649559 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -20,13 +20,13 @@
import java.util.Map;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.provider.StreamRecordReaderProvider.Format;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -52,12 +52,13 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
+ throws HyracksDataException, AlgebricksException {
return streamFactory.getPartitionConstraint();
}
@Override
- public void configure(Map<String, String> configuration) throws AsterixException {
+ public void configure(Map<String, String> configuration) throws HyracksDataException, AlgebricksException {
this.configuration = configuration;
streamFactory.configure(configuration);
format = StreamRecordReaderProvider.getReaderFormat(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 172b22b..73d1b39 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -31,11 +31,11 @@
import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import twitter4j.FilterQuery;
-import twitter4j.Status;
public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> {
@@ -55,7 +55,7 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, INTAKE_CARDINALITY);
return clusterLocations;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index 1eb760e..8ab8ead 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -33,6 +33,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.http.impl.conn.SystemDefaultDnsResolver;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -44,7 +45,7 @@
private List<Pair<String, Integer>> sockets;
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, sockets.size());
return clusterLocations;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 4cc8e33..059de63 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -30,7 +29,9 @@
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* This class represents the entry point to all things adapters
@@ -39,7 +40,8 @@
// Adapters
public static IAdapterFactory getAdapterFactory(ILibraryManager libraryManager, String adapterName,
- Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AsterixException {
+ Map<String, String> configuration, ARecordType itemType, ARecordType metaType)
+ throws HyracksDataException, AlgebricksException {
ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
adapterFactory.setOutputType(itemType);
@@ -51,7 +53,7 @@
// Indexing Adapters
public static IIndexingAdapterFactory getIndexingAdapterFactory(ILibraryManager libraryManager, String adapterName,
Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp,
- ARecordType metaType) throws AsterixException {
+ ARecordType metaType) throws HyracksDataException, AlgebricksException {
ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
adapterFactory.setOutputType(itemType);
@@ -64,7 +66,8 @@
// Lookup Adapters
public static LookupAdapterFactory<?> getLookupAdapterFactory(ILibraryManager libraryManager,
Map<String, String> configuration, ARecordType recordType, int[] ridFields, boolean retainInput,
- boolean retainMissing, IMissingWriterFactory missingWriterFactory) throws AsterixException {
+ boolean retainMissing, IMissingWriterFactory missingWriterFactory)
+ throws HyracksDataException, AlgebricksException {
LookupAdapterFactory<?> adapterFactory = new LookupAdapterFactory<>(recordType, ridFields, retainInput,
retainMissing, missingWriterFactory);
adapterFactory.configure(libraryManager, configuration);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
index 87c187a..3467411 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/RecordWithPKTestReaderFactory.java
@@ -25,6 +25,7 @@
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.record.RecordWithPK;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
public class RecordWithPKTestReaderFactory implements IRecordReaderFactory<RecordWithPK<char[]>> {
@@ -33,7 +34,7 @@
private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1);
return clusterLocations;
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d45097a..20cb2bc 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataSourceAdapter;
@@ -36,6 +35,7 @@
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -62,7 +62,7 @@
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
clusterLocations = IExternalDataSourceFactory.getPartitionConstraints(clusterLocations, 1);
return clusterLocations;
}
@@ -78,9 +78,11 @@
ADMDataParser parser;
ITupleForwarder forwarder;
ArrayTupleBuilder tb;
- IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx
- .getJobletContext().getApplicationContext().getControllerService()).getApplicationContext()
- .getApplicationObject();
+ IAsterixPropertiesProvider propertiesProvider =
+ (IAsterixPropertiesProvider) ((NodeControllerService) ctx
+ .getJobletContext().getApplicationContext().getControllerService())
+ .getApplicationContext()
+ .getApplicationObject();
ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
.get(nodeId)[0];
parser = new ADMDataParser(outputType, true);
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
index a604315..c46b0be 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
@@ -19,6 +19,8 @@
package org.apache.asterix.lang.common.util;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import org.apache.asterix.builders.OrderedListBuilder;
@@ -39,6 +41,7 @@
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -68,7 +71,7 @@
parseLiteral((LiteralExpr) expr, serialized);
break;
case RECORD_CONSTRUCTOR_EXPRESSION:
- parseRecord((RecordConstructor) expr, serialized, true);
+ parseRecord((RecordConstructor) expr, serialized, true, Collections.emptyList());
break;
case LIST_CONSTRUCTOR_EXPRESSION:
parseList((ListConstructor) expr, serialized);
@@ -82,7 +85,8 @@
}
}
- public static void parseRecord(RecordConstructor recordValue, ArrayBackedValueStorage serialized, boolean tagged)
+ public static void parseRecord(RecordConstructor recordValue, ArrayBackedValueStorage serialized, boolean tagged,
+ List<Pair<String, String>> defaults)
throws HyracksDataException {
AMutableString fieldNameString = new AMutableString(null);
ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
@@ -91,30 +95,48 @@
recordBuilder.reset(ARecordType.FULLY_OPEN_RECORD_TYPE);
recordBuilder.init();
List<FieldBinding> fbList = recordValue.getFbList();
+ HashSet<String> fieldNames = new HashSet<>();
for (FieldBinding fb : fbList) {
fieldName.reset();
fieldValue.reset();
// get key
- Expression keyExpr = fb.getLeftExpr();
- if (keyExpr.getKind() != Expression.Kind.LITERAL_EXPRESSION) {
- throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR,
- "JSON key can only be of type %1$s", Expression.Kind.LITERAL_EXPRESSION);
+ fieldNameString.setValue(exprToStringLiteral(fb.getLeftExpr()).getStringValue());
+ if (!fieldNames.add(fieldNameString.getStringValue())) {
+ throw new HyracksDataException(
+ "Field " + fieldNameString.getStringValue() + " was specified multiple times");
}
- LiteralExpr keyLiteralExpr = (LiteralExpr) keyExpr;
- Literal keyLiteral = keyLiteralExpr.getValue();
- if (keyLiteral.getLiteralType() != Literal.Type.STRING) {
- throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR,
- "JSON key can only be of type %1$s", Literal.Type.STRING);
- }
- fieldNameString.setValue(keyLiteral.getStringValue());
stringSerde.serialize(fieldNameString, fieldName.getDataOutput());
// get value
parseExpression(fb.getRightExpr(), fieldValue);
recordBuilder.addField(fieldName, fieldValue);
}
+ // defaults
+ for (Pair<String, String> kv : defaults) {
+ if (!fieldNames.contains(kv.first)) {
+ fieldName.reset();
+ fieldValue.reset();
+ stringSerde.serialize(new AString(kv.first), fieldName.getDataOutput());
+ stringSerde.serialize(new AString(kv.second), fieldValue.getDataOutput());
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+ }
recordBuilder.write(serialized.getDataOutput(), tagged);
}
+ public static Literal exprToStringLiteral(Expression expr) throws HyracksDataException {
+ if (expr.getKind() != Expression.Kind.LITERAL_EXPRESSION) {
+ throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR,
+ "Expected expression can only be of type %1$s", Expression.Kind.LITERAL_EXPRESSION);
+ }
+ LiteralExpr keyLiteralExpr = (LiteralExpr) expr;
+ Literal keyLiteral = keyLiteralExpr.getValue();
+ if (keyLiteral.getLiteralType() != Literal.Type.STRING) {
+ throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.ERROR_PARSE_ERROR,
+ "Expected Literal can only be of type %1$s", Literal.Type.STRING);
+ }
+ return keyLiteral;
+ }
+
private static void parseList(ListConstructor valueExpr, ArrayBackedValueStorage serialized)
throws HyracksDataException {
if (valueExpr.getType() != ListConstructor.Type.ORDERED_LIST_CONSTRUCTOR) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
index c69c89e..55c9d13 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
@@ -21,6 +21,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.List;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.RecordBuilder;
@@ -39,6 +40,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -210,6 +212,26 @@
confRecordBuilder.write(dataOutput, writeTypeTag);
}
+ @SuppressWarnings("unchecked")
+ public static void serializeSimpleSchemalessRecord(List<Pair<String, String>> record, DataOutput dataOutput,
+ boolean writeTypeTag)
+ throws HyracksDataException {
+ ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+ RecordBuilder confRecordBuilder = new RecordBuilder();
+ confRecordBuilder.reset(ARecordType.FULLY_OPEN_RECORD_TYPE);
+ ArrayBackedValueStorage fieldNameBytes = new ArrayBackedValueStorage();
+ ArrayBackedValueStorage fieldValueBytes = new ArrayBackedValueStorage();
+ for (int i = 0; i < record.size(); i++) {
+ fieldValueBytes.reset();
+ fieldNameBytes.reset();
+ stringSerde.serialize(new AString(record.get(i).first), fieldNameBytes.getDataOutput());
+ stringSerde.serialize(new AString(record.get(i).second), fieldValueBytes.getDataOutput());
+ confRecordBuilder.addField(fieldNameBytes, fieldValueBytes);
+ }
+ confRecordBuilder.write(dataOutput, writeTypeTag);
+ }
+
private IAObject[] mergeFields(IAObject[] closedFields, IAObject[] openFields) {
IAObject[] fields = new IAObject[closedFields.length + openFields.length];
int i = 0;