Fix Number of LocalFS Readers
This change makes the number of readers in localfs
adapter always equals to one. This is a temporary
change and is done to avoid sporadic failures for
decorrelation with unique id.
Change-Id: Id948bfd2c1a4e79863378ed98fb27f662fffba93
Reviewed-on: https://asterix-gerrit.ics.uci.edu/782
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 08fce87..712ffbe 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -22,12 +22,12 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
@@ -38,7 +38,6 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.asterix.external.util.NodeResolverFactory;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -128,19 +127,10 @@
}
private void configurePartitionConstraint() throws AsterixException {
- Map<String, ClusterPartition[]> partitions = AsterixAppContextInfo.getInstance().getMetadataProperties()
- .getNodePartitions();
- List<String> locs = new ArrayList<>();
+ Set<String> locs = new TreeSet<>();
for (int i = 0; i < inputFileSplits.length; i++) {
String location = inputFileSplits[i].getNodeName();
- if (!locs.contains(location)) {
- int numOfPartitions = partitions.get(location).length;
- int j = 0;
- while (j < numOfPartitions) {
- locs.add(location);
- j++;
- }
- }
+ locs.add(location);
}
constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()]));
}