From 3a901de1c29a6d35fc70402ef96ecc9f4eb8006a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 27 Dec 2024 11:46:46 -0500 Subject: [PATCH 1/2] Remove aws v1 from infrastructure --- .github/workflows/README.md | 2 - ...mit_Java_Amazon-Web-Services_IO_Direct.yml | 145 ----------------- .../beam_PreCommit_Java_Kinesis_IO_Direct.yml | 152 ------------------ .../sdk/io/gcp/bigquery/StorageApiLoads.java | 35 ++-- sdks/java/testing/load-tests/build.gradle | 9 +- .../sdk/loadtests/SyntheticDataPublisher.java | 29 +++- sdks/java/testing/watermarks/build.gradle | 1 - 7 files changed, 45 insertions(+), 328 deletions(-) delete mode 100644 .github/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml delete mode 100644 .github/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml diff --git a/.github/workflows/README.md b/.github/workflows/README.md index de85a99d7bc9..b9069c530e53 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -228,7 +228,6 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour | [ PreCommit Go ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Go.yml) | N/A |`Run Go PreCommit`| [![.github/workflows/beam_PreCommit_Go.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Go.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Go.yml?query=event%3Aschedule) | | [ PreCommit GoPortable ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_GoPortable.yml) | N/A |`Run GoPortable PreCommit`| [![.github/workflows/beam_PreCommit_GoPortable.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_GoPortable.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_GoPortable.yml?query=event%3Aschedule) | | [ PreCommit Java ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java.yml) | N/A |`Run Java PreCommit`| [![.github/workflows/beam_PreCommit_Java.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java.yml?query=event%3Aschedule) | -| [ PreCommit Java Amazon Web Services IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml) | N/A |`Run Java_Amazon-Web-Services_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Amazon Web Services2 IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amazon-Web-Services2_IO_Direct.yml) | N/A |`Run Java_Amazon-Web-Services2_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Amazon-Web-Services2_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amazon-Web-Services2_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amazon-Web-Services2_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Amqp IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amqp_IO_Direct.yml) | N/A |`Run Java_Amqp_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Amqp_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amqp_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Amqp_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Azure IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Azure_IO_Direct.yml) | N/A |`Run Java_Azure_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Azure_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Azure_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Azure_IO_Direct.yml?query=event%3Aschedule) | @@ -252,7 +251,6 @@ PreCommit Jobs run in a schedule and also get triggered in a PR if relevant sour | [ PreCommit Java IOs Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_IOs_Direct.yml) | N/A |`Run Java_IOs_Direct PreCommit`| N/A | | [ PreCommit Java JDBC IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml) | N/A |`Run Java_JDBC_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_JDBC_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Jms IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Jms_IO_Direct.yml) | N/A |`Run Java_Jms_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Jms_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Jms_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Jms_IO_Direct.yml?query=event%3Aschedule) | -| [ PreCommit Java Kinesis IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml) | N/A |`Run Java_Kinesis_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Kudu IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kudu_IO_Direct.yml) | N/A |`Run Java_Kudu_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Kudu_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kudu_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Kudu_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java MongoDb IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_MongoDb_IO_Direct.yml) | N/A |`Run Java_MongoDb_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_MongoDb_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_MongoDb_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_MongoDb_IO_Direct.yml?query=event%3Aschedule) | | [ PreCommit Java Mqtt IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Mqtt_IO_Direct.yml) | N/A |`Run Java_Mqtt_IO_Direct PreCommit`| [![.github/workflows/beam_PreCommit_Java_Mqtt_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Mqtt_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PreCommit_Java_Mqtt_IO_Direct.yml?query=event%3Aschedule) | diff --git a/.github/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml b/.github/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml deleted file mode 100644 index 9053bb730371..000000000000 --- a/.github/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml +++ /dev/null @@ -1,145 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: PreCommit Java Amazon-Web-Services IO Direct - -on: - push: - tags: ['v*'] - branches: ['master', 'release-*'] - paths: - - "sdks/java/io/amazon-web-services/**" - - "sdks/java/io/common/**" - - "sdks/java/core/src/main/**" - - "build.gradle" - - "buildSrc/**" - - "gradle/**" - - "gradle.properties" - - "gradlew" - - "gradle.bat" - - "settings.gradle.kts" - - ".github/workflows/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.yml" - pull_request_target: - branches: ['master', 'release-*'] - paths: - - "sdks/java/io/amazon-web-services/**" - - "sdks/java/io/common/**" - - "sdks/java/core/src/main/**" - - 'release/trigger_all_tests.json' - - '.github/trigger_files/beam_PreCommit_Java_Amazon-Web-Services_IO_Direct.json' - - "build.gradle" - - "buildSrc/**" - - "gradle/**" - - "gradle.properties" - - "gradlew" - - "gradle.bat" - - "settings.gradle.kts" - issue_comment: - types: [created] - schedule: - - cron: '0 1/6 * * *' - workflow_dispatch: - -#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event -permissions: - actions: write - pull-requests: write - checks: write - contents: read - deployments: read - id-token: none - issues: write - discussions: read - packages: read - pages: read - repository-projects: read - security-events: read - statuses: read - -# This allows a subsequently queued workflow run to interrupt previous runs -concurrency: - group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' - cancel-in-progress: true - -env: - DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} - GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} - -jobs: - beam_PreCommit_Java_Amazon-Web-Services_IO_Direct: - name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) - strategy: - matrix: - job_name: ["beam_PreCommit_Java_Amazon-Web-Services_IO_Direct"] - job_phrase: ["Run Java_Amazon-Web-Services_IO_Direct PreCommit"] - timeout-minutes: 60 - if: | - github.event_name == 'push' || - github.event_name == 'pull_request_target' || - (github.event_name == 'schedule' && github.repository == 'apache/beam') || - github.event_name == 'workflow_dispatch' || - github.event.comment.body == 'Run Java_Amazon-Web-Services_IO_Direct PreCommit' - runs-on: [self-hosted, ubuntu-20.04, main] - steps: - - uses: actions/checkout@v4 - - name: Setup repository - uses: ./.github/actions/setup-action - with: - comment_phrase: ${{ matrix.job_phrase }} - github_token: ${{ secrets.GITHUB_TOKEN }} - github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) - - name: Setup environment - uses: ./.github/actions/setup-environment-action - - name: run Amazon-Web-Services IO build script - uses: ./.github/actions/gradle-command-self-hosted-action - with: - gradle-command: :sdks:java:io:amazon-web-services:build - arguments: | - -PdisableSpotlessCheck=true \ - -PdisableCheckStyle=true \ - - name: run Amazon-Web-Services IO IT script - uses: ./.github/actions/gradle-command-self-hosted-action - with: - gradle-command: :sdks:java:io:amazon-web-services:integrationTest - arguments: | - -PdisableSpotlessCheck=true \ - -PdisableCheckStyle=true \ - - name: Archive JUnit Test Results - uses: actions/upload-artifact@v4 - if: ${{ !success() }} - with: - name: JUnit Test Results - path: "**/build/reports/tests/" - - name: Publish JUnit Test Results - uses: EnricoMi/publish-unit-test-result-action@v2 - if: always() - with: - commit: '${{ env.prsha || env.GITHUB_SHA }}' - comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} - files: '**/build/test-results/**/*.xml' - large_files: true - - name: Archive SpotBugs Results - uses: actions/upload-artifact@v4 - if: always() - with: - name: SpotBugs Results - path: '**/build/reports/spotbugs/*.html' - - name: Publish SpotBugs Results - uses: jwgmeligmeyling/spotbugs-github-action@v1.2 - if: always() - with: - name: Publish SpotBugs - path: '**/build/reports/spotbugs/*.html' \ No newline at end of file diff --git a/.github/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml b/.github/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml deleted file mode 100644 index 785748e793e9..000000000000 --- a/.github/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml +++ /dev/null @@ -1,152 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: PreCommit Java Kinesis IO Direct - -on: - push: - tags: ['v*'] - branches: ['master', 'release-*'] - paths: - - "sdks/java/io/kinesis/**" - - "sdks/java/io/common/**" - - "sdks/java/core/src/main/**" - - "build.gradle" - - "buildSrc/**" - - "gradle/**" - - "gradle.properties" - - "gradlew" - - "gradle.bat" - - "settings.gradle.kts" - - ".github/workflows/beam_PreCommit_Java_Kinesis_IO_Direct.yml" - pull_request_target: - branches: ['master', 'release-*'] - paths: - - "sdks/java/io/kinesis/**" - - "sdks/java/io/common/**" - - "sdks/java/core/src/main/**" - - "build.gradle" - - "buildSrc/**" - - "gradle/**" - - "gradle.properties" - - "gradlew" - - "gradle.bat" - - "settings.gradle.kts" - - 'release/trigger_all_tests.json' - - '.github/trigger_files/beam_PreCommit_Java_Kinesis_IO_Direct.json' - issue_comment: - types: [created] - schedule: - - cron: '0 2/6 * * *' - workflow_dispatch: - -#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event -permissions: - actions: write - pull-requests: write - checks: write - contents: read - deployments: read - id-token: none - issues: write - discussions: read - packages: read - pages: read - repository-projects: read - security-events: read - statuses: read - -# This allows a subsequently queued workflow run to interrupt previous runs -concurrency: - group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' - cancel-in-progress: true - -env: - DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} - GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} - -jobs: - beam_PreCommit_Java_Kinesis_IO_Direct: - name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) - strategy: - matrix: - job_name: ["beam_PreCommit_Java_Kinesis_IO_Direct"] - job_phrase: ["Run Java_Kinesis_IO_Direct PreCommit"] - timeout-minutes: 60 - if: | - github.event_name == 'push' || - github.event_name == 'pull_request_target' || - (github.event_name == 'schedule' && github.repository == 'apache/beam') || - github.event_name == 'workflow_dispatch' || - github.event.comment.body == 'Run Java_Kinesis_IO_Direct PreCommit' - runs-on: [self-hosted, ubuntu-20.04, main] - steps: - - uses: actions/checkout@v4 - - name: Setup repository - uses: ./.github/actions/setup-action - with: - comment_phrase: ${{ matrix.job_phrase }} - github_token: ${{ secrets.GITHUB_TOKEN }} - github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) - - name: Setup environment - uses: ./.github/actions/setup-environment-action - - name: run Kinesis IO build script - uses: ./.github/actions/gradle-command-self-hosted-action - with: - gradle-command: :sdks:java:io:kinesis:build - arguments: | - -PdisableSpotlessCheck=true \ - -PdisableCheckStyle=true \ - - name: run Kinesis expansion service script - uses: ./.github/actions/gradle-command-self-hosted-action - with: - gradle-command: :sdks:java:io:kinesis:expansion-service:build - arguments: | - -PdisableSpotlessCheck=true \ - -PdisableCheckStyle=true \ - - name: run Kinesis IO IT script - uses: ./.github/actions/gradle-command-self-hosted-action - with: - gradle-command: :sdks:java:io:kinesis:integrationTest - arguments: | - -PdisableSpotlessCheck=true \ - -PdisableCheckStyle=true \ - - name: Archive JUnit Test Results - uses: actions/upload-artifact@v4 - if: ${{ !success() }} - with: - name: JUnit Test Results - path: "**/build/reports/tests/" - - name: Publish JUnit Test Results - uses: EnricoMi/publish-unit-test-result-action@v2 - if: always() - with: - commit: '${{ env.prsha || env.GITHUB_SHA }}' - comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} - files: '**/build/test-results/**/*.xml' - large_files: true - - name: Archive SpotBugs Results - uses: actions/upload-artifact@v4 - if: always() - with: - name: SpotBugs Results - path: '**/build/reports/spotbugs/*.html' - - name: Publish SpotBugs Results - uses: jwgmeligmeyling/spotbugs-github-action@v1.2 - if: always() - with: - name: Publish SpotBugs - path: '**/build/reports/spotbugs/*.html' \ No newline at end of file diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 22e0f955abb5..fcf67a8062ac 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -373,24 +373,23 @@ public WriteResult expandUntriggered( } PCollectionTuple writeRecordsResult = - successfulConvertedRows - .apply( - "StorageApiWriteUnsharded", - new StorageApiWriteUnshardedRecords<>( - dynamicDestinations, - bqServices, - failedRowsTag, - successfulWrittenRowsTag, - successfulRowsPredicate, - BigQueryStorageApiInsertErrorCoder.of(), - TableRowJsonCoder.of(), - autoUpdateSchema, - ignoreUnknownValues, - createDisposition, - kmsKey, - usesCdc, - defaultMissingValueInterpretation, - bigLakeConfiguration)); + successfulConvertedRows.apply( + "StorageApiWriteUnsharded", + new StorageApiWriteUnshardedRecords<>( + dynamicDestinations, + bqServices, + failedRowsTag, + successfulWrittenRowsTag, + successfulRowsPredicate, + BigQueryStorageApiInsertErrorCoder.of(), + TableRowJsonCoder.of(), + autoUpdateSchema, + ignoreUnknownValues, + createDisposition, + kmsKey, + usesCdc, + defaultMissingValueInterpretation, + bigLakeConfiguration)); PCollection insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) diff --git a/sdks/java/testing/load-tests/build.gradle b/sdks/java/testing/load-tests/build.gradle index d1439bafb748..c74c7301db74 100644 --- a/sdks/java/testing/load-tests/build.gradle +++ b/sdks/java/testing/load-tests/build.gradle @@ -64,6 +64,10 @@ configurations { gradleRun } +def excludeNetty = { + exclude group: "io.netty", module: "*" // exclude more recent Netty version +} + dependencies { implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) @@ -73,8 +77,9 @@ dependencies { implementation project(":sdks:java:testing:test-utils") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") - implementation project(":sdks:java:io:kinesis") - implementation library.java.aws_java_sdk_core + implementation project(":sdks:java:io:amazon-web-services2") + implementation library.java.aws_java_sdk2_auth, excludeNetty + implementation library.java.aws_java_sdk2_regions, excludeNetty implementation library.java.google_cloud_core implementation library.java.joda_time implementation library.java.vendored_guava_32_1_2_jre diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java index 525582451bd9..94da25c3d874 100644 --- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java +++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; -import com.amazonaws.regions.Regions; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -30,10 +29,11 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.io.kinesis.KinesisIO; import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource; import org.apache.beam.sdk.io.synthetic.SyntheticOptions; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; @@ -47,6 +47,9 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.kafka.common.serialization.StringSerializer; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; /** * Pipeline that generates synthetic data and publishes it in a PubSub or Kafka topic or in a @@ -180,17 +183,27 @@ private static void writeToKafka(PCollection> collection) { } private static void writeToKinesis(PCollection> collection) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(options.getKinesisAwsKey(), options.getKinesisAwsSecret()); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); collection .apply("Map to byte array for Kinesis", MapElements.via(new MapKVToByteArray())) .apply( "Write to Kinesis", - KinesisIO.write() + KinesisIO.write() .withStreamName(options.getKinesisStreamName()) - .withPartitionKey(options.getKinesisPartitionKey()) - .withAWSClientsProvider( - options.getKinesisAwsKey(), - options.getKinesisAwsSecret(), - Regions.fromName(options.getKinesisAwsRegion()))); + .withPartitioner(p -> options.getKinesisPartitionKey()) + // .withPartitionKey(options.getKinesisPartitionKey()) + // .withAWSClientsProvider( + // options.getKinesisAwsKey(), + // options.getKinesisAwsSecret(), + // Regions.fromName(options.getKinesisAwsRegion()) + // )); + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(options.getKinesisAwsRegion())) + .build())); } private static class MapKVToString extends SimpleFunction, String> { diff --git a/sdks/java/testing/watermarks/build.gradle b/sdks/java/testing/watermarks/build.gradle index c6c2a50279cc..ca774815467a 100644 --- a/sdks/java/testing/watermarks/build.gradle +++ b/sdks/java/testing/watermarks/build.gradle @@ -69,7 +69,6 @@ dependencies { runtimeOnly project(":sdks:java:testing:test-utils") runtimeOnly project(":sdks:java:io:google-cloud-platform") runtimeOnly project(":sdks:java:io:kafka") - runtimeOnly project(":sdks:java:io:kinesis") gradleRun project(project.path) gradleRun project(path: runnerDependency, configuration: runnerConfiguration) From 1961a53dacb42ca6aa40eaa8b871c210d33eb355 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 30 Dec 2024 08:47:43 -0500 Subject: [PATCH 2/2] Remove unused code --- .../apache/beam/sdk/loadtests/SyntheticDataPublisher.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java index 94da25c3d874..3bd87480b8bc 100644 --- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java +++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java @@ -193,12 +193,6 @@ private static void writeToKinesis(PCollection> collection) { KinesisIO.write() .withStreamName(options.getKinesisStreamName()) .withPartitioner(p -> options.getKinesisPartitionKey()) - // .withPartitionKey(options.getKinesisPartitionKey()) - // .withAWSClientsProvider( - // options.getKinesisAwsKey(), - // options.getKinesisAwsSecret(), - // Regions.fromName(options.getKinesisAwsRegion()) - // )); .withClientConfiguration( ClientConfiguration.builder() .credentialsProvider(provider)