Skip to content

Commit

Permalink
added iceberg table to share
Browse files Browse the repository at this point in the history
  • Loading branch information
duhizjame committed Jan 29, 2024
1 parent dc4c4ed commit d2878e6
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ public class ITDeltaSharingClient implements DatasetComparer, ScalaUtils {

private final StorageManagerInitializer storageManagerInitializer;
private final String deltaTablePath;

private final String icebergTablePath;

public ITDeltaSharingClient() {
this.storageManagerInitializer = new StorageManagerInitializer();
this.deltaTablePath =
TablePath.getDeltaTablePath(getClass().getClassLoader().getResource("MrFoxProfile.json"));
this.icebergTablePath =
TablePath.getIcebergTablePath(getClass().getClassLoader().getResource("MrFoxProfile.json"));
}

@BeforeAll
Expand All @@ -37,10 +39,10 @@ static void initStorageManager() {
}

@Test
void showS3Table1withQueryTableApi() {
void showS3IcebergTable1withQueryTableApi() {
var spark = TestSparkSession.newSparkSession();
storageManagerInitializer.createS3DeltaTable();
var ds = spark.read().format("deltaSharing").load(deltaTablePath);
storageManagerInitializer.createIcebergTableWithGlueMetastore();
var ds = spark.read().format("deltaSharing").load(icebergTablePath);
var expectedSchema = new StructType(new StructField[] {
new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap()))
});
Expand All @@ -61,23 +63,23 @@ void showS3Table1withQueryTableApi() {
}

@Test
void showS3IcebergTableWithQueryTableApi() {
void showS3Table1withQueryTableApi() {
var spark = TestSparkSession.newSparkSession();
registerAnIcebergTable();
storageManagerInitializer.createS3DeltaTable();
var ds = spark.read().format("deltaSharing").load(deltaTablePath);
var expectedSchema = new StructType(new StructField[] {
new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap()))
new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap()))
});
var expectedData = spark
.createDataFrame(
List.of(
new MrFoxDeltaTableSchema(0),
new MrFoxDeltaTableSchema(3),
new MrFoxDeltaTableSchema(2),
new MrFoxDeltaTableSchema(1),
new MrFoxDeltaTableSchema(4)),
MrFoxDeltaTableSchema.class)
.toDF();
.createDataFrame(
List.of(
new MrFoxDeltaTableSchema(0),
new MrFoxDeltaTableSchema(3),
new MrFoxDeltaTableSchema(2),
new MrFoxDeltaTableSchema(1),
new MrFoxDeltaTableSchema(4)),
MrFoxDeltaTableSchema.class)
.toDF();

assertEquals(expectedSchema, ds.schema());
assertEquals(5, ds.count());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ public TableInfo createIcebergTableWithGlueMetastore() {
var provider = ApiUtils.recoverConflictLazy(
() -> providerV1Api.addProvider(providerRequest),
() -> providerV1Api.getProvider(providerRequest.getName()));
var schemaRequest = createSchemaRequest(TableFormat.iceberg);
var shareRequest = createShareRequest();
ignoreConflict(() -> schemaV1Api.createSchema(shareRequest.getName(), schemaRequest));
var createTableRequest = createIcebergTableRequest();
ignoreConflict(() -> schemaV1Api.addTableToSchema(
shareRequest.getName(),
schemaRequest,
addTableToSchemaRequest(providerRequest.getName(), createTableRequest.getName())));
return ApiUtils.recoverConflictLazy(
() -> tableV1Api.createTableInProvider(provider.getName(), createTableRequest),
() -> tableV1Api.describeTableInProvider(provider.getName(), createTableRequest.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

public class TablePath {

public static String getIcebergTablePath(URL resource) {
return String.format("%s#%s.%s.%s", resource, "s3share", "s3schemaiceberg", "s3IcebergTable1");
}

public static String getDeltaTablePath(URL resource) {
return String.format("%s#%s.%s.%s", resource, "s3share", "s3schemadelta", "s3Table1");
}
Expand Down

0 comments on commit d2878e6

Please sign in to comment.