Skip to content

Commit

Permalink
(fix) Parse the startingTimestamp at api level and not at service lev…
Browse files Browse the repository at this point in the history
…el (#200)
  • Loading branch information
agilelab-tmnd1991 authored Jan 29, 2024
1 parent 79c1eab commit ac9b809
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 70 deletions.
2 changes: 1 addition & 1 deletion server/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ tasks.jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = BigDecimal.valueOf(0.76)
minimum = BigDecimal.valueOf(0.75)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,41 @@ public Response getTableMetadata(
String share,
String schema,
String table,
String startingTimestamp,
String startingTimestampStr,
String deltaSharingCapabilities) {
return wrapExceptions(
() -> optionalToNotFound(
deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp),
m -> optionalToNotFound(
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
v -> Response.ok(
tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)),
ndjsonMediaType)
.status(Response.Status.OK.getStatusCode())
.header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v))
.header(
DELTA_SHARE_CAPABILITIES_HEADER,
getResponseFormatHeader(
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
.build())),
() -> {
var startingTimestamp = parseTimestamp(startingTimestampStr);
return optionalToNotFound(
deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp),
m -> optionalToNotFound(
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
v -> Response.ok(
tableResponseSerializer.serialize(
DeltaMappers.toTableResponseMetadata(m)),
ndjsonMediaType)
.status(Response.Status.OK.getStatusCode())
.header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v))
.header(
DELTA_SHARE_CAPABILITIES_HEADER,
getResponseFormatHeader(
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
.build()));
},
exceptionToResponse);
}

@Override
public Response getTableVersion(
String share, String schema, String table, String startingTimestamp) {
String share, String schema, String table, String startingTimestampStr) {

return wrapExceptions(
() -> optionalToNotFound(
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
t -> Response.ok().header(DELTA_TABLE_VERSION_HEADER, t).build()),
() -> {
var startingTimestamp = parseTimestamp(startingTimestampStr);
return optionalToNotFound(
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
t -> Response.ok().header(DELTA_TABLE_VERSION_HEADER, t).build());
},
exceptionToResponse);
}

Expand Down
17 changes: 17 additions & 0 deletions server/app/src/main/java/io/whitefox/api/server/ApiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import io.whitefox.core.services.exceptions.AlreadyExists;
import io.whitefox.core.services.exceptions.NotFound;
import jakarta.ws.rs.core.Response;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
Expand All @@ -33,6 +37,12 @@ public interface ApiUtils extends DeltaHeaders {
.errorCode("NOT FOUND")
.message(ExceptionUtil.generateStackTrace(t)))
.build();
} else if (t instanceof DateTimeParseException) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(new CommonErrorResponse()
.errorCode("BAD REQUEST - timestamp provided is not formatted correctly")
.message(ExceptionUtil.generateStackTrace(t)))
.build();
} else {
return Response.status(Response.Status.BAD_GATEWAY)
.entity(new CommonErrorResponse()
Expand Down Expand Up @@ -79,4 +89,11 @@ default Principal getRequestPrincipal() {
default Principal resolvePrincipal(String s) {
return new Principal(s);
}

default Optional<Timestamp> parseTimestamp(String timestamp) {
return Optional.ofNullable(timestamp)
.map(ts -> new Timestamp(OffsetDateTime.parse(ts, DateTimeFormatter.ISO_OFFSET_DATE_TIME)
.toInstant()
.toEpochMilli()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public void getTableVersionBadTimestamp() {
"default",
"table1")
.then()
.statusCode(502);
.statusCode(Response.Status.BAD_REQUEST.getStatusCode());
}

@DisabledOnOs(OS.WINDOWS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import io.delta.standalone.Snapshot;
import io.whitefox.core.*;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -54,7 +52,7 @@ public static DeltaSharedTable of(SharedTable sharedTable) {
return of(sharedTable, TableSchemaConverter.INSTANCE, new HadoopConfigBuilder());
}

public Optional<Metadata> getMetadata(Optional<String> startingTimestamp) {
public Optional<Metadata> getMetadata(Optional<Timestamp> startingTimestamp) {
return getSnapshot(startingTimestamp).map(this::metadataFromSnapshot);
}

Expand All @@ -74,7 +72,7 @@ private Metadata metadataFromSnapshot(Snapshot snapshot) {
);
}

public Optional<Long> getTableVersion(Optional<String> startingTimestamp) {
public Optional<Long> getTableVersion(Optional<Timestamp> startingTimestamp) {
return getSnapshot(startingTimestamp).map(Snapshot::getVersion);
}

Expand Down Expand Up @@ -106,9 +104,8 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) {
snapshot.getVersion());
}

private Optional<Snapshot> getSnapshot(Optional<String> startingTimestamp) {
private Optional<Snapshot> getSnapshot(Optional<Timestamp> startingTimestamp) {
return startingTimestamp
.map(this::getTimestamp)
.map(Timestamp::getTime)
.map(this::getSnapshotForTimestampAsOf)
.orElse(Optional.of(getSnapshot()));
Expand All @@ -131,12 +128,6 @@ private String location() {
return location.replaceAll("/+$", "");
}

private Timestamp getTimestamp(String timestamp) {
return new Timestamp(OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_OFFSET_DATE_TIME)
.toInstant()
.toEpochMilli());
}

public static class DeltaShareTableFormat {
public static final String RESPONSE_FORMAT_PARQUET = "parquet";
public static final String RESPONSE_FORMAT_DELTA = "delta";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@
import io.whitefox.core.Schema;
import io.whitefox.core.Share;
import io.whitefox.core.SharedTable;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;

public interface DeltaSharesService {

Optional<Long> getTableVersion(
String share, String schema, String table, String startingTimestamp);
String share, String schema, String table, Optional<Timestamp> startingTimestamp);

ContentAndToken<List<Share>> listShares(
Optional<ContentAndToken.Token> nextPageToken, Optional<Integer> maxResults);

Optional<Metadata> getTableMetadata(
String share, String schema, String table, String startingTimestamp);
String share, String schema, String table, Optional<Timestamp> startingTimestamp);

Optional<ContentAndToken<List<Schema>>> listSchemas(
String share, Optional<ContentAndToken.Token> nextPageToken, Optional<Integer> maxResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.whitefox.persistence.StorageManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.sql.Timestamp;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -33,13 +34,13 @@ public DeltaSharesServiceImpl(

@Override
public Optional<Long> getTableVersion(
String share, String schema, String table, String startingTimestamp) {
String share, String schema, String table, Optional<Timestamp> startingTimestamp) {
return storageManager
.getSharedTable(share, schema, table)
.map(t -> tableLoaderFactory
.newTableLoader(t.internalTable())
.loadTable(t)
.getTableVersion(Optional.ofNullable(startingTimestamp)))
.getTableVersion(startingTimestamp))
.orElse(Optional.empty());
}

Expand All @@ -60,11 +61,11 @@ public ContentAndToken<List<Share>> listShares(

@Override
public Optional<Metadata> getTableMetadata(
String share, String schema, String table, String startingTimestamp) {
String share, String schema, String table, Optional<Timestamp> startingTimestamp) {
return storageManager.getSharedTable(share, schema, table).flatMap(t -> tableLoaderFactory
.newTableLoader(t.internalTable())
.loadTable(t)
.getMetadata(Optional.ofNullable(startingTimestamp)));
.getMetadata(startingTimestamp));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import io.whitefox.core.ReadTableResultToBeSigned;
import io.whitefox.core.TableSchema;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
Expand Down Expand Up @@ -34,7 +32,7 @@ public static IcebergSharedTable of(Table icebergTable) {
return new IcebergSharedTable(icebergTable, new TableSchemaConverter());
}

public Optional<Metadata> getMetadata(Optional<String> startingTimestamp) {
public Optional<Metadata> getMetadata(Optional<Timestamp> startingTimestamp) {
return getSnapshot(startingTimestamp).map(this::getMetadataFromSnapshot);
}

Expand All @@ -56,9 +54,8 @@ private Metadata getMetadataFromSnapshot(Snapshot snapshot) {
);
}

private Optional<Snapshot> getSnapshot(Optional<String> startingTimestamp) {
private Optional<Snapshot> getSnapshot(Optional<Timestamp> startingTimestamp) {
return startingTimestamp
.map(this::getTimestamp)
.map(Timestamp::getTime)
.map(this::getSnapshotForTimestampAsOf)
.orElseGet(() -> Optional.ofNullable(icebergTable.currentSnapshot()));
Expand All @@ -73,14 +70,8 @@ private Optional<Snapshot> getSnapshotForTimestampAsOf(long l) {
}
}

private Timestamp getTimestamp(String timestamp) {
return new Timestamp(OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_OFFSET_DATE_TIME)
.toInstant()
.toEpochMilli());
}

@Override
public Optional<Long> getTableVersion(Optional<String> startingTimestamp) {
public Optional<Long> getTableVersion(Optional<Timestamp> startingTimestamp) {
return getSnapshot(startingTimestamp).map(Snapshot::sequenceNumber);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import io.whitefox.core.Metadata;
import io.whitefox.core.ReadTableRequest;
import io.whitefox.core.ReadTableResultToBeSigned;
import java.sql.Timestamp;
import java.util.Optional;

public interface InternalSharedTable {

Optional<Metadata> getMetadata(Optional<String> startingTimestamp);
Optional<Metadata> getMetadata(Optional<Timestamp> startingTimestamp);

Optional<Long> getTableVersion(Optional<String> startingTimestamp);
Optional<Long> getTableVersion(Optional<Timestamp> startingTimestamp);

ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ public void getDeltaTableMetadata() {
StorageManager storageManager = new InMemoryStorageManager(shares);
DeltaSharesService deltaSharesService =
new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory);
var tableMetadata = deltaSharesService.getTableMetadata("name", "default", "table1", null);
var tableMetadata =
deltaSharesService.getTableMetadata("name", "default", "table1", Optional.empty());
Assertions.assertTrue(tableMetadata.isPresent());
Assertions.assertEquals(
"56d48189-cdbc-44f2-9b0e-2bded4c79ed7", tableMetadata.get().id());
Expand All @@ -223,7 +224,8 @@ public void tableMetadataNotFound() {
StorageManager storageManager = new InMemoryStorageManager(shares);
DeltaSharesService deltaSharesService =
new DeltaSharesServiceImpl(storageManager, 100, tableLoaderFactory, fileSignerFactory);
var resultTable = deltaSharesService.getTableMetadata("name", "default", "tableNotFound", null);
var resultTable =
deltaSharesService.getTableMetadata("name", "default", "tableNotFound", Optional.empty());
Assertions.assertTrue(resultTable.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static org.wildfly.common.Assert.assertTrue;

import io.whitefox.core.SharedTable;
import java.time.format.DateTimeParseException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -51,24 +50,15 @@ void getTableVersionNonExistingTable() throws ExecutionException, InterruptedExc
void getTableVersionWithTimestamp() throws ExecutionException, InterruptedException {
var PTable = new SharedTable("delta-table", "default", "share1", deltaTable("delta-table"));
var DTable = DeltaSharedTable.of(PTable);
var version = DTable.getTableVersion(Optional.of("2023-09-30T10:15:30+01:00"));
var version = DTable.getTableVersion(TestDateUtils.parseTimestamp("2023-09-30T10:15:30+01:00"));
assertEquals(Optional.empty(), version);
}

@Test
void getTableVersionWithFutureTimestamp() throws ExecutionException, InterruptedException {
var PTable = new SharedTable("delta-table", "default", "share1", deltaTable("delta-table"));
var DTable = DeltaSharedTable.of(PTable);
var version = DTable.getTableVersion(Optional.of("2024-10-20T10:15:30+01:00"));
var version = DTable.getTableVersion(TestDateUtils.parseTimestamp("2024-10-20T10:15:30+01:00"));
assertEquals(Optional.empty(), version);
}

@Test
void getTableVersionWithMalformedTimestamp() throws ExecutionException, InterruptedException {
var PTable = new SharedTable("delta-table", "default", "share1", deltaTable("delta-table"));
var DTable = DeltaSharedTable.of(PTable);
assertThrows(
DateTimeParseException.class,
() -> DTable.getTableVersion(Optional.of("221rfewdsad10:15:30+01:00")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void getTableMetadataWithTimestamp() {
"share1",
icebergTableWithHadoopCatalog("test_db", "icebergtable2"));
var DTable = icebergTableLoader.loadTable(PTable);
var metadata = DTable.getMetadata(Optional.of("2024-01-25T01:32:15+01:00"));
var metadata = DTable.getMetadata(TestDateUtils.parseTimestamp("2024-01-25T01:32:15+01:00"));
assertTrue(metadata.isPresent());
assertEquals("2174306913745765008", metadata.get().id());
}
Expand Down Expand Up @@ -71,7 +71,7 @@ void getTableVersionWithTimestamp() {
"share1",
icebergTableWithHadoopCatalog("test_db", "icebergtable2"));
var DTable = icebergTableLoader.loadTable(PTable);
var version = DTable.getTableVersion(Optional.of("2024-01-25T01:32:15+01:00"));
var version = DTable.getTableVersion(TestDateUtils.parseTimestamp("2024-01-25T01:32:15+01:00"));
assertTrue(version.isPresent());
assertEquals(1, version.get());
}
Expand All @@ -84,7 +84,7 @@ void getTableVersionWithTooOldTimestamp() {
"share1",
icebergTableWithHadoopCatalog("test_db", "icebergtable2"));
var DTable = icebergTableLoader.loadTable(PTable);
var version = DTable.getTableVersion(Optional.of("2024-01-24T01:32:15+01:00"));
var version = DTable.getTableVersion(TestDateUtils.parseTimestamp("2024-01-24T01:32:15+01:00"));
assertTrue(version.isEmpty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.whitefox.core.services;

import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;

public class TestDateUtils {
public static Optional<Timestamp> parseTimestamp(String timestamp) {
return Optional.ofNullable(timestamp)
.map(ts -> new Timestamp(OffsetDateTime.parse(ts, DateTimeFormatter.ISO_OFFSET_DATE_TIME)
.toInstant()
.toEpochMilli()));
}
}

0 comments on commit ac9b809

Please sign in to comment.