diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index fcf5c53d11a0..06186b35858e 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -2921,8 +2921,14 @@ components: properties: filter: type: array + description: Additional row-level filter as Predicate entry strings. items: type: string + columnMasking: + type: object + description: Column masking rules as a map from column name to Transform entry JSON string. + additionalProperties: + type: string AlterDatabaseRequest: type: object properties: diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index a6bf162d55a6..31c292b3786e 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -672,21 +672,19 @@ public void alterTable(Identifier identifier, List changes) { * * @param identifier database name and table name. * @param select select columns, null if select all - * @return additional filter for row level access control + * @return additional row-level filter and column masking * @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for * this table */ - public List authTableQuery(Identifier identifier, @Nullable List select) { + public AuthTableQueryResponse authTableQuery( + Identifier identifier, @Nullable List select) { AuthTableQueryRequest request = new AuthTableQueryRequest(select); - AuthTableQueryResponse response = - client.post( - resourcePaths.authTable( - identifier.getDatabaseName(), identifier.getObjectName()), - request, - AuthTableQueryResponse.class, - restAuthFunction); - return response.filter(); + return client.post( + resourcePaths.authTable(identifier.getDatabaseName(), identifier.getObjectName()), + request, + AuthTableQueryResponse.class, + restAuthFunction); } /** diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java index 0f833b03302a..7410631aea0a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java @@ -27,24 +27,38 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import java.util.Map; /** Response for auth table query. */ @JsonIgnoreProperties(ignoreUnknown = true) public class AuthTableQueryResponse implements RESTResponse { private static final String FIELD_FILTER = "filter"; + private static final String FIELD_COLUMN_MASKING = "columnMasking"; @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty(FIELD_FILTER) private final List filter; + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_COLUMN_MASKING) + private final Map columnMasking; + @JsonCreator - public AuthTableQueryResponse(@JsonProperty(FIELD_FILTER) List filter) { + public AuthTableQueryResponse( + @JsonProperty(FIELD_FILTER) List filter, + @JsonProperty(FIELD_COLUMN_MASKING) Map columnMasking) { this.filter = filter; + this.columnMasking = columnMasking; } @JsonGetter(FIELD_FILTER) public List filter() { return filter; } + + @JsonGetter(FIELD_COLUMN_MASKING) + public Map columnMasking() { + return columnMasking; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/DefaultValueTransform.java b/paimon-common/src/main/java/org/apache/paimon/predicate/DefaultValueTransform.java new file mode 100644 index 000000000000..9ae8a3f3cc59 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/DefaultValueTransform.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.DefaultValueUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * A {@link Transform} which always returns the default value of the input field's {@link DataType}. + */ +public class DefaultValueTransform implements Transform { + + private static final long serialVersionUID = 1L; + + private final FieldRef fieldRef; + + public DefaultValueTransform(FieldRef fieldRef) { + this.fieldRef = Objects.requireNonNull(fieldRef, "fieldRef must not be null"); + } + + public FieldRef fieldRef() { + return fieldRef; + } + + @Override + public List inputs() { + return Collections.singletonList(fieldRef); + } + + @Override + public DataType outputType() { + return fieldRef.type(); + } + + @Override + public Object transform(InternalRow row) { + return DefaultValueUtils.defaultValue(fieldRef.type()); + } + + @Override + public Transform copyWithNewInputs(List inputs) { + List nonNullInputs = + Objects.requireNonNull(inputs, "DefaultValueTransform expects non-null inputs"); + checkArgument(nonNullInputs.size() == 1, "DefaultValueTransform expects 1 input"); + checkArgument( + nonNullInputs.get(0) instanceof FieldRef, + "DefaultValueTransform input must be FieldRef"); + return new DefaultValueTransform((FieldRef) nonNullInputs.get(0)); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultValueTransform that = (DefaultValueTransform) o; + return Objects.equals(fieldRef, that.fieldRef); + } + + @Override + public int hashCode() { + return Objects.hashCode(fieldRef); + } + + @Override + public String toString() { + return "DefaultValueTransform{" + "fieldRef=" + fieldRef + '}'; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/HashMaskTransform.java b/paimon-common/src/main/java/org/apache/paimon/predicate/HashMaskTransform.java new file mode 100644 index 000000000000..f69d69e6ead5 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/HashMaskTransform.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryString; + +import org.apache.paimon.shade.guava30.com.google.common.hash.HashCode; + +import javax.annotation.Nullable; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Objects; + +import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * A {@link Transform} which masks a string column by hashing it. + * + *

Output is a hex string. Default algorithm is {@code SHA-256}. + */ +public class HashMaskTransform extends StringTransform { + + private static final long serialVersionUID = 1L; + + private final String algorithm; + + @Nullable private final BinaryString salt; + + private transient MessageDigest digest; + + public HashMaskTransform(FieldRef fieldRef) { + this(fieldRef, null, null); + } + + public HashMaskTransform( + FieldRef fieldRef, @Nullable String algorithm, @Nullable BinaryString salt) { + super(Arrays.asList(Objects.requireNonNull(fieldRef, "fieldRef must not be null"))); + checkArgument(fieldRef.type().is(CHARACTER_STRING), "fieldRef must be a string type"); + this.algorithm = resolveAlgorithm(algorithm); + this.salt = salt; + ensureDigest(); + } + + public FieldRef fieldRef() { + return (FieldRef) inputs().get(0); + } + + public String algorithm() { + return algorithm; + } + + @Nullable + public BinaryString salt() { + return salt; + } + + @Nullable + @Override + protected BinaryString transform(List inputs) { + BinaryString s = inputs.get(0); + if (s == null) { + return null; + } + + MessageDigest md = ensureDigest(); + md.reset(); + + if (salt != null) { + md.update(salt.toString().getBytes(StandardCharsets.UTF_8)); + } + md.update(s.toString().getBytes(StandardCharsets.UTF_8)); + + return BinaryString.fromString(HashCode.fromBytes(md.digest()).toString()); + } + + @Override + public Transform copyWithNewInputs(List inputs) { + List nonNullInputs = + Objects.requireNonNull(inputs, "HashMaskTransform expects non-null inputs"); + checkArgument(nonNullInputs.size() == 1, "HashMaskTransform expects 1 input"); + checkArgument( + nonNullInputs.get(0) instanceof FieldRef, + "HashMaskTransform input must be FieldRef"); + return new HashMaskTransform((FieldRef) nonNullInputs.get(0), algorithm, salt); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + HashMaskTransform that = (HashMaskTransform) o; + return Objects.equals(fieldRef(), that.fieldRef()) + && Objects.equals(algorithm, that.algorithm) + && Objects.equals(salt, that.salt); + } + + @Override + public int hashCode() { + return Objects.hash(fieldRef(), algorithm, salt); + } + + @Override + public String toString() { + return "HashMaskTransform{" + + "fieldRef=" + + fieldRef() + + ", algorithm='" + + algorithm + + '\'' + + ", salt=" + + salt + + '}'; + } + + private MessageDigest ensureDigest() { + if (digest == null) { + try { + digest = MessageDigest.getInstance(algorithm); + } catch (NoSuchAlgorithmException e) { + throw new IllegalArgumentException("Unsupported hash algorithm: " + algorithm, e); + } + } + return digest; + } + + private static String resolveAlgorithm(@Nullable String algorithm) { + if (algorithm == null || algorithm.trim().isEmpty()) { + return "SHA-256"; + } + String a = algorithm.trim(); + String normalized = a.replace("-", "").toLowerCase(Locale.ROOT); + switch (normalized) { + case "md5": + return "MD5"; + case "sha1": + return "SHA-1"; + case "sha256": + return "SHA-256"; + case "sha512": + return "SHA-512"; + default: + return a; + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java index 6be94495564c..9934850c438d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java @@ -56,10 +56,6 @@ public LeafPredicate( super(fieldTransform, function, literals); } - public LeafFunction function() { - return function; - } - public DataType type() { return fieldRef().type(); } @@ -80,10 +76,6 @@ public FieldRef fieldRef() { return ((FieldTransform) transform).fieldRef(); } - public List literals() { - return literals; - } - public LeafPredicate copyWithNewIndex(int fieldIndex) { return new LeafPredicate(function, type(), fieldIndex, fieldName(), literals); } diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/NullTransform.java b/paimon-common/src/main/java/org/apache/paimon/predicate/NullTransform.java new file mode 100644 index 000000000000..9235a64cc47c --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/NullTransform.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.DataType; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * A {@link Transform} which always returns {@code null}. + * + *

This is useful for column masking when you want to always hide a field, regardless of its data + * type. + */ +public class NullTransform implements Transform { + + private static final long serialVersionUID = 1L; + + private final FieldRef fieldRef; + + public NullTransform(FieldRef fieldRef) { + this.fieldRef = Objects.requireNonNull(fieldRef, "fieldRef must not be null"); + } + + public FieldRef fieldRef() { + return fieldRef; + } + + @Override + public List inputs() { + return Collections.singletonList(fieldRef); + } + + @Override + public DataType outputType() { + return fieldRef.type(); + } + + @Override + public Object transform(InternalRow row) { + return null; + } + + @Override + public Transform copyWithNewInputs(List inputs) { + List nonNullInputs = + Objects.requireNonNull(inputs, "NullTransform expects non-null inputs"); + checkArgument(nonNullInputs.size() == 1, "NullTransform expects 1 input"); + checkArgument( + nonNullInputs.get(0) instanceof FieldRef, "NullTransform input must be FieldRef"); + return new NullTransform((FieldRef) nonNullInputs.get(0)); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + NullTransform that = (NullTransform) o; + return Objects.equals(fieldRef, that.fieldRef); + } + + @Override + public int hashCode() { + return Objects.hashCode(fieldRef); + } + + @Override + public String toString() { + return "NullTransform{" + "fieldRef=" + fieldRef + '}'; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PartialMaskTransform.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PartialMaskTransform.java new file mode 100644 index 000000000000..95bb4902be56 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PartialMaskTransform.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryString; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * A {@link Transform} which masks the middle part of a string by keeping the specified number of + * prefix and suffix characters. + * + *

If the string length is less than or equal to {@code prefixLen + suffixLen}, the output will + * be a fully masked string of the same length. + */ +public class PartialMaskTransform extends StringTransform { + + private static final long serialVersionUID = 1L; + + private final int prefixLen; + private final int suffixLen; + + public PartialMaskTransform( + FieldRef fieldRef, int prefixLen, int suffixLen, BinaryString mask) { + super( + Arrays.asList( + Objects.requireNonNull(fieldRef, "fieldRef must not be null"), + Objects.requireNonNull(mask, "mask must not be null"))); + checkArgument(fieldRef.type().is(CHARACTER_STRING), "fieldRef must be a string type"); + checkArgument(prefixLen >= 0, "prefixLen must be >= 0"); + checkArgument(suffixLen >= 0, "suffixLen must be >= 0"); + // "mask" is treated as a token repeated by character count. Empty mask would be invalid. + checkArgument(mask.numChars() > 0, "mask must not be empty"); + + this.prefixLen = prefixLen; + this.suffixLen = suffixLen; + } + + public FieldRef fieldRef() { + return (FieldRef) inputs().get(0); + } + + public int prefixLen() { + return prefixLen; + } + + public int suffixLen() { + return suffixLen; + } + + public BinaryString mask() { + return (BinaryString) inputs().get(1); + } + + @Nullable + @Override + protected BinaryString transform(List inputs) { + BinaryString s = inputs.get(0); + if (s == null) { + return null; + } + BinaryString mask = inputs.get(1); + checkArgument(mask != null, "mask must not be null"); + + int len = s.numChars(); + if (len <= 0) { + return s; + } + + if (prefixLen + suffixLen >= len) { + return repeat(mask, len); + } + + BinaryString prefix = prefixLen == 0 ? BinaryString.EMPTY_UTF8 : s.substring(0, prefixLen); + BinaryString suffix = + suffixLen == 0 ? BinaryString.EMPTY_UTF8 : s.substring(len - suffixLen, len); + int middleLen = len - prefixLen - suffixLen; + BinaryString middle = middleLen == 0 ? BinaryString.EMPTY_UTF8 : repeat(mask, middleLen); + return BinaryString.concat(prefix, middle, suffix); + } + + private static BinaryString repeat(BinaryString token, int times) { + if (times <= 0) { + return BinaryString.EMPTY_UTF8; + } + String t = token.toString(); + StringBuilder sb = new StringBuilder(t.length() * times); + for (int i = 0; i < times; i++) { + sb.append(t); + } + return BinaryString.fromString(sb.toString()); + } + + @Override + public Transform copyWithNewInputs(List inputs) { + List nonNullInputs = + Objects.requireNonNull(inputs, "PartialMaskTransform expects non-null inputs"); + checkArgument(nonNullInputs.size() == 2, "PartialMaskTransform expects 2 inputs"); + checkArgument( + nonNullInputs.get(0) instanceof FieldRef, + "PartialMaskTransform input must be FieldRef"); + checkArgument( + nonNullInputs.get(1) instanceof BinaryString, + "PartialMaskTransform mask input must be BinaryString"); + return new PartialMaskTransform( + (FieldRef) nonNullInputs.get(0), + prefixLen, + suffixLen, + (BinaryString) nonNullInputs.get(1)); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + PartialMaskTransform that = (PartialMaskTransform) o; + return prefixLen == that.prefixLen + && suffixLen == that.suffixLen + && Objects.equals(fieldRef(), that.fieldRef()) + && Objects.equals(mask(), that.mask()); + } + + @Override + public int hashCode() { + return Objects.hash(fieldRef(), prefixLen, suffixLen, mask()); + } + + @Override + public String toString() { + return "PartialMaskTransform{" + + "fieldRef=" + + fieldRef() + + ", prefixLen=" + + prefixLen + + ", suffixLen=" + + suffixLen + + ", mask=" + + mask() + + '}'; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java b/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java index 04549584a579..43281226bb7a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TransformPredicate.java @@ -62,6 +62,14 @@ public Transform transform() { return transform; } + public LeafFunction function() { + return function; + } + + public List literals() { + return literals; + } + public TransformPredicate copyWithNewInputs(List newInputs) { return TransformPredicate.of(transform.copyWithNewInputs(newInputs), function, literals); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/DefaultValueUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/DefaultValueUtils.java index b82f5f77e700..0f8521a3b608 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/DefaultValueUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/DefaultValueUtils.java @@ -21,14 +21,30 @@ import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.GenericVariant; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.BinaryType; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; import javax.annotation.Nullable; +import java.math.BigDecimal; +import java.util.Collections; + /** Utils for default value. */ public class DefaultValueUtils { + private static final Variant NULL_VARIANT = GenericVariant.fromJson("null"); + public static Object convertDefaultValue(DataType dataType, String defaultValueStr) { @SuppressWarnings("unchecked") CastExecutor resolve = @@ -58,4 +74,58 @@ public static void validateDefaultValue(DataType dataType, @Nullable String defa "Unsupported default value `" + defaultValueStr + "` for type " + dataType, e); } } + + /** Creates a default value object for the given {@link DataType}. */ + public static Object defaultValue(DataType dataType) { + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return false; + case TINYINT: + return (byte) 0; + case SMALLINT: + return (short) 0; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return 0; + case BIGINT: + return 0L; + case FLOAT: + return 0.0f; + case DOUBLE: + return 0.0d; + case DECIMAL: + DecimalType decimalType = (DecimalType) dataType; + return Decimal.fromBigDecimal( + BigDecimal.ZERO, decimalType.getPrecision(), decimalType.getScale()); + case CHAR: + case VARCHAR: + return BinaryString.fromString(""); + case BINARY: + return new byte[((BinaryType) dataType).getLength()]; + case VARBINARY: + return new byte[0]; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return Timestamp.fromEpochMillis(0); + case ARRAY: + return new GenericArray(new Object[0]); + case MAP: + case MULTISET: + return new GenericMap(Collections.emptyMap()); + case ROW: + RowType rowType = (RowType) dataType; + GenericRow row = new GenericRow(rowType.getFieldCount()); + for (int i = 0; i < rowType.getFieldCount(); i++) { + row.setField(i, defaultValue(rowType.getTypeAt(i))); + } + return row; + case VARIANT: + return NULL_VARIANT; + case BLOB: + return Blob.fromData(new byte[0]); + default: + throw new UnsupportedOperationException("Unsupported type: " + dataType); + } + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/DefaultValueTransformTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/DefaultValueTransformTest.java new file mode 100644 index 000000000000..4f2c3ff83fbc --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/DefaultValueTransformTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.DefaultValueUtils; +import org.apache.paimon.utils.InternalRowUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +class DefaultValueTransformTest { + + @ParameterizedTest + @MethodSource("allTypes") + void testReturnDefaultValueForAllTypes(DataType type) { + DefaultValueTransform transform = new DefaultValueTransform(new FieldRef(0, "f0", type)); + assertThat(transform.outputType()).isEqualTo(type); + + Object expected = DefaultValueUtils.defaultValue(type); + Object actual = transform.transform(GenericRow.of(123)); + if (type.getTypeRoot() == DataTypeRoot.MULTISET) { + assertThat(actual).isInstanceOf(InternalMap.class); + assertThat(((InternalMap) actual).size()).isEqualTo(0); + } else { + assertThat(InternalRowUtils.equals(actual, expected, type)).isTrue(); + } + } + + @Test + void testCopyWithNewInputs() { + FieldRef ref0 = new FieldRef(0, "f0", DataTypes.INT()); + FieldRef ref3 = new FieldRef(3, "f0", DataTypes.INT()); + + DefaultValueTransform transform = new DefaultValueTransform(ref0); + Transform copied = transform.copyWithNewInputs(Collections.singletonList(ref3)); + + assertThat(copied).isEqualTo(new DefaultValueTransform(ref3)); + assertThat(copied.outputType()).isEqualTo(DataTypes.INT()); + assertThat(copied.transform(GenericRow.of((Object) null))).isEqualTo(0); + } + + private static Stream allTypes() { + return Stream.of( + // numeric + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.DECIMAL(10, 2), + + // boolean + DataTypes.BOOLEAN(), + + // string + DataTypes.STRING(), + DataTypes.CHAR(3), + DataTypes.VARCHAR(20), + + // binary + DataTypes.BYTES(), + DataTypes.BINARY(8), + DataTypes.VARBINARY(12), + + // datetime + DataTypes.DATE(), + DataTypes.TIME(), + DataTypes.TIME(9), + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_MILLIS(), + DataTypes.TIMESTAMP(9), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9), + DataTypes.TIMESTAMP_LTZ_MILLIS(), + + // complex + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.TIMESTAMP())), + DataTypes.MAP(DataTypes.VARCHAR(10), DataTypes.TIMESTAMP()), + DataTypes.MULTISET(DataTypes.STRING()), + DataTypes.ROW( + DataTypes.FIELD(0, "a", DataTypes.INT()), + DataTypes.FIELD(1, "b", DataTypes.STRING())), + + // special + DataTypes.VARIANT(), + DataTypes.BLOB(), + + // not-null variants (exercise nullability flag on type) + DataTypes.INT().copy(false), + DataTypes.STRING().copy(false), + DataTypes.ARRAY(DataTypes.INT()).copy(false)); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/HashMaskTransformTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/HashMaskTransformTest.java new file mode 100644 index 000000000000..1363d4a7b658 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/HashMaskTransformTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class HashMaskTransformTest { + + @Test + void testNull() { + HashMaskTransform transform = + new HashMaskTransform(new FieldRef(0, "f0", DataTypes.STRING())); + assertThat(transform.transform(GenericRow.of((Object) null))).isNull(); + } + + @Test + void testSha256HexLower() { + HashMaskTransform transform = + new HashMaskTransform(new FieldRef(0, "f0", DataTypes.STRING()), "sha256", null); + Object out = transform.transform(GenericRow.of(BinaryString.fromString("abcdef"))); + assertThat(out) + .isEqualTo( + BinaryString.fromString( + "bef57ec7f53a6d40beb640a780a639c83bc29ac8a9816f1fc6c5c6dcd93c4721")); + } + + @Test + void testSha256WithSalt() { + HashMaskTransform transform = + new HashMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + "SHA-256", + BinaryString.fromString("SALT_")); + Object out = transform.transform(GenericRow.of(BinaryString.fromString("abcdef"))); + // sha256("SALT_" + "abcdef") + assertThat(out) + .isEqualTo( + BinaryString.fromString( + "1aff6a6e4dc5a1bcf81b101216ae7cb85b32ec8c56e926be3e6d3ae211caf522")); + } + + @Test + void testIllegalAlgorithm() { + assertThatThrownBy( + () -> + new HashMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + "NO_SUCH_ALGO", + null)) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/NullTransformTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/NullTransformTest.java new file mode 100644 index 000000000000..9f0767896f29 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/NullTransformTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +class NullTransformTest { + + @ParameterizedTest + @MethodSource("allTypes") + void testAlwaysReturnNullForAllTypes(DataType type) { + NullTransform transform = new NullTransform(new FieldRef(0, "f0", type)); + assertThat(transform.outputType()).isEqualTo(type); + assertThat(transform.transform(GenericRow.of((Object) null))).isNull(); + assertThat(transform.transform(GenericRow.of(123))).isNull(); + } + + @Test + void testCopyWithNewInputs() { + FieldRef ref0 = new FieldRef(0, "f0", DataTypes.INT()); + FieldRef ref3 = new FieldRef(3, "f0", DataTypes.INT()); + + NullTransform transform = new NullTransform(ref0); + Transform copied = transform.copyWithNewInputs(Collections.singletonList(ref3)); + + assertThat(copied).isEqualTo(new NullTransform(ref3)); + assertThat(copied.outputType()).isEqualTo(DataTypes.INT()); + assertThat(copied.transform(GenericRow.of((Object) null))).isNull(); + } + + private static Stream allTypes() { + return Stream.of( + // numeric + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.DECIMAL(10, 2), + + // boolean + DataTypes.BOOLEAN(), + + // string + DataTypes.STRING(), + DataTypes.CHAR(3), + DataTypes.VARCHAR(20), + + // binary + DataTypes.BYTES(), + DataTypes.BINARY(8), + DataTypes.VARBINARY(12), + + // datetime + DataTypes.DATE(), + DataTypes.TIME(), + DataTypes.TIME(9), + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_MILLIS(), + DataTypes.TIMESTAMP(9), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9), + DataTypes.TIMESTAMP_LTZ_MILLIS(), + + // complex + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.TIMESTAMP())), + DataTypes.MAP(DataTypes.VARCHAR(10), DataTypes.TIMESTAMP()), + DataTypes.MULTISET(DataTypes.STRING()), + DataTypes.ROW( + DataTypes.FIELD(0, "a", DataTypes.INT()), + DataTypes.FIELD(1, "b", DataTypes.STRING())), + + // special + DataTypes.VARIANT(), + DataTypes.BLOB(), + + // not-null variants (exercise nullability flag on type) + DataTypes.INT().copy(false), + DataTypes.STRING().copy(false), + DataTypes.ARRAY(DataTypes.INT()).copy(false)); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/PartialMaskTransformTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PartialMaskTransformTest.java new file mode 100644 index 000000000000..4c0fd5bc7a5e --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/PartialMaskTransformTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class PartialMaskTransformTest { + + @Test + void testNull() { + PartialMaskTransform transform = + new PartialMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + 2, + 2, + BinaryString.fromString("*")); + assertThat(transform.transform(GenericRow.of((Object) null))).isNull(); + } + + @Test + void testNormal() { + PartialMaskTransform transform = + new PartialMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + 2, + 2, + BinaryString.fromString("*")); + Object out = transform.transform(GenericRow.of(BinaryString.fromString("abcdef"))); + assertThat(out).isEqualTo(BinaryString.fromString("ab**ef")); + } + + @Test + void testShorterThanPrefixPlusSuffix() { + PartialMaskTransform transform = + new PartialMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + 3, + 3, + BinaryString.fromString("*")); + Object out = transform.transform(GenericRow.of(BinaryString.fromString("abc"))); + assertThat(out).isEqualTo(BinaryString.fromString("***")); + } + + @Test + void testMultiCharMaskToken() { + PartialMaskTransform transform = + new PartialMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + 1, + 1, + BinaryString.fromString("xx")); + Object out = transform.transform(GenericRow.of(BinaryString.fromString("abcd"))); + assertThat(out).isEqualTo(BinaryString.fromString("axxxxd")); + } + + @Test + void testIllegalArgs() { + assertThatThrownBy( + () -> + new PartialMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + -1, + 0, + BinaryString.fromString("*"))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + new PartialMaskTransform( + new FieldRef(0, "f0", DataTypes.STRING()), + 0, + 0, + BinaryString.fromString(""))) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 6c98d0fb2630..52902ce2bef6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -577,7 +577,8 @@ public boolean supportsVersionManagement() { } @Override - public List authTableQuery(Identifier identifier, List select) { + public TableQueryAuthResult authTableQuery( + Identifier identifier, @Nullable List select) { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 74e35dde3dac..0c111711ad24 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -1032,14 +1032,15 @@ void alterFunction( // ==================== Table Auth ========================== /** - * Auth table query select and get the filter for row level access control. + * Auth table query select and get the filter for row level access control and column masking + * rules. * * @param identifier path of the table to alter partitions * @param select selected fields, null if select all - * @return additional filter for row level access control + * @return additional filter for row level access control and column masking * @throws TableNotExistException if the table does not exist */ - List authTableQuery(Identifier identifier, @Nullable List select) + TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException; // ==================== Catalog Information ========================== diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 5e286191de6a..b92cd63d941f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -409,7 +409,7 @@ public PagedList listPartitionsPaged( } @Override - public List authTableQuery(Identifier identifier, @Nullable List select) + public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException { return wrapped.authTableQuery(identifier, select); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java new file mode 100644 index 000000000000..8bacf6e5c42e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.Transform; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; + +/** Auth result for table query, including row-level filter and optional column masking rules. */ +public class TableQueryAuthResult { + + @Nullable private final Predicate rowFilter; + private final Map columnMasking; + + public TableQueryAuthResult( + @Nullable Predicate rowFilter, Map columnMasking) { + this.rowFilter = rowFilter; + this.columnMasking = columnMasking == null ? Collections.emptyMap() : columnMasking; + } + + public static TableQueryAuthResult empty() { + return new TableQueryAuthResult(null, Collections.emptyMap()); + } + + @Nullable + public Predicate rowFilter() { + return rowFilter; + } + + public Map columnMasking() { + return columnMasking; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index d8254bb58fee..f29354d1ea35 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -30,6 +30,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.catalog.TableMetadata; +import org.apache.paimon.catalog.TableQueryAuthResult; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.function.Function; @@ -37,6 +38,10 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.Transform; import org.apache.paimon.rest.exceptions.AlreadyExistsException; import org.apache.paimon.rest.exceptions.BadRequestException; import org.apache.paimon.rest.exceptions.ForbiddenException; @@ -59,12 +64,15 @@ import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.PredicateJsonSerde; import org.apache.paimon.utils.SnapshotNotExistException; +import org.apache.paimon.utils.TransformJsonSerde; import org.apache.paimon.view.View; import org.apache.paimon.view.ViewChange; import org.apache.paimon.view.ViewImpl; import org.apache.paimon.view.ViewSchema; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils; import javax.annotation.Nullable; @@ -79,6 +87,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BRANCH; @@ -524,11 +533,53 @@ public void alterTable( } @Override - public List authTableQuery(Identifier identifier, @Nullable List select) + public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException { checkNotSystemTable(identifier, "authTable"); try { - return api.authTableQuery(identifier, select); + org.apache.paimon.rest.responses.AuthTableQueryResponse response = + api.authTableQuery(identifier, select); + + List predicateJsons = response == null ? null : response.filter(); + Predicate rowFilter = null; + if (predicateJsons != null && !predicateJsons.isEmpty()) { + List predicates = new ArrayList<>(); + for (String json : predicateJsons) { + if (json == null || json.trim().isEmpty()) { + continue; + } + Predicate predicate = PredicateJsonSerde.parse(json); + if (predicate != null) { + predicates.add(predicate); + } + } + if (predicates.size() == 1) { + rowFilter = predicates.get(0); + } else if (!predicates.isEmpty()) { + rowFilter = new CompoundPredicate(And.INSTANCE, predicates); + } + } + + Map columnMasking = new TreeMap<>(); + Map maskingJsons = response == null ? null : response.columnMasking(); + if (maskingJsons != null && !maskingJsons.isEmpty()) { + for (Map.Entry e : maskingJsons.entrySet()) { + String column = e.getKey(); + String json = e.getValue(); + if (column == null + || column.trim().isEmpty() + || json == null + || json.trim().isEmpty()) { + continue; + } + Transform transform = TransformJsonSerde.parse(json); + if (transform != null) { + columnMasking.put(column, transform); + } + } + } + + return new TableQueryAuthResult(rowFilter, columnMasking); } catch (NoSuchResourceException e) { throw new TableNotExistException(identifier); } catch (ForbiddenException e) { @@ -539,6 +590,8 @@ public List authTableQuery(Identifier identifier, @Nullable List throw new UnsupportedOperationException(e.getMessage()); } catch (BadRequestException e) { throw new RuntimeException(new IllegalArgumentException(e.getMessage())); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index 0f61f8fa0978..de196d0b6eea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -28,6 +28,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.catalog.TableQueryAuthResult; import org.apache.paimon.operation.Lock; import org.apache.paimon.table.source.TableQueryAuth; import org.apache.paimon.tag.SnapshotLoaderImpl; @@ -37,9 +38,10 @@ import javax.annotation.Nullable; import java.io.Serializable; -import java.util.Collections; import java.util.Optional; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + /** Catalog environment in table which contains log factory, metastore client factory. */ public class CatalogEnvironment implements Serializable { @@ -154,10 +156,11 @@ public CatalogEnvironment copy(Identifier identifier) { public TableQueryAuth tableQueryAuth(CoreOptions options) { if (!options.queryAuthEnabled() || catalogLoader == null) { - return select -> Collections.emptyList(); + return select -> TableQueryAuthResult.empty(); } + final CatalogLoader loader = checkNotNull(catalogLoader); return select -> { - try (Catalog catalog = catalogLoader.load()) { + try (Catalog catalog = loader.load()) { return catalog.authTableQuery(identifier, select); } catch (Exception e) { throw new RuntimeException(e); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index dcadfad1ff3a..6e9a44e5558e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.TableQueryAuthResult; import org.apache.paimon.consumer.Consumer; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; @@ -169,8 +170,12 @@ protected void authQuery() { if (!options.queryAuthEnabled()) { return; } - queryAuth.auth(readType == null ? null : readType.getFieldNames()); - // TODO add support for row level access control + TableQueryAuthResult authResult = + queryAuth.auth(readType == null ? null : readType.getFieldNames()); + Predicate rowFilter = authResult == null ? null : authResult.rowFilter(); + if (rowFilter != null) { + snapshotReader.withFilter(rowFilter); + } } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/MaskingTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/MaskingTableRead.java new file mode 100644 index 000000000000..cdeb14481e57 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/MaskingTableRead.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.InternalRowUtils.get; + +/** + * A {@link TableRead} wrapper which applies column masking for each produced {@link InternalRow}. + */ +public class MaskingTableRead implements TableRead { + + private final TableRead wrapped; + private final RowType outputRowType; + private final Map masking; + private final MaskingApplier applier; + + public MaskingTableRead( + TableRead wrapped, RowType outputRowType, Map masking) { + this.wrapped = wrapped; + this.outputRowType = outputRowType; + this.masking = masking; + this.applier = new MaskingApplier(outputRowType, masking); + } + + @Override + public TableRead withMetricRegistry(MetricRegistry registry) { + return new MaskingTableRead(wrapped.withMetricRegistry(registry), outputRowType, masking); + } + + @Override + public TableRead executeFilter() { + return new MaskingTableRead(wrapped.executeFilter(), outputRowType, masking); + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return new MaskingTableRead(wrapped.withIOManager(ioManager), outputRowType, masking); + } + + @Override + public RecordReader createReader(Split split) throws IOException { + return new MaskingRecordReader(wrapped.createReader(split), applier); + } + + private static class MaskingRecordReader implements RecordReader { + + private final RecordReader wrapped; + private final MaskingApplier applier; + + private MaskingRecordReader(RecordReader wrapped, MaskingApplier applier) { + this.wrapped = wrapped; + this.applier = applier; + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + RecordIterator batch = wrapped.readBatch(); + if (batch == null) { + return null; + } + return batch.transform(applier::apply); + } + + @Override + public void close() throws IOException { + wrapped.close(); + } + } + + private static class MaskingApplier { + + private final RowType outputRowType; + private final Map remapped; + + private MaskingApplier(RowType outputRowType, Map masking) { + this.outputRowType = outputRowType; + this.remapped = remapToOutputRow(outputRowType, masking); + } + + private InternalRow apply(InternalRow row) { + if (remapped.isEmpty()) { + return row; + } + int arity = outputRowType.getFieldCount(); + GenericRow out = new GenericRow(row.getRowKind(), arity); + for (int i = 0; i < arity; i++) { + DataType type = outputRowType.getTypeAt(i); + out.setField(i, get(row, i, type)); + } + for (Map.Entry e : remapped.entrySet()) { + int targetIndex = e.getKey(); + Transform transform = e.getValue(); + Object masked = transform.transform(row); + out.setField(targetIndex, masked); + } + return out; + } + + private static Map remapToOutputRow( + RowType outputRowType, Map masking) { + Map out = new HashMap<>(); + if (masking == null || masking.isEmpty()) { + return out; + } + + for (Map.Entry e : masking.entrySet()) { + String targetColumn = e.getKey(); + Transform transform = e.getValue(); + if (targetColumn == null || transform == null) { + continue; + } + + int targetIndex = outputRowType.getFieldIndex(targetColumn); + if (targetIndex < 0) { + continue; + } + + List newInputs = new ArrayList<>(); + for (Object input : transform.inputs()) { + if (input instanceof FieldRef) { + FieldRef ref = (FieldRef) input; + int newIndex = outputRowType.getFieldIndex(ref.name()); + if (newIndex < 0) { + throw new IllegalArgumentException( + "Column masking refers to field '" + + ref.name() + + "' which is not present in output row type " + + outputRowType); + } + DataType type = outputRowType.getTypeAt(newIndex); + newInputs.add(new FieldRef(newIndex, ref.name(), type)); + } else { + newInputs.add(input); + } + } + out.put(targetIndex, transform.copyWithNewInputs(newInputs)); + } + return out; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index c81dfd8e01dd..dcdaaf5d92a0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.TableQueryAuthResult; import org.apache.paimon.data.variant.VariantAccessInfo; import org.apache.paimon.data.variant.VariantAccessInfoUtils; import org.apache.paimon.partition.PartitionPredicate; @@ -26,6 +27,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.TopN; import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; @@ -243,7 +245,30 @@ private InnerTableScan configureScan(InnerTableScan scan) { @Override public TableRead newRead() { - InnerTableRead read = table.newRead().withFilter(filter); + Predicate readFilter = filter; + boolean executeFilter = false; + TableQueryAuthResult authResult = null; + if (table instanceof FileStoreTable) { + CoreOptions options = new CoreOptions(table.options()); + if (options.queryAuthEnabled()) { + TableQueryAuth queryAuth = + ((FileStoreTable) table).catalogEnvironment().tableQueryAuth(options); + authResult = queryAuth.auth(readType == null ? null : readType.getFieldNames()); + Predicate rowFilter = authResult == null ? null : authResult.rowFilter(); + if (rowFilter != null) { + readFilter = + readFilter == null + ? rowFilter + : PredicateBuilder.and(readFilter, rowFilter); + executeFilter = true; + } + } + } + + InnerTableRead read = table.newRead().withFilter(readFilter); + if (executeFilter) { + read.executeFilter(); + } if (readType != null) { read.withReadType(readType); } @@ -256,6 +281,11 @@ public TableRead newRead() { if (variantAccessInfo != null) { read.withVariantAccess(variantAccessInfo); } + if (authResult != null + && authResult.columnMasking() != null + && !authResult.columnMasking().isEmpty()) { + return new MaskingTableRead(read, readType(), authResult.columnMasking()); + } return read; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java index 96a0dfb3a591..b4e98576ad61 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java @@ -18,6 +18,8 @@ package org.apache.paimon.table.source; +import org.apache.paimon.catalog.TableQueryAuthResult; + import javax.annotation.Nullable; import java.util.List; @@ -25,5 +27,6 @@ /** Table query auth. */ public interface TableQueryAuth { - List auth(@Nullable List select); + @Nullable + TableQueryAuthResult auth(@Nullable List select); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PredicateJsonSerde.java b/paimon-core/src/main/java/org/apache/paimon/utils/PredicateJsonSerde.java new file mode 100644 index 000000000000..86794bec73cf --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PredicateJsonSerde.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Contains; +import org.apache.paimon.predicate.EndsWith; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FieldTransform; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.IsNotNull; +import org.apache.paimon.predicate.IsNull; +import org.apache.paimon.predicate.LeafFunction; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.Like; +import org.apache.paimon.predicate.NotEqual; +import org.apache.paimon.predicate.NotIn; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.predicate.StartsWith; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.predicate.TransformPredicate; +import org.apache.paimon.rest.RESTApi; +import org.apache.paimon.types.DataType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** SerDe for {@link Predicate} JSON entries. */ +public class PredicateJsonSerde { + + private static final ObjectMapper MAPPER = RESTApi.OBJECT_MAPPER; + + private static final String FIELD_PREDICATE = "predicate"; + + private static final String FIELD_TYPE = "type"; + private static final String TYPE_COMPOUND = "compound"; + private static final String TYPE_TRANSFORM = "transform"; + + private static final String FIELD_FUNCTION = "function"; + private static final String FIELD_CHILDREN = "children"; + + private static final String FIELD_TRANSFORM = "transform"; + private static final String FIELD_LITERALS = "literals"; + + private PredicateJsonSerde() {} + + @Nullable + public static Predicate parse(@Nullable String json) throws JsonProcessingException { + if (json == null || json.trim().isEmpty()) { + return null; + } + + JsonNode root = MAPPER.readTree(json); + JsonNode predicateNode = root.get(FIELD_PREDICATE); + if (predicateNode == null || predicateNode.isNull()) { + return null; + } + return parsePredicateNode(predicateNode); + } + + public static String toJsonString(Predicate predicate) { + return toJsonString(toJsonNode(predicate)); + } + + private static String toJsonString(JsonNode predicateObject) { + ObjectNode root = MAPPER.createObjectNode(); + root.set(FIELD_PREDICATE, predicateObject); + try { + return MAPPER.writeValueAsString(root); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize predicate entry json.", e); + } + } + + public static ObjectNode compoundPredicateNode( + String function, List children) { + ObjectNode predicate = MAPPER.createObjectNode(); + predicate.put(FIELD_TYPE, TYPE_COMPOUND); + predicate.put(FIELD_FUNCTION, function); + ArrayNode array = MAPPER.createArrayNode(); + if (children != null) { + for (JsonNode child : children) { + array.add(child); + } + } + predicate.set(FIELD_CHILDREN, array); + return predicate; + } + + private static JsonNode toJsonNode(Predicate predicate) { + if (predicate instanceof LeafPredicate) { + LeafPredicate leaf = (LeafPredicate) predicate; + Transform transform = + new FieldTransform(new FieldRef(leaf.index(), leaf.fieldName(), leaf.type())); + return transformPredicateJsonNode( + transform, leafFunctionName(leaf.function()), leaf.literals()); + } + + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compound = (CompoundPredicate) predicate; + List children = + compound.children().stream() + .map(PredicateJsonSerde::toJsonNode) + .collect(Collectors.toList()); + String fn = compound.function().equals(Or.INSTANCE) ? "OR" : "AND"; + return compoundPredicateNode(fn, children); + } + + if (predicate instanceof TransformPredicate) { + TransformPredicate transformPredicate = (TransformPredicate) predicate; + return transformPredicateJsonNode( + transformPredicate.transform(), + leafFunctionName(transformPredicate.function()), + transformPredicate.literals()); + } + + throw new IllegalArgumentException( + "Unsupported predicate type: " + predicate.getClass().getName()); + } + + private static ObjectNode transformPredicateJsonNode( + Transform transform, String function, @Nullable List literals) { + ObjectNode predicate = MAPPER.createObjectNode(); + predicate.put(FIELD_TYPE, TYPE_TRANSFORM); + predicate.set(FIELD_TRANSFORM, TransformJsonSerde.toJsonNode(transform)); + predicate.put(FIELD_FUNCTION, function); + + ArrayNode lits = MAPPER.createArrayNode(); + if (literals != null) { + for (Object lit : literals) { + lits.add(MAPPER.valueToTree(lit)); + } + } + predicate.set(FIELD_LITERALS, lits); + return predicate; + } + + private static String leafFunctionName(LeafFunction function) { + if (function.equals(Equal.INSTANCE)) { + return "EQUAL"; + } else if (function.equals(NotEqual.INSTANCE)) { + return "NOT_EQUAL"; + } else if (function.equals(GreaterThan.INSTANCE)) { + return "GREATER_THAN"; + } else if (function.equals(GreaterOrEqual.INSTANCE)) { + return "GREATER_OR_EQUAL"; + } else if (function.equals(LessThan.INSTANCE)) { + return "LESS_THAN"; + } else if (function.equals(LessOrEqual.INSTANCE)) { + return "LESS_OR_EQUAL"; + } else if (function.equals(In.INSTANCE)) { + return "IN"; + } else if (function.equals(NotIn.INSTANCE)) { + return "NOT_IN"; + } else if (function.equals(IsNull.INSTANCE)) { + return "IS_NULL"; + } else if (function.equals(IsNotNull.INSTANCE)) { + return "IS_NOT_NULL"; + } else if (function.equals(StartsWith.INSTANCE)) { + return "STARTS_WITH"; + } else if (function.equals(EndsWith.INSTANCE)) { + return "ENDS_WITH"; + } else if (function.equals(Contains.INSTANCE)) { + return "CONTAINS"; + } else if (function.equals(Like.INSTANCE)) { + return "LIKE"; + } + + throw new IllegalArgumentException( + "Unsupported leaf function: " + function.getClass().getName()); + } + + private static Predicate parsePredicateNode(JsonNode node) { + String type = requiredText(node, FIELD_TYPE); + if (TYPE_COMPOUND.equals(type)) { + String fnText = requiredText(node, FIELD_FUNCTION); + CompoundPredicate.Function fn = "OR".equals(fnText) ? Or.INSTANCE : And.INSTANCE; + + JsonNode childrenNode = node.get(FIELD_CHILDREN); + List children = + childrenNode == null || childrenNode.isNull() + ? new ArrayList<>() + : toList(childrenNode).stream() + .map(PredicateJsonSerde::parsePredicateNode) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + return new CompoundPredicate(fn, children); + } + + if (TYPE_TRANSFORM.equals(type)) { + Transform transform = + TransformJsonSerde.parseTransformNode(required(node, FIELD_TRANSFORM)); + LeafFunction fn = parseLeafFunction(requiredText(node, FIELD_FUNCTION)); + + List literals = new ArrayList<>(); + JsonNode literalsNode = node.get(FIELD_LITERALS); + if (literalsNode instanceof ArrayNode) { + DataType literalType = transform.outputType(); + for (JsonNode lit : (ArrayNode) literalsNode) { + Object javaObj = MAPPER.convertValue(lit, Object.class); + literals.add(PredicateBuilder.convertJavaObject(literalType, javaObj)); + } + } + + return TransformPredicate.of(transform, fn, literals); + } + + throw new IllegalArgumentException("Unsupported predicate type: " + type); + } + + private static LeafFunction parseLeafFunction(String function) { + switch (function) { + case "EQUAL": + return Equal.INSTANCE; + case "NOT_EQUAL": + return NotEqual.INSTANCE; + case "GREATER_THAN": + return GreaterThan.INSTANCE; + case "GREATER_OR_EQUAL": + return GreaterOrEqual.INSTANCE; + case "LESS_THAN": + return LessThan.INSTANCE; + case "LESS_OR_EQUAL": + return LessOrEqual.INSTANCE; + case "IN": + return In.INSTANCE; + case "NOT_IN": + return NotIn.INSTANCE; + case "IS_NULL": + return IsNull.INSTANCE; + case "IS_NOT_NULL": + return IsNotNull.INSTANCE; + case "STARTS_WITH": + return StartsWith.INSTANCE; + case "ENDS_WITH": + return EndsWith.INSTANCE; + case "CONTAINS": + return Contains.INSTANCE; + case "LIKE": + return Like.INSTANCE; + default: + throw new IllegalArgumentException("Unsupported leaf function: " + function); + } + } + + private static JsonNode required(JsonNode node, String field) { + JsonNode v = node.get(field); + if (v == null || v.isNull()) { + throw new IllegalArgumentException("Missing required field: " + field); + } + return v; + } + + private static String requiredText(JsonNode node, String field) { + return required(node, field).asText(); + } + + private static List toList(JsonNode node) { + List list = new ArrayList<>(); + if (node instanceof ArrayNode) { + node.forEach(list::add); + } + return list; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TransformJsonSerde.java b/paimon-core/src/main/java/org/apache/paimon/utils/TransformJsonSerde.java new file mode 100644 index 000000000000..6bbfa3eb2db6 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TransformJsonSerde.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.predicate.CastTransform; +import org.apache.paimon.predicate.ConcatTransform; +import org.apache.paimon.predicate.ConcatWsTransform; +import org.apache.paimon.predicate.DefaultValueTransform; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FieldTransform; +import org.apache.paimon.predicate.HashMaskTransform; +import org.apache.paimon.predicate.NullTransform; +import org.apache.paimon.predicate.PartialMaskTransform; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.predicate.UpperTransform; +import org.apache.paimon.rest.RESTApi; +import org.apache.paimon.types.DataType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** SerDe for {@link Transform} JSON entries. */ +public class TransformJsonSerde { + + private static final ObjectMapper MAPPER = RESTApi.OBJECT_MAPPER; + + private static final String FIELD_TRANSFORM = "transform"; + private static final String FIELD_TYPE = "type"; + + private static final String TRANSFORM_TYPE_FIELD = "field"; + private static final String TRANSFORM_TYPE_UPPER = "upper"; + private static final String TRANSFORM_TYPE_CONCAT = "concat"; + private static final String TRANSFORM_TYPE_CONCAT_WS = "concat_ws"; + private static final String TRANSFORM_TYPE_CAST = "cast"; + private static final String TRANSFORM_TYPE_MASK = "mask"; + private static final String TRANSFORM_TYPE_HASH = "hash"; + private static final String TRANSFORM_TYPE_NULL = "null"; + private static final String TRANSFORM_TYPE_DEFAULT = "default"; + private static final String TRANSFORM_TYPE_LITERAL = "literal"; + + private static final String FIELD_INPUTS = "inputs"; + private static final String FIELD_VALUE = "value"; + private static final String FIELD_TO_DATA_TYPE = "toDataType"; + private static final String FIELD_FIELD = "field"; + private static final String FIELD_PREFIX_LEN = "prefixLen"; + private static final String FIELD_SUFFIX_LEN = "suffixLen"; + private static final String FIELD_MASK = "mask"; + private static final String FIELD_ALGORITHM = "algorithm"; + private static final String FIELD_SALT = "salt"; + private static final String FIELD_INDEX = "index"; + private static final String FIELD_NAME = "name"; + private static final String FIELD_DATA_TYPE = "dataType"; + + private TransformJsonSerde() {} + + @Nullable + public static Transform parse(@Nullable String json) throws JsonProcessingException { + if (json == null || json.trim().isEmpty()) { + return null; + } + + JsonNode root = MAPPER.readTree(json); + + if (root.hasNonNull(FIELD_TRANSFORM) && !root.has(FIELD_TYPE)) { + return parseTransformNode(required(root, FIELD_TRANSFORM)); + } + + if (root.hasNonNull(FIELD_TYPE)) { + return parseTransformNode(root); + } + + throw new IllegalArgumentException("Invalid transform json: missing required fields."); + } + + public static String toJsonString(Transform transform) { + ObjectNode root = MAPPER.createObjectNode(); + root.set(FIELD_TRANSFORM, toJsonNode(transform)); + try { + return MAPPER.writeValueAsString(root); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize transform json.", e); + } + } + + public static ObjectNode toJsonNode(Transform transform) { + if (transform instanceof FieldTransform) { + return fieldRefToJsonNode(((FieldTransform) transform).fieldRef()); + } + + if (transform instanceof UpperTransform) { + return stringTransformToJsonNode(TRANSFORM_TYPE_UPPER, transform.inputs()); + } + if (transform instanceof ConcatTransform) { + return stringTransformToJsonNode(TRANSFORM_TYPE_CONCAT, transform.inputs()); + } + if (transform instanceof ConcatWsTransform) { + return stringTransformToJsonNode(TRANSFORM_TYPE_CONCAT_WS, transform.inputs()); + } + + if (transform instanceof CastTransform) { + ObjectNode node = MAPPER.createObjectNode(); + node.put(FIELD_TYPE, TRANSFORM_TYPE_CAST); + FieldRef fieldRef = (FieldRef) transform.inputs().get(0); + node.set(FIELD_FIELD, fieldRefToJsonNode(fieldRef)); + node.set(FIELD_TO_DATA_TYPE, MAPPER.valueToTree(transform.outputType())); + return node; + } + + if (transform instanceof PartialMaskTransform) { + PartialMaskTransform maskTransform = (PartialMaskTransform) transform; + ObjectNode node = MAPPER.createObjectNode(); + node.put(FIELD_TYPE, TRANSFORM_TYPE_MASK); + node.set(FIELD_FIELD, fieldRefToJsonNode(maskTransform.fieldRef())); + node.put(FIELD_PREFIX_LEN, maskTransform.prefixLen()); + node.put(FIELD_SUFFIX_LEN, maskTransform.suffixLen()); + node.put(FIELD_MASK, maskTransform.mask().toString()); + return node; + } + + if (transform instanceof HashMaskTransform) { + HashMaskTransform hashTransform = (HashMaskTransform) transform; + ObjectNode node = MAPPER.createObjectNode(); + node.put(FIELD_TYPE, TRANSFORM_TYPE_HASH); + node.set(FIELD_FIELD, fieldRefToJsonNode(hashTransform.fieldRef())); + node.put(FIELD_ALGORITHM, hashTransform.algorithm()); + BinaryString salt = hashTransform.salt(); + if (salt != null) { + node.put(FIELD_SALT, salt.toString()); + } + return node; + } + + if (transform instanceof NullTransform) { + NullTransform nullTransform = (NullTransform) transform; + ObjectNode node = MAPPER.createObjectNode(); + node.put(FIELD_TYPE, TRANSFORM_TYPE_NULL); + node.set(FIELD_FIELD, fieldRefToJsonNode(nullTransform.fieldRef())); + return node; + } + if (transform instanceof DefaultValueTransform) { + DefaultValueTransform defaultValueTransform = (DefaultValueTransform) transform; + ObjectNode node = MAPPER.createObjectNode(); + node.put(FIELD_TYPE, TRANSFORM_TYPE_DEFAULT); + node.set(FIELD_FIELD, fieldRefToJsonNode(defaultValueTransform.fieldRef())); + return node; + } + + throw new IllegalArgumentException( + "Unsupported transform type: " + transform.getClass().getName()); + } + + public static Transform parseTransformNode(JsonNode node) { + String type = requiredText(node, FIELD_TYPE); + if (TRANSFORM_TYPE_FIELD.equals(type)) { + int index = required(node, FIELD_INDEX).asInt(); + String name = requiredText(node, FIELD_NAME); + DataType dataType = + MAPPER.convertValue(required(node, FIELD_DATA_TYPE), DataType.class); + return new FieldTransform(new FieldRef(index, name, dataType)); + } + if (TRANSFORM_TYPE_UPPER.equals(type) + || TRANSFORM_TYPE_CONCAT.equals(type) + || TRANSFORM_TYPE_CONCAT_WS.equals(type)) { + List inputs = parseTransformInputs(required(node, FIELD_INPUTS)); + if (TRANSFORM_TYPE_UPPER.equals(type)) { + return new UpperTransform(inputs); + } else if (TRANSFORM_TYPE_CONCAT_WS.equals(type)) { + return new ConcatWsTransform(inputs); + } else { + return new ConcatTransform(inputs); + } + } + if (TRANSFORM_TYPE_CAST.equals(type)) { + FieldRef fieldRef = parseFieldRef(required(node, FIELD_FIELD)); + DataType toType = + MAPPER.convertValue(required(node, FIELD_TO_DATA_TYPE), DataType.class); + return CastTransform.tryCreate(fieldRef, toType) + .orElseThrow( + () -> + new IllegalArgumentException( + "Unsupported CAST transform from " + + fieldRef.type() + + " to " + + toType)); + } + if (TRANSFORM_TYPE_MASK.equals(type)) { + FieldRef fieldRef = parseFieldRef(required(node, FIELD_FIELD)); + int prefixLen = optionalInt(node, FIELD_PREFIX_LEN, 0); + int suffixLen = optionalInt(node, FIELD_SUFFIX_LEN, 0); + String mask = optionalText(node, FIELD_MASK, "*"); + return new PartialMaskTransform( + fieldRef, prefixLen, suffixLen, BinaryString.fromString(mask)); + } + if (TRANSFORM_TYPE_HASH.equals(type)) { + FieldRef fieldRef = parseFieldRef(required(node, FIELD_FIELD)); + String algorithm = optionalText(node, FIELD_ALGORITHM, "SHA-256"); + String salt = optionalText(node, FIELD_SALT, null); + return new HashMaskTransform( + fieldRef, algorithm, salt == null ? null : BinaryString.fromString(salt)); + } + if (TRANSFORM_TYPE_NULL.equals(type)) { + FieldRef fieldRef = parseFieldRef(required(node, FIELD_FIELD)); + return new NullTransform(fieldRef); + } + if (TRANSFORM_TYPE_DEFAULT.equals(type)) { + FieldRef fieldRef = parseFieldRef(required(node, FIELD_FIELD)); + return new DefaultValueTransform(fieldRef); + } + throw new IllegalArgumentException("Unsupported transform type: " + type); + } + + private static ObjectNode fieldRefToJsonNode(FieldRef fieldRef) { + ObjectNode node = MAPPER.createObjectNode(); + node.put(FIELD_TYPE, TRANSFORM_TYPE_FIELD); + node.put(FIELD_INDEX, fieldRef.index()); + node.put(FIELD_NAME, fieldRef.name()); + node.set(FIELD_DATA_TYPE, MAPPER.valueToTree(fieldRef.type())); + return node; + } + + private static ObjectNode stringTransformToJsonNode(String type, List inputs) { + ObjectNode node = MAPPER.createObjectNode(); + node.put(FIELD_TYPE, type); + ArrayNode inputNodes = MAPPER.createArrayNode(); + for (Object input : inputs) { + if (input == null) { + inputNodes.addNull(); + } else if (input instanceof FieldRef) { + inputNodes.add(fieldRefToJsonNode((FieldRef) input)); + } else if (input instanceof BinaryString) { + ObjectNode literal = MAPPER.createObjectNode(); + literal.put(FIELD_TYPE, TRANSFORM_TYPE_LITERAL); + literal.put(FIELD_VALUE, ((BinaryString) input).toString()); + inputNodes.add(literal); + } else { + throw new IllegalArgumentException( + "Unsupported transform input type: " + input.getClass().getName()); + } + } + node.set(FIELD_INPUTS, inputNodes); + return node; + } + + private static List parseTransformInputs(JsonNode node) { + List inputs = new ArrayList<>(); + if (!(node instanceof ArrayNode)) { + throw new IllegalArgumentException("Transform inputs must be an array."); + } + for (JsonNode inputNode : (ArrayNode) node) { + if (inputNode == null || inputNode.isNull()) { + inputs.add(null); + continue; + } + if (inputNode.isTextual()) { + inputs.add(BinaryString.fromString(inputNode.asText())); + continue; + } + String type = requiredText(inputNode, FIELD_TYPE); + if (TRANSFORM_TYPE_FIELD.equals(type)) { + inputs.add(parseFieldRef(inputNode)); + } else if (TRANSFORM_TYPE_LITERAL.equals(type)) { + inputs.add(BinaryString.fromString(requiredText(inputNode, FIELD_VALUE))); + } else { + throw new IllegalArgumentException("Unsupported transform input type: " + type); + } + } + return inputs; + } + + private static FieldRef parseFieldRef(JsonNode node) { + int index = required(node, FIELD_INDEX).asInt(); + String name = requiredText(node, FIELD_NAME); + DataType dataType = MAPPER.convertValue(required(node, FIELD_DATA_TYPE), DataType.class); + return new FieldRef(index, name, dataType); + } + + private static JsonNode required(JsonNode node, String field) { + JsonNode v = node.get(field); + if (v == null || v.isNull()) { + throw new IllegalArgumentException("Missing required field: " + field); + } + return v; + } + + private static String requiredText(JsonNode node, String field) { + return required(node, field).asText(); + } + + private static int optionalInt(JsonNode node, String field, int defaultValue) { + JsonNode v = node.get(field); + if (v == null || v.isNull()) { + return defaultValue; + } + return v.asInt(); + } + + private static String optionalText(JsonNode node, String field, String defaultValue) { + JsonNode v = node.get(field); + if (v == null || v.isNull()) { + return defaultValue; + } + return v.asText(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java index 150a8e28eb96..5dc6acc6e9d7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java @@ -25,8 +25,23 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.ConcatTransform; +import org.apache.paimon.predicate.ConcatWsTransform; +import org.apache.paimon.predicate.DefaultValueTransform; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FieldTransform; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.HashMaskTransform; +import org.apache.paimon.predicate.NullTransform; +import org.apache.paimon.predicate.PartialMaskTransform; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.predicate.TransformPredicate; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.rest.auth.AuthProvider; import org.apache.paimon.rest.auth.AuthProviderEnum; import org.apache.paimon.rest.auth.BearTokenAuthProvider; @@ -37,7 +52,17 @@ import org.apache.paimon.rest.exceptions.NotAuthorizedException; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.PredicateJsonSerde; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -47,14 +72,18 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import static org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED; import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; import static org.apache.paimon.rest.RESTApi.HEADER_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -246,6 +275,311 @@ void testCreateFormatTableWhenEnableDataToken() throws Exception { catalog.dropTable(identifier, true); } + @Test + void testRowFilter() throws Exception { + Identifier identifier = Identifier.create("test_table_db", "auth_table_filter"); + catalog.createDatabase(identifier.getDatabaseName(), true); + catalog.createTable( + identifier, + new Schema( + Collections.singletonList(new DataField(0, "col1", DataTypes.INT())), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonMap(QUERY_AUTH_ENABLED.key(), "true"), + ""), + true); + + Table table = catalog.getTable(identifier); + batchWrite(table, Arrays.asList(1, 2, 3, 4)); + + // Only allow rows with col1 > 2 + TransformPredicate rowFilterPredicate = + TransformPredicate.of( + new FieldTransform(new FieldRef(0, "col1", DataTypes.INT())), + GreaterThan.INSTANCE, + Collections.singletonList(2)); + restCatalogServer.addTableFilter( + identifier, PredicateJsonSerde.toJsonString(rowFilterPredicate)); + + assertThat(batchRead(table)).containsExactly("+I[3]", "+I[4]"); + } + + @Test + void testColumnMaskingApplyOnRead() throws Exception { + Identifier identifier = Identifier.create("test_table_db", "auth_table_masking_apply"); + catalog.createDatabase(identifier.getDatabaseName(), true); + catalog.createTable( + identifier, + new Schema( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING()), + new DataField(1, "col2", DataTypes.INT())), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonMap(QUERY_AUTH_ENABLED.key(), "true"), + ""), + true); + + Table table = catalog.getTable(identifier); + + // write two rows: ("abcdef", 1), ("ghijkl", 2) + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + write.write(GenericRow.of(BinaryString.fromString("abcdef"), 1)); + write.write(GenericRow.of(BinaryString.fromString("ghijkl"), 2)); + List messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + write.close(); + commit.close(); + + { + // Mask col1 as constant "****" + Transform constantMaskTransform = + new ConcatTransform(Collections.singletonList(BinaryString.fromString("****"))); + restCatalogServer.addTableColumnMasking( + identifier, ImmutableMap.of("col1", constantMaskTransform)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List actual = new ArrayList<>(); + List col2 = new ArrayList<>(); + reader.forEachRemaining( + row -> { + actual.add(row.getString(0).toString()); + col2.add(row.getInt(1)); + }); + assertThat(actual).containsExactly("****", "****"); + assertThat(col2).containsExactly(1, 2); + } + + { + // Mask col1 by keeping prefix/suffix and masking middle: ab**ef, gh**kl + Transform partialMaskTransform = + new PartialMaskTransform( + new FieldRef(0, "col1", DataTypes.STRING()), + 2, + 2, + BinaryString.fromString("*")); + restCatalogServer.addTableColumnMasking( + identifier, ImmutableMap.of("col1", partialMaskTransform)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List actual = new ArrayList<>(); + List col2 = new ArrayList<>(); + reader.forEachRemaining( + row -> { + actual.add(row.getString(0).toString()); + col2.add(row.getInt(1)); + }); + assertThat(actual).containsExactly("ab**ef", "gh**kl"); + assertThat(col2).containsExactly(1, 2); + } + + { + // Mask col1 as "MASK_" + col1 + "_END" (ConcatTransform with FieldRef) + Transform concatMaskTransform = + new ConcatTransform( + Arrays.asList( + BinaryString.fromString("MASK_"), + new FieldRef(0, "col1", DataTypes.STRING()), + BinaryString.fromString("_END"))); + restCatalogServer.addTableColumnMasking( + identifier, ImmutableMap.of("col1", concatMaskTransform)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List actual = new ArrayList<>(); + List col2 = new ArrayList<>(); + reader.forEachRemaining( + row -> { + actual.add(row.getString(0).toString()); + col2.add(row.getInt(1)); + }); + assertThat(actual).containsExactly("MASK_abcdef_END", "MASK_ghijkl_END"); + assertThat(col2).containsExactly(1, 2); + } + + { + // Mask col1 as "PFX" + sep + col1 + sep + "SFX" (ConcatWsTransform with FieldRef) + Transform concatWsMaskTransform = + new ConcatWsTransform( + Arrays.asList( + BinaryString.fromString("_"), + BinaryString.fromString("PFX"), + new FieldRef(0, "col1", DataTypes.STRING()), + BinaryString.fromString("SFX"))); + restCatalogServer.addTableColumnMasking( + identifier, ImmutableMap.of("col1", concatWsMaskTransform)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List actual = new ArrayList<>(); + List col2 = new ArrayList<>(); + reader.forEachRemaining( + row -> { + actual.add(row.getString(0).toString()); + col2.add(row.getInt(1)); + }); + assertThat(actual).containsExactly("PFX_abcdef_SFX", "PFX_ghijkl_SFX"); + assertThat(col2).containsExactly(1, 2); + } + + { + // Mask col1 by hashing (SHA-256, hex lowercase) + Transform hashMaskTransform = + new HashMaskTransform( + new FieldRef(0, "col1", DataTypes.STRING()), "sha256", null); + restCatalogServer.addTableColumnMasking( + identifier, ImmutableMap.of("col1", hashMaskTransform)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List actual = new ArrayList<>(); + List col2 = new ArrayList<>(); + reader.forEachRemaining( + row -> { + actual.add(row.getString(0).toString()); + col2.add(row.getInt(1)); + }); + assertThat(actual) + .containsExactly( + "bef57ec7f53a6d40beb640a780a639c83bc29ac8a9816f1fc6c5c6dcd93c4721", + "54f6ee81b58accbc57adbceb0f50264897626060071dc9e92f897e7b373deb93"); + assertThat(col2).containsExactly(1, 2); + } + + { + // Mask col1 as NULL for any type + Transform nullMaskTransform = + new NullTransform(new FieldRef(0, "col1", DataTypes.STRING())); + restCatalogServer.addTableColumnMasking( + identifier, ImmutableMap.of("col1", nullMaskTransform)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List isNull = new ArrayList<>(); + List col2 = new ArrayList<>(); + reader.forEachRemaining( + row -> { + isNull.add(row.isNullAt(0)); + col2.add(row.getInt(1)); + }); + assertThat(isNull).containsExactly(true, true); + assertThat(col2).containsExactly(1, 2); + } + + { + // Mask columns as their type default values: col1 -> "", col2 -> 0 + Transform defaultValueMaskCol1 = + new DefaultValueTransform(new FieldRef(0, "col1", DataTypes.STRING())); + Transform defaultValueMaskCol2 = + new DefaultValueTransform(new FieldRef(1, "col2", DataTypes.INT())); + restCatalogServer.addTableColumnMasking( + identifier, + ImmutableMap.of("col1", defaultValueMaskCol1, "col2", defaultValueMaskCol2)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List col1 = new ArrayList<>(); + List col2 = new ArrayList<>(); + List col1IsNull = new ArrayList<>(); + reader.forEachRemaining( + row -> { + col1IsNull.add(row.isNullAt(0)); + col1.add(row.getString(0).toString()); + col2.add(row.getInt(1)); + }); + assertThat(col1IsNull).containsExactly(false, false); + assertThat(col1).containsExactly("", ""); + assertThat(col2).containsExactly(0, 0); + } + } + + @Test + void testRowFilterWithColumnMasking() throws Exception { + Identifier identifier = Identifier.create("test_table_db", "auth_table_filter_masking"); + catalog.createDatabase(identifier.getDatabaseName(), true); + catalog.createTable( + identifier, + new Schema( + Arrays.asList( + new DataField(0, "col1", DataTypes.INT()), + new DataField(1, "col2", DataTypes.STRING())), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonMap(QUERY_AUTH_ENABLED.key(), "true"), + ""), + true); + + Table table = catalog.getTable(identifier); + + // write four rows: (1, "a"), (2, "b"), (3, "c"), (4, "d") + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + write.write(GenericRow.of(1, BinaryString.fromString("a"))); + write.write(GenericRow.of(2, BinaryString.fromString("b"))); + write.write(GenericRow.of(3, BinaryString.fromString("c"))); + write.write(GenericRow.of(4, BinaryString.fromString("d"))); + List messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + write.close(); + commit.close(); + + // Only allow rows with col1 > 2 + TransformPredicate rowFilterPredicate = + TransformPredicate.of( + new FieldTransform(new FieldRef(0, "col1", DataTypes.INT())), + GreaterThan.INSTANCE, + Collections.singletonList(2)); + restCatalogServer.addTableFilter( + identifier, PredicateJsonSerde.toJsonString(rowFilterPredicate)); + + // Mask col2 as "****" + Transform maskTransform = + new ConcatTransform(Collections.singletonList(BinaryString.fromString("****"))); + restCatalogServer.addTableColumnMasking(identifier, ImmutableMap.of("col2", maskTransform)); + + ReadBuilder readBuilder = table.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + + List actual = new ArrayList<>(); + reader.forEachRemaining( + row -> + actual.add( + String.format( + "%s[%d, %s]", + row.getRowKind().shortString(), + row.getInt(0), + row.getString(1).toString()))); + assertThat(actual).containsExactly("+I[3, ****]", "+I[4, ****]"); + } + private void checkHeader(String headerName, String headerValue) { // Verify that the header were included in the requests List> receivedHeaders = restCatalogServer.getReceivedHeaders(); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 544b4903850c..b51e8c404aa7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -41,6 +41,7 @@ import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; import org.apache.paimon.partition.PartitionUtils; +import org.apache.paimon.predicate.Transform; import org.apache.paimon.rest.auth.AuthProvider; import org.apache.paimon.rest.auth.RESTAuthParameter; import org.apache.paimon.rest.requests.AlterDatabaseRequest; @@ -102,6 +103,7 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import org.apache.paimon.utils.TimeUtils; +import org.apache.paimon.utils.TransformJsonSerde; import org.apache.paimon.view.View; import org.apache.paimon.view.ViewChange; import org.apache.paimon.view.ViewImpl; @@ -185,6 +187,8 @@ public class RESTCatalogServer { private final List noPermissionTables = new ArrayList<>(); private final Map functionStore = new HashMap<>(); private final Map> columnAuthHandler = new HashMap<>(); + private final Map> rowFilterAuthHandler = new HashMap<>(); + private final Map> columnMaskingAuthHandler = new HashMap<>(); public final ConfigResponse configResponse; public final String warehouse; @@ -266,6 +270,25 @@ public void addTableColumnAuth(Identifier identifier, List select) { columnAuthHandler.put(identifier.getFullName(), select); } + public void addTableFilter(Identifier identifier, String filter) { + rowFilterAuthHandler.put(identifier.getFullName(), Collections.singletonList(filter)); + } + + public void addTableColumnMasking(Identifier identifier, Map columnMasking) { + Map serialized = new HashMap<>(); + if (columnMasking != null) { + for (Map.Entry e : columnMasking.entrySet()) { + String column = e.getKey(); + Transform transform = e.getValue(); + if (column == null || transform == null) { + continue; + } + serialized.put(column, TransformJsonSerde.toJsonString(transform)); + } + } + columnMaskingAuthHandler.put(identifier.getFullName(), serialized); + } + public RESTToken getDataToken(Identifier identifier) { return DataTokenStore.getDataToken(warehouse, identifier.getFullName()); } @@ -829,7 +852,23 @@ private MockResponse authTable(Identifier identifier, String data) throws Except } }); } - AuthTableQueryResponse response = new AuthTableQueryResponse(Collections.emptyList()); + List predicates = rowFilterAuthHandler.get(identifier.getFullName()); + Map columnMasking = columnMaskingAuthHandler.get(identifier.getFullName()); + if (columnMasking != null) { + List select = requestBody.select(); + if (select == null) { + select = metadata.schema().fieldNames(); + } + Map filtered = new HashMap<>(); + for (String column : select) { + String mask = columnMasking.get(column); + if (mask != null) { + filtered.put(column, mask); + } + } + columnMasking = filtered; + } + AuthTableQueryResponse response = new AuthTableQueryResponse(predicates, columnMasking); return mockResponse(response, 200); } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/PredicateJsonSerdeTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/PredicateJsonSerdeTest.java new file mode 100644 index 000000000000..23e6d2e52d08 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/PredicateJsonSerdeTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.predicate.ConcatTransform; +import org.apache.paimon.predicate.DefaultValueTransform; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.IsNotNull; +import org.apache.paimon.predicate.NullTransform; +import org.apache.paimon.predicate.PartialMaskTransform; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.predicate.TransformPredicate; +import org.apache.paimon.predicate.UpperTransform; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +class PredicateJsonSerdeTest { + + @Test + void testTransformToJsonAndParseTransform() throws Exception { + Transform transform = + new UpperTransform( + Collections.singletonList(new FieldRef(0, "col1", DataTypes.STRING()))); + + String json = TransformJsonSerde.toJsonString(transform); + Transform parsed = TransformJsonSerde.parse(json); + + assertThat(parsed).isNotNull(); + assertThat(TransformJsonSerde.toJsonString(parsed)).isEqualTo(json); + } + + @Test + void testMaskTransformToJsonAndParseTransform() throws Exception { + Transform transform = + new PartialMaskTransform( + new FieldRef(0, "col1", DataTypes.STRING()), + 2, + 2, + BinaryString.fromString("*")); + + String json = TransformJsonSerde.toJsonString(transform); + Transform parsed = TransformJsonSerde.parse(json); + + assertThat(parsed).isNotNull(); + assertThat(parsed).isInstanceOf(PartialMaskTransform.class); + assertThat(TransformJsonSerde.toJsonString(parsed)).isEqualTo(json); + } + + @Test + void testNullTransformToJsonAndParseTransform() throws Exception { + Transform transform = new NullTransform(new FieldRef(0, "col1", DataTypes.INT())); + + String json = TransformJsonSerde.toJsonString(transform); + Transform parsed = TransformJsonSerde.parse(json); + + assertThat(parsed).isNotNull(); + assertThat(parsed).isInstanceOf(NullTransform.class); + assertThat(TransformJsonSerde.toJsonString(parsed)).isEqualTo(json); + } + + @Test + void testDefaultValueTransformToJsonAndParseTransform() throws Exception { + Transform transform = new DefaultValueTransform(new FieldRef(0, "col1", DataTypes.INT())); + + String json = TransformJsonSerde.toJsonString(transform); + Transform parsed = TransformJsonSerde.parse(json); + + assertThat(parsed).isNotNull(); + assertThat(parsed).isInstanceOf(DefaultValueTransform.class); + assertThat(TransformJsonSerde.toJsonString(parsed)).isEqualTo(json); + } + + @Test + void testPredicateToJsonAndParsePredicate() throws Exception { + Transform transform = + new ConcatTransform( + Arrays.asList( + new FieldRef(1, "col2", DataTypes.STRING()), + BinaryString.fromString("****"))); + + TransformPredicate predicate = + TransformPredicate.of(transform, IsNotNull.INSTANCE, Collections.emptyList()); + String json = PredicateJsonSerde.toJsonString(predicate); + + Predicate parsed = PredicateJsonSerde.parse(json); + assertThat(parsed).isInstanceOf(TransformPredicate.class); + assertThat(PredicateJsonSerde.toJsonString(parsed)).isEqualTo(json); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java index d79aa713c9dd..491aea9ae044 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java @@ -19,7 +19,16 @@ package org.apache.paimon.flink; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.predicate.ConcatTransform; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FieldTransform; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.predicate.TransformPredicate; import org.apache.paimon.rest.RESTToken; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.PredicateJsonSerde; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -34,6 +43,7 @@ import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -81,6 +91,56 @@ public void testWriteAndRead() { .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 22.0D)); } + @Test + public void testRowFilter() { + String rowFilterTable = "row_filter_table"; + batchSql( + String.format( + "CREATE TABLE %s.%s (col1 INT) WITH ('query-auth.enabled' = 'true')", + DATABASE_NAME, rowFilterTable)); + batchSql( + String.format( + "INSERT INTO %s.%s VALUES (1), (2), (3), (4)", + DATABASE_NAME, rowFilterTable)); + + // Only allow rows with col1 > 2 + TransformPredicate rowFilterPredicate = + TransformPredicate.of( + new FieldTransform(new FieldRef(0, "col1", DataTypes.INT())), + GreaterThan.INSTANCE, + Collections.singletonList(2)); + restCatalogServer.addTableFilter( + Identifier.create(DATABASE_NAME, rowFilterTable), + PredicateJsonSerde.toJsonString(rowFilterPredicate)); + + assertThat(batchSql(String.format("SELECT col1 FROM %s.%s", DATABASE_NAME, rowFilterTable))) + .containsExactlyInAnyOrder(Row.of(3), Row.of(4)); + } + + @Test + public void testColumnMasking() { + String maskingTable = "column_masking_table"; + batchSql( + String.format( + "CREATE TABLE %s.%s (id INT, secret STRING) WITH ('query-auth.enabled' = 'true')", + DATABASE_NAME, maskingTable)); + batchSql( + String.format( + "INSERT INTO %s.%s VALUES (1, 's1'), (2, 's2')", + DATABASE_NAME, maskingTable)); + + Transform maskTransform = + new ConcatTransform(Collections.singletonList(BinaryString.fromString("****"))); + restCatalogServer.addTableColumnMasking( + Identifier.create(DATABASE_NAME, maskingTable), + ImmutableMap.of("secret", maskTransform)); + + assertThat(batchSql(String.format("SELECT secret FROM %s.%s", DATABASE_NAME, maskingTable))) + .containsExactlyInAnyOrder(Row.of("****"), Row.of("****")); + assertThat(batchSql(String.format("SELECT id FROM %s.%s", DATABASE_NAME, maskingTable))) + .containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + } + @Test public void testExpiredDataToken() { Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME); diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java index ee8978c68767..199a30189865 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java @@ -20,11 +20,18 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.function.Function; import org.apache.paimon.function.FunctionChange; import org.apache.paimon.function.FunctionDefinition; import org.apache.paimon.function.FunctionImpl; import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.predicate.ConcatTransform; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FieldTransform; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.Transform; +import org.apache.paimon.predicate.TransformPredicate; import org.apache.paimon.rest.RESTCatalogInternalOptions; import org.apache.paimon.rest.RESTCatalogServer; import org.apache.paimon.rest.auth.AuthProvider; @@ -34,6 +41,7 @@ import org.apache.paimon.spark.catalog.WithPaimonCatalog; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.PredicateJsonSerde; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -48,6 +56,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -237,6 +246,46 @@ public void testMapFunction() throws Exception { cleanFunction(functionName); } + @Test + public void testRowFilter() { + spark.sql( + "CREATE TABLE t_row_filter (col1 INT) TBLPROPERTIES" + + " ('bucket'='1', 'bucket-key'='col1', 'file.format'='avro', 'query-auth.enabled'='true')"); + spark.sql("INSERT INTO t_row_filter VALUES (1), (2), (3), (4)"); + + // Only allow rows with col1 > 2 + TransformPredicate rowFilterPredicate = + TransformPredicate.of( + new FieldTransform(new FieldRef(0, "col1", DataTypes.INT())), + GreaterThan.INSTANCE, + Collections.singletonList(2)); + restCatalogServer.addTableFilter( + Identifier.create("db2", "t_row_filter"), + PredicateJsonSerde.toJsonString(rowFilterPredicate)); + + assertThat(spark.sql("SELECT col1 FROM t_row_filter").collectAsList().toString()) + .isEqualTo("[[3], [4]]"); + } + + @Test + public void testColumnMasking() { + spark.sql( + "CREATE TABLE t_column_masking (id INT, secret STRING) TBLPROPERTIES" + + " ('bucket'='1', 'bucket-key'='id', 'file.format'='avro', 'query-auth.enabled'='true')"); + spark.sql("INSERT INTO t_column_masking VALUES (1, 's1'), (2, 's2')"); + + Transform maskTransform = + new ConcatTransform(Collections.singletonList(BinaryString.fromString("****"))); + restCatalogServer.addTableColumnMasking( + Identifier.create("db2", "t_column_masking"), + ImmutableMap.of("secret", maskTransform)); + + assertThat(spark.sql("SELECT secret FROM t_column_masking").collectAsList().toString()) + .isEqualTo("[[****], [****]]"); + assertThat(spark.sql("SELECT id FROM t_column_masking").collectAsList().toString()) + .isEqualTo("[[1], [2]]"); + } + private Catalog getPaimonCatalog() { CatalogManager catalogManager = spark.sessionState().catalogManager(); WithPaimonCatalog withPaimonCatalog = (WithPaimonCatalog) catalogManager.currentCatalog();