[NO ISSUE][OTH] Simplify ResultJobRecord APIs
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Simplify ResultJobRecord APIs by allowing only a single ResultSetId
per ResultJobRecord (i.e. per job).
- Fail result partition registration when a job attempts to use
multiple ResultSetIds or inconsistent number of partitions and
log the inconsistency.
- Delete test ReplicateOperatorTest which duplicates
the test PushRuntimeTest#scanReplicateWrite.
Change-Id: I37816efc92ee9f5e66f29ce74dec4c6c5bd07c6f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3432
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index b3b0706..02762ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -21,12 +21,12 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class ResultJobRecord implements IResultStateRecord {
@@ -77,16 +77,14 @@
}
private static final long serialVersionUID = 1L;
-
+ private static final Logger LOGGER = LogManager.getLogger();
private final long timestamp;
private long jobStartTime;
private long jobEndTime;
private Status status;
-
+ private ResultSetId rsId;
private ResultSetMetaData resultSetMetaData;
- private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
-
public ResultJobRecord() {
this.timestamp = System.nanoTime();
this.status = new Status();
@@ -116,10 +114,6 @@
updateState(State.SUCCESS);
}
- public void fail(ResultSetId rsId, int partition) {
- getOrCreateDirectoryRecord(rsId, partition).fail();
- }
-
public void fail(List<Exception> exceptions) {
updateState(State.FAILED);
status.setExceptions(exceptions);
@@ -139,47 +133,40 @@
StringBuilder sb = new StringBuilder();
sb.append("{ \"status\": ").append(status.toString()).append(", ");
sb.append("\"timestamp\": ").append(timestamp).append(", ");
- sb.append("\"resultsets\": ").append(Arrays.toString(resultSetMetadataMap.entrySet().toArray())).append(" }");
+ sb.append("\"resultset\": ").append(resultSetMetaData).append(" }");
return sb.toString();
}
public synchronized void setResultSetMetaData(ResultSetId rsId, IResultMetadata metadata, int nPartitions)
throws HyracksDataException {
- ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
- if (rsMd == null) {
- final ResultSetMetaData resultSetMetaData = new ResultSetMetaData(nPartitions, metadata);
- resultSetMetadataMap.put(rsId, resultSetMetaData);
- this.resultSetMetaData = resultSetMetaData;
- } else if (rsMd.getRecords().length != nPartitions) {
- throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString());
+ if (this.rsId == null) {
+ this.rsId = rsId;
+ this.resultSetMetaData = new ResultSetMetaData(nPartitions, metadata);
+ } else if (!this.rsId.equals(rsId) || resultSetMetaData.getRecords().length != nPartitions) {
+ logInconsistentMetadata(rsId, nPartitions);
+ throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, this.rsId.toString());
}
- //TODO(tillw) throwing a HyracksDataException here hangs the execution tests
}
- public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) {
- return resultSetMetadataMap.get(rsId);
- }
-
- public synchronized ResultDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
- ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ public synchronized ResultDirectoryRecord getOrCreateDirectoryRecord(int partition) {
+ ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
if (records[partition] == null) {
records[partition] = new ResultDirectoryRecord();
}
return records[partition];
}
- public synchronized ResultDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition)
- throws HyracksDataException {
- ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ public synchronized ResultDirectoryRecord getDirectoryRecord(int partition) throws HyracksDataException {
+ ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
if (records[partition] == null) {
throw HyracksDataException.create(ErrorCode.RESULT_NO_RECORD, partition, rsId);
}
return records[partition];
}
- public synchronized void updateState(ResultSetId rsId) {
+ public synchronized void updateState() {
int successCount = 0;
- ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+ ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
for (ResultDirectoryRecord record : records) {
if ((record != null) && (record.getStatus() == ResultDirectoryRecord.Status.SUCCESS)) {
successCount++;
@@ -193,4 +180,18 @@
public synchronized ResultSetMetaData getResultSetMetaData() {
return resultSetMetaData;
}
+
+ private void logInconsistentMetadata(ResultSetId rsId, int nPartitions) {
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn("inconsistent result metadata for result set {}", this.rsId);
+ if (!this.rsId.equals(rsId)) {
+ LOGGER.warn("inconsistent result set id. Current {}, new {}", this.rsId, rsId);
+ }
+ final int expectedPartitions = resultSetMetaData.getRecords().length;
+ if (expectedPartitions != nPartitions) {
+ LOGGER.warn("inconsistent result set number of partitions. Current {}, new {}", expectedPartitions,
+ nPartitions);
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index a2218e2..bfecc48 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -121,7 +121,7 @@
throws HyracksDataException {
ResultJobRecord djr = getNonNullResultJobRecord(jobId);
djr.setResultSetMetaData(rsId, metadata, nPartitions);
- ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
+ ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(partition);
record.setNetworkAddress(networkAddress);
record.setEmpty(emptyResult);
@@ -147,8 +147,8 @@
public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
throws HyracksDataException {
ResultJobRecord djr = getNonNullResultJobRecord(jobId);
- djr.getDirectoryRecord(rsId, partition).writeEOS();
- djr.updateState(rsId);
+ djr.getDirectoryRecord(partition).writeEOS();
+ djr.updateState();
notifyAll();
}
@@ -178,7 +178,7 @@
@Override
public synchronized IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws HyracksDataException {
- return getNonNullResultJobRecord(jobId).getResultSetMetaData(rsId).getMetadata();
+ return getNonNullResultJobRecord(jobId).getResultSetMetaData().getMetadata();
}
@Override
@@ -228,7 +228,6 @@
private ResultDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords)
throws HyracksDataException {
ResultJobRecord djr = getNonNullResultJobRecord(jobId);
-
if (djr.getStatus().getState() == State.FAILED) {
List<Exception> caughtExceptions = djr.getStatus().getExceptions();
if (caughtExceptions != null && !caughtExceptions.isEmpty()) {
@@ -241,13 +240,11 @@
throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_NO_EXCEPTION, rsId, jobId);
}
}
-
- final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData(rsId);
+ final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData();
if (resultSetMetaData == null) {
return null;
}
ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
-
return Arrays.equals(records, knownRecords) ? null : records;
}
@@ -255,7 +252,7 @@
for (JobId jId : getJobIds()) {
pw.print(jId.toString());
pw.print(" - ");
- pw.println(String.valueOf(getResultJobRecord(jId)));
+ pw.println(getResultJobRecord(jId));
}
pw.flush();
return pw;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
deleted file mode 100644
index 0d4d0e8..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.result.ResultSetId;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ReplicateOperatorTest extends AbstractIntegrationTest {
-
- public void compareFiles(String fileNameA, String fileNameB) throws IOException {
- BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
- BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
-
- String lineA, lineB;
- while ((lineA = fileA.readLine()) != null) {
- lineB = fileB.readLine();
- Assert.assertEquals(lineA, lineB);
- }
- Assert.assertNull(fileB.readLine());
- fileA.close();
- fileB.close();
- }
-
- @Test
- public void test() throws Exception {
- final int outputArity = 2;
-
- JobSpecification spec = new JobSpecification();
-
- String inputFileName = "data" + File.separator + "nc1" + File.separator + "words.txt";
- File[] outputFile = new File[outputArity];
- for (int i = 0; i < outputArity; i++) {
- outputFile[i] = File.createTempFile("replicateop", null);
- outputFile[i].deleteOnExit();
- }
-
- FileSplit[] inputSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, inputFileName) };
-
- String[] locations = new String[] { NC1_ID };
-
- DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
- new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
- RecordDescriptor stringRec =
- new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), });
-
- FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec,
- new ConstantFileSplitProvider(inputSplits), stringParser, stringRec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
-
- ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, locations);
-
- IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
- for (int i = 0; i < outputArity; i++) {
- ResultSetId rsId = new ResultSetId(i);
- spec.addResultSetId(rsId);
-
- outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, null, false,
- ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
- }
-
- spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0);
- for (int i = 0; i < outputArity; i++) {
- spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0);
- }
-
- for (int i = 0; i < outputArity; i++) {
- spec.addRoot(outputOp[i]);
- }
- String[] expectedResultsFileNames = new String[outputArity];
- for (int i = 0; i < outputArity; i++) {
- expectedResultsFileNames[i] = "data" + File.separator + "device0" + File.separator + inputFileName;
- }
- runTestAndCompareResults(spec, expectedResultsFileNames);
- }
-}