diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index 0992e51d2375..386f7dd221b9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -42,6 +42,7 @@ import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider; +import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunctionProvider; import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure; import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure; import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; @@ -135,7 +136,9 @@ public void configure(Binder binder) tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON); tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON); - newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); + Multibinder tableFunctions = newSetBinder(binder, ConnectorTableFunction.class); + tableFunctions.addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); + tableFunctions.addBinding().toProvider(IcebergTablesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(TableChangesFunctionProcessorProviderFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index f23983a4a2c3..0cb4d8d849f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -21,6 +21,7 @@ import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplitSource; +import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunction.IcebergTables; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; @@ -157,6 +158,9 @@ public ConnectorSplitSource getSplits( .toSnapshot(functionHandle.endSnapshotId())); return new ClassLoaderSafeConnectorSplitSource(tableChangesSplitSource, IcebergSplitManager.class.getClassLoader()); } + if (function instanceof IcebergTables icebergTables) { + return new ClassLoaderSafeConnectorSplitSource(new FixedSplitSource(icebergTables), IcebergSplitManager.class.getClassLoader()); + } throw new IllegalStateException("Unknown table function: " + function); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java index 07326a42bcef..b23b543a3cbf 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java @@ -14,12 +14,20 @@ package io.trino.plugin.iceberg.functions; import com.google.inject.Inject; +import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider; import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProviderFactory; +import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory; +import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunction; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.function.FunctionProvider; import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.function.table.TableFunctionProcessorProvider; import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; +import io.trino.spi.function.table.TableFunctionSplitProcessor; import static java.util.Objects.requireNonNull; @@ -40,6 +48,28 @@ public TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFa if (functionHandle instanceof TableChangesFunctionHandle) { return new ClassLoaderSafeTableFunctionProcessorProviderFactory(tableChangesFunctionProcessorProviderFactory, getClass().getClassLoader()); } + if (functionHandle instanceof IcebergTablesFunction.IcebergTables) { + ClassLoader classLoader = getClass().getClassLoader(); + return new TableFunctionProcessorProviderFactory() + { + @Override + public TableFunctionProcessorProvider createTableFunctionProcessorProvider() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return new ClassLoaderSafeTableFunctionProcessorProvider(new TableFunctionProcessorProvider() + { + @Override + public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle, ConnectorSplit split) + { + return new ClassLoaderSafeTableFunctionSplitProcessor( + new IcebergTablesFunction.IcebergTablesProcessor(((IcebergTablesFunction.IcebergTables) split).tables()), + getClass().getClassLoader()); + } + }, classLoader); + } + } + }; + } throw new UnsupportedOperationException("Unsupported function: " + functionHandle); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tables/IcebergTablesFunction.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tables/IcebergTablesFunction.java new file mode 100644 index 000000000000..345c527a6ecf --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tables/IcebergTablesFunction.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tables; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.Page; +import io.trino.spi.block.VariableWidthBlockBuilder; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.function.table.AbstractConnectorTableFunction; +import io.trino.spi.function.table.Argument; +import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.function.table.Descriptor; +import io.trino.spi.function.table.ScalarArgument; +import io.trino.spi.function.table.ScalarArgumentSpecification; +import io.trino.spi.function.table.TableFunctionAnalysis; +import io.trino.spi.function.table.TableFunctionProcessorState; +import io.trino.spi.function.table.TableFunctionSplitProcessor; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; +import static io.trino.spi.function.table.TableFunctionProcessorState.Processed.produced; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class IcebergTablesFunction + extends AbstractConnectorTableFunction +{ + private static final String FUNCTION_NAME = "iceberg_tables"; + private static final String SCHEMA_NAME_VAR_NAME = "SCHEMA_NAME"; + + private final TrinoCatalogFactory trinoCatalogFactory; + + public IcebergTablesFunction(TrinoCatalogFactory trinoCatalogFactory) + { + super( + "system", + FUNCTION_NAME, + ImmutableList.of( + ScalarArgumentSpecification.builder() + .name(SCHEMA_NAME_VAR_NAME) + .type(VARCHAR) + .defaultValue(null) + .build()), + GENERIC_TABLE); + this.trinoCatalogFactory = requireNonNull(trinoCatalogFactory, "trinoCatalogFactory is null"); + } + + @Override + public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments, ConnectorAccessControl accessControl) + { + ScalarArgument argument = (ScalarArgument) getOnlyElement(arguments.values()); + Optional schemaFilter = Optional.ofNullable(((Slice) argument.getValue())).map(Slice::toStringUtf8); + + TrinoCatalog catalog = trinoCatalogFactory.create(session.getIdentity()); + List tables = catalog.listIcebergTables(session, schemaFilter); + Set filtered = accessControl.filterTables(null, ImmutableSet.copyOf(tables)); + return TableFunctionAnalysis.builder() + .returnedType(new Descriptor(ImmutableList.of( + new Descriptor.Field("table_schema", Optional.of(VARCHAR)), + new Descriptor.Field("table_name", Optional.of(VARCHAR))))) + .handle(new IcebergTables(filtered)) + .build(); + } + + public record IcebergTables(Collection tables) + implements ConnectorTableFunctionHandle, ConnectorSplit + { + public IcebergTables + { + requireNonNull(tables, "tables is null"); + } + } + + public static class IcebergTablesProcessor + implements TableFunctionSplitProcessor + { + private final Collection tables; + private boolean finished; + + public IcebergTablesProcessor(Collection tables) + { + this.tables = requireNonNull(tables, "tables is null"); + } + + @Override + public TableFunctionProcessorState process() + { + if (finished) { + return FINISHED; + } + + VariableWidthBlockBuilder schema = VARCHAR.createBlockBuilder(null, tables.size()); + VariableWidthBlockBuilder tableName = VARCHAR.createBlockBuilder(null, tables.size()); + for (SchemaTableName table : tables) { + schema.writeEntry(Slices.utf8Slice(table.getSchemaName())); + tableName.writeEntry(Slices.utf8Slice(table.getTableName())); + } + finished = true; + return produced(new Page(tables.size(), schema.build(), tableName.build())); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tables/IcebergTablesFunctionProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tables/IcebergTablesFunctionProvider.java new file mode 100644 index 000000000000..a0d281423f04 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tables/IcebergTablesFunctionProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.functions.tables; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.function.table.ConnectorTableFunction; + +import static java.util.Objects.requireNonNull; + +public class IcebergTablesFunctionProvider + implements Provider +{ + private final TrinoCatalogFactory trinoCatalogFactory; + + @Inject + public IcebergTablesFunctionProvider(TrinoCatalogFactory trinoCatalogFactory) + { + this.trinoCatalogFactory = requireNonNull(trinoCatalogFactory, "trinoCatalogFactory is null"); + } + + @Override + public ConnectorTableFunction get() + { + return new ClassLoaderSafeConnectorTableFunction( + new IcebergTablesFunction(trinoCatalogFactory), + getClass().getClassLoader()); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index c9fef25627a5..745894d2d2ff 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -22,6 +22,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import org.apache.iceberg.FileFormat; @@ -61,6 +62,7 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.joining; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @@ -835,6 +837,56 @@ public void testCreateOrReplaceWithTableChangesFunction() } } + @Test + public void testIcebergTablesFunction() + throws Exception + { + String schemaName = getSession().getSchema().orElseThrow(); + String firstSchema = "first_schema_" + randomNameSuffix(); + String secondSchema = "second_schema_" + randomNameSuffix(); + String firstSchemaLocation = schemaPath().replaceAll(schemaName, firstSchema); + String secondSchemaLocation = schemaPath().replaceAll(schemaName, secondSchema); + assertQuerySucceeds("CREATE SCHEMA " + firstSchema + " WITH (location = '%s')".formatted(firstSchemaLocation)); + assertQuerySucceeds("CREATE SCHEMA " + secondSchema + " WITH (location = '%s')".formatted(secondSchemaLocation)); + QueryRunner queryRunner = getQueryRunner(); + Session firstSchemaSession = Session.builder(queryRunner.getDefaultSession()).setSchema(firstSchema).build(); + Session secondSchemaSession = Session.builder(queryRunner.getDefaultSession()).setSchema(secondSchema).build(); + + try (TestTable _ = new TestTable( + sql -> getQueryRunner().execute(firstSchemaSession, sql), + "first_schema_table1_", + "(id int)"); + TestTable _ = new TestTable( + sql -> getQueryRunner().execute(firstSchemaSession, sql), + "first_schema_table2_", + "(id int)"); + TestTable secondSchemaTable = new TestTable( + sql -> queryRunner.execute(secondSchemaSession, sql), + "second_schema_table_", + "(id int)"); + AutoCloseable _ = createAdditionalTables(firstSchema)) { + String firstSchemaTablesValues = "VALUES " + getQueryRunner() + .execute("SELECT table_schema, table_name FROM iceberg.information_schema.tables WHERE table_schema='%s'".formatted(firstSchema)) + .getMaterializedRows().stream() + .map(row -> "('%s', '%s')".formatted(row.getField(0), row.getField(1))) + .collect(joining(", ")); + String bothSchemasTablesValues = firstSchemaTablesValues + ", ('%s', '%s')".formatted(secondSchema, secondSchemaTable.getName()); + assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(SCHEMA_NAME => '%s'))".formatted(firstSchema), firstSchemaTablesValues); + assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(null)) WHERE table_schema = '%s'".formatted(firstSchema), firstSchemaTablesValues); + assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables()) WHERE table_schema in ('%s', '%s')".formatted(firstSchema, secondSchema), bothSchemasTablesValues); + assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(null)) WHERE table_schema in ('%s', '%s')".formatted(firstSchema, secondSchema), bothSchemasTablesValues); + } + finally { + assertQuerySucceeds("DROP SCHEMA " + firstSchema); + assertQuerySucceeds("DROP SCHEMA " + secondSchema); + } + } + + protected AutoCloseable createAdditionalTables(String schema) + { + return () -> {}; + } + private long getMostRecentSnapshotId(String tableName) { return (long) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName)) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java index 3144bd56bb02..91474fa6fa8c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java @@ -13,10 +13,14 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.minio.messages.Event; import io.trino.Session; +import io.trino.metastore.Column; import io.trino.metastore.HiveMetastore; +import io.trino.metastore.HiveType; +import io.trino.metastore.Table; import io.trino.plugin.hive.containers.Hive3MinioDataLake; import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; import io.trino.testing.QueryRunner; @@ -28,18 +32,25 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.metastore.PrincipalPrivileges.NO_PRIVILEGES; +import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; +import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; +import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; import static io.trino.testing.containers.Minio.MINIO_REGION; import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; import static java.lang.String.format; import static java.util.Locale.ENGLISH; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -233,6 +244,25 @@ public void testPathContainsSpecialCharacter() assertUpdate("DROP TABLE " + tableName); } + @Override + protected AutoCloseable createAdditionalTables(String schema) + { + HiveMetastore metastore = getHiveMetastore(getQueryRunner()); + // simulate iceberg table created by spark with lowercase table type + Table lowerCaseTableType = io.trino.metastore.Table.builder() + .setDatabaseName(schema) + .setTableName("lowercase_type_" + randomNameSuffix()) + .setOwner(Optional.empty()) + .setDataColumns(ImmutableList.of(new Column("id", HiveType.HIVE_STRING, Optional.empty(), ImmutableMap.of()))) + .setTableType(EXTERNAL_TABLE.name()) + .withStorage(storage -> storage.setStorageFormat(ICEBERG_METASTORE_STORAGE_FORMAT)) + .setParameter("EXTERNAL", "TRUE") + .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(ENGLISH)) + .build(); + metastore.createTable(lowerCaseTableType, NO_PRIVILEGES); + return () -> metastore.dropTable(lowerCaseTableType.getDatabaseName(), lowerCaseTableType.getTableName(), true); + } + private String onMetastore(@Language("SQL") String sql) { return hiveMinioDataLake.getHiveHadoop().runOnMetastore(sql); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java index d059623c8294..b755bcea9859 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseSharedMetastoreTest.java @@ -152,6 +152,13 @@ public void testShowSchemas() assertThat(showCreateIcebergWithRedirectionsSchema).isEqualTo(getExpectedIcebergCreateSchema("iceberg_with_redirections")); } + @Test + public void testIcebergTablesFunction() + { + assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(SCHEMA_NAME => '%s'))".formatted(tpchSchema), "VALUES ('%s', 'nation')".formatted(tpchSchema)); + assertQuery("SELECT * FROM TABLE(iceberg_with_redirections.system.iceberg_tables(SCHEMA_NAME => '%s'))".formatted(tpchSchema), "VALUES ('%s', 'nation')".formatted(tpchSchema)); + } + @Test public void testTimeTravelWithRedirection() throws InterruptedException diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestSharedHiveThriftMetastore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestSharedHiveThriftMetastore.java new file mode 100644 index 000000000000..6228db8ed7c6 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestSharedHiveThriftMetastore.java @@ -0,0 +1,181 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.plugin.hive.TestingHivePlugin; +import io.trino.plugin.hive.containers.Hive3MinioDataLake; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.plugin.tpch.TpchPlugin; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.nio.file.Path; +import java.util.Map; + +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_REGION; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.lang.String.format; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; + +@TestInstance(PER_CLASS) +@Execution(CONCURRENT) +public class TestSharedHiveThriftMetastore + extends BaseSharedMetastoreTest +{ + private static final String HIVE_CATALOG = "hive"; + private String bucketName; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + bucketName = "test-iceberg-shared-metastore" + randomNameSuffix(); + HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new Hive3MinioDataLake(bucketName)); + hiveMinioDataLake.start(); + + Session icebergSession = testSessionBuilder() + .setCatalog(ICEBERG_CATALOG) + .setSchema(tpchSchema) + .build(); + Session hiveSession = testSessionBuilder() + .setCatalog(HIVE_CATALOG) + .setSchema(tpchSchema) + .build(); + + QueryRunner queryRunner = DistributedQueryRunner.builder(icebergSession).build(); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + Path dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"); + dataDirectory.toFile().deleteOnExit(); + + queryRunner.installPlugin(new IcebergPlugin()); + queryRunner.createCatalog( + ICEBERG_CATALOG, + "iceberg", + ImmutableMap.builder() + .put("iceberg.catalog.type", "HIVE_METASTORE") + .put("hive.metastore.uri", hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString()) + .put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout + .put("fs.hadoop.enabled", "false") + .put("fs.native-s3.enabled", "true") + .put("s3.aws-access-key", MINIO_ACCESS_KEY) + .put("s3.aws-secret-key", MINIO_SECRET_KEY) + .put("s3.region", MINIO_REGION) + .put("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress()) + .put("s3.path-style-access", "true") + .put("s3.streaming.part-size", "5MB") // minimize memory usage + .put("s3.max-connections", "2") // verify no leaks + .put("iceberg.register-table-procedure.enabled", "true") + .put("iceberg.writer-sort-buffer-size", "1MB") + .buildOrThrow()); + queryRunner.createCatalog( + "iceberg_with_redirections", + "iceberg", + ImmutableMap.builder() + .put("iceberg.catalog.type", "HIVE_METASTORE") + .put("hive.metastore.uri", hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString()) + .put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout + .put("fs.hadoop.enabled", "false") + .put("fs.native-s3.enabled", "true") + .put("s3.aws-access-key", MINIO_ACCESS_KEY) + .put("s3.aws-secret-key", MINIO_SECRET_KEY) + .put("s3.region", MINIO_REGION) + .put("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress()) + .put("s3.path-style-access", "true") + .put("s3.streaming.part-size", "5MB") // minimize memory usage + .put("s3.max-connections", "2") // verify no leaks + .put("iceberg.register-table-procedure.enabled", "true") + .put("iceberg.writer-sort-buffer-size", "1MB") + .put("iceberg.hive-catalog-name", "hive") + .buildOrThrow()); + + queryRunner.installPlugin(new TestingHivePlugin(dataDirectory)); + Map hiveProperties = ImmutableMap.builder() + .put("hive.metastore", "thrift") + .put("hive.metastore.uri", hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString()) + .put("fs.hadoop.enabled", "false") + .put("fs.native-s3.enabled", "true") + .put("s3.aws-access-key", MINIO_ACCESS_KEY) + .put("s3.aws-secret-key", MINIO_SECRET_KEY) + .put("s3.region", MINIO_REGION) + .put("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress()) + .put("s3.path-style-access", "true") + .put("s3.streaming.part-size", "5MB") + .put("hive.max-partitions-per-scan", "1000") + .put("hive.max-partitions-for-eager-load", "1000") + .put("hive.security", "allow-all") + .buildOrThrow(); + queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties); + queryRunner.createCatalog( + "hive_with_redirections", + "hive", + ImmutableMap.builder() + .putAll(hiveProperties).put("hive.iceberg-catalog-name", "iceberg") + .buildOrThrow()); + + queryRunner.execute("CREATE SCHEMA " + tpchSchema + " WITH (location = 's3://" + bucketName + "/" + tpchSchema + "')"); + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, icebergSession, ImmutableList.of(TpchTable.NATION)); + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, hiveSession, ImmutableList.of(TpchTable.REGION)); + queryRunner.execute("CREATE SCHEMA " + testSchema + " WITH (location = 's3://" + bucketName + "/" + testSchema + "')"); + + return queryRunner; + } + + @AfterAll + public void cleanup() + { + assertQuerySucceeds("DROP TABLE IF EXISTS hive." + tpchSchema + ".region"); + assertQuerySucceeds("DROP TABLE IF EXISTS iceberg." + tpchSchema + ".nation"); + assertQuerySucceeds("DROP SCHEMA IF EXISTS hive." + tpchSchema); + assertQuerySucceeds("DROP SCHEMA IF EXISTS hive." + testSchema); + } + + @Override + protected String getExpectedHiveCreateSchema(String catalogName) + { + return """ + CREATE SCHEMA %s.%s + WITH ( + location = 's3://%s/%s' + )""" + .formatted(catalogName, tpchSchema, bucketName, tpchSchema); + } + + @Override + protected String getExpectedIcebergCreateSchema(String catalogName) + { + String expectedIcebergCreateSchema = "CREATE SCHEMA %s.%s\n" + + "AUTHORIZATION USER user\n" + + "WITH (\n" + + " location = 's3://%s/%s'\n" + + ")"; + return format(expectedIcebergCreateSchema, catalogName, tpchSchema, bucketName, tpchSchema); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java index a6cdc1793d2b..01f87a4b3d73 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java @@ -95,7 +95,7 @@ protected String getMetadataLocation(String tableName) @Override protected String schemaPath() { - return format("%s/%s", warehouseLocation, getSession().getSchema()); + return format("%s/%s", warehouseLocation, getSession().getSchema().orElseThrow()); } @Override @@ -470,7 +470,7 @@ public void testDropTableWithMissingDataFile() public void testDropTableWithNonExistentTableLocation() { assertThatThrownBy(super::testDropTableWithNonExistentTableLocation) - .hasMessageContaining("Access Denied"); + .hasStackTraceContaining("Access Denied"); } @Test @@ -520,4 +520,12 @@ public void testTruncateTable() assertThatThrownBy(super::testTruncateTable) .hasMessageContaining("Access Denied"); } + + @Test + @Override + public void testIcebergTablesFunction() + { + assertThatThrownBy(super::testIcebergTablesFunction) + .hasStackTraceContaining("Access Denied"); + } }