Moved to filescanners that do not need object construction
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@75 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
index 996d7c3..09d8840 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.common.comm.io;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -69,4 +70,21 @@
serDeser.serialize(instance, dos);
fEndOffsets[nextField++] = baaos.size();
}
+
+ public void addField(byte[] bytes, int start, int length) throws HyracksDataException {
+ try {
+ dos.write(bytes, start, length);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ fEndOffsets[nextField++] = baaos.size();
+ }
+
+ public DataOutput getDataOutput() {
+ return dos;
+ }
+
+ public void addFieldEndOffset() {
+ fEndOffsets[nextField++] = baaos.size();
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
index 06255c1..ed34fcb 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
@@ -16,6 +16,9 @@
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
public class FrameUtils {
public static void copy(ByteBuffer srcFrame, ByteBuffer destFrame) {
makeReadable(srcFrame);
@@ -27,4 +30,12 @@
frame.position(0);
frame.limit(frame.capacity());
}
+
+ public static void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ writer.nextFrame(buffer);
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/StringBinaryComparatorFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/UTF8StringBinaryComparatorFactory.java
similarity index 88%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/StringBinaryComparatorFactory.java
rename to hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/UTF8StringBinaryComparatorFactory.java
index 8ab015d..439859d 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/StringBinaryComparatorFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/UTF8StringBinaryComparatorFactory.java
@@ -18,12 +18,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.util.StringUtils;
-public class StringBinaryComparatorFactory implements IBinaryComparatorFactory {
+public class UTF8StringBinaryComparatorFactory implements IBinaryComparatorFactory {
private static final long serialVersionUID = 1L;
- public static final StringBinaryComparatorFactory INSTANCE = new StringBinaryComparatorFactory();
+ public static final UTF8StringBinaryComparatorFactory INSTANCE = new UTF8StringBinaryComparatorFactory();
- private StringBinaryComparatorFactory() {
+ private UTF8StringBinaryComparatorFactory() {
}
@Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/StringBinaryHashFunctionFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/UTF8StringBinaryHashFunctionFactory.java
similarity index 86%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/StringBinaryHashFunctionFactory.java
rename to hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/UTF8StringBinaryHashFunctionFactory.java
index 3c358fc..98634b6 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/StringBinaryHashFunctionFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/UTF8StringBinaryHashFunctionFactory.java
@@ -18,12 +18,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.dataflow.common.data.util.StringUtils;
-public class StringBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
- public static final StringBinaryHashFunctionFactory INSTANCE = new StringBinaryHashFunctionFactory();
+public class UTF8StringBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ public static final UTF8StringBinaryHashFunctionFactory INSTANCE = new UTF8StringBinaryHashFunctionFactory();
private static final long serialVersionUID = 1L;
- private StringBinaryHashFunctionFactory() {
+ private UTF8StringBinaryHashFunctionFactory() {
}
@Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/StringSerializerDeserializer.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/UTF8StringSerializerDeserializer.java
similarity index 85%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/StringSerializerDeserializer.java
rename to hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/UTF8StringSerializerDeserializer.java
index bc6fd4a..0fa887f 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/StringSerializerDeserializer.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/UTF8StringSerializerDeserializer.java
@@ -21,12 +21,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public class StringSerializerDeserializer implements ISerializerDeserializer<String> {
- public static final StringSerializerDeserializer INSTANCE = new StringSerializerDeserializer();
+public class UTF8StringSerializerDeserializer implements ISerializerDeserializer<String> {
+ public static final UTF8StringSerializerDeserializer INSTANCE = new UTF8StringSerializerDeserializer();
private static final long serialVersionUID = 1L;
- private StringSerializerDeserializer() {
+ private UTF8StringSerializerDeserializer() {
}
@Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/DoubleParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/DoubleParserFactory.java
new file mode 100644
index 0000000..6363c18
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/DoubleParserFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DoubleParserFactory implements IValueParserFactory {
+ public static final IValueParserFactory INSTANCE = new DoubleParserFactory();
+
+ private static final long serialVersionUID = 1L;
+
+ private DoubleParserFactory() {
+ }
+
+ @Override
+ public IValueParser createValueParser() {
+ return new IValueParser() {
+ @Override
+ public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+ String s = String.valueOf(buffer, start, length);
+ try {
+ out.writeDouble(Double.parseDouble(s));
+ } catch (NumberFormatException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/FloatParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/FloatParserFactory.java
new file mode 100644
index 0000000..d20c98e
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/FloatParserFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FloatParserFactory implements IValueParserFactory {
+ public static final IValueParserFactory INSTANCE = new FloatParserFactory();
+
+ private static final long serialVersionUID = 1L;
+
+ private FloatParserFactory() {
+ }
+
+ @Override
+ public IValueParser createValueParser() {
+ return new IValueParser() {
+ @Override
+ public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+ String s = String.valueOf(buffer, start, length);
+ try {
+ out.writeFloat(Float.parseFloat(s));
+ } catch (NumberFormatException e) {
+ throw new HyracksDataException(e);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParser.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParser.java
new file mode 100644
index 0000000..ac07ca2
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParser.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IValueParser {
+ void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParserFactory.java
new file mode 100644
index 0000000..0d8725a
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParserFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.Serializable;
+
+public interface IValueParserFactory extends Serializable {
+ public IValueParser createValueParser();
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
new file mode 100644
index 0000000..3a71afa
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class IntegerParserFactory implements IValueParserFactory {
+ public static final IValueParserFactory INSTANCE = new IntegerParserFactory();
+
+ private static final long serialVersionUID = 1L;
+
+ private IntegerParserFactory() {
+ }
+
+ @Override
+ public IValueParser createValueParser() {
+ return new IValueParser() {
+ @Override
+ public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+ int n = 0;
+ int sign = 1;
+ int i = 0;
+ boolean pre = true;
+ for (; pre && i < length; ++i) {
+ char ch = buffer[i + start];
+ switch (ch) {
+ case ' ':
+ case '\t':
+ case '\n':
+ case '\r':
+ case '\f':
+ break;
+
+ case '-':
+ sign = -1;
+
+ case '0':
+ case '1':
+ case '2':
+ case '3':
+ case '4':
+ case '5':
+ case '6':
+ case '7':
+ case '8':
+ case '9':
+ pre = false;
+ break;
+
+ default:
+ throw new HyracksDataException("Encountered " + ch);
+ }
+ }
+ boolean post = false;
+ for (; !post && i < length; ++i) {
+ char ch = buffer[i + start];
+ switch (ch) {
+ case '0':
+ case '1':
+ case '2':
+ case '3':
+ case '4':
+ case '5':
+ case '6':
+ case '7':
+ case '8':
+ case '9':
+ n = n * 10 + (ch - '0');
+ break;
+ }
+ }
+
+ for (; i < length; ++i) {
+ char ch = buffer[i + start];
+ switch (ch) {
+ case ' ':
+ case '\t':
+ case '\n':
+ case '\r':
+ case '\f':
+ break;
+
+ default:
+ throw new HyracksDataException("Encountered " + ch);
+ }
+ }
+
+ try {
+ out.writeInt(n * sign);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
new file mode 100644
index 0000000..927d21c
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class LongParserFactory implements IValueParserFactory {
+ public static final IValueParserFactory INSTANCE = new LongParserFactory();
+
+ private static final long serialVersionUID = 1L;
+
+ private LongParserFactory() {
+ }
+
+ @Override
+ public IValueParser createValueParser() {
+ return new IValueParser() {
+ @Override
+ public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+ long n = 0;
+ int sign = 1;
+ int i = 0;
+ boolean pre = true;
+ for (; pre && i < length; ++i) {
+ char ch = buffer[i + start];
+ switch (ch) {
+ case ' ':
+ case '\t':
+ case '\n':
+ case '\r':
+ case '\f':
+ break;
+
+ case '-':
+ sign = -1;
+
+ case '0':
+ case '1':
+ case '2':
+ case '3':
+ case '4':
+ case '5':
+ case '6':
+ case '7':
+ case '8':
+ case '9':
+ pre = false;
+ break;
+
+ default:
+ throw new HyracksDataException("Encountered " + ch);
+ }
+ }
+ boolean post = false;
+ for (; !post && i < length; ++i) {
+ char ch = buffer[i + start];
+ switch (ch) {
+ case '0':
+ case '1':
+ case '2':
+ case '3':
+ case '4':
+ case '5':
+ case '6':
+ case '7':
+ case '8':
+ case '9':
+ n = n * 10 + (ch - '0');
+ break;
+ }
+ }
+
+ for (; i < length; ++i) {
+ char ch = buffer[i + start];
+ switch (ch) {
+ case ' ':
+ case '\t':
+ case '\n':
+ case '\r':
+ case '\f':
+ break;
+
+ default:
+ throw new HyracksDataException("Encountered " + ch);
+ }
+ }
+
+ try {
+ out.writeLong(n * sign);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/UTF8StringParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/UTF8StringParserFactory.java
new file mode 100644
index 0000000..8ddddac
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/UTF8StringParserFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class UTF8StringParserFactory implements IValueParserFactory {
+ public static final IValueParserFactory INSTANCE = new UTF8StringParserFactory();
+
+ private static final long serialVersionUID = 1L;
+
+ private UTF8StringParserFactory() {
+ }
+
+ @Override
+ public IValueParser createValueParser() {
+ return new IValueParser() {
+ private byte[] utf8;
+
+ @Override
+ public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+ int utflen = 0;
+ for (int i = 0; i < length; i++) {
+ char ch = buffer[i + start];
+ if ((ch >= 0x0001) && (ch <= 0x007F)) {
+ utflen++;
+ } else if (ch > 0x07ff) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ if (utf8 == null || utf8.length < utflen + 2) {
+ utf8 = new byte[utflen + 2];
+ }
+
+ int count = 0;
+ utf8[count++] = (byte) ((utflen >>> 8) & 0xff);
+ utf8[count++] = (byte) ((utflen >>> 0) & 0xff);
+
+ int i = 0;
+ for (i = 0; i < length; i++) {
+ char ch = buffer[i + start];
+ if (!((ch >= 0x0001) && (ch <= 0x007F)))
+ break;
+ utf8[count++] = (byte) ch;
+ }
+
+ for (; i < length; i++) {
+ char ch = buffer[i + start];
+ if ((ch >= 0x0001) && (ch <= 0x007F)) {
+ utf8[count++] = (byte) ch;
+ } else if (ch > 0x07FF) {
+ utf8[count++] = (byte) (0xE0 | ((ch >> 12) & 0x0F));
+ utf8[count++] = (byte) (0x80 | ((ch >> 6) & 0x3F));
+ utf8[count++] = (byte) (0x80 | ((ch >> 0) & 0x3F));
+ } else {
+ utf8[count++] = (byte) (0xC0 | ((ch >> 6) & 0x1F));
+ utf8[count++] = (byte) (0x80 | ((ch >> 0) & 0x3F));
+ }
+ }
+ try {
+ out.write(utf8, 0, utflen + 2);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
new file mode 100644
index 0000000..cfe1b65
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractUnaryOutputOperatorNodePushable implements IOperatorNodePushable {
+ protected IFrameWriter writer;
+ protected RecordDescriptor recordDesc;
+
+ @Override
+ public final void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ this.writer = writer;
+ this.recordDesc = recordDesc;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
new file mode 100644
index 0000000..bebf7d4
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractUnaryOutputSourceOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
+ @Override
+ public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
similarity index 88%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileScanOperatorDescriptor.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
index 43d821c..cea61f1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
@@ -28,12 +28,12 @@
import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
-public abstract class AbstractFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public abstract class AbstractDeserializedFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
protected FileSplit[] splits;
- public AbstractFileScanOperatorDescriptor(JobSpecification spec, FileSplit[] splits,
+ public AbstractDeserializedFileScanOperatorDescriptor(JobSpecification spec, FileSplit[] splits,
RecordDescriptor recordDescriptor) {
super(spec, 0, 1);
recordDescriptors[0] = recordDescriptor;
@@ -44,11 +44,11 @@
protected abstract void configure() throws Exception;
- protected class FileScanOperator implements IOpenableDataWriterOperator {
+ protected class DeserializedFileScanOperator implements IOpenableDataWriterOperator {
private IOpenableDataWriter<Object[]> writer;
private int index;
- FileScanOperator(int index) {
+ DeserializedFileScanOperator(int index) {
this.index = index;
}
@@ -102,6 +102,6 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), null);
+ return new DeserializedOperatorNodePushable(ctx, new DeserializedFileScanOperator(partition), null);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/CSVFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/CSVFileScanOperatorDescriptor.java
deleted file mode 100644
index 54724a4..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/CSVFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.file;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class CSVFileScanOperatorDescriptor extends
- AbstractFileScanOperatorDescriptor {
- private static class CSVRecordReaderImpl implements IRecordReader {
- private final BufferedReader in;
- private final char separator;
- private final String quotes;
-
- CSVRecordReaderImpl(File file, RecordDescriptor desc, char separator,
- String quotes) throws Exception {
- in = new BufferedReader(new InputStreamReader(new FileInputStream(
- file)));
- this.separator = separator;
- this.quotes = quotes;
- }
-
- @Override
- public void close() {
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public boolean read(Object[] record) throws Exception {
- String line = in.readLine();
- if (line == null) {
- return false;
- }
- int fid = 0;
- char[] chars = line.toCharArray();
- int i = 0;
- boolean insideQuote = false;
- char quoteChar = 0;
- int partStart = 0;
- boolean skipNext = false;
- while (fid < record.length && i < chars.length) {
- char c = chars[i];
- if (!skipNext) {
- if (quotes.indexOf(c) >= 0) {
- if (insideQuote) {
- if (quoteChar == c) {
- insideQuote = false;
- }
- } else {
- insideQuote = true;
- quoteChar = c;
- }
- } else if (c == separator) {
- if (!insideQuote) {
- record[fid++] = String.valueOf(chars, partStart, i
- - partStart);
- partStart = i + 1;
- }
- } else if (c == '\\') {
- skipNext = true;
- }
- } else {
- skipNext = false;
- }
- ++i;
- }
- if (fid < record.length) {
- record[fid] = String.valueOf(chars, partStart, i - partStart);
- }
- return true;
- }
- }
-
- private static final long serialVersionUID = 1L;
-
- private final char separator;
- private final String quotes;
-
- public CSVFileScanOperatorDescriptor(JobSpecification spec,
- FileSplit[] splits, RecordDescriptor recordDescriptor) {
- this(spec, splits, recordDescriptor, ',', "'\"");
- }
-
- public CSVFileScanOperatorDescriptor(JobSpecification spec,
- FileSplit[] splits, RecordDescriptor recordDescriptor,
- char separator, String quotes) {
- super(spec, splits, recordDescriptor);
- this.separator = separator;
- this.quotes = quotes;
- }
-
- @Override
- protected IRecordReader createRecordReader(File file, RecordDescriptor desc)
- throws Exception {
- return new CSVRecordReaderImpl(file, desc, separator, quotes);
- }
-
- @Override
- protected void configure() throws Exception {
- // currently a no-op, but is meant to initialize , if required before it is asked
- // to create a record reader
- // this is executed at the node and is useful for operators that could not be
- // initialized from the client completely, because of lack of information specific
- // to the node where the operator gets executed.
-
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ConstantFileSplitProvider.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ConstantFileSplitProvider.java
new file mode 100644
index 0000000..21f5e3f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ConstantFileSplitProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+public class ConstantFileSplitProvider implements IFileSplitProvider {
+ private static final long serialVersionUID = 1L;
+ private final FileSplit[] splits;
+
+ public ConstantFileSplitProvider(FileSplit[] splits) {
+ this.splits = splits;
+ }
+
+ @Override
+ public FileSplit[] getFileSplits() {
+ return splits;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
new file mode 100644
index 0000000..716f587
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
+ private static final long serialVersionUID = 1L;
+ private IValueParserFactory[] valueParserFactories;
+ private char fieldDelimiter;
+
+ public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter) {
+ this.valueParserFactories = fieldParserFactories;
+ this.fieldDelimiter = fieldDelimiter;
+ }
+
+ @Override
+ public ITupleParser createTupleParser(final IHyracksContext ctx) {
+ return new ITupleParser() {
+ @Override
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+ try {
+ IValueParser[] valueParsers = new IValueParser[valueParserFactories.length];
+ for (int i = 0; i < valueParserFactories.length; ++i) {
+ valueParsers[i] = valueParserFactories[i].createValueParser();
+ }
+ ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(valueParsers.length);
+ DataOutput dos = tb.getDataOutput();
+
+ FieldCursor cursor = new FieldCursor(new InputStreamReader(in));
+ while (cursor.nextRecord()) {
+ tb.reset();
+ for (int i = 0; i < valueParsers.length; ++i) {
+ if (!cursor.nextField()) {
+ break;
+ }
+ valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
+ tb.addFieldEndOffset();
+ }
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+
+ private enum State {
+ INIT,
+ IN_RECORD,
+ EOR,
+ CR,
+ EOF
+ }
+
+ private class FieldCursor {
+ private static final int INITIAL_BUFFER_SIZE = 4096;
+ private static final int INCREMENT = 4096;
+
+ private final Reader in;
+
+ private char[] buffer;
+ private int start;
+ private int end;
+ private State state;
+
+ private int fStart;
+ private int fEnd;
+
+ public FieldCursor(Reader in) {
+ this.in = in;
+ buffer = new char[INITIAL_BUFFER_SIZE];
+ start = 0;
+ end = 0;
+ state = State.INIT;
+ }
+
+ public boolean nextRecord() throws IOException {
+ while (true) {
+ switch (state) {
+ case INIT:
+ boolean eof = !readMore();
+ if (eof) {
+ state = State.EOF;
+ return false;
+ } else {
+ state = State.IN_RECORD;
+ return true;
+ }
+
+ case IN_RECORD:
+ int p = start;
+ while (true) {
+ if (p >= end) {
+ int s = start;
+ eof = !readMore();
+ if (eof) {
+ state = State.EOF;
+ return start < end;
+ }
+ p -= (s - start);
+ }
+ char ch = buffer[p];
+ if (ch == '\n') {
+ start = p + 1;
+ state = State.EOR;
+ break;
+ } else if (ch == '\r') {
+ start = p + 1;
+ state = State.CR;
+ break;
+ }
+ ++p;
+ }
+ break;
+
+ case CR:
+ if (start >= end) {
+ eof = !readMore();
+ if (eof) {
+ state = State.EOF;
+ return false;
+ }
+ }
+ char ch = buffer[start];
+ if (ch == '\n') {
+ ++start;
+ state = State.EOR;
+ } else {
+ state = State.IN_RECORD;
+ return true;
+ }
+
+ case EOR:
+ if (start >= end) {
+ eof = !readMore();
+ if (eof) {
+ state = State.EOF;
+ return false;
+ }
+ }
+ state = State.IN_RECORD;
+ return start < end;
+
+ case EOF:
+ return false;
+ }
+ }
+ }
+
+ public boolean nextField() throws IOException {
+ switch (state) {
+ case INIT:
+ case EOR:
+ case EOF:
+ case CR:
+ return false;
+
+ case IN_RECORD:
+ boolean eof;
+ int p = start;
+ while (true) {
+ if (p >= end) {
+ int s = start;
+ eof = !readMore();
+ if (eof) {
+ state = State.EOF;
+ return true;
+ }
+ p -= (s - start);
+ }
+ char ch = buffer[p];
+ if (ch == fieldDelimiter) {
+ fStart = start;
+ fEnd = p;
+ start = p + 1;
+ return true;
+ } else if (ch == '\n') {
+ fStart = start;
+ fEnd = p;
+ start = p + 1;
+ state = State.EOR;
+ return true;
+ } else if (ch == '\r') {
+ fStart = start;
+ fEnd = p;
+ start = p + 1;
+ state = State.CR;
+ return true;
+ }
+ ++p;
+ }
+ }
+ throw new IllegalStateException();
+ }
+
+ private boolean readMore() throws IOException {
+ if (start > 0) {
+ System.arraycopy(buffer, start, buffer, 0, end - start);
+ }
+ end -= start;
+ start = 0;
+
+ if (end == buffer.length) {
+ buffer = Arrays.copyOf(buffer, buffer.length + INCREMENT);
+ }
+
+ int n = in.read(buffer, end, buffer.length - end);
+ if (n < 0) {
+ return false;
+ }
+ end += n;
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
new file mode 100644
index 0000000..efc63d3
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class FileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final IFileSplitProvider fileSplitProvider;
+
+ private final ITupleParserFactory tupleParserFactory;
+
+ public FileScanOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider,
+ ITupleParserFactory tupleParserFactory, RecordDescriptor rDesc) {
+ super(spec, 0, 1);
+ this.fileSplitProvider = fileSplitProvider;
+ this.tupleParserFactory = tupleParserFactory;
+ recordDescriptors[0] = rDesc;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+ final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+ File f = split.getLocalFile();
+ writer.open();
+ try {
+ InputStream in;
+ try {
+ in = new FileInputStream(f);
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ tp.parse(in, writer);
+ } finally {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IFileSplitProvider.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IFileSplitProvider.java
new file mode 100644
index 0000000..90f65ab
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IFileSplitProvider.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.Serializable;
+
+public interface IFileSplitProvider extends Serializable {
+ public FileSplit[] getFileSplits();
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParser.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParser.java
new file mode 100644
index 0000000..5014e41
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParser.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.InputStream;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITupleParser {
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParserFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParserFactory.java
new file mode 100644
index 0000000..708742b
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParserFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public interface ITupleParserFactory extends Serializable {
+ public ITupleParser createTupleParser(IHyracksContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/LineFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/LineFileScanOperatorDescriptor.java
deleted file mode 100644
index 19c4ba9..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/LineFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.file;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class LineFileScanOperatorDescriptor extends
- AbstractFileScanOperatorDescriptor {
- private static class LineReaderImpl extends RecordReader {
- private File file;
-
- LineReaderImpl(File file) throws Exception {
- super(new Object[]{file});
- this.file = file;
- }
-
- @Override
- public InputStream createInputStream(Object[] args) throws Exception{
- this.file = (File)args[0];
- return new FileInputStream(file) ;
- }
- }
-
- private static final long serialVersionUID = 1L;
-
- public LineFileScanOperatorDescriptor(JobSpecification spec,
- FileSplit[] splits, RecordDescriptor recordDescriptor) {
- super(spec, splits, recordDescriptor);
- }
-
- @Override
- protected IRecordReader createRecordReader(File file, RecordDescriptor desc)
- throws Exception {
- return new LineReaderImpl(file);
- }
-
- @Override
- protected void configure() throws Exception {
- // currently a no-op, but is meant to initialize , if required before it is asked
- // to create a record reader
- // this is executed at the node and is useful for operators that could not be
- // initialized from the client completely, because of lack of information specific
- // to the node where the operator gets executed.
-
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
index 9dd223e..aa6635d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-public class RecordFileScanOperatorDescriptor extends AbstractFileScanOperatorDescriptor {
+public class RecordFileScanOperatorDescriptor extends AbstractDeserializedFileScanOperatorDescriptor {
private static final long serialVersionUID = 1L;
public RecordFileScanOperatorDescriptor(JobSpecification spec, FileSplit[] splits, RecordDescriptor recordDescriptor) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordReader.java
deleted file mode 100644
index 2064cc2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordReader.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.file;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public abstract class RecordReader implements IRecordReader {
-
- private final BufferedReader bufferedReader;
- private InputStream inputStream;
-
- @Override
- public void close() {
- try {
- bufferedReader.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public boolean read(Object[] record) throws Exception {
- String line = bufferedReader.readLine();
- if (line == null) {
- return false;
- }
- record[0] = line;
- return true;
- }
-
- public abstract InputStream createInputStream(Object[] args) throws Exception;
-
- public RecordReader(Object[] args) throws Exception{
- try{
- bufferedReader = new BufferedReader(new InputStreamReader(createInputStream(args)));
- }catch(Exception e){
- e.printStackTrace();
- throw e;
- }
- }
-
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 253e2c6..b2a9f39 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -105,7 +105,7 @@
private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
buffer.position(0);
buffer.limit(buffer.capacity());
- writer.nextFrame(outBuffer);
+ writer.nextFrame(buffer);
buffer.position(0);
buffer.limit(buffer.capacity());
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
new file mode 100644
index 0000000..3b7b02c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public NullSinkOperatorDescriptor(JobSpecification spec) {
+ super(spec, 1, 0);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new IOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index ffcf92f..6ddc8ec 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -35,7 +35,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializingDataReader;
import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
public class SerializationDeserializationTest {
private static final String DBLP_FILE = "data/dblp.txt";
@@ -129,7 +129,7 @@
@Test
public void serdeser01() throws Exception {
RecordDescriptor rDes = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
LineProcessor processor = new LineProcessor() {
@Override
public void process(String line, IDataWriter<Object[]> writer) throws Exception {
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index 5c07a16..c06bd40 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -29,17 +29,22 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.aggregators.SumStringGroupAggregator;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
@@ -50,85 +55,60 @@
public void countOfCountsSingleNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/words.txt"))
- };
- RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE
- });
+ FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
+ ','), desc);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
- 0
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, desc);
- PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+ PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
- });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
- 0
- }, new IComparatorFactory[] {
- StringComparatorFactory.INSTANCE
- }, new SumStringGroupAggregator(0), desc2);
- PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+ new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+ PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
group.setPartitionConstraint(groupPartitionConstraint);
- InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
- 1
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, desc2);
- PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+ PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
- 1
- }, new IComparatorFactory[] {
- StringComparatorFactory.INSTANCE
- }, new SumStringGroupAggregator(1), desc2);
- PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+ new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+ PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 1
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -145,91 +125,62 @@
public void countOfCountsMultiNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/words.txt"))
- };
- RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE
- });
+ FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
+ ','), desc);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
- 0
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, desc);
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
- });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
- 0
- }, new IComparatorFactory[] {
- StringComparatorFactory.INSTANCE
- }, new SumStringGroupAggregator(0), desc2);
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+ new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group.setPartitionConstraint(groupPartitionConstraint);
- InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
- 1
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, desc2);
+ InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
- 1
- }, new IComparatorFactory[] {
- StringComparatorFactory.INSTANCE
- }, new SumStringGroupAggregator(1), desc2);
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+ new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 1
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -246,91 +197,62 @@
public void countOfCountsExternalSortMultiNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/words.txt"))
- };
- RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE
- });
+ FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
+ ','), desc);
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
- 0
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, desc);
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
- });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
- 0
- }, new IComparatorFactory[] {
- StringComparatorFactory.INSTANCE
- }, new SumStringGroupAggregator(0), desc2);
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+ new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID),
- new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group.setPartitionConstraint(groupPartitionConstraint);
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
- 1
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, desc2);
+ ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
- 1
- }, new IComparatorFactory[] {
- StringComparatorFactory.INSTANCE
- }, new SumStringGroupAggregator(1), desc2);
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+ new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 0
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] {
- 1
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }));
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index 9b4ce83..691aea8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -23,13 +23,22 @@
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
public class ScanPrintTest extends AbstractIntegrationTest {
@@ -37,23 +46,24 @@
public void scanPrint01() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] {
- new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt"))
- };
- RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE
- });
+ IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt")) });
- CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
+ RecordDescriptor desc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec,
+ splitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+ desc);
PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
- });
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
- });
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
@@ -62,4 +72,41 @@
spec.addRoot(printer);
runTest(spec);
}
+
+ @Test
+ public void scanPrint02() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
+
+ IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(conn1, ordScanner, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 891fede..49070d7 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -27,14 +27,19 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
@@ -44,56 +49,43 @@
JobSpecification spec = new JobSpecification();
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
- };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE
- });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
- 1
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }, ordersDesc);
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
PartitionConstraint sortersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
- });
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
sorter.setPartitionConstraint(sortersPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID)
- });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new MToNHashPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- new int[] {
- 1
- }, new IBinaryHashFunctionFactory[] {
- StringBinaryHashFunctionFactory.INSTANCE
- }), new int[] {
- 1
- }, new IBinaryComparatorFactory[] {
- StringBinaryComparatorFactory.INSTANCE
- }), sorter, 0, printer, 0);
+ new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new int[] { 1 }, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }),
+ sorter, 0, printer, 0);
runTest(spec);
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 96e102a..5ebbf62 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -24,27 +24,36 @@
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
+ private static final boolean DEBUG = true;
+
/*
* TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
*/
@@ -54,51 +63,62 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
@@ -121,52 +141,63 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 4, 10, 200, 1.2,
new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
@@ -189,52 +220,63 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
@@ -259,65 +301,76 @@
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -334,66 +387,77 @@
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -410,66 +474,77 @@
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -486,63 +561,74 @@
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
join.setPartitionConstraint(new PartitionCountConstraint(2));
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -559,41 +645,51 @@
FileSplit[] custSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = new FileSplit[] {
new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
custScanner.setPartitionConstraint(custPartitionConstraint);
@@ -607,25 +703,26 @@
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
join.setPartitionConstraint(joinPartitionConstraint);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custPartConn, custScanner, 0, custMat, 0);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 2eafd76..d90f384 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -17,15 +17,20 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
@@ -62,51 +67,62 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = createCustomerFileSplits();
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
FileSplit[] ordersSplits = createOrdersFileSplits();
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
- CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
ordScanner.setPartitionConstraint(createRRPartitionConstraint(2));
- CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
custScanner.setPartitionConstraint(createRRPartitionConstraint(2));
InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
- new int[] { 1 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 6000000);
+ new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc,
+ 6000000);
join.setPartitionConstraint(new PartitionCountConstraint(4));
RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 6 },
new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }),
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new CountAccumulatingAggregatorFactory(), groupResultDesc, 16);
gby.setPartitionConstraint(new PartitionCountConstraint(4));
@@ -115,17 +131,17 @@
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(ordJoinConn, ordScanner, 0, join, 1);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(custJoinConn, custScanner, 0, join, 0);
IConnectorDescriptor joinGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 6 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
spec.connect(joinGroupConn, join, 0, gby, 0);
IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);