finish merge of master into genomix branch
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 7c16be0..e292a34 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -139,7 +139,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + pid + " by " + taId);
}
- fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(':', '_'));
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
size = 0;
@@ -177,4 +177,4 @@
manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
}
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 4ae43eb..002aa3b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -29,26 +29,20 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
public class PreclusteredGroupWriter implements IFrameWriter {
-
- private final static int INT_SIZE = 4;
-
private final int[] groupFields;
private final IBinaryComparator[] comparators;
private final IAggregatorDescriptor aggregator;
private final AggregateState aggregateState;
private final IFrameWriter writer;
+ private final ByteBuffer copyFrame;
private final FrameTupleAccessor inFrameAccessor;
+ private final FrameTupleAccessor copyFrameAccessor;
private final ByteBuffer outFrame;
private final FrameTupleAppender appender;
private final ArrayTupleBuilder tupleBuilder;
- private final RecordDescriptor outRecordDesc;
-
- private byte[] groupResultCache;
- private ByteBuffer groupResultCacheBuffer;
- private FrameTupleAccessor groupResultCacheAccessor;
- private FrameTupleAppender groupResultCacheAppender;
+ private boolean first;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
@@ -58,9 +52,10 @@
this.aggregator = aggregator;
this.aggregateState = aggregator.createAggregateStates();
this.writer = writer;
- this.outRecordDesc = outRecordDesc;
-
+ copyFrame = ctx.allocateFrame();
inFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDesc);
+ copyFrameAccessor.reset(copyFrame);
outFrame = ctx.allocateFrame();
appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -72,6 +67,7 @@
@Override
public void open() throws HyracksDataException {
writer.open();
+ first = true;
}
@Override
@@ -79,45 +75,40 @@
inFrameAccessor.reset(buffer);
int nTuples = inFrameAccessor.getTupleCount();
for (int i = 0; i < nTuples; ++i) {
+ if (first) {
- if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
- groupResultCacheAccessor.reset(ByteBuffer.wrap(groupResultCache));
- if (sameGroup(inFrameAccessor, i, groupResultCacheAccessor, 0)) {
- // find match: do aggregation
- aggregator.aggregate(inFrameAccessor, i, groupResultCacheAccessor, 0, aggregateState);
- continue;
- } else {
- // write the cached group into the final output
- writeOutput(groupResultCacheAccessor, 0);
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
}
+ aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+
+ first = false;
+
+ } else {
+ if (i == 0) {
+ switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
+ } else {
+ switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ }
+
}
+ }
+ FrameUtils.copy(buffer, copyFrame);
+ }
+
+ private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
+ FrameTupleAccessor currTupleAccessor, int currTupleIndex) throws HyracksDataException {
+ if (!sameGroup(prevTupleAccessor, prevTupleIndex, currTupleAccessor, currTupleIndex)) {
+ writeOutput(prevTupleAccessor, prevTupleIndex);
tupleBuilder.reset();
-
for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+ tupleBuilder.addField(currTupleAccessor, currTupleIndex, groupFields[j]);
}
-
- aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
-
- // enlarge the cache buffer if necessary
- int requiredSize = tupleBuilder.getSize() + tupleBuilder.getFieldEndOffsets().length * INT_SIZE + 2
- * INT_SIZE;
-
- if (groupResultCache == null || groupResultCache.length < requiredSize) {
- groupResultCache = new byte[requiredSize];
- groupResultCacheAppender = new FrameTupleAppender(groupResultCache.length);
- groupResultCacheBuffer = ByteBuffer.wrap(groupResultCache);
- groupResultCacheAccessor = new FrameTupleAccessor(groupResultCache.length, outRecordDesc);
- }
-
- groupResultCacheAppender.reset(groupResultCacheBuffer, true);
- if (!groupResultCacheAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new HyracksDataException("The partial result is too large to be initialized in a frame.");
- }
-
- groupResultCacheAccessor.reset(groupResultCacheBuffer);
+ aggregator.init(tupleBuilder, currTupleAccessor, currTupleIndex, aggregateState);
+ } else {
+ aggregator.aggregate(currTupleAccessor, currTupleIndex, null, 0, aggregateState);
}
}
@@ -126,7 +117,7 @@
tupleBuilder.reset();
for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, j);
+ tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
}
aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
@@ -147,8 +138,8 @@
int fIdx = groupFields[i];
int s1 = a1.getTupleStartOffset(t1Idx) + a1.getFieldSlotsLength() + a1.getFieldStartOffset(t1Idx, fIdx);
int l1 = a1.getFieldLength(t1Idx, fIdx);
- int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, i);
- int l2 = a2.getFieldLength(t2Idx, i);
+ int s2 = a2.getTupleStartOffset(t2Idx) + a2.getFieldSlotsLength() + a2.getFieldStartOffset(t2Idx, fIdx);
+ int l2 = a2.getFieldLength(t2Idx, fIdx);
if (comparators[i].compare(a1.getBuffer().array(), s1, l1, a2.getBuffer().array(), s2, l2) != 0) {
return false;
}
@@ -163,8 +154,8 @@
@Override
public void close() throws HyracksDataException {
- if (groupResultCache != null && groupResultCacheAccessor.getTupleCount() > 0) {
- writeOutput(groupResultCacheAccessor, 0);
+ if (!first) {
+ writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(outFrame, writer);
}
@@ -172,4 +163,4 @@
aggregateState.close();
writer.close();
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 088c5ee..4f9f6ec 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -48,10 +48,10 @@
@Override
public void writeData(Object[] data) throws HyracksDataException {
for (int i = 0; i < data.length; ++i) {
- // System.err.print(StringSerializationUtils.toString(data[i]));
- // System.err.print(", ");
+ System.err.print(StringSerializationUtils.toString(data[i]));
+ System.err.print(", ");
}
- //System.err.println();
+ System.err.println();
}
@Override
@@ -66,4 +66,4 @@
return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(),
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/pom.xml
index d166e45..ba30424 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatserver/pom.xml
@@ -125,9 +125,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
- <fork>true</fork>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-examples/text-example/textserver/pom.xml b/hyracks/hyracks-examples/text-example/textserver/pom.xml
index f67d444..b12f823 100644
--- a/hyracks/hyracks-examples/text-example/textserver/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textserver/pom.xml
@@ -124,9 +124,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
- <fork>true</fork>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
<plugin>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index e97f26f..032d50d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -66,18 +66,6 @@
</filesets>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 300dce4..2d4064b 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -136,7 +136,6 @@
terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId())
|| IterationUtils.readForceTerminationState(job.getConfiguration(), jobGen.getJobId());
i++;
-
} while (!terminate);
start = System.currentTimeMillis();
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index e8ab2ed..4c7f91d 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -102,7 +102,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ PregelixHyracksIntegrationUtil.init();
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index 91aa0d7..b4e17b6 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -76,7 +76,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ PregelixHyracksIntegrationUtil.init();
LOGGER.info("Hyracks mini-cluster started");
startHDFS();
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index 7757f86..ea89bb9 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -77,7 +77,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
+ PregelixHyracksIntegrationUtil.init();
LOGGER.info("Hyracks mini-cluster started");
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));