Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3969 - Rename ingest job status store to tracker #3975

Draft
wants to merge 28 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
76296a3
Remove IngestJob from IngestJobAddedFilesEvent
patchwork01 Dec 18, 2024
abf3313
Remove IngestJob from IngestJobFailedEvent
patchwork01 Dec 18, 2024
6d2b12a
Remove IngestJob from IngestJobFinishedEvent
patchwork01 Dec 18, 2024
9821322
Remove IngestJob from IngestJobStartedEvent
patchwork01 Dec 18, 2024
ab7c552
Remove IngestJob from IngestJobValidatedEvent
patchwork01 Dec 18, 2024
aaefe36
Move events to core module except IngestJobValidatedEvent
patchwork01 Dec 18, 2024
d1284cd
Move method only used in tests to test
patchwork01 Dec 18, 2024
4e940a4
Move IngestJobValidatedEvent to core module
patchwork01 Dec 18, 2024
42776f9
Remove IngestJob from IngestJobAcceptedStatus
patchwork01 Dec 18, 2024
8ebbfc3
Remove IngestJob from IngestJobRejectedStatus
patchwork01 Dec 18, 2024
7c56409
Remove IngestJob from IngestJobStartedStatus
patchwork01 Dec 18, 2024
8193468
Move ingest job status updates to core module
patchwork01 Dec 18, 2024
6a18853
Move ingest job tracker to core module
patchwork01 Dec 18, 2024
be1a4b8
Create IngestJobStatusFromJobTestData
patchwork01 Dec 18, 2024
d4ffd1b
Rename test helper
patchwork01 Dec 18, 2024
5c14c25
Move ingestAcceptedStatus to IngestJobStatusFromJobTestData
patchwork01 Dec 18, 2024
b71aec1
Move ingestStartedStatus to IngestJobStatusFromJobTestData
patchwork01 Dec 18, 2024
7d2e268
From IngestJob from ingestStartedStatus helper
patchwork01 Dec 18, 2024
7f3f94d
Add validatedIngestStartedStatus to IngestJobStatusFromJobTestData
patchwork01 Dec 18, 2024
8f6d452
Remove IngestJob from ingestFinishedStatus
patchwork01 Dec 18, 2024
0991ee8
Remove IngestJob from ingestFinishedStatusUncommitted
patchwork01 Dec 18, 2024
2f6b1f3
Fix Javadoc
patchwork01 Dec 18, 2024
5b3c18b
Remove trailing spaces in Javadoc
patchwork01 Dec 19, 2024
ad316f8
Start removing IngestJob from InMemoryIngestJobStatusStoreTest
patchwork01 Dec 19, 2024
d2ad98b
Removing IngestJob from InMemoryIngestJobStatusStoreTest
patchwork01 Dec 19, 2024
3372da3
Removing IngestJob from InMemoryIngestJobStatusStoreTest
patchwork01 Dec 19, 2024
31b5a1b
Fix constant declaration in IngestJobEventTestData
patchwork01 Dec 19, 2024
97195a7
Adjust bug comment
patchwork01 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader;
import sleeper.core.table.TableStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatusStore;
import sleeper.core.util.LoggedDuration;
import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequest;
import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequestSerDe;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory;
import sleeper.parquet.utils.HadoopConfigurationProvider;
import sleeper.statestore.StateStoreFactory;
Expand All @@ -64,9 +64,6 @@
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_BUCKET;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
import static sleeper.core.properties.table.TableProperty.BULK_IMPORT_FILES_COMMIT_ASYNC;
import static sleeper.ingest.core.job.status.IngestJobFailedEvent.ingestJobFailed;
import static sleeper.ingest.core.job.status.IngestJobFinishedEvent.ingestJobFinished;
import static sleeper.ingest.core.job.status.IngestJobStartedEvent.validatedIngestJobStarted;

/**
* Executes a Spark job that reads input Parquet files and writes to a Sleeper table. This takes a
Expand Down Expand Up @@ -103,14 +100,16 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce
Instant startTime = getTime.get();
LOGGER.info("Received bulk import job with id {} at time {}", job.getId(), startTime);
LOGGER.info("Job is for table {}: {}", table, job);
statusStore.jobStarted(validatedIngestJobStarted(job.toIngestJob(), startTime)
statusStore.jobStarted(job.toIngestJob()
.startedAfterValidationEventBuilder(startTime)
.jobRunId(jobRunId).taskId(taskId).build());

BulkImportJobOutput output;
try {
output = sessionRunner.run(job);
} catch (RuntimeException e) {
statusStore.jobFailed(ingestJobFailed(job.toIngestJob(), new ProcessRunTime(startTime, getTime.get()))
statusStore.jobFailed(job.toIngestJob()
.failedEventBuilder(new ProcessRunTime(startTime, getTime.get()))
.jobRunId(jobRunId).taskId(taskId).failure(e).build());
throw e;
}
Expand All @@ -133,7 +132,8 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce
LOGGER.info("Added {} files to statestore for job {} in table {}", output.numFiles(), job.getId(), table);
}
} catch (RuntimeException e) {
statusStore.jobFailed(ingestJobFailed(job.toIngestJob(), new ProcessRunTime(startTime, getTime.get()))
statusStore.jobFailed(job.toIngestJob()
.failedEventBuilder(new ProcessRunTime(startTime, getTime.get()))
.jobRunId(jobRunId).taskId(taskId).failure(e).build());
throw new RuntimeException("Failed to add files to state store. Ensure this service account has write access. Files may need to "
+ "be re-imported for clients to access data", e);
Expand All @@ -146,8 +146,8 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce
double rate = numRecords / (double) duration.getSeconds();
LOGGER.info("Bulk import job {} took {} (rate of {} per second)", job.getId(), duration, rate);

statusStore.jobFinished(ingestJobFinished(job.toIngestJob(),
new RecordsProcessedSummary(new RecordsProcessed(numRecords, numRecords), startTime, finishTime))
statusStore.jobFinished(job.toIngestJob()
.finishedEventBuilder(new RecordsProcessedSummary(new RecordsProcessed(numRecords, numRecords), startTime, finishTime))
.jobRunId(jobRunId).taskId(taskId)
.fileReferencesAddedByJob(output.fileReferences())
.committedBySeparateFileUpdates(asyncCommit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3SerDe;
import sleeper.core.tracker.ingest.job.IngestJobStatusStore;
import sleeper.ingest.core.job.IngestJob;
import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequest;
import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequestSerDe;
import sleeper.ingest.core.job.status.InMemoryIngestJobStatusStore;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
import sleeper.parquet.record.ParquetRecordReader;
import sleeper.parquet.record.ParquetRecordWriterFactory;
import sleeper.statestore.StateStoreFactory;
Expand Down Expand Up @@ -118,12 +118,11 @@
import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties;
import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties;
import static sleeper.core.record.process.RecordsProcessedSummaryTestHelper.summary;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAcceptedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestAcceptedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.validatedIngestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.validatedIngestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted;
import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration;

@Testcontainers
Expand Down Expand Up @@ -220,11 +219,11 @@ record = reader.read();
assertThat(readRecords).isEqualTo(expectedRecords);
IngestJob ingestJob = job.toIngestJob();
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_ID)))
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatus(ingestJob,
.finishedStatus(ingestFinishedStatus(
summary(startTime, endTime, 200, 200), 1))
.build()));
}
Expand Down Expand Up @@ -269,12 +268,12 @@ record = reader.read();
assertThat(readRecords).isEqualTo(expectedRecords);
IngestJob ingestJob = job.toIngestJob();
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_ID)))
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatus(ingestJob,
summary(startTime, endTime, 100, 100), 1))
.finishedStatus(ingestFinishedStatus(summary(startTime, endTime, 100, 100),
1))
.build()));
}

Expand Down Expand Up @@ -311,12 +310,12 @@ void shouldImportDataMultiplePartitions(BulkImportJobRunner runner) throws IOExc
tuple(100L, rightPartition));
IngestJob ingestJob = job.toIngestJob();
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_ID)))
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatus(ingestJob,
summary(startTime, endTime, 200, 200), 2))
.finishedStatus(ingestFinishedStatus(summary(startTime, endTime, 200, 200),
2))
.build()));
}

Expand Down Expand Up @@ -382,12 +381,12 @@ record = reader.read();
}
IngestJob ingestJob = job.toIngestJob();
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_ID)))
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatus(ingestJob,
summary(startTime, endTime, 100000, 100000), 50))
.finishedStatus(ingestFinishedStatus(summary(startTime, endTime, 100000, 100000),
50))
.build()));
}

Expand Down Expand Up @@ -419,12 +418,12 @@ void shouldNotThrowExceptionIfProvidedWithDirectoryWhichContainsParquetAndNonPar
.containsExactly(tuple(200L, expectedPartitionId, records));
IngestJob ingestJob = job.toIngestJob();
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_ID)))
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatus(ingestJob,
summary(startTime, endTime, 200, 200), 1))
.finishedStatus(ingestFinishedStatus(summary(startTime, endTime, 200, 200),
1))
.build()));
}

Expand Down Expand Up @@ -470,12 +469,12 @@ record = reader.read();
assertThat(readRecords).isEqualTo(expectedRecords);
IngestJob ingestJob = job.toIngestJob();
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_ID)))
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatus(ingestJob,
summary(startTime, endTime, 200, 200), 1))
.finishedStatus(ingestFinishedStatus(summary(startTime, endTime, 200, 200),
1))
.build()));
}

Expand Down Expand Up @@ -514,11 +513,11 @@ void shouldImportDataWithAsynchronousCommitOfNewFiles() throws Exception {
.containsExactly(tuple(200L, "root", expectedRecords));
});
assertThat(statusStore.getAllJobs(tableProperties.get(TABLE_ID)))
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatusUncommitted(ingestJob,
.finishedStatus(ingestFinishedStatusUncommitted(
summary(startTime, endTime, 200, 200), 1))
.build()));
}
Expand Down Expand Up @@ -704,7 +703,7 @@ private void runJob(BulkImportJobRunner runner, InstanceProperties properties, B
}

private void runJob(BulkImportJobRunner runner, InstanceProperties properties, BulkImportJob job, Supplier<Instant> timeSupplier) throws IOException {
statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build());
statusStore.jobValidated(job.toIngestJob().acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoDBClient);
StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDBClient, conf);
AddFilesAsynchronously addFilesAsync = BulkImportJobDriver.submitFilesToCommitQueue(sqsClient, s3Client, instanceProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.testutils.FixedStateStoreProvider;
import sleeper.core.statestore.testutils.StateStoreTestHelper;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatusStore;
import sleeper.ingest.core.job.IngestJob;
import sleeper.ingest.core.job.commit.IngestAddFilesCommitRequest;
import sleeper.ingest.core.job.status.InMemoryIngestJobStatusStore;
import sleeper.ingest.core.job.status.IngestJobStatus;
import sleeper.ingest.core.job.status.IngestJobStatusStore;

import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -55,13 +55,12 @@
import static sleeper.core.record.process.RecordsProcessedSummaryTestHelper.summary;
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;
import static sleeper.core.statestore.FileReferenceTestData.defaultFileOnRootPartitionWithRecords;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestAcceptedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.validatedIngestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRunWhichFailed;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAcceptedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.validatedIngestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted;

class BulkImportJobDriverTest {
private final InstanceProperties instanceProperties = createTestInstanceProperties();
Expand All @@ -88,11 +87,11 @@ void shouldReportJobFinished() throws Exception {
// Then
IngestJob ingestJob = job.toIngestJob();
assertThat(allJobsReported())
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId("test-task")
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatus(ingestJob,
.finishedStatus(ingestFinishedStatus(
summary(startTime, finishTime, 100, 100), 1))
.build()));
assertThat(stateStore.getFileReferences())
Expand Down Expand Up @@ -120,7 +119,7 @@ void shouldReportJobFailed() throws Exception {

// Then
assertThat(allJobsReported())
.containsExactly(jobStatus(job.toIngestJob(), acceptedRunWhichFailed(
.containsExactly(ingestJobStatus(job.toIngestJob(), acceptedRunWhichFailed(
job.toIngestJob(), "test-task", validationTime,
new ProcessRunTime(startTime, finishTime),
List.of("Failed running job", "Some cause", "Root cause"))));
Expand Down Expand Up @@ -149,7 +148,7 @@ void shouldReportJobFinishedWithNoRecordsWhenStateStoreUpdateFailed() throws Exc

// Then
assertThat(allJobsReported())
.containsExactly(jobStatus(job.toIngestJob(), acceptedRunWhichFailed(
.containsExactly(ingestJobStatus(job.toIngestJob(), acceptedRunWhichFailed(
job.toIngestJob(), "test-task", validationTime,
new ProcessRunTime(startTime, finishTime),
List.of("Failed updating files"))));
Expand Down Expand Up @@ -179,7 +178,7 @@ void shouldReportJobFinishedWithNoRecordsWhenStateStoreUpdateHadUnexpectedFailur

// Then
assertThat(allJobsReported())
.containsExactly(jobStatus(job.toIngestJob(), acceptedRunWhichFailed(
.containsExactly(ingestJobStatus(job.toIngestJob(), acceptedRunWhichFailed(
job.toIngestJob(), "test-task", validationTime,
new ProcessRunTime(startTime, finishTime),
List.of("Failed updating files"))));
Expand Down Expand Up @@ -208,11 +207,11 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception {
// Then
IngestJob ingestJob = job.toIngestJob();
assertThat(allJobsReported())
.containsExactly(jobStatus(ingestJob, ProcessRun.builder()
.containsExactly(ingestJobStatus(ingestJob, ProcessRun.builder()
.taskId("test-task")
.startedStatus(ingestAcceptedStatus(ingestJob, validationTime))
.statusUpdate(validatedIngestStartedStatus(ingestJob, startTime))
.finishedStatus(ingestFinishedStatusUncommitted(ingestJob,
.finishedStatus(ingestFinishedStatusUncommitted(
summary(startTime, finishTime, 300, 300), 2))
.build()));
assertThat(stateStore.getFileReferences()).isEmpty();
Expand All @@ -227,7 +226,7 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception {
private void runJob(
BulkImportJob job, String jobRunId, String taskId, Instant validationTime,
BulkImportJobDriver driver) throws Exception {
statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build());
statusStore.jobValidated(job.toIngestJob().acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
driver.run(job, jobRunId, taskId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import sleeper.core.properties.PropertiesReloader;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.tracker.ingest.job.IngestJobStatusStore;
import sleeper.ingest.core.job.IngestJobMessageHandler;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory;
import sleeper.parquet.utils.HadoopConfigurationProvider;
import sleeper.parquet.utils.HadoopPathUtils;
Expand Down
Loading
Loading