diff --git a/assemblies/plugins/tech/aws/pom.xml b/assemblies/plugins/tech/aws/pom.xml
index af216dfde89..ef7d42a58d8 100644
--- a/assemblies/plugins/tech/aws/pom.xml
+++ b/assemblies/plugins/tech/aws/pom.xml
@@ -36,6 +36,7 @@
1.12.279
1.12.279
+ 2.1.0.19
1.10.19
@@ -72,5 +73,10 @@
${aws-java-sdk-s3.version}
compile
+
+ com.amazon.redshift
+ redshift-jdbc42
+ ${redshift.jdbc.version}
+
\ No newline at end of file
diff --git a/assemblies/plugins/tech/aws/src/assembly/assembly.xml b/assemblies/plugins/tech/aws/src/assembly/assembly.xml
index 186c2eb7f8d..864a65d3fd7 100644
--- a/assemblies/plugins/tech/aws/src/assembly/assembly.xml
+++ b/assemblies/plugins/tech/aws/src/assembly/assembly.xml
@@ -57,6 +57,7 @@
joda-time:joda-time
com.amazonaws:aws-java-sdk-s3
com.amazonaws:aws-java-sdk-kms
+ com.amazon.redshift:redshift-jdbc42:jar
diff --git a/docs/hop-user-manual/modules/ROOT/assets/images/transforms/icons/redshift.svg b/docs/hop-user-manual/modules/ROOT/assets/images/transforms/icons/redshift.svg
new file mode 100644
index 00000000000..87f05da8eae
--- /dev/null
+++ b/docs/hop-user-manual/modules/ROOT/assets/images/transforms/icons/redshift.svg
@@ -0,0 +1,9 @@
+
diff --git a/docs/hop-user-manual/modules/ROOT/nav.adoc b/docs/hop-user-manual/modules/ROOT/nav.adoc
index e3a12444f40..c13a2e4f25d 100644
--- a/docs/hop-user-manual/modules/ROOT/nav.adoc
+++ b/docs/hop-user-manual/modules/ROOT/nav.adoc
@@ -207,6 +207,7 @@ under the License.
*** xref:pipeline/transforms/processfiles.adoc[Process files]
*** xref:pipeline/transforms/propertyinput.adoc[Properties file Input]
*** xref:pipeline/transforms/propertyoutput.adoc[Properties file Output]
+*** xref:pipeline/transforms/redshift-bulkloader.adoc[Redshift Bulk Loader]
*** xref:pipeline/transforms/regexeval.adoc[Regex Evaluation]
*** xref:pipeline/transforms/replacestring.adoc[Replace in String]
*** xref:pipeline/transforms/reservoirsampling.adoc[Reservoir Sampling]
diff --git a/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/postgresbulkloader.adoc b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/postgresbulkloader.adoc
index 3d14113bba7..eccc89f520a 100644
--- a/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/postgresbulkloader.adoc
+++ b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/postgresbulkloader.adoc
@@ -40,7 +40,7 @@ TIP: replace boolean fields in your pipeline stream by string fields with "Y" or
!===
|===
-IMPORTANT: The PostgreSQL Bulk Loader is linked to the database type. It will fetch the JDBC driver from the hop/plugins/databases/postgresql/lib folder. +
+IMPORTANT: The PostgreSQL Bulk Loader is linked to the database type. It will fetch the JDBC driver from the hop/lib/jdbc folder. +
+
Valid locations for the JDBC driver for this transform are the database plugin lib and the main hop/lib folder. It will not work in combination with the SHARED_JDBC_FOLDER variable.
diff --git a/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/redshift-bulkloader.adoc b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/redshift-bulkloader.adoc
new file mode 100644
index 00000000000..8d3795fbdcc
--- /dev/null
+++ b/docs/hop-user-manual/modules/ROOT/pages/pipeline/transforms/redshift-bulkloader.adoc
@@ -0,0 +1,86 @@
+////
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+////
+:documentationPath: /pipeline/transforms/
+:language: en_US
+:description: The Redshift Bulk Loader transform loads data from Apache Hop to AWS Redshift using the COPY command.
+
+= image:transforms/icons/redshift.svg[Redshift Bulk Loader transform Icon, role="image-doc-icon"] Redshift Bulk Loader
+
+[%noheader,cols="3a,1a", role="table-no-borders" ]
+|===
+|
+== Description
+
+The Redshift Bulk Loader transform loads data from Apache Hop to AWS Redshift using the https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html[`COPY`^] command.
+
+TIP: make sure your target Redshift table has a layout that is compatible with Parquet data types, e.g. use `int8` instead of `int4` data types.
+
+|
+== Supported Engines
+[%noheader,cols="2,1a",frame=none, role="table-supported-engines"]
+!===
+!Hop Engine! image:check_mark.svg[Supported, 24]
+!Spark! image:question_mark.svg[Maybe Supported, 24]
+!Flink! image:question_mark.svg[Maybe Supported, 24]
+!Dataflow! image:question_mark.svg[Maybe Supported, 24]
+!===
+|===
+
+IMPORTANT: The Redshift Bulk Loader is linked to the database type. It will fetch the JDBC driver from the hop/lib/jdbc folder. +
++
+
+== General Options
+
+[options="header"]
+|===
+|Option|Description
+|Transform name|Name of the transform.
+|Connection|Name of the database connection on which the target table resides.
+|Target schema|The name of the target schema to write data to.
+|Target table|The name of the target table to write data to.
+|AWS Authentication a|choose which authentication method to use with the `COPY` command. Supported options are `AWS Credentials` and `IAM Role`. +
+
+* check the https://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-access-permissions.html#copy-usage_notes-access-key-based[Key-based access control] for more information on the `Credentials` option.
+* check the https://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-access-permissions.html#copy-usage_notes-access-role-based[IAM Role] docs for more information on the `IAM Role` option.
+
+|Use AWS system variables|(`Credentials` only!) pick up the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` values from your operating system's environment variables.
+|AWS_ACCESS_KEY_ID|(if `Credentials` is selected and `Use AWS system variables` is unchecked) specify a value or variable for your `AWS_ACCESS_KEY_ID`.
+|AWS_SECRET_ACCESS_KEY|(if `Credentials` is selected and `Use AWS system variables` is unchecked) specify a value or variable for your `AWS_SECRET_ACCESS_KEY`.
+|IAM Role|(if `IAM Role` is selected) specify the IAM Role to use, in the syntax `arn:aws:iam:::role/`
+|Truncate table|Truncate the target table before loading data.
+|Truncate on first row|Truncate the target table before loading data, but only when a first data row is received (will not truncate when a pipeline runs an empty stream (0 rows)).
+|Specify database fields|Specify the database and stream fields mapping
+|===
+
+== Main Options
+
+[options="header"]
+|===
+|Option|Description
+|Stream to S3 CSV|write the current pipeline stream to a CSV file in an S3 bucket before performing the `COPY` load.
+|Load from existing file|do not stream the contents of the current pipeline, but perform the `COPY` load from a pre-existing file in S3. Suppoorted formats are `CSV` (comma delimited) and `Parquet`.
+|Copy into Redshift from existing file|path to the file in S3 to `COPY` load the data from.
+|===
+
+== Database fields
+
+Map the current stream fields to the Redshift table's columns.
+
+== Metadata Injection Support
+
+All fields of this transform support metadata injection.
+You can use this transform with Metadata Injection to pass metadata to your pipeline at runtime.
diff --git a/plugins/tech/aws/pom.xml b/plugins/tech/aws/pom.xml
index 594b867304a..f3836dc1578 100644
--- a/plugins/tech/aws/pom.xml
+++ b/plugins/tech/aws/pom.xml
@@ -36,6 +36,7 @@
2.0.9
1.12.279
1.12.279
+ 2.1.0.19
@@ -75,6 +76,11 @@
slf4j-api
${slf4j.version}
+
+ com.amazon.redshift
+ redshift-jdbc42
+ ${redshift.jdbc.version}
+
diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java
new file mode 100644
index 00000000000..c55448c8e06
--- /dev/null
+++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.redshift.bulkloader;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopDatabaseException;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.exception.HopTransformException;
+import org.apache.hop.core.exception.HopValueException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.row.value.ValueMetaBigNumber;
+import org.apache.hop.core.row.value.ValueMetaDate;
+import org.apache.hop.core.row.value.ValueMetaString;
+import org.apache.hop.core.util.Utils;
+import org.apache.hop.core.vfs.HopVfs;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.pipeline.Pipeline;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.BaseTransform;
+import org.apache.hop.pipeline.transform.TransformMeta;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class RedshiftBulkLoader extends BaseTransform {
+ private static final Class> PKG =
+ RedshiftBulkLoader.class; // for i18n purposes, needed by Translator2!!
+
+ private static final SimpleDateFormat SIMPLE_DATE_FORMAT =
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ private FileOutputStream exceptionLog;
+ private FileOutputStream rejectedLog;
+
+ public RedshiftBulkLoader(
+ TransformMeta transformMeta,
+ RedshiftBulkLoaderMeta meta,
+ RedshiftBulkLoaderData data,
+ int copyNr,
+ PipelineMeta pipelineMeta,
+ Pipeline pipeline) {
+ super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline);
+ }
+
+ @Override
+ public boolean init() {
+
+ if (super.init()) {
+ try {
+ // Validating that the connection has been defined.
+ verifyDatabaseConnection();
+ data.databaseMeta = this.getPipelineMeta().findDatabase(meta.getConnection(), variables);
+
+ if(meta.isStreamToS3Csv()){
+ // get the file output stream to write to S3
+ data.writer = HopVfs.getOutputStream(meta.getCopyFromFilename(), false);
+ }
+
+ data.db = new Database(this, this, data.databaseMeta);
+ data.db.connect();
+
+ if (log.isBasic()) {
+ logBasic(BaseMessages.getString(PKG, "RedshiftBulkLoader.Connection.Connected", data.db.getDatabaseMeta()));
+ }
+ initBinaryDataFields();
+
+ data.db.setAutoCommit(false);
+
+ return true;
+ } catch (HopException e) {
+ logError("An error occurred initializing this transform: " + e.getMessage());
+ stopAll();
+ setErrors(1);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean processRow() throws HopException {
+
+ Object[] r = getRow(); // this also waits for a previous transform to be finished.
+
+ if (r == null) { // no more input to be expected...
+ if (first && meta.isTruncateTable() && !meta.isOnlyWhenHaveRows()) {
+ truncateTable();
+ }
+
+ try {
+ data.close();
+ closeFile();
+ String copyStmt = buildCopyStatementSqlString();
+ Connection conn = data.db.getConnection();
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate(copyStmt);
+ conn.commit();
+ stmt.close();
+ conn.close();
+ }catch(SQLException sqle){
+ throw new HopDatabaseException("Error executing COPY statements", sqle);
+ } catch (IOException ioe) {
+ throw new HopTransformException("Error releasing resources", ioe);
+ }
+ return false;
+ }
+
+ if (first && meta.isStreamToS3Csv()) {
+
+ first = false;
+
+ if (meta.isTruncateTable()) {
+ truncateTable();
+ }
+
+ data.outputRowMeta = getInputRowMeta().clone();
+ meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);
+
+ if(meta.isStreamToS3Csv()){
+
+ }
+ if (!meta.specifyFields()){
+
+ // Just take the whole input row
+ data.insertRowMeta = getInputRowMeta().clone();
+ data.selectedRowFieldIndices = new int[data.insertRowMeta.size()];
+
+ data.fieldnrs = new HashMap<>();
+ try{
+ getDbFields();
+ }catch(HopException e){
+ logError("Error getting database fields", e);
+ setErrors(1);
+ stopAll();
+ setOutputDone(); // signal end to receiver(s)
+ return false;
+ }
+
+ for (int i = 0; i < meta.getFields().size(); i++) {
+ int streamFieldLocation =
+ data.outputRowMeta.indexOfValue(
+ meta.getFields().get(i).getStreamField());
+ if (streamFieldLocation < 0) {
+ throw new HopTransformException(
+ "Field ["
+ + meta.getFields().get(i).getStreamField()
+ + "] couldn't be found in the input stream!");
+ }
+
+ int dbFieldLocation = -1;
+ for (int e = 0; e < data.dbFields.size(); e++) {
+ String[] field = data.dbFields.get(e);
+ if (field[0].equalsIgnoreCase(
+ meta.getFields().get(i).getDatabaseField())) {
+ dbFieldLocation = e;
+ break;
+ }
+ }
+ if (dbFieldLocation < 0) {
+ throw new HopException(
+ "Field ["
+ + meta.getFields().get(i).getDatabaseField()
+ + "] couldn't be found in the table!");
+ }
+
+ data.fieldnrs.put(
+ meta.getFields().get(i).getDatabaseField().toUpperCase(),
+ streamFieldLocation);
+ }
+
+ } else {
+
+ int numberOfInsertFields = meta.getFields().size();
+ data.insertRowMeta = new RowMeta();
+
+ // Cache the position of the selected fields in the row array
+ data.selectedRowFieldIndices = new int[numberOfInsertFields];
+ for (int insertFieldIdx = 0; insertFieldIdx < numberOfInsertFields; insertFieldIdx++) {
+ RedshiftBulkLoaderField vbf = meta.getFields().get(insertFieldIdx);
+ String inputFieldName = vbf.getStreamField();
+ int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName);
+ if (inputFieldIdx < 0) {
+ throw new HopTransformException(
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoader.Exception.FieldRequired",
+ inputFieldName)); //$NON-NLS-1$
+ }
+ data.selectedRowFieldIndices[insertFieldIdx] = inputFieldIdx;
+
+ String insertFieldName = vbf.getDatabaseField();
+ IValueMeta inputValueMeta = getInputRowMeta().getValueMeta(inputFieldIdx);
+ if (inputValueMeta == null) {
+ throw new HopTransformException(
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoader.Exception.FailedToFindField",
+ vbf.getStreamField())); // $NON-NLS-1$
+ }
+ IValueMeta insertValueMeta = inputValueMeta.clone();
+ insertValueMeta.setName(insertFieldName);
+ data.insertRowMeta.addValueMeta(insertValueMeta);
+ }
+ }
+ }
+
+ if(meta.isStreamToS3Csv()){
+ writeRowToFile(data.outputRowMeta, r);
+ putRow(data.outputRowMeta, r);
+ }
+
+ return true;
+ }
+
+ /**
+ * Closes a file so that its file handle is no longer open
+ *
+ * @return true if we successfully closed the file
+ */
+ private boolean closeFile() {
+ boolean returnValue = false;
+
+ try {
+ if (data.writer != null) {
+ data.writer.flush();
+ data.writer.close();
+ }
+ data.writer = null;
+ if (log.isDebug()) {
+ logDebug("Closing normal file ...");
+ }
+
+ returnValue = true;
+ } catch (Exception e) {
+ logError("Exception trying to close file: " + e.toString());
+ setErrors(1);
+ returnValue = false;
+ }
+ return returnValue;
+ }
+
+ private String buildCopyStatementSqlString() {
+ final DatabaseMeta databaseMeta = data.db.getDatabaseMeta();
+
+ StringBuilder sb = new StringBuilder(150);
+ sb.append("COPY ");
+
+ sb.append(
+ databaseMeta.getQuotedSchemaTableCombination(
+ variables,
+ data.db.resolve(meta.getSchemaName()),
+ data.db.resolve(meta.getTableName())));
+
+ if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){
+ sb.append(" (");
+ final IRowMeta fields = data.insertRowMeta;
+ for (int i = 0; i < fields.size(); i++) {
+ if (i > 0) {
+ sb.append(", " + fields.getValueMeta(i).getName());
+ }else{
+ sb.append(fields.getValueMeta(i).getName());
+ }
+ }
+ sb.append(")");
+ }
+
+ sb.append(" FROM '" + resolve(meta.getCopyFromFilename()) + "'");
+ if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){
+ sb.append(" delimiter ','");
+ }
+ if(meta.isUseAwsIamRole()){
+ sb.append(" iam_role '" + meta.getAwsIamRole() + "'");
+ }else if(meta.isUseCredentials()){
+ String awsAccessKeyId = "";
+ String awsSecretAccessKey = "";
+ if(meta.isUseSystemEnvVars()) {
+ awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");
+ awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");
+ }else{
+ awsAccessKeyId = resolve(meta.getAwsAccessKeyId());
+ awsSecretAccessKey = resolve(meta.getAwsSecretAccessKey());
+ }
+ sb.append(" CREDENTIALS 'aws_access_key_id=" + awsAccessKeyId + ";aws_secret_access_key=" + awsSecretAccessKey + "'");
+ }
+ if(meta.getLoadFromExistingFileFormat().equals("Parquet")){
+ sb.append(" FORMAT AS PARQUET;");
+ }
+
+ logDebug("copy stmt: " + sb.toString());
+
+ return sb.toString();
+ }
+
+ private Object[] writeToOutputStream(Object[] r) throws HopException, IOException {
+ assert (r != null);
+
+ Object[] insertRowData = r;
+ Object[] outputRowData = r;
+
+ if (meta.specifyFields()) {
+ insertRowData = new Object[data.selectedRowFieldIndices.length];
+ for (int idx = 0; idx < data.selectedRowFieldIndices.length; idx++) {
+ insertRowData[idx] = r[data.selectedRowFieldIndices[idx]];
+ }
+ }
+
+ return outputRowData;
+ }
+
+ /**
+ * Runs a desc table to get the fields, and field types from the database. Uses a desc table as
+ * opposed to the select * from table limit 0 that Hop normally uses to get the fields and types,
+ * due to the need to handle the Time type. The select * method through Hop does not give us the
+ * ability to differentiate time from timestamp.
+ *
+ * @throws HopException
+ */
+ private void getDbFields() throws HopException {
+ data.dbFields = new ArrayList<>();
+ String sql = "desc table ";
+
+ IRowMeta rowMeta = null;
+
+
+ if (!StringUtils.isEmpty(resolve(meta.getSchemaName()))) {
+ rowMeta = data.db.getTableFields(meta.getSchemaName() + "." + meta.getTableName());
+ }else {
+ rowMeta = data.db.getTableFields(meta.getTableName());
+ }
+ try {
+ if(rowMeta.size() == 0) {
+ throw new HopException("No fields found in table");
+ }
+
+ for(int i=0; i < rowMeta.size(); i++) {
+ String field[] = new String[2];
+ field[0] = rowMeta.getValueMeta(i).getName().toUpperCase();
+ field[1] = rowMeta.getValueMeta(i).getTypeDesc().toUpperCase();
+ data.dbFields.add(field);
+ }
+ } catch (Exception ex) {
+ throw new HopException("Error getting database fields", ex);
+ }
+ }
+
+
+ protected void verifyDatabaseConnection() throws HopException {
+ // Confirming Database Connection is defined.
+ if (meta.getConnection() == null) {
+ throw new HopException(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Error.NoConnection"));
+ }
+ }
+
+ /**
+ * Initialize the binary values of delimiters, enclosures, and escape characters
+ *
+ * @throws HopException
+ */
+ private void initBinaryDataFields() throws HopException {
+ try {
+ data.binarySeparator = new byte[] {};
+ data.binaryEnclosure = new byte[] {};
+ data.binaryNewline = new byte[] {};
+ data.escapeCharacters = new byte[] {};
+
+ data.binarySeparator =
+ resolve(RedshiftBulkLoaderMeta.CSV_DELIMITER).getBytes(StandardCharsets.UTF_8);
+ data.binaryEnclosure =
+ resolve(RedshiftBulkLoaderMeta.ENCLOSURE).getBytes(StandardCharsets.UTF_8);
+ data.binaryNewline =
+ RedshiftBulkLoaderMeta.CSV_RECORD_DELIMITER.getBytes(StandardCharsets.UTF_8);
+ data.escapeCharacters =
+ RedshiftBulkLoaderMeta.CSV_ESCAPE_CHAR.getBytes(StandardCharsets.UTF_8);
+
+ data.binaryNullValue = "".getBytes(StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new HopException("Unexpected error while encoding binary fields", e);
+ }
+ }
+
+ /**
+ * Writes an individual row of data to a temp file
+ *
+ * @param rowMeta The metadata about the row
+ * @param row The input row
+ * @throws HopTransformException
+ */
+ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformException {
+ try {
+
+ if(meta.isStreamToS3Csv() && !meta.isSpecifyFields()) {
+ /*
+ * Write all values in stream to text file.
+ */
+ for (int i = 0; i < rowMeta.size(); i++) {
+ if (i > 0 && data.binarySeparator.length > 0) {
+ data.writer.write(data.binarySeparator);
+ }
+ IValueMeta v = rowMeta.getValueMeta(i);
+ Object valueData = row[i];
+
+ // no special null value default was specified since no fields are specified at all
+ // As such, we pass null
+ //
+ writeField(v, valueData, null);
+ }
+ data.writer.write(data.binaryNewline);
+ } else if (meta.isStreamToS3Csv()) {
+ /*
+ * Only write the fields specified!
+ */
+ for (int i = 0; i < data.dbFields.size(); i++) {
+ if (data.dbFields.get(i) != null) {
+ if (i > 0 && data.binarySeparator.length > 0) {
+ data.writer.write(data.binarySeparator);
+ }
+
+ String[] field = data.dbFields.get(i);
+ IValueMeta v;
+
+ if (field[1].toUpperCase().startsWith("TIMESTAMP")) {
+ v = new ValueMetaDate();
+ v.setConversionMask("yyyy-MM-dd HH:mm:ss.SSS");
+ } else if (field[1].toUpperCase().startsWith("DATE")) {
+ v = new ValueMetaDate();
+ v.setConversionMask("yyyy-MM-dd");
+ } else if (field[1].toUpperCase().startsWith("TIME")) {
+ v = new ValueMetaDate();
+ v.setConversionMask("HH:mm:ss.SSS");
+ } else if (field[1].toUpperCase().startsWith("NUMBER")
+ || field[1].toUpperCase().startsWith("FLOAT")) {
+ v = new ValueMetaBigNumber();
+ } else {
+ v = new ValueMetaString();
+ v.setLength(-1);
+ }
+
+ int fieldIndex = -1;
+ if (data.fieldnrs.get(data.dbFields.get(i)[0]) != null) {
+ fieldIndex = data.fieldnrs.get(data.dbFields.get(i)[0]);
+ }
+ Object valueData = null;
+ if (fieldIndex >= 0) {
+ valueData = v.convertData(rowMeta.getValueMeta(fieldIndex), row[fieldIndex]);
+ } else if (meta.isErrorColumnMismatch()) {
+ throw new HopException(
+ "Error column mismatch: Database field "
+ + data.dbFields.get(i)[0]
+ + " not found on stream.");
+ }
+ writeField(v, valueData, data.binaryNullValue);
+ }
+ }
+ data.writer.write(data.binaryNewline);
+ } else {
+ int jsonField = data.fieldnrs.get("json");
+ data.writer.write(
+ data.outputRowMeta.getString(row, jsonField).getBytes(StandardCharsets.UTF_8));
+ data.writer.write(data.binaryNewline);
+ }
+ } catch (Exception e) {
+ throw new HopTransformException("Error writing line", e);
+ }
+ }
+
+ /**
+ * Writes an individual field to the temp file.
+ *
+ * @param v The metadata about the column
+ * @param valueData The data for the column
+ * @param nullString The bytes to put in the temp file if the value is null
+ * @throws HopTransformException
+ */
+ private void writeField(IValueMeta v, Object valueData, byte[] nullString)
+ throws HopTransformException {
+ try {
+ byte[] str;
+
+ // First check whether or not we have a null string set
+ // These values should be set when a null value passes
+ //
+ if (nullString != null && v.isNull(valueData)) {
+ str = nullString;
+ } else {
+ str = formatField(v, valueData);
+ }
+
+ if (str != null && str.length > 0) {
+ List enclosures = null;
+ boolean writeEnclosures = false;
+
+ if (v.isString()) {
+ if (containsSeparatorOrEnclosure(
+ str, data.binarySeparator, data.binaryEnclosure, data.escapeCharacters)) {
+ writeEnclosures = true;
+ }
+ }
+
+ if (writeEnclosures) {
+ data.writer.write(data.binaryEnclosure);
+ enclosures = getEnclosurePositions(str);
+ }
+
+ if (enclosures == null) {
+ data.writer.write(str);
+ } else {
+ // Skip the enclosures, escape them instead...
+ int from = 0;
+ for (Integer enclosure : enclosures) {
+ // Minus one to write the escape before the enclosure
+ int position = enclosure;
+ data.writer.write(str, from, position - from);
+ data.writer.write(data.escapeCharacters); // write enclosure a second time
+ from = position;
+ }
+ if (from < str.length) {
+ data.writer.write(str, from, str.length - from);
+ }
+ }
+
+ if (writeEnclosures) {
+ data.writer.write(data.binaryEnclosure);
+ }
+ }
+ } catch (Exception e) {
+ throw new HopTransformException("Error writing field content to file", e);
+ }
+ }
+
+ /**
+ * Takes an input field and converts it to bytes to be stored in the temp file.
+ *
+ * @param v The metadata about the column
+ * @param valueData The column data
+ * @return The bytes for the value
+ * @throws HopValueException
+ */
+ private byte[] formatField(IValueMeta v, Object valueData) throws HopValueException {
+ if (v.isString()) {
+ if (v.isStorageBinaryString()
+ && v.getTrimType() == IValueMeta.TRIM_TYPE_NONE
+ && v.getLength() < 0
+ && StringUtils.isEmpty(v.getStringEncoding())) {
+ return (byte[]) valueData;
+ } else {
+ String svalue = (valueData instanceof String) ? (String) valueData : v.getString(valueData);
+
+ // trim or cut to size if needed.
+ //
+ return convertStringToBinaryString(v, Const.trimToType(svalue, v.getTrimType()));
+ }
+ } else {
+ return v.getBinaryString(valueData);
+ }
+ }
+
+ /**
+ * Converts an input string to the bytes for the string
+ *
+ * @param v The metadata about the column
+ * @param string The column data
+ * @return The bytes for the value
+ * @throws HopValueException
+ */
+ private byte[] convertStringToBinaryString(IValueMeta v, String string) {
+ int length = v.getLength();
+
+ if (string == null) {
+ return new byte[] {};
+ }
+
+ if (length > -1 && length < string.length()) {
+ // we need to truncate
+ String tmp = string.substring(0, length);
+ return tmp.getBytes(StandardCharsets.UTF_8);
+
+ } else {
+ byte[] text;
+ text = string.getBytes(StandardCharsets.UTF_8);
+
+ if (length > string.length()) {
+ // we need to pad this
+
+ int size = 0;
+ byte[] filler;
+ filler = " ".getBytes(StandardCharsets.UTF_8);
+ size = text.length + filler.length * (length - string.length());
+
+ byte[] bytes = new byte[size];
+ System.arraycopy(text, 0, bytes, 0, text.length);
+ if (filler.length == 1) {
+ java.util.Arrays.fill(bytes, text.length, size, filler[0]);
+ } else {
+ int currIndex = text.length;
+ for (int i = 0; i < (length - string.length()); i++) {
+ for (byte aFiller : filler) {
+ bytes[currIndex++] = aFiller;
+ }
+ }
+ }
+ return bytes;
+ } else {
+ // do not need to pad or truncate
+ return text;
+ }
+ }
+ }
+
+ /**
+ * Check if a string contains separators or enclosures. Can be used to determine if the string
+ * needs enclosures around it or not.
+ *
+ * @param source The string to check
+ * @param separator The separator character(s)
+ * @param enclosure The enclosure character(s)
+ * @param escape The escape character(s)
+ * @return True if the string contains separators or enclosures
+ */
+ @SuppressWarnings("Duplicates")
+ private boolean containsSeparatorOrEnclosure(
+ byte[] source, byte[] separator, byte[] enclosure, byte[] escape) {
+ boolean result = false;
+
+ boolean enclosureExists = enclosure != null && enclosure.length > 0;
+ boolean separatorExists = separator != null && separator.length > 0;
+ boolean escapeExists = escape != null && escape.length > 0;
+
+ // Skip entire test if neither separator nor enclosure exist
+ if (separatorExists || enclosureExists || escapeExists) {
+
+ // Search for the first occurrence of the separator or enclosure
+ for (int index = 0; !result && index < source.length; index++) {
+ if (enclosureExists && source[index] == enclosure[0]) {
+
+ // Potential match found, make sure there are enough bytes to support a full match
+ if (index + enclosure.length <= source.length) {
+ // First byte of enclosure found
+ result = true; // Assume match
+ for (int i = 1; i < enclosure.length; i++) {
+ if (source[index + i] != enclosure[i]) {
+ // Enclosure match is proven false
+ result = false;
+ break;
+ }
+ }
+ }
+
+ } else if (separatorExists && source[index] == separator[0]) {
+
+ // Potential match found, make sure there are enough bytes to support a full match
+ if (index + separator.length <= source.length) {
+ // First byte of separator found
+ result = true; // Assume match
+ for (int i = 1; i < separator.length; i++) {
+ if (source[index + i] != separator[i]) {
+ // Separator match is proven false
+ result = false;
+ break;
+ }
+ }
+ }
+
+ } else if (escapeExists && source[index] == escape[0]) {
+
+ // Potential match found, make sure there are enough bytes to support a full match
+ if (index + escape.length <= source.length) {
+ // First byte of separator found
+ result = true; // Assume match
+ for (int i = 1; i < escape.length; i++) {
+ if (source[index + i] != escape[i]) {
+ // Separator match is proven false
+ result = false;
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Gets the positions of any double quotes or backslashes in the string
+ *
+ * @param str The string to check
+ * @return The positions within the string of double quotes and backslashes.
+ */
+ private List getEnclosurePositions(byte[] str) {
+ List positions = null;
+ // +1 because otherwise we will not find it at the end
+ for (int i = 0, len = str.length; i < len; i++) {
+ // verify if on position i there is an enclosure
+ //
+ boolean found = true;
+ for (int x = 0; found && x < data.binaryEnclosure.length; x++) {
+ if (str[i + x] != data.binaryEnclosure[x]) {
+ found = false;
+ }
+ }
+
+ if (!found) {
+ found = true;
+ for (int x = 0; found && x < data.escapeCharacters.length; x++) {
+ if (str[i + x] != data.escapeCharacters[x]) {
+ found = false;
+ }
+ }
+ }
+
+ if (found) {
+ if (positions == null) {
+ positions = new ArrayList<>();
+ }
+ positions.add(i);
+ }
+ }
+ return positions;
+ }
+
+ @Override
+ public void stopRunning() throws HopException {
+ setStopped(true);
+ if (data.workerThread != null) {
+ synchronized (data.workerThread) {
+ if (data.workerThread.isAlive() && !data.workerThread.isInterrupted()) {
+ try {
+ data.workerThread.interrupt();
+ data.workerThread.join();
+ } catch (InterruptedException e) { // Checkstyle:OFF:
+ }
+ // Checkstyle:ONN:
+ }
+ }
+ }
+
+ super.stopRunning();
+ }
+
+ void truncateTable() throws HopDatabaseException {
+ if (meta.isTruncateTable() && ((getCopy() == 0) || !Utils.isEmpty(getPartitionId()))) {
+ data.db.truncateTable(resolve(meta.getSchemaName()), resolve(meta.getTableName()));
+ }
+ }
+
+ @Override
+ public void dispose() {
+
+ setOutputDone();
+
+ try {
+ if (getErrors() > 0) {
+ data.db.rollback();
+ }
+ } catch (HopDatabaseException e) {
+ logError("Unexpected error rolling back the database connection.", e);
+ }
+
+ if (data.workerThread != null) {
+ try {
+ data.workerThread.join();
+ } catch (InterruptedException e) { // Checkstyle:OFF:
+ }
+ // Checkstyle:ONN:
+ }
+
+ if (data.db != null) {
+ data.db.disconnect();
+ }
+ super.dispose();
+ }
+}
diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderData.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderData.java
new file mode 100644
index 00000000000..f95ded3646c
--- /dev/null
+++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderData.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.redshift.bulkloader;
+
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.pipeline.transform.BaseTransformData;
+import org.apache.hop.pipeline.transform.ITransformData;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+public class RedshiftBulkLoaderData extends BaseTransformData implements ITransformData {
+ protected Database db;
+ protected DatabaseMeta databaseMeta;
+
+ protected int[] selectedRowFieldIndices;
+
+ protected IRowMeta outputRowMeta;
+ protected IRowMeta insertRowMeta;
+
+ // A list of table fields mapped to their data type. String[0] is the field name, String[1] is
+ // the Snowflake
+ // data type
+ public ArrayList dbFields;
+
+ // Maps table fields to the location of the corresponding field on the input stream.
+ public Map fieldnrs;
+
+ protected OutputStream writer;
+ // Byte arrays for constant characters put into output files.
+ public byte[] binarySeparator;
+ public byte[] binaryEnclosure;
+ public byte[] escapeCharacters;
+ public byte[] binaryNewline;
+ public byte[] binaryNullValue;
+
+ protected PipedInputStream pipedInputStream;
+
+ protected volatile Thread workerThread;
+
+ public RedshiftBulkLoaderData() {
+ super();
+
+ db = null;
+ }
+
+ public IRowMeta getInsertRowMeta() {
+ return insertRowMeta;
+ }
+
+ public void close() throws IOException {
+ }
+}
diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderDialog.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderDialog.java
new file mode 100644
index 00000000000..d667f1f9d98
--- /dev/null
+++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderDialog.java
@@ -0,0 +1,1245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.redshift.bulkloader;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.DbCache;
+import org.apache.hop.core.Props;
+import org.apache.hop.core.SourceToTargetMapping;
+import org.apache.hop.core.SqlStatement;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.exception.HopTransformException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.util.StringUtil;
+import org.apache.hop.core.util.Utils;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.BaseTransformMeta;
+import org.apache.hop.pipeline.transform.ITransformDialog;
+import org.apache.hop.pipeline.transform.ITransformMeta;
+import org.apache.hop.pipeline.transform.TransformMeta;
+import org.apache.hop.ui.core.PropsUi;
+import org.apache.hop.ui.core.database.dialog.DatabaseExplorerDialog;
+import org.apache.hop.ui.core.database.dialog.SqlEditor;
+import org.apache.hop.ui.core.dialog.BaseDialog;
+import org.apache.hop.ui.core.dialog.EnterMappingDialog;
+import org.apache.hop.ui.core.dialog.ErrorDialog;
+import org.apache.hop.ui.core.widget.ColumnInfo;
+import org.apache.hop.ui.core.widget.ComboVar;
+import org.apache.hop.ui.core.widget.MetaSelectionLine;
+import org.apache.hop.ui.core.widget.TableView;
+import org.apache.hop.ui.core.widget.TextVar;
+import org.apache.hop.ui.pipeline.transform.BaseTransformDialog;
+import org.eclipse.swt.SWT;
+import org.eclipse.swt.custom.CTabFolder;
+import org.eclipse.swt.custom.CTabItem;
+import org.eclipse.swt.events.FocusAdapter;
+import org.eclipse.swt.events.FocusEvent;
+import org.eclipse.swt.events.FocusListener;
+import org.eclipse.swt.events.ModifyListener;
+import org.eclipse.swt.events.SelectionAdapter;
+import org.eclipse.swt.events.SelectionEvent;
+import org.eclipse.swt.layout.FormAttachment;
+import org.eclipse.swt.layout.FormData;
+import org.eclipse.swt.layout.FormLayout;
+import org.eclipse.swt.widgets.Button;
+import org.eclipse.swt.widgets.Composite;
+import org.eclipse.swt.widgets.Control;
+import org.eclipse.swt.widgets.Event;
+import org.eclipse.swt.widgets.Label;
+import org.eclipse.swt.widgets.Listener;
+import org.eclipse.swt.widgets.MessageBox;
+import org.eclipse.swt.widgets.Shell;
+import org.eclipse.swt.widgets.TableItem;
+import org.eclipse.swt.widgets.Text;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RedshiftBulkLoaderDialog extends BaseTransformDialog implements ITransformDialog {
+
+ private static final Class> PKG =
+ RedshiftBulkLoaderMeta.class; // for i18n purposes, needed by Translator2!!
+
+ private MetaSelectionLine wConnection;
+
+ private Button wTruncate;
+
+ private Button wOnlyWhenHaveRows;
+
+ private TextVar wSchema;
+
+ private TextVar wTable;
+
+ private Button wStreamToS3Csv;
+
+ private ComboVar wLoadFromExistingFileFormat;
+
+ private TextVar wCopyFromFilename;
+
+ private Button wSpecifyFields;
+
+ private TableView wFields;
+
+ private Button wGetFields;
+
+ private Button wDoMapping;
+
+ private RedshiftBulkLoaderMeta input;
+
+ private Map inputFields;
+
+ private ColumnInfo[] ciFields;
+
+ private static final String AWS_CREDENTIALS = "Credentials";
+ private static final String AWS_IAM_ROLE = "IAM Role";
+ private String[] awsAuthOptions = new String[]{AWS_CREDENTIALS, AWS_IAM_ROLE};
+
+ private Label wlAwsAuthentication;
+ private ComboVar wAwsAuthentication;
+ private Label wlUseSystemVars;
+ private Button wUseSystemVars;
+ private Label wlAccessKeyId;
+ private TextVar wAccessKeyId;
+ private Label wlSecretAccessKey;
+ private TextVar wSecretAccessKey;
+ private Label wlAwsIamRole;
+ private TextVar wAwsIamRole;
+
+
+ /** List of ColumnInfo that should have the field names of the selected database table */
+ private List tableFieldColumns = new ArrayList<>();
+
+ /** Constructor. */
+ public RedshiftBulkLoaderDialog(
+ Shell parent, IVariables variables, Object in, PipelineMeta pipelineMeta, String sname) {
+ super(parent, variables, (BaseTransformMeta) in, pipelineMeta, sname);
+ input = (RedshiftBulkLoaderMeta) in;
+ inputFields = new HashMap<>();
+ }
+
+ /** Open the dialog. */
+ public String open() {
+ FormData fdDoMapping;
+ FormData fdGetFields;
+ Label wlFields;
+ FormData fdSpecifyFields;
+ Label wlSpecifyFields;
+ FormData fdbTable;
+ FormData fdlTable;
+ Button wbTable;
+ FormData fdSchema;
+ FormData fdlSchema;
+ Label wlSchema;
+ FormData fdMainComp;
+ CTabItem wFieldsTab;
+ CTabItem wMainTab;
+ FormData fdTabFolder;
+ CTabFolder wTabFolder;
+ Label wlTruncate;
+ Shell parent = getParent();
+
+ shell = new Shell(parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN);
+ PropsUi.setLook(shell);
+ setShellImage(shell, input);
+
+ ModifyListener lsMod = e -> input.setChanged();
+ FocusListener lsFocusLost =
+ new FocusAdapter() {
+ @Override
+ public void focusLost(FocusEvent arg0) {
+ setTableFieldCombo();
+ }
+ };
+ backupChanged = input.hasChanged();
+
+ int middle = props.getMiddlePct();
+ int margin = Const.MARGIN;
+
+ FormLayout formLayout = new FormLayout();
+ formLayout.marginWidth = Const.FORM_MARGIN;
+ formLayout.marginHeight = Const.FORM_MARGIN;
+
+ shell.setLayout(formLayout);
+ shell.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.DialogTitle"));
+
+ // TransformName line
+ wlTransformName = new Label(shell, SWT.RIGHT);
+ wlTransformName.setText(BaseMessages.getString("System.Label.TransformName"));
+ PropsUi.setLook(wlTransformName);
+ fdlTransformName = new FormData();
+ fdlTransformName.left = new FormAttachment(0, 0);
+ fdlTransformName.right = new FormAttachment(middle, -margin);
+ fdlTransformName.top = new FormAttachment(0, margin*2);
+ wlTransformName.setLayoutData(fdlTransformName);
+ wTransformName = new Text(shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
+ wTransformName.setText(transformName);
+ PropsUi.setLook(wTransformName);
+ wTransformName.addModifyListener(lsMod);
+ fdTransformName = new FormData();
+ fdTransformName.left = new FormAttachment(middle, 0);
+ fdTransformName.top = new FormAttachment(0, margin*2);
+ fdTransformName.right = new FormAttachment(100, 0);
+ wTransformName.setLayoutData(fdTransformName);
+
+ // Connection line
+ DatabaseMeta dbm = pipelineMeta.findDatabase(input.getConnection(), variables);
+ wConnection = addConnectionLine(shell, wTransformName, input.getDatabaseMeta(), null);
+ if (input.getDatabaseMeta() == null && pipelineMeta.nrDatabases() == 1) {
+ wConnection.select(0);
+ }
+ wConnection.addModifyListener(lsMod);
+ wConnection.addModifyListener(event -> setFlags());
+
+ // Schema line...
+ wlSchema = new Label(shell, SWT.RIGHT);
+ wlSchema.setText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.TargetSchema.Label")); // $NON-NLS-1$
+ PropsUi.setLook(wlSchema);
+ fdlSchema = new FormData();
+ fdlSchema.left = new FormAttachment(0, 0);
+ fdlSchema.right = new FormAttachment(middle, -margin);
+ fdlSchema.top = new FormAttachment(wConnection, margin * 2);
+ wlSchema.setLayoutData(fdlSchema);
+
+ wSchema = new TextVar(variables, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
+ PropsUi.setLook(wSchema);
+ wSchema.addModifyListener(lsMod);
+ wSchema.addFocusListener(lsFocusLost);
+ fdSchema = new FormData();
+ fdSchema.left = new FormAttachment(middle, 0);
+ fdSchema.top = new FormAttachment(wConnection, margin * 2);
+ fdSchema.right = new FormAttachment(100, 0);
+ wSchema.setLayoutData(fdSchema);
+
+ // Table line...
+ Label wlTable = new Label(shell, SWT.RIGHT);
+ wlTable.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.TargetTable.Label"));
+ PropsUi.setLook(wlTable);
+ fdlTable = new FormData();
+ fdlTable.left = new FormAttachment(0, 0);
+ fdlTable.right = new FormAttachment(middle, -margin);
+ fdlTable.top = new FormAttachment(wSchema, margin);
+ wlTable.setLayoutData(fdlTable);
+
+ wbTable = new Button(shell, SWT.PUSH | SWT.CENTER);
+ PropsUi.setLook(wbTable);
+ wbTable.setText(BaseMessages.getString("System.Button.Browse"));
+ fdbTable = new FormData();
+ fdbTable.right = new FormAttachment(100, 0);
+ fdbTable.top = new FormAttachment(wSchema, margin);
+ wbTable.setLayoutData(fdbTable);
+
+ wTable = new TextVar(variables, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
+ PropsUi.setLook(wTable);
+ wTable.addModifyListener(lsMod);
+ wTable.addFocusListener(lsFocusLost);
+ FormData fdTable = new FormData();
+ fdTable.top = new FormAttachment(wSchema, margin);
+ fdTable.left = new FormAttachment(middle, 0);
+ fdTable.right = new FormAttachment(wbTable, -margin);
+ wTable.setLayoutData(fdTable);
+ Control lastControl = wTable;
+
+ SelectionAdapter lsSelMod =
+ new SelectionAdapter() {
+ @Override
+ public void widgetSelected(SelectionEvent arg0) {
+ input.setChanged();
+ }
+ };
+
+ wlAwsAuthentication = new Label(shell, SWT.RIGHT);
+ wlAwsAuthentication.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.Authenticate.Options.Label"));
+ PropsUi.setLook(wlAwsAuthentication);
+ FormData fdlAwsAuthentication = new FormData();
+ fdlAwsAuthentication.top = new FormAttachment(lastControl, margin);
+ fdlAwsAuthentication.left = new FormAttachment(0, 0);
+ fdlAwsAuthentication.right = new FormAttachment(middle, -margin);
+ wlAwsAuthentication.setLayoutData(fdlAwsAuthentication);
+ wAwsAuthentication = new ComboVar(variables, shell, SWT.BORDER|SWT.READ_ONLY);
+ wAwsAuthentication.setItems(awsAuthOptions);
+ wAwsAuthentication.setText(awsAuthOptions[0]);
+ PropsUi.setLook(wAwsAuthentication);
+ FormData fdAwsAuthentication = new FormData();
+ fdAwsAuthentication.top = new FormAttachment(lastControl, margin);
+ fdAwsAuthentication.left = new FormAttachment(middle, 0);
+ fdAwsAuthentication.right = new FormAttachment(100, 0);
+ wAwsAuthentication.setLayoutData(fdAwsAuthentication);
+ lastControl = wlAwsAuthentication;
+
+ wlUseSystemVars = new Label(shell, SWT.RIGHT);
+ wlUseSystemVars.setText("Use AWS system variables");
+ wlUseSystemVars.setToolTipText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.Authenticate.UseSystemVars.Tooltip"));
+ PropsUi.setLook(wlUseSystemVars);
+ FormData fdlUseSystemVars = new FormData();
+ fdlUseSystemVars.top = new FormAttachment(lastControl, margin);
+ fdlUseSystemVars.left = new FormAttachment(0, 0);
+ fdlUseSystemVars.right = new FormAttachment(middle, -margin);
+ wlUseSystemVars.setLayoutData(fdlUseSystemVars);
+ wUseSystemVars = new Button(shell, SWT.CHECK);
+ wUseSystemVars.setSelection(true);
+ PropsUi.setLook(wUseSystemVars);
+ FormData fdUseSystemVars = new FormData();
+ fdUseSystemVars.top = new FormAttachment(lastControl, margin*3);
+ fdUseSystemVars.left = new FormAttachment(middle, 0);
+ fdUseSystemVars.right = new FormAttachment(100, 0);
+ wUseSystemVars.setLayoutData(fdUseSystemVars);
+ lastControl = wlUseSystemVars;
+
+ wUseSystemVars.addSelectionListener(new SelectionAdapter() {
+ @Override
+ public void widgetSelected(SelectionEvent e) {
+ toggleKeysSelection();
+ }
+ });
+
+ wlAccessKeyId = new Label(shell, SWT.RIGHT);
+ wlAccessKeyId.setText("AWS_ACCESS_KEY_ID");
+ PropsUi.setLook(wlAccessKeyId);
+ FormData fdlAccessKeyId = new FormData();
+ fdlAccessKeyId.top = new FormAttachment(lastControl, margin);
+ fdlAccessKeyId.left = new FormAttachment(0, 0);
+ fdlAccessKeyId.right = new FormAttachment(middle, -margin);
+ wlAccessKeyId.setLayoutData(fdlAccessKeyId);
+ wAccessKeyId = new TextVar(variables, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
+ PropsUi.setLook(wAccessKeyId);
+ FormData fdUseAccessKeyId = new FormData();
+ fdUseAccessKeyId.top = new FormAttachment(lastControl, margin);
+ fdUseAccessKeyId.left = new FormAttachment(middle, 0);
+ fdUseAccessKeyId.right = new FormAttachment(100, 0);
+ wAccessKeyId.setLayoutData(fdUseAccessKeyId);
+ lastControl = wAccessKeyId;
+
+ wlSecretAccessKey = new Label(shell, SWT.RIGHT);
+ wlSecretAccessKey.setText("AWS_SECRET_ACCESS_KEY");
+ PropsUi.setLook(wlSecretAccessKey);
+ FormData fdlSecretAccessKey = new FormData();
+ fdlSecretAccessKey.top = new FormAttachment(lastControl,margin);
+ fdlSecretAccessKey.left = new FormAttachment(0, 0);
+ fdlSecretAccessKey.right = new FormAttachment(middle, -margin);
+ wlSecretAccessKey.setLayoutData(fdlSecretAccessKey);
+ wSecretAccessKey = new TextVar(variables, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
+ PropsUi.setLook(wSecretAccessKey);
+ FormData fdSecretAccessKey = new FormData();
+ fdSecretAccessKey.top = new FormAttachment(lastControl, margin);
+ fdSecretAccessKey.left = new FormAttachment(middle,0);
+ fdSecretAccessKey.right = new FormAttachment(100, 0);
+ wSecretAccessKey.setLayoutData(fdSecretAccessKey);
+ lastControl = wSecretAccessKey;
+
+ // Start with system variables enabled and AWS keys disabled by default
+ wlAccessKeyId.setEnabled(false);
+ wAccessKeyId.setEnabled(false);
+ wlSecretAccessKey.setEnabled(false);
+ wSecretAccessKey.setEnabled(false);
+
+
+ wlAwsIamRole = new Label(shell, SWT.RIGHT);
+ wlAwsIamRole.setText("IAM Role");
+ PropsUi.setLook(wlAwsIamRole);
+ FormData fdlIamRole = new FormData();
+ fdlIamRole.top = new FormAttachment(lastControl, margin);
+ fdlIamRole.left = new FormAttachment(0, 0);
+ fdlIamRole.right = new FormAttachment(middle, -margin);
+ wlAwsIamRole.setLayoutData(fdlIamRole);
+ wAwsIamRole = new TextVar(variables, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
+ wAwsIamRole.getTextWidget().setMessage("arn:aws:iam:::role/");
+ PropsUi.setLook(wAwsIamRole);
+ FormData fdIamRole = new FormData();
+ fdIamRole.top = new FormAttachment(lastControl, margin);
+ fdIamRole.left = new FormAttachment(middle, 0);
+ fdIamRole.right = new FormAttachment(100, 0);
+ wAwsIamRole.setLayoutData(fdIamRole);
+ lastControl = wlAwsIamRole;
+ // Credentials are enabled by default.
+ wlAwsIamRole.setEnabled(false);
+ wAwsIamRole.setEnabled(false);
+
+ wAwsAuthentication.addSelectionListener(new SelectionAdapter() {
+ @Override
+ public void widgetSelected(SelectionEvent e) {
+ toggleAuthSelection();
+ }
+ });
+
+ // Truncate table
+ wlTruncate = new Label(shell, SWT.RIGHT);
+ wlTruncate.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.TruncateTable.Label"));
+ PropsUi.setLook(wlTruncate);
+ FormData fdlTruncate = new FormData();
+ fdlTruncate.top = new FormAttachment(lastControl, margin);
+ fdlTruncate.left = new FormAttachment(0, 0);
+ fdlTruncate.right = new FormAttachment(middle, -margin);
+ wlTruncate.setLayoutData(fdlTruncate);
+ wTruncate = new Button(shell, SWT.CHECK);
+ PropsUi.setLook(wTruncate);
+ FormData fdTruncate = new FormData();
+ fdTruncate.top = new FormAttachment(lastControl, margin*3);
+ fdTruncate.left = new FormAttachment(middle, 0);
+ fdTruncate.right = new FormAttachment(100, 0);
+ wTruncate.setLayoutData(fdTruncate);
+ SelectionAdapter lsTruncMod =
+ new SelectionAdapter() {
+ @Override
+ public void widgetSelected(SelectionEvent arg0) {
+ input.setChanged();
+ }
+ };
+ wTruncate.addSelectionListener(lsTruncMod);
+ wTruncate.addSelectionListener(
+ new SelectionAdapter() {
+ @Override
+ public void widgetSelected(SelectionEvent e) {
+ setFlags();
+ }
+ });
+ lastControl = wlTruncate;
+
+ // Truncate only when have rows
+ Label wlOnlyWhenHaveRows = new Label(shell, SWT.RIGHT);
+ wlOnlyWhenHaveRows.setText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.OnlyWhenHaveRows.Label"));
+ PropsUi.setLook(wlOnlyWhenHaveRows);
+ FormData fdlOnlyWhenHaveRows = new FormData();
+ fdlOnlyWhenHaveRows.top = new FormAttachment(lastControl, margin);
+ fdlOnlyWhenHaveRows.left = new FormAttachment(0, 0);
+ fdlOnlyWhenHaveRows.right = new FormAttachment(middle, -margin);
+ wlOnlyWhenHaveRows.setLayoutData(fdlOnlyWhenHaveRows);
+ wOnlyWhenHaveRows = new Button(shell, SWT.CHECK);
+ wOnlyWhenHaveRows.setToolTipText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.OnlyWhenHaveRows.Tooltip"));
+ PropsUi.setLook(wOnlyWhenHaveRows);
+ FormData fdTruncateWhenHaveRows = new FormData();
+ fdTruncateWhenHaveRows.top = new FormAttachment(lastControl, margin*3);
+ fdTruncateWhenHaveRows.left = new FormAttachment(middle, 0);
+ fdTruncateWhenHaveRows.right = new FormAttachment(100, 0);
+ wOnlyWhenHaveRows.setLayoutData(fdTruncateWhenHaveRows);
+ wOnlyWhenHaveRows.addSelectionListener(lsSelMod);
+ lastControl = wlOnlyWhenHaveRows;
+
+ // Specify fields
+ wlSpecifyFields = new Label(shell, SWT.RIGHT);
+ wlSpecifyFields.setText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.SpecifyFields.Label"));
+ PropsUi.setLook(wlSpecifyFields);
+ FormData fdlSpecifyFields = new FormData();
+ fdlSpecifyFields.top = new FormAttachment(lastControl, margin);
+ fdlSpecifyFields.left = new FormAttachment(0, 0);
+ fdlSpecifyFields.right = new FormAttachment(middle, -margin);
+ wlSpecifyFields.setLayoutData(fdlSpecifyFields);
+ wSpecifyFields = new Button(shell, SWT.CHECK);
+ PropsUi.setLook(wSpecifyFields);
+ fdSpecifyFields = new FormData();
+ fdSpecifyFields.top = new FormAttachment(lastControl, margin*3);
+ fdSpecifyFields.left = new FormAttachment(middle, 0);
+ fdSpecifyFields.right = new FormAttachment(100, 0);
+ wSpecifyFields.setLayoutData(fdSpecifyFields);
+ wSpecifyFields.addSelectionListener(lsSelMod);
+ lastControl = wlSpecifyFields;
+
+ // If the flag is off, gray out the fields tab e.g.
+ wSpecifyFields.addSelectionListener(
+ new SelectionAdapter() {
+ @Override
+ public void widgetSelected(SelectionEvent arg0) {
+ setFlags();
+ }
+ });
+
+ wTabFolder = new CTabFolder(shell, SWT.BORDER);
+ PropsUi.setLook(wTabFolder, Props.WIDGET_STYLE_TAB);
+
+ // ////////////////////////
+ // START OF KEY TAB ///
+ // /
+ wMainTab = new CTabItem(wTabFolder, SWT.NONE);
+ wMainTab.setText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.MainTab.CTabItem")); // $NON-NLS-1$
+
+ FormLayout mainLayout = new FormLayout();
+ mainLayout.marginWidth = 3;
+ mainLayout.marginHeight = 3;
+
+ Composite wMainComp = new Composite(wTabFolder, SWT.NONE);
+ PropsUi.setLook(wMainComp);
+ wMainComp.setLayout(mainLayout);
+
+ fdMainComp = new FormData();
+ fdMainComp.left = new FormAttachment(0, 0);
+ fdMainComp.top = new FormAttachment(0, 0);
+ fdMainComp.right = new FormAttachment(100, 0);
+ fdMainComp.bottom = new FormAttachment(100, 0);
+ wMainComp.setLayoutData(fdMainComp);
+
+ Label wlStreamToS3Csv = new Label(wMainComp, SWT.RIGHT);
+ wlStreamToS3Csv.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.StreamCsvToS3.Label"));
+ wlStreamToS3Csv.setToolTipText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.StreamCsvToS3.ToolTip"));
+ PropsUi.setLook(wlStreamToS3Csv);
+ FormData fdlStreamToS3Csv = new FormData();
+ fdlStreamToS3Csv.top = new FormAttachment(0, margin*2);
+ fdlStreamToS3Csv.left = new FormAttachment(0, 0);
+ fdlStreamToS3Csv.right = new FormAttachment(middle, -margin);
+ wlStreamToS3Csv.setLayoutData(fdlStreamToS3Csv);
+
+ wStreamToS3Csv = new Button(wMainComp, SWT.CHECK);
+ wStreamToS3Csv.setToolTipText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.StreamCsvToS3.ToolTip"));
+ PropsUi.setLook(wStreamToS3Csv);
+ FormData fdStreamToS3Csv = new FormData();
+ fdStreamToS3Csv.top = new FormAttachment(0, margin*4);
+ fdStreamToS3Csv.left = new FormAttachment(middle, 0);
+ fdStreamToS3Csv.right = new FormAttachment(100, 0);
+ wStreamToS3Csv.setLayoutData(fdStreamToS3Csv);
+ wStreamToS3Csv.setSelection(true);
+ lastControl = wlStreamToS3Csv;
+
+ wStreamToS3Csv.addSelectionListener(
+ new SelectionAdapter() {
+ @Override
+ public void widgetSelected(SelectionEvent e) {
+ if(wStreamToS3Csv.getSelection()){
+ wLoadFromExistingFileFormat.setText("");
+ }
+ wLoadFromExistingFileFormat.setEnabled(!wStreamToS3Csv.getSelection());
+ }
+ }
+ );
+
+ Label wlLoadFromExistingFile = new Label(wMainComp, SWT.RIGHT);
+ wlLoadFromExistingFile.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.LoadFromExistingFile.Label"));
+ wlLoadFromExistingFile.setToolTipText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.LoadFromExistingFile.Tooltip"));
+ PropsUi.setLook(wlLoadFromExistingFile);
+ FormData fdlLoadFromExistingFile = new FormData();
+ fdlLoadFromExistingFile.top = new FormAttachment(lastControl, margin*2);
+ fdlLoadFromExistingFile.left = new FormAttachment(0, 0);
+ fdlLoadFromExistingFile.right = new FormAttachment(middle, -margin);
+ wlLoadFromExistingFile.setLayoutData(fdlLoadFromExistingFile);
+
+ wLoadFromExistingFileFormat = new ComboVar(variables, wMainComp, SWT.SINGLE | SWT.READ_ONLY | SWT.BORDER);
+ FormData fdLoadFromExistingFile = new FormData();
+ fdLoadFromExistingFile.top = new FormAttachment(lastControl, margin*2);
+ fdLoadFromExistingFile.left = new FormAttachment(middle, 0);
+ fdLoadFromExistingFile.right = new FormAttachment(100, 0);
+ wLoadFromExistingFileFormat.setLayoutData(fdLoadFromExistingFile);
+ String[] fileFormats = {"CSV", "Parquet"};
+ wLoadFromExistingFileFormat.setItems(fileFormats);
+ lastControl = wLoadFromExistingFileFormat;
+
+ Label wlCopyFromFile = new Label(wMainComp, SWT.RIGHT);
+ wlCopyFromFile.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.CopyFromFile.Label"));
+ PropsUi.setLook(wlCopyFromFile);
+ FormData fdlCopyFromFile = new FormData();
+ fdlCopyFromFile.top = new FormAttachment(lastControl, margin*2);
+ fdlCopyFromFile.left = new FormAttachment(0, 0);
+ fdlCopyFromFile.right = new FormAttachment(middle, -margin);
+ wlCopyFromFile.setLayoutData(fdlCopyFromFile);
+
+ Button wbCopyFromFile = new Button(wMainComp, SWT.PUSH | SWT.CENTER);
+ PropsUi.setLook(wbCopyFromFile);
+ wbCopyFromFile.setText(BaseMessages.getString("System.Button.Browse"));
+ FormData fdbCopyFromFile = new FormData();
+ fdbCopyFromFile.top = new FormAttachment(lastControl, margin*2);
+ fdbCopyFromFile.right = new FormAttachment(100, 0);
+ wbCopyFromFile.setLayoutData(fdbCopyFromFile);
+
+ wCopyFromFilename = new TextVar(variables, wMainComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
+ PropsUi.setLook(wCopyFromFilename);
+ wCopyFromFilename.addModifyListener(lsMod);
+ wCopyFromFilename.addFocusListener(lsFocusLost);
+ FormData fdCopyFromFile = new FormData();
+ fdCopyFromFile.top = new FormAttachment(lastControl, margin*2);
+ fdCopyFromFile.left = new FormAttachment(middle, 0);
+ fdCopyFromFile.right = new FormAttachment(wbCopyFromFile, -margin);
+ wCopyFromFilename.setLayoutData(fdCopyFromFile);
+ lastControl = wCopyFromFilename;
+
+ wMainComp.layout();
+ wMainTab.setControl(wMainComp);
+
+ //
+ // Fields tab...
+ //
+ wFieldsTab = new CTabItem(wTabFolder, SWT.NONE);
+ wFieldsTab.setText(
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.FieldsTab.CTabItem.Title")); // $NON-NLS-1$
+
+ Composite wFieldsComp = new Composite(wTabFolder, SWT.NONE);
+ PropsUi.setLook(wFieldsComp);
+
+ FormLayout fieldsCompLayout = new FormLayout();
+ fieldsCompLayout.marginWidth = Const.FORM_MARGIN;
+ fieldsCompLayout.marginHeight = Const.FORM_MARGIN;
+ wFieldsComp.setLayout(fieldsCompLayout);
+
+ // The fields table
+ wlFields = new Label(wFieldsComp, SWT.NONE);
+ wlFields.setText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.InsertFields.Label")); // $NON-NLS-1$
+ PropsUi.setLook(wlFields);
+ FormData fdlUpIns = new FormData();
+ fdlUpIns.left = new FormAttachment(0, 0);
+ fdlUpIns.top = new FormAttachment(0, margin);
+ wlFields.setLayoutData(fdlUpIns);
+
+ int tableCols = 2;
+ int upInsRows =
+ (input.getFields() != null && !input.getFields().equals(Collections.emptyList())
+ ? input.getFields().size()
+ : 1);
+
+ ciFields = new ColumnInfo[tableCols];
+ ciFields[0] =
+ new ColumnInfo(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.ColumnInfo.TableField"),
+ ColumnInfo.COLUMN_TYPE_CCOMBO,
+ new String[] {""},
+ false); //$NON-NLS-1$
+ ciFields[1] =
+ new ColumnInfo(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.ColumnInfo.StreamField"),
+ ColumnInfo.COLUMN_TYPE_CCOMBO,
+ new String[] {""},
+ false); //$NON-NLS-1$
+ tableFieldColumns.add(ciFields[0]);
+ wFields =
+ new TableView(
+ variables,
+ wFieldsComp,
+ SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI | SWT.V_SCROLL | SWT.H_SCROLL,
+ ciFields,
+ upInsRows,
+ lsMod,
+ props);
+
+ wGetFields = new Button(wFieldsComp, SWT.PUSH);
+ wGetFields.setText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.GetFields.Button")); // $NON-NLS-1$
+ fdGetFields = new FormData();
+ fdGetFields.top = new FormAttachment(wlFields, margin);
+ fdGetFields.right = new FormAttachment(100, 0);
+ wGetFields.setLayoutData(fdGetFields);
+
+ wDoMapping = new Button(wFieldsComp, SWT.PUSH);
+ wDoMapping.setText(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.DoMapping.Button")); // $NON-NLS-1$
+ fdDoMapping = new FormData();
+ fdDoMapping.top = new FormAttachment(wGetFields, margin);
+ fdDoMapping.right = new FormAttachment(100, 0);
+ wDoMapping.setLayoutData(fdDoMapping);
+
+ wDoMapping.addListener(
+ SWT.Selection,
+ new Listener() {
+ public void handleEvent(Event arg0) {
+ generateMappings();
+ }
+ });
+
+ FormData fdFields = new FormData();
+ fdFields.left = new FormAttachment(0, 0);
+ fdFields.top = new FormAttachment(wlFields, margin);
+ fdFields.right = new FormAttachment(wDoMapping, -margin);
+ fdFields.bottom = new FormAttachment(100, -2 * margin);
+ wFields.setLayoutData(fdFields);
+
+ FormData fdFieldsComp = new FormData();
+ fdFieldsComp.left = new FormAttachment(0, 0);
+ fdFieldsComp.top = new FormAttachment(0, 0);
+ fdFieldsComp.right = new FormAttachment(100, 0);
+ fdFieldsComp.bottom = new FormAttachment(100, 0);
+ wFieldsComp.setLayoutData(fdFieldsComp);
+
+ wFieldsComp.layout();
+ wFieldsTab.setControl(wFieldsComp);
+
+ //
+ // Search the fields in the background
+ //
+
+ final Runnable runnable =
+ new Runnable() {
+ public void run() {
+ TransformMeta transformMeta = pipelineMeta.findTransform(transformName);
+ if (transformMeta != null) {
+ try {
+ IRowMeta row = pipelineMeta.getPrevTransformFields(variables, transformMeta);
+
+ // Remember these fields...
+ for (int i = 0; i < row.size(); i++) {
+ inputFields.put(row.getValueMeta(i).getName(), Integer.valueOf(i));
+ }
+
+ setComboBoxes();
+ } catch (HopException e) {
+ log.logError(
+ toString(), BaseMessages.getString("System.Dialog.GetFieldsFailed.Message"));
+ }
+ }
+ }
+ };
+ new Thread(runnable).start();
+
+ // Some buttons
+ wOk = new Button(shell, SWT.PUSH);
+ wOk.setText(BaseMessages.getString("System.Button.OK"));
+ wCreate = new Button(shell, SWT.PUSH);
+ wCreate.setText(BaseMessages.getString("System.Button.SQL"));
+ wCancel = new Button(shell, SWT.PUSH);
+ wCancel.setText(BaseMessages.getString("System.Button.Cancel"));
+
+ setButtonPositions(new Button[] {wOk, wCancel, wCreate}, margin, null);
+
+ fdTabFolder = new FormData();
+ fdTabFolder.left = new FormAttachment(0, 0);
+ fdTabFolder.top = new FormAttachment(wlSpecifyFields, margin);
+ fdTabFolder.right = new FormAttachment(100, 0);
+ fdTabFolder.bottom = new FormAttachment(wOk, -2 * margin);
+ wTabFolder.setLayoutData(fdTabFolder);
+ wTabFolder.setSelection(0);
+
+ // Add listeners
+ wOk.addListener(SWT.Selection, c -> ok());
+ wCancel.addListener(SWT.Selection, c -> cancel());
+ wCreate.addListener(SWT.Selection, c -> sql());
+ wGetFields.addListener(SWT.Selection, c -> get());
+
+ // Set the shell size, based upon previous time...
+ setSize();
+
+ getData();
+ setTableFieldCombo();
+ input.setChanged(backupChanged);
+
+ BaseDialog.defaultShellHandling(shell, c -> ok(), c -> cancel());
+ return transformName;
+ }
+
+ /**
+ * Reads in the fields from the previous transforms and from the ONE next transform and opens an
+ * EnterMappingDialog with this information. After the user did the mapping, those information is
+ * put into the Select/Rename table.
+ */
+ private void generateMappings() {
+
+ // Determine the source and target fields...
+ //
+ IRowMeta sourceFields;
+ IRowMeta targetFields;
+
+ try {
+ sourceFields = pipelineMeta.getPrevTransformFields(variables, transformMeta);
+ } catch (HopException e) {
+ new ErrorDialog(
+ shell,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.DoMapping.UnableToFindSourceFields.Title"),
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.DoMapping.UnableToFindSourceFields.Message"),
+ e);
+ return;
+ }
+
+ // refresh data
+ input.setTablename(variables.resolve(wTable.getText()));
+ ITransformMeta transformMetaInterface = transformMeta.getTransform();
+ try {
+ targetFields = transformMetaInterface.getRequiredFields(variables);
+ } catch (HopException e) {
+ new ErrorDialog(
+ shell,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.DoMapping.UnableToFindTargetFields.Title"),
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.DoMapping.UnableToFindTargetFields.Message"),
+ e);
+ return;
+ }
+
+ String[] inputNames = new String[sourceFields.size()];
+ for (int i = 0; i < sourceFields.size(); i++) {
+ IValueMeta value = sourceFields.getValueMeta(i);
+ inputNames[i] = value.getName();
+ }
+
+ // Create the existing mapping list...
+ //
+ List mappings = new ArrayList<>();
+ StringBuffer missingSourceFields = new StringBuffer();
+ StringBuffer missingTargetFields = new StringBuffer();
+
+ int nrFields = wFields.nrNonEmpty();
+ for (int i = 0; i < nrFields; i++) {
+ TableItem item = wFields.getNonEmpty(i);
+ String source = item.getText(2);
+ String target = item.getText(1);
+
+ int sourceIndex = sourceFields.indexOfValue(source);
+ if (sourceIndex < 0) {
+ missingSourceFields.append(Const.CR + " " + source + " --> " + target);
+ }
+ int targetIndex = targetFields.indexOfValue(target);
+ if (targetIndex < 0) {
+ missingTargetFields.append(Const.CR + " " + source + " --> " + target);
+ }
+ if (sourceIndex < 0 || targetIndex < 0) {
+ continue;
+ }
+
+ SourceToTargetMapping mapping = new SourceToTargetMapping(sourceIndex, targetIndex);
+ mappings.add(mapping);
+ }
+
+ // show a confirm dialog if some missing field was found
+ //
+ if (missingSourceFields.length() > 0 || missingTargetFields.length() > 0) {
+
+ String message = "";
+ if (missingSourceFields.length() > 0) {
+ message +=
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderDialog.DoMapping.SomeSourceFieldsNotFound",
+ missingSourceFields.toString())
+ + Const.CR;
+ }
+ if (missingTargetFields.length() > 0) {
+ message +=
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderDialog.DoMapping.SomeTargetFieldsNotFound",
+ missingSourceFields.toString())
+ + Const.CR;
+ }
+ message += Const.CR;
+ message +=
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.DoMapping.SomeFieldsNotFoundContinue")
+ + Const.CR;
+ int answer =
+ BaseDialog.openMessageBox(
+ shell,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.DoMapping.SomeFieldsNotFoundTitle"),
+ message,
+ SWT.ICON_QUESTION | SWT.YES | SWT.NO);
+ boolean goOn = (answer & SWT.YES) != 0;
+ if (!goOn) {
+ return;
+ }
+ }
+ EnterMappingDialog d =
+ new EnterMappingDialog(
+ RedshiftBulkLoaderDialog.this.shell,
+ sourceFields.getFieldNames(),
+ targetFields.getFieldNames(),
+ mappings);
+ mappings = d.open();
+
+ // mappings == null if the user pressed cancel
+ //
+ if (mappings != null) {
+ // Clear and re-populate!
+ //
+ wFields.table.removeAll();
+ wFields.table.setItemCount(mappings.size());
+ for (int i = 0; i < mappings.size(); i++) {
+ SourceToTargetMapping mapping = (SourceToTargetMapping) mappings.get(i);
+ TableItem item = wFields.table.getItem(i);
+ item.setText(2, sourceFields.getValueMeta(mapping.getSourcePosition()).getName());
+ item.setText(1, targetFields.getValueMeta(mapping.getTargetPosition()).getName());
+ }
+ wFields.setRowNums();
+ wFields.optWidth(true);
+ }
+ }
+
+ private void setTableFieldCombo() {
+ Runnable fieldLoader =
+ () -> {
+ // clear
+ for (int i = 0; i < tableFieldColumns.size(); i++) {
+ ColumnInfo colInfo = (ColumnInfo) tableFieldColumns.get(i);
+ colInfo.setComboValues(new String[] {});
+ }
+ if (!StringUtil.isEmpty(wTable.getText())) {
+ DatabaseMeta databaseMeta = pipelineMeta.findDatabase(wConnection.getText(), variables);
+ if (databaseMeta != null) {
+ try (Database db = new Database(loggingObject, variables, databaseMeta)) {
+ db.connect();
+
+ String schemaTable =
+ databaseMeta.getQuotedSchemaTableCombination(
+ variables,
+ variables.resolve(wSchema.getText()),
+ variables.resolve(wTable.getText()));
+ IRowMeta r = db.getTableFields(schemaTable);
+ if (null != r) {
+ String[] fieldNames = r.getFieldNames();
+ if (null != fieldNames) {
+ for (int i = 0; i < tableFieldColumns.size(); i++) {
+ ColumnInfo colInfo = (ColumnInfo) tableFieldColumns.get(i);
+ colInfo.setComboValues(fieldNames);
+ }
+ }
+ }
+ } catch (Exception e) {
+ for (int i = 0; i < tableFieldColumns.size(); i++) {
+ ColumnInfo colInfo = (ColumnInfo) tableFieldColumns.get(i);
+ colInfo.setComboValues(new String[] {});
+ }
+ // ignore any errors here. drop downs will not be
+ // filled, but no problem for the user
+ }
+ }
+ }
+ };
+ shell.getDisplay().asyncExec(fieldLoader);
+ }
+
+ protected void setComboBoxes() {
+ // Something was changed in the row.
+ //
+ final Map fields = new HashMap<>();
+
+ // Add the currentMeta fields...
+ fields.putAll(inputFields);
+
+ Set keySet = fields.keySet();
+ List entries = new ArrayList<>(keySet);
+
+ String[] fieldNames = (String[]) entries.toArray(new String[entries.size()]);
+
+ if (PropsUi.getInstance().isSortFieldByName()) {
+ Const.sortStrings(fieldNames);
+ }
+ ciFields[1].setComboValues(fieldNames);
+ }
+
+ public void setFlags() {
+ boolean specifyFields = wSpecifyFields.getSelection();
+ wFields.setEnabled(specifyFields);
+ wGetFields.setEnabled(specifyFields);
+ wDoMapping.setEnabled(specifyFields);
+ }
+
+ /** Copy information from the meta-data input to the dialog fields. */
+ public void getData() {
+ if(!StringUtils.isEmpty(input.getConnection())) {
+ wConnection.setText(input.getConnection());
+ }
+ if(!StringUtils.isEmpty(input.getSchemaName())) {
+ wSchema.setText(input.getSchemaName());
+ }
+ if(!StringUtils.isEmpty(input.getTableName())) {
+ wTable.setText(input.getTableName());
+ }
+ if(input.isUseCredentials()){
+ wAwsAuthentication.setText(awsAuthOptions[0]);
+ wUseSystemVars.setSelection(input.isUseSystemEnvVars());
+ if(!input.isUseSystemEnvVars()){
+ if(!StringUtil.isEmpty(input.getAwsAccessKeyId())){
+ wAccessKeyId.setText(input.getAwsAccessKeyId());
+ }
+ if(!StringUtils.isEmpty(input.getAwsSecretAccessKey())){
+ wAccessKeyId.setText(input.getAwsSecretAccessKey());
+ }
+ }
+ }else if(input.isUseAwsIamRole()){
+ wAwsAuthentication.setText(awsAuthOptions[1]);
+ if(!StringUtils.isEmpty(input.getAwsIamRole())){
+ wAwsIamRole.setText(input.getAwsIamRole());
+ }
+ }
+
+ wStreamToS3Csv.setSelection(input.isStreamToS3Csv());
+ if(!StringUtils.isEmpty(input.getLoadFromExistingFileFormat())){
+ wLoadFromExistingFileFormat.setText(input.getLoadFromExistingFileFormat());
+ }
+ if(!StringUtils.isEmpty(input.getCopyFromFilename())){
+ wCopyFromFilename.setText(input.getCopyFromFilename());
+ }
+
+ wTruncate.setSelection(input.isTruncateTable());
+ wOnlyWhenHaveRows.setSelection(input.isOnlyWhenHaveRows());
+
+ wSpecifyFields.setSelection(input.specifyFields());
+
+ for (int i = 0; i < input.getFields().size(); i++) {
+ RedshiftBulkLoaderField vbf = input.getFields().get(i);
+ TableItem item = wFields.table.getItem(i);
+ if (vbf.getDatabaseField() != null) {
+ item.setText(1, vbf.getDatabaseField());
+ }
+ if (vbf.getStreamField() != null) {
+ item.setText(2, vbf.getStreamField());
+ }
+ }
+
+ setFlags();
+
+ wTransformName.selectAll();
+ }
+
+ private void cancel() {
+ transformName = null;
+ input.setChanged(backupChanged);
+ dispose();
+ }
+
+ private void getInfo(RedshiftBulkLoaderMeta info) {
+ if(!StringUtils.isEmpty(wConnection.getText())){
+ info.setConnection(wConnection.getText());
+ }
+ if(!StringUtils.isEmpty(wSchema.getText())){
+ info.setSchemaName(wSchema.getText());
+ }
+ if(!StringUtils.isEmpty(wTable.getText())){
+ info.setTablename(wTable.getText());
+ }
+ if(wAwsAuthentication.getText().equals(AWS_CREDENTIALS)){
+ info.setUseCredentials(true);
+ info.setUseAwsIamRole(false);
+ if(wUseSystemVars.getSelection()){
+ info.setUseSystemEnvVars(true);
+ }else{
+ info.setUseSystemEnvVars(false);
+ if(!StringUtils.isEmpty(wAccessKeyId.getText())){
+ info.setAwsAccessKeyId(wAccessKeyId.getText());
+ }
+ if(!StringUtil.isEmpty(wSecretAccessKey.getText())){
+ info.setAwsSecretAccessKey(wSecretAccessKey.getText());
+ }
+ }
+ }else if(wAwsAuthentication.getText().equals(AWS_IAM_ROLE)){
+ info.setUseCredentials(false);
+ info.setUseAwsIamRole(true);
+ if(!StringUtils.isEmpty(wAwsIamRole.getText())){
+ info.setAwsIamRole(wAwsIamRole.getText());
+ }
+ }
+ info.setTruncateTable(wTruncate.getSelection());
+ info.setOnlyWhenHaveRows(wOnlyWhenHaveRows.getSelection());
+ info.setStreamToS3Csv(wStreamToS3Csv.getSelection());
+ if(!StringUtils.isEmpty(wLoadFromExistingFileFormat.getText())){
+ info.setLoadFromExistingFileFormat(wLoadFromExistingFileFormat.getText());
+ }
+ if(!StringUtils.isEmpty(wCopyFromFilename.getText())){
+ info.setCopyFromFilename(wCopyFromFilename.getText());
+ }
+
+ info.setSpecifyFields(wSpecifyFields.getSelection());
+
+ int nrRows = wFields.nrNonEmpty();
+ info.getFields().clear();
+
+ for (int i = 0; i < nrRows; i++) {
+ TableItem item = wFields.getNonEmpty(i);
+ RedshiftBulkLoaderField vbf =
+ new RedshiftBulkLoaderField(
+ Const.NVL(item.getText(1), ""), Const.NVL(item.getText(2), ""));
+ info.getFields().add(vbf);
+ }
+ }
+
+ private void ok() {
+ if (StringUtil.isEmpty(wTransformName.getText())) {
+ return;
+ }
+
+ transformName = wTransformName.getText(); // return value
+
+ getInfo(input);
+
+ if (Utils.isEmpty(input.getConnection())) {
+ MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_ERROR);
+ mb.setMessage(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.ConnectionError.DialogMessage"));
+ mb.setText(BaseMessages.getString("System.Dialog.Error.Title"));
+ mb.open();
+ return;
+ }
+
+ dispose();
+ }
+
+ private void getTableName() {
+
+ String connectionName = wConnection.getText();
+ if (StringUtil.isEmpty(connectionName)) {
+ return;
+ }
+ DatabaseMeta databaseMeta = pipelineMeta.findDatabase(connectionName, variables);
+
+ if (databaseMeta != null) {
+ log.logDebug(
+ toString(),
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderDialog.Log.LookingAtConnection", databaseMeta.toString()));
+
+ DatabaseExplorerDialog std =
+ new DatabaseExplorerDialog(
+ shell, SWT.NONE, variables, databaseMeta, pipelineMeta.getDatabases());
+ std.setSelectedSchemaAndTable(wSchema.getText(), wTable.getText());
+ if (std.open()) {
+ wSchema.setText(Const.NVL(std.getSchemaName(), ""));
+ wTable.setText(Const.NVL(std.getTableName(), ""));
+ }
+ } else {
+ MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_ERROR);
+ mb.setMessage(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.ConnectionError2.DialogMessage"));
+ mb.setText(BaseMessages.getString("System.Dialog.Error.Title"));
+ mb.open();
+ }
+ }
+
+ /** Fill up the fields table with the incoming fields. */
+ private void get() {
+ try {
+ IRowMeta r = pipelineMeta.getPrevTransformFields(variables, transformName);
+ if (r != null && !r.isEmpty()) {
+ BaseTransformDialog.getFieldsFromPrevious(
+ r, wFields, 1, new int[] {1, 2}, new int[] {}, -1, -1, null);
+ }
+ } catch (HopException ke) {
+ new ErrorDialog(
+ shell,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.FailedToGetFields.DialogTitle"),
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.FailedToGetFields.DialogMessage"),
+ ke); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+
+ // Generate code for create table...
+ // Conversions done by Database
+ //
+ private void sql() {
+ try {
+ RedshiftBulkLoaderMeta info = new RedshiftBulkLoaderMeta();
+ DatabaseMeta databaseMeta = pipelineMeta.findDatabase(wConnection.getText(), variables);
+
+ getInfo(info);
+ IRowMeta prev = pipelineMeta.getPrevTransformFields(variables, transformName);
+ TransformMeta transformMeta = pipelineMeta.findTransform(transformName);
+
+ if (info.specifyFields()) {
+ // Only use the fields that were specified.
+ IRowMeta prevNew = new RowMeta();
+
+ for (int i = 0; i < info.getFields().size(); i++) {
+ RedshiftBulkLoaderField vbf = info.getFields().get(i);
+ IValueMeta insValue = prev.searchValueMeta(vbf.getStreamField());
+ if (insValue != null) {
+ IValueMeta insertValue = insValue.clone();
+ insertValue.setName(vbf.getDatabaseField());
+ prevNew.addValueMeta(insertValue);
+ } else {
+ throw new HopTransformException(
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderDialog.FailedToFindField.Message",
+ vbf.getStreamField())); // $NON-NLS-1$
+ }
+ }
+ prev = prevNew;
+ }
+
+ SqlStatement sql =
+ info.getSqlStatements(variables, pipelineMeta, transformMeta, prev, metadataProvider);
+ if (!sql.hasError()) {
+ if (sql.hasSql()) {
+ SqlEditor sqledit =
+ new SqlEditor(
+ shell, SWT.NONE, variables, databaseMeta, DbCache.getInstance(), sql.getSql());
+ sqledit.open();
+ } else {
+ MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_INFORMATION);
+ mb.setMessage(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.NoSQL.DialogMessage"));
+ mb.setText(BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.NoSQL.DialogTitle"));
+ mb.open();
+ }
+ } else {
+ MessageBox mb = new MessageBox(shell, SWT.OK | SWT.ICON_ERROR);
+ mb.setMessage(sql.getError());
+ mb.setText(BaseMessages.getString("System.Dialog.Error.Title"));
+ mb.open();
+ }
+ } catch (HopException ke) {
+ new ErrorDialog(
+ shell,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.BuildSQLError.DialogTitle"),
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderDialog.BuildSQLError.DialogMessage"),
+ ke);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName();
+ }
+
+ public void toggleAuthSelection(){
+ if(wAwsAuthentication.getText().equals("Credentials")){
+ wlUseSystemVars.setEnabled(true);
+ wUseSystemVars.setEnabled(true);
+ wlAccessKeyId.setEnabled(true);
+ wAccessKeyId.setEnabled(true);
+ wlSecretAccessKey.setEnabled(true);
+ wSecretAccessKey.setEnabled(true);
+
+ wlAwsIamRole.setEnabled(false);
+ wAwsIamRole.setEnabled(false);
+ }
+ if(wAwsAuthentication.getText().equals("IAM Role")){
+ wlUseSystemVars.setEnabled(false);
+ wUseSystemVars.setEnabled(false);
+ wlAccessKeyId.setEnabled(false);
+ wAccessKeyId.setEnabled(false);
+ wlSecretAccessKey.setEnabled(false);
+ wSecretAccessKey.setEnabled(false);
+
+ wlAwsIamRole.setEnabled(true);
+ wAwsIamRole.setEnabled(true);
+ }
+ }
+
+ public void toggleKeysSelection(){
+ if(wUseSystemVars.getSelection()){
+ wlAccessKeyId.setEnabled(false);
+ wAccessKeyId.setEnabled(false);
+ wlSecretAccessKey.setEnabled(false);
+ wSecretAccessKey.setEnabled(false);
+ }else{
+ wlAccessKeyId.setEnabled(true);
+ wAccessKeyId.setEnabled(true);
+ wlSecretAccessKey.setEnabled(true);
+ wSecretAccessKey.setEnabled(true);
+ }
+ }
+}
diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderField.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderField.java
new file mode 100644
index 00000000000..656fe859a55
--- /dev/null
+++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderField.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.redshift.bulkloader;
+
+import org.apache.hop.metadata.api.HopMetadataProperty;
+
+import java.util.Objects;
+
+public class RedshiftBulkLoaderField {
+
+ public RedshiftBulkLoaderField(){
+
+ }
+
+ public RedshiftBulkLoaderField(String fieldDatabase, String fieldStream){
+ this.databaseField = fieldDatabase;
+ this.streamField = fieldStream;
+ }
+
+ @HopMetadataProperty(
+ key = "stream_name",
+ injectionKey = "STREAM_FIELDNAME",
+ injectionKeyDescription = "RedshiftBulkLoader.Inject.FIELDSTREAM"
+ )
+ private String streamField;
+
+ @HopMetadataProperty(
+ key = "column_name",
+ injectionKey = "DATABASE_FIELDNAME",
+ injectionKeyDescription = "RedshiftBulkLoader.Inject.FIELDDATABASE"
+ )
+ private String databaseField;
+
+ public String getStreamField(){
+ return streamField;
+ }
+
+ public void setStreamField(String streamField){
+ this.streamField = streamField;
+ }
+
+ public String getDatabaseField(){
+ return databaseField;
+ }
+
+ public void setDatabaseField(String databaseField){
+ this.databaseField = databaseField;
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if(this == o) return true;
+ if(o == null || getClass() != o.getClass()) return false;
+ RedshiftBulkLoaderField that = (RedshiftBulkLoaderField) o;
+ return streamField.equals(that.streamField) && databaseField.equals(that.databaseField);
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hash(streamField, databaseField);
+ }
+
+}
diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java
new file mode 100644
index 00000000000..5c8503c2b14
--- /dev/null
+++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java
@@ -0,0 +1,951 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.redshift.bulkloader;
+
+import org.apache.hop.core.CheckResult;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.ICheckResult;
+import org.apache.hop.core.IProvidesModelerMeta;
+import org.apache.hop.core.SqlStatement;
+import org.apache.hop.core.annotations.Transform;
+import org.apache.hop.core.database.Database;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.exception.HopDatabaseException;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.exception.HopTransformException;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.util.StringUtil;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.i18n.BaseMessages;
+import org.apache.hop.metadata.api.HopMetadataProperty;
+import org.apache.hop.metadata.api.IHopMetadataProvider;
+import org.apache.hop.pipeline.DatabaseImpact;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.BaseTransformMeta;
+import org.apache.hop.pipeline.transform.ITransformData;
+import org.apache.hop.pipeline.transform.TransformMeta;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Transform(
+ id = "RedshiftBulkLoader",
+ image = "redshift.svg",
+ name = "i18n::BaseTransform.TypeLongDesc.RedshiftBulkLoaderMessage",
+ description = "i18n::BaseTransform.TypeTooltipDesc.RedshiftBulkLoaderMessage",
+ categoryDescription = "i18n:org.apache.hop.pipeline.transform:BaseTransform.Category.Bulk",
+ documentationUrl =
+ "https://hop.apache.org/manual/latest/plugins/transforms/redshiftbulkloader.html",
+ isIncludeJdbcDrivers = true,
+ classLoaderGroup = "redshift")
+public class RedshiftBulkLoaderMeta
+ extends BaseTransformMeta
+ implements IProvidesModelerMeta {
+ private static final Class> PKG = RedshiftBulkLoaderMeta.class;
+
+ public static final String CSV_DELIMITER = ",";
+ public static final String CSV_RECORD_DELIMITER = "\n";
+ public static final String CSV_ESCAPE_CHAR = "\\";
+ public static final String ENCLOSURE = "\"";
+
+ @HopMetadataProperty(
+ key = "connection",
+ injectionKey = "CONNECTIONNAME",
+ injectionKeyDescription = "RedshiftBulkLoader.Injection.CONNECTIONNAME")
+ private String connection;
+
+ @HopMetadataProperty(
+ key = "schema",
+ injectionKey = "SCHEMANAME",
+ injectionKeyDescription = "RedshiftBulkLoader.Injection.SCHEMANAME")
+ private String schemaName;
+
+ @HopMetadataProperty(
+ key = "table",
+ injectionKey = "TABLENAME",
+ injectionKeyDescription = "RedshiftBulkLoader.Injection.TABLENAME")
+ private String tablename;
+
+ @HopMetadataProperty(
+ key = "use_credentials",
+ injectionKey = "USE_CREDENTIALS",
+ injectionKeyDescription = ""
+ )
+ private boolean useCredentials;
+
+ @HopMetadataProperty(
+ key = "use_system_env_vars",
+ injectionKey = "USE_SYSTEM_ENV_VARS",
+ injectionKeyDescription = ""
+ )
+ private boolean useSystemEnvVars;
+
+ @HopMetadataProperty(
+ key = "aws_access_key_id",
+ injectionKey = "AWS_ACCESS_KEY_ID",
+ injectionKeyDescription = ""
+ )
+ private String awsAccessKeyId;
+
+ @HopMetadataProperty(
+ key = "aws_secret_access_key",
+ injectionKey = "AWS_SECRET_ACCESS_KEY",
+ injectionKeyDescription = ""
+ )
+ private String awsSecretAccessKey;
+
+ @HopMetadataProperty(
+ key = "use_aws_iam_role",
+ injectionKey = "USE_AWS_IAM_ROLE",
+ injectionKeyDescription = ""
+ )
+ private boolean useAwsIamRole;
+
+ @HopMetadataProperty(
+ key = "aws_iam_role",
+ injectionKey = "AWS_IAM_ROLE",
+ injectionKeyDescription = ""
+ )
+ private String awsIamRole;
+
+ @HopMetadataProperty(
+ key = "truncate",
+ injectionKey = "TRUNCATE_TABLE",
+ injectionKeyDescription = "RedshiftBulkLoader.Injection.TruncateTable.Field")
+ private boolean truncateTable;
+
+ @HopMetadataProperty(
+ key = "only_when_have_rows",
+ injectionKey = "ONLY_WHEN_HAVE_ROWS",
+ injectionKeyDescription = "RedshiftBulkLoader.Inject.OnlyWhenHaveRows.Field")
+ private boolean onlyWhenHaveRows;
+
+ @HopMetadataProperty(
+ key = "stream_to_s3",
+ injectionKey = "STREAM_TO_S3",
+ injectionKeyDescription = ""
+ )
+ private boolean streamToS3Csv;
+
+ /** CSV: Trim whitespace */
+ @HopMetadataProperty(key = "trim_whitespace", injectionKeyDescription = "")
+ private boolean trimWhitespace;
+
+ /** CSV: Convert column value to null if */
+ @HopMetadataProperty(key = "null_if", injectionKeyDescription = "")
+ private String nullIf;
+
+ /**
+ * CSV: Should the load fail if the column count in the row does not match the column count in the
+ * table
+ */
+ @HopMetadataProperty(key = "error_column_mismatch", injectionKeyDescription = "")
+ private boolean errorColumnMismatch;
+
+ /** JSON: Strip nulls from JSON */
+ @HopMetadataProperty(key = "strip_null", injectionKeyDescription = "")
+ private boolean stripNull;
+
+
+ @HopMetadataProperty(
+ key = "load_from_existing_file",
+ injectionKey = "LOAD_FROM_EXISTING_FILE",
+ injectionKeyDescription = ""
+ )
+ private String loadFromExistingFileFormat;
+
+ /** Do we explicitly select the fields to update in the database */
+ @HopMetadataProperty(key = "specify_fields", injectionKeyDescription = "")
+ private boolean specifyFields;
+
+ @HopMetadataProperty(
+ key = "load_from_filename",
+ injectionKey = "LOAD_FROM_FILENAME",
+ injectionKeyDescription = ""
+ )
+ private String copyFromFilename;
+
+ @HopMetadataProperty(
+ groupKey = "fields",
+ key = "field",
+ injectionGroupKey = "FIELDS",
+ injectionGroupDescription = "RedshiftBulkLoader.Injection.FIELDS",
+ injectionKey = "FIELDSTREAM",
+ injectionKeyDescription = "RedshiftBulkLoader.Injection.FIELDSTREAM")
+ /** Fields containing the values in the input stream to insert */
+ private List fields;
+
+ @HopMetadataProperty(
+ groupKey = "fields",
+ key = "field",
+ injectionGroupKey = "FIELDS",
+ injectionGroupDescription = "RedshiftBulkLoader.Injection.FIELDS",
+ injectionKey = "FIELDDATABASE",
+ injectionKeyDescription = "RedshiftBulkLoader.Injection.FIELDDATABASE")
+ /** Fields in the table to insert */
+ private String[] fieldDatabase;
+
+ public RedshiftBulkLoaderMeta() {
+ super(); // allocate BaseTransformMeta
+
+ fields = new ArrayList<>();
+ }
+
+ public Object clone() {
+ return super.clone();
+ }
+
+ /**
+ * @return returns the database connection name
+ */
+ public String getConnection() {
+ return connection;
+ }
+
+ /**
+ * sets the database connection name
+ *
+ * @param connection the database connection name to set
+ */
+ public void setConnection(String connection) {
+ this.connection = connection;
+ }
+
+ /*
+ */
+ /**
+ * @return Returns the database.
+ */
+ public DatabaseMeta getDatabaseMeta() {
+ return null;
+ }
+
+ /**
+ * @deprecated use {@link #getTableName()}
+ */
+ public String getTablename() {
+ return getTableName();
+ }
+
+ /**
+ * @return Returns the tablename.
+ */
+ public String getTableName() {
+ return tablename;
+ }
+
+ /**
+ * @param tablename The tablename to set.
+ */
+ public void setTablename(String tablename) {
+ this.tablename = tablename;
+ }
+
+ /**
+ * @return Returns the truncate table flag.
+ */
+ public boolean isTruncateTable() {
+ return truncateTable;
+ }
+
+ /**
+ * @param truncateTable The truncate table flag to set.
+ */
+ public void setTruncateTable(boolean truncateTable) {
+ this.truncateTable = truncateTable;
+ }
+
+ /**
+ * @return Returns the onlyWhenHaveRows flag.
+ */
+ public boolean isOnlyWhenHaveRows() {
+ return onlyWhenHaveRows;
+ }
+
+ /**
+ * @param onlyWhenHaveRows The onlyWhenHaveRows to set.
+ */
+ public void setOnlyWhenHaveRows(boolean onlyWhenHaveRows) {
+ this.onlyWhenHaveRows = onlyWhenHaveRows;
+ }
+
+ /**
+ * @param specifyFields The specify fields flag to set.
+ */
+ public void setSpecifyFields(boolean specifyFields) {
+ this.specifyFields = specifyFields;
+ }
+
+ public boolean isStreamToS3Csv() {
+ return streamToS3Csv;
+ }
+
+ public void setStreamToS3Csv(boolean streamToS3Csv) {
+ this.streamToS3Csv = streamToS3Csv;
+ }
+
+ /**
+ * CSV:
+ *
+ * @return Should whitespace in the fields be trimmed
+ */
+ public boolean isTrimWhitespace() {
+ return trimWhitespace;
+ }
+
+ /**
+ * CSV: Set if the whitespace in the files should be trimmmed
+ *
+ * @param trimWhitespace true/false
+ */
+ public void setTrimWhitespace(boolean trimWhitespace) {
+ this.trimWhitespace = trimWhitespace;
+ }
+
+ /**
+ * CSV:
+ *
+ * @return Comma delimited list of strings to convert to Null
+ */
+ public String getNullIf() {
+ return nullIf;
+ }
+
+ /**
+ * CSV: Set the string constants to convert to Null
+ *
+ * @param nullIf Comma delimited list of constants
+ */
+ public void setNullIf(String nullIf) {
+ this.nullIf = nullIf;
+ }
+
+ /**
+ * CSV:
+ *
+ * @return Should the load error if the number of columns in the table and in the CSV do not match
+ */
+ public boolean isErrorColumnMismatch() {
+ return errorColumnMismatch;
+ }
+
+ /**
+ * CSV: Set if the load should error if the number of columns in the table and in the CSV do not
+ * match
+ *
+ * @param errorColumnMismatch true/false
+ */
+ public void setErrorColumnMismatch(boolean errorColumnMismatch) {
+ this.errorColumnMismatch = errorColumnMismatch;
+ }
+
+ /**
+ * JSON:
+ *
+ * @return Should null values be stripped out of the JSON
+ */
+ public boolean isStripNull() {
+ return stripNull;
+ }
+
+ /**
+ * JSON: Set if null values should be stripped out of the JSON
+ *
+ * @param stripNull true/false
+ */
+ public void setStripNull(boolean stripNull) {
+ this.stripNull = stripNull;
+ }
+
+
+ public String getLoadFromExistingFileFormat() {
+ return loadFromExistingFileFormat;
+ }
+
+ public void setLoadFromExistingFileFormat(String loadFromExistingFileFormat) {
+ this.loadFromExistingFileFormat = loadFromExistingFileFormat;
+ }
+
+ public String getCopyFromFilename() {
+ return copyFromFilename;
+ }
+
+ public void setCopyFromFilename(String copyFromFilename) {
+ this.copyFromFilename = copyFromFilename;
+ }
+
+ public List getFields() {
+ return fields;
+ }
+
+ public void setFields(List fields) {
+ this.fields = fields;
+ }
+
+ /**
+ * @return Returns the specify fields flag.
+ */
+ public boolean specifyFields() {
+ return specifyFields;
+ }
+
+ public boolean isSpecifyFields() {
+ return specifyFields;
+ }
+
+ public boolean isUseCredentials() {
+ return useCredentials;
+ }
+
+ public void setUseCredentials(boolean useCredentials) {
+ this.useCredentials = useCredentials;
+ }
+
+ public String getAwsAccessKeyId() {
+ return awsAccessKeyId;
+ }
+
+ public void setAwsAccessKeyId(String awsAccessKeyId) {
+ this.awsAccessKeyId = awsAccessKeyId;
+ }
+
+ public String getAwsSecretAccessKey() {
+ return awsSecretAccessKey;
+ }
+
+ public void setAwsSecretAccessKey(String awsSecretAccessKey) {
+ this.awsSecretAccessKey = awsSecretAccessKey;
+ }
+
+ public boolean isUseAwsIamRole() {
+ return useAwsIamRole;
+ }
+
+ public void setUseAwsIamRole(boolean useAwsIamRole) {
+ this.useAwsIamRole = useAwsIamRole;
+ }
+
+ public String getAwsIamRole() {
+ return awsIamRole;
+ }
+
+ public void setAwsIamRole(String awsIamRole) {
+ this.awsIamRole = awsIamRole;
+ }
+
+ public boolean isUseSystemEnvVars() {
+ return useSystemEnvVars;
+ }
+
+ public void setUseSystemEnvVars(boolean useSystemEnvVars) {
+ this.useSystemEnvVars = useSystemEnvVars;
+ }
+
+ public void setDefault() {
+ tablename = "";
+
+ // To be compatible with pre-v3.2 (SB)
+ specifyFields = false;
+ }
+
+ @Override
+ public void check(
+ List remarks,
+ PipelineMeta pipelineMeta,
+ TransformMeta transformMeta,
+ IRowMeta prev,
+ String[] input,
+ String[] output,
+ IRowMeta info,
+ IVariables variables,
+ IHopMetadataProvider metadataProvider) {
+
+ Database db = null;
+
+ try {
+
+ DatabaseMeta databaseMeta =
+ metadataProvider.getSerializer(DatabaseMeta.class).load(variables.resolve(connection));
+
+ if (databaseMeta != null) {
+ CheckResult cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.CheckResult.ConnectionExists"),
+ transformMeta);
+ remarks.add(cr);
+
+ db = new Database(loggingObject, variables, databaseMeta);
+
+ try {
+ db.connect();
+
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.CheckResult.ConnectionOk"),
+ transformMeta);
+ remarks.add(cr);
+
+ if (!StringUtil.isEmpty(tablename)) {
+ String schemaTable =
+ databaseMeta.getQuotedSchemaTableCombination(
+ variables, db.resolve(schemaName), db.resolve(tablename));
+ // Check if this table exists...
+ String realSchemaName = db.resolve(schemaName);
+ String realTableName = db.resolve(tablename);
+ if (db.checkTableExists(realSchemaName, realTableName)) {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.TableAccessible", schemaTable),
+ transformMeta);
+ remarks.add(cr);
+
+ IRowMeta r = db.getTableFields(schemaTable);
+ if (r != null) {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.TableOk", schemaTable),
+ transformMeta);
+ remarks.add(cr);
+
+ String error_message = "";
+ boolean error_found = false;
+ // OK, we have the table fields.
+ // Now see what we can find as previous transform...
+ if (prev != null && prev.size() > 0) {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderMeta.CheckResult.FieldsReceived",
+ "" + prev.size()),
+ transformMeta);
+ remarks.add(cr);
+
+ if (!specifyFields()) {
+ // Starting from prev...
+ for (int i = 0; i < prev.size(); i++) {
+ IValueMeta pv = prev.getValueMeta(i);
+ int idx = r.indexOfValue(pv.getName());
+ if (idx < 0) {
+ error_message +=
+ "\t\t" + pv.getName() + " (" + pv.getTypeDesc() + ")" + Const.CR;
+ error_found = true;
+ }
+ }
+ if (error_found) {
+ error_message =
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderMeta.CheckResult.FieldsNotFoundInOutput",
+ error_message);
+
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR, error_message, transformMeta);
+ remarks.add(cr);
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.AllFieldsFoundInOutput"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ } else {
+ // Specifying the column names explicitly
+ for (int i = 0; i < getFieldDatabase().length; i++) {
+ int idx = r.indexOfValue(getFieldDatabase()[i]);
+ if (idx < 0) {
+ error_message += "\t\t" + getFieldDatabase()[i] + Const.CR;
+ error_found = true;
+ }
+ }
+ if (error_found) {
+ error_message =
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderMeta.CheckResult.FieldsSpecifiedNotInTable",
+ error_message);
+
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR, error_message, transformMeta);
+ remarks.add(cr);
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.AllFieldsFoundInOutput"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ }
+
+ error_message = "";
+ if (!specifyFields()) {
+ // Starting from table fields in r...
+ for (int i = 0; i < getFieldDatabase().length; i++) {
+ IValueMeta rv = r.getValueMeta(i);
+ int idx = prev.indexOfValue(rv.getName());
+ if (idx < 0) {
+ error_message +=
+ "\t\t" + rv.getName() + " (" + rv.getTypeDesc() + ")" + Const.CR;
+ error_found = true;
+ }
+ }
+ if (error_found) {
+ error_message =
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderMeta.CheckResult.FieldsNotFound",
+ error_message);
+
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_WARNING, error_message, transformMeta);
+ remarks.add(cr);
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.AllFieldsFound"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ } else {
+ // Specifying the column names explicitly
+ for (int i = 0; i < fields.size(); i++) {
+ RedshiftBulkLoaderField vbf = fields.get(i);
+ int idx = prev.indexOfValue(vbf.getStreamField());
+ if (idx < 0) {
+ error_message += "\t\t" + vbf.getStreamField() + Const.CR;
+ error_found = true;
+ }
+ }
+ if (error_found) {
+ error_message =
+ BaseMessages.getString(
+ PKG,
+ "RedshiftBulkLoaderMeta.CheckResult.FieldsSpecifiedNotFound",
+ error_message);
+
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR, error_message, transformMeta);
+ remarks.add(cr);
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.AllFieldsFound"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ }
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.CheckResult.NoFields"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.TableNotAccessible"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.TableError", schemaTable),
+ transformMeta);
+ remarks.add(cr);
+ }
+ } else {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.CheckResult.NoTableName"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ } catch (HopException e) {
+ cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.UndefinedError", e.getMessage()),
+ transformMeta);
+ remarks.add(cr);
+ } finally {
+ db.disconnect();
+ }
+ } else {
+ CheckResult cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.CheckResult.NoConnection"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ } catch (HopException e) {
+ CheckResult cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.CheckResult.UndefinedError", e.getMessage()),
+ transformMeta);
+ remarks.add(cr);
+ }
+
+ // See if we have input streams leading to this transform!
+ if (input.length > 0) {
+ CheckResult cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_OK,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.CheckResult.ExpectedInputOk"),
+ transformMeta);
+ remarks.add(cr);
+ } else {
+ CheckResult cr =
+ new CheckResult(
+ ICheckResult.TYPE_RESULT_ERROR,
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.CheckResult.ExpectedInputError"),
+ transformMeta);
+ remarks.add(cr);
+ }
+ }
+
+ public void analyseImpact(
+ IVariables variables,
+ List impact,
+ PipelineMeta pipelineMeta,
+ TransformMeta transformMeta,
+ IRowMeta prev,
+ String[] input,
+ String[] output,
+ IRowMeta info,
+ IHopMetadataProvider metadataProvider)
+ throws HopTransformException {
+
+ try {
+ DatabaseMeta databaseMeta =
+ metadataProvider.getSerializer(DatabaseMeta.class).load(variables.resolve(connection));
+
+ // The values that are entering this transform are in "prev":
+ if (prev != null) {
+ for (int i = 0; i < prev.size(); i++) {
+ IValueMeta v = prev.getValueMeta(i);
+ DatabaseImpact ii =
+ new DatabaseImpact(
+ DatabaseImpact.TYPE_IMPACT_WRITE,
+ pipelineMeta.getName(),
+ transformMeta.getName(),
+ databaseMeta.getDatabaseName(),
+ tablename,
+ v.getName(),
+ v.getName(),
+ v != null ? v.getOrigin() : "?",
+ "",
+ "Type = " + v.toStringMeta());
+ impact.add(ii);
+ }
+ }
+ } catch (HopException e) {
+ throw new HopTransformException(
+ "Unable to get databaseMeta for connection: " + Const.CR + variables.resolve(connection));
+ }
+ }
+
+ public SqlStatement getSqlStatements(
+ IVariables variables,
+ PipelineMeta pipelineMeta,
+ TransformMeta transformMeta,
+ IRowMeta prev,
+ IHopMetadataProvider metadataProvider) {
+
+ DatabaseMeta databaseMeta = pipelineMeta.findDatabase(connection, variables);
+
+ SqlStatement retval =
+ new SqlStatement(transformMeta.getName(), databaseMeta, null); // default: nothing to do!
+
+ if (databaseMeta != null) {
+ if (prev != null && prev.size() > 0) {
+ if (!StringUtil.isEmpty(tablename)) {
+ Database db = new Database(loggingObject, variables, databaseMeta);
+ try {
+ db.connect();
+
+ String schemaTable =
+ databaseMeta.getQuotedSchemaTableCombination(variables, schemaName, tablename);
+ String cr_table = db.getDDL(schemaTable, prev);
+
+ // Empty string means: nothing to do: set it to null...
+ if (cr_table == null || cr_table.length() == 0) {
+ cr_table = null;
+ }
+
+ retval.setSql(cr_table);
+ } catch (HopDatabaseException dbe) {
+ retval.setError(
+ BaseMessages.getString(
+ PKG, "RedshiftBulkLoaderMeta.Error.ErrorConnecting", dbe.getMessage()));
+ } finally {
+ db.disconnect();
+ }
+ } else {
+ retval.setError(BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Error.NoTable"));
+ }
+ } else {
+ retval.setError(BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Error.NoInput"));
+ }
+ } else {
+ retval.setError(BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Error.NoConnection"));
+ }
+
+ return retval;
+ }
+
+ public IRowMeta getRequiredFields(IVariables variables) throws HopException {
+ String realTableName = variables.resolve(tablename);
+ String realSchemaName = variables.resolve(schemaName);
+
+ DatabaseMeta databaseMeta =
+ getParentTransformMeta().getParentPipelineMeta().findDatabase(connection, variables);
+
+ if (databaseMeta != null) {
+ Database db = new Database(loggingObject, variables, databaseMeta);
+ try {
+ db.connect();
+
+ if (!StringUtil.isEmpty(realTableName)) {
+ String schemaTable =
+ databaseMeta.getQuotedSchemaTableCombination(
+ variables, realSchemaName, realTableName);
+
+ // Check if this table exists...
+ if (db.checkTableExists(realSchemaName, realTableName)) {
+ return db.getTableFields(schemaTable);
+ } else {
+ throw new HopException(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Exception.TableNotFound"));
+ }
+ } else {
+ throw new HopException(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Exception.TableNotSpecified"));
+ }
+ } catch (Exception e) {
+ throw new HopException(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Exception.ErrorGettingFields"), e);
+ } finally {
+ db.disconnect();
+ }
+ } else {
+ throw new HopException(
+ BaseMessages.getString(PKG, "RedshiftBulkLoaderMeta.Exception.ConnectionNotDefined"));
+ }
+ }
+
+ /**
+ * @return Fields containing the fieldnames in the database insert.
+ */
+ public String[] getFieldDatabase() {
+ return fieldDatabase;
+ }
+
+ /**
+ * @param fieldDatabase The fields containing the names of the fields to insert.
+ */
+ public void setFieldDatabase(String[] fieldDatabase) {
+ this.fieldDatabase = fieldDatabase;
+ }
+
+ /**
+ * @return the schemaName
+ */
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ /**
+ * @param schemaName the schemaName to set
+ */
+ public void setSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ }
+
+ public boolean supportsErrorHandling() {
+ return true;
+ }
+
+ @Override
+ public String getMissingDatabaseConnectionInformationMessage() {
+ // use default message
+ return null;
+ }
+
+ @Override
+ public RowMeta getRowMeta(IVariables variables, ITransformData transformData) {
+ return (RowMeta) ((RedshiftBulkLoaderData) transformData).getInsertRowMeta();
+ }
+
+ @Override
+ public List getDatabaseFields() {
+ List items = Collections.emptyList();
+ if (specifyFields()) {
+ items = new ArrayList<>();
+ for (RedshiftBulkLoaderField vbf : fields) {
+ items.add(vbf.getDatabaseField());
+ }
+ }
+ return items;
+ }
+
+ @Override
+ public List getStreamFields() {
+ List items = Collections.emptyList();
+ if (specifyFields()) {
+ items = new ArrayList<>();
+ for (RedshiftBulkLoaderField vbf : fields) {
+ items.add(vbf.getStreamField());
+ }
+ }
+ return items;
+ }
+}
diff --git a/plugins/tech/aws/src/main/resources/org/apache/hop/pipeline/transforms/redshift/bulkloader/messages/messages_en_US.properties b/plugins/tech/aws/src/main/resources/org/apache/hop/pipeline/transforms/redshift/bulkloader/messages/messages_en_US.properties
new file mode 100644
index 00000000000..9170c32d1bd
--- /dev/null
+++ b/plugins/tech/aws/src/main/resources/org/apache/hop/pipeline/transforms/redshift/bulkloader/messages/messages_en_US.properties
@@ -0,0 +1,124 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BaseTransform.TypeLongDesc.RedshiftBulkLoaderMessage=Redshift bulk loader
+BaseTransform.TypeTooltipDesc.RedshiftBulkLoaderMessage=Bulk load data into a Redshift database table
+RedshiftBulkLoaderDialog.StreamCsvToS3.Label=Stream to S3 CSV
+RedshiftBulkLoaderDialog.StreamCsvToS3.Tooltip=Writes the current pipeline stream to a file in an S3 bucket before copying into Redshift.
+RedshiftBulkLoaderDialog.LoadFromExistingFile.Label=Load from existing file
+RedshiftBulkLoaderDialog.LoadFromExistingFile.Tooltip=Copy data into Redshift table from an existing file.
+RedshiftBulkLoaderDialog.CopyFromFile.Label=Copy into Redshift from existing file
+RedshiftBulkLoader.Connection.Connected=Connected to database {0}
+RedshiftBulkLoaderDialog.Authenticate.Options.Label=AWS authentication
+RedshiftBulkLoaderDialog.Authenticate.UseSystemVars.Label=Use AWS system variables
+RedshiftBulkLoaderDialog.Authenticate.UseSystemVars.Tooltip=specify whether you want to use the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment (operating system) variables, or specify values for this transform only
+
+RedshiftBulkLoader.Exception.FailedToFindField=Could not find field {0} in stream
+RedshiftBulkLoader.Exception.FieldRequired=Field [{0}] is required and couldn''t be found\!
+RedshiftBulkLoader.Exception.RowRejected=Row Rejected\: {0}
+RedshiftBulkLoader.Exception.ClosingLogError=Unable to close Log Files
+RedshiftBulkLoaderDialog.BuildSQLError.DialogMessage=Unable to build the SQL statement because of an error
+RedshiftBulkLoaderDialog.BuildSQLError.DialogTitle=Couldn''t build SQL
+RedshiftBulkLoaderDialog.ColumnInfo.StreamField=Stream field
+RedshiftBulkLoaderDialog.ColumnInfo.TableField=Table field
+RedshiftBulkLoaderDialog.ConnectionError.DialogMessage=Please select a valid connection\!
+RedshiftBulkLoaderDialog.ConnectionError2.DialogMessage=Please select a valid database connection first\!
+RedshiftBulkLoaderDialog.DialogTitle=Redshift bulk loader
+RedshiftBulkLoaderDialog.DoMapping.Button=Enter field mapping
+RedshiftBulkLoaderDialog.DoMapping.SomeFieldsNotFoundContinue=Certain fields could not be found in the existing mapping, do you want continue?
+RedshiftBulkLoaderDialog.DoMapping.SomeFieldsNotFoundTitle=Certain referenced fields were not found\!
+RedshiftBulkLoaderDialog.DoMapping.SomeSourceFieldsNotFound=These source fields were not found\\\: {0}
+RedshiftBulkLoaderDialog.DoMapping.SomeTargetFieldsNotFound=These target fields were not found\\\: {0}
+RedshiftBulkLoaderDialog.DoMapping.UnableToFindSourceFields.Message=It was not possible to retrieve the source fields for this transform because of an error\:
+RedshiftBulkLoaderDialog.DoMapping.UnableToFindSourceFields.Title=Error getting source fields
+RedshiftBulkLoaderDialog.DoMapping.UnableToFindTargetFields.Message=It was not possible to retrieve the target fields for this transform because of an error\:
+RedshiftBulkLoaderDialog.DoMapping.UnableToFindTargetFields.Title=Error getting target fields
+RedshiftBulkLoaderDialog.FailedToFindField.Message=Could not find field {0} in stream
+RedshiftBulkLoaderDialog.FailedToGetFields.DialogMessage=Unable to get fields from previous transforms because of an error
+RedshiftBulkLoaderDialog.FailedToGetFields.DialogTitle=Get fields failed
+RedshiftBulkLoaderDialog.FieldsTab.CTabItem.Title=Database fields
+RedshiftBulkLoaderDialog.GetFields.Button=\ &Get fields
+RedshiftBulkLoaderDialog.AbortOnError.Label=Abort on error
+RedshiftBulkLoaderDialog.AbortOnError.Tooltip=If a record is rejected, the statement will be aborted and no data will be loaded. If this option is not enabled, rejected records will be logged but will not stop the bulk load.
+RedshiftBulkLoaderDialog.InsertDirect.Label=Insert directly to ROS
+RedshiftBulkLoaderDialog.InsertDirect.Tooltip=If enabled, the statement is a COPY DIRECT statement and Redshift will insert the data directly to the ROS (Read Optimized Storage). Otherwise, the data will be inserted to the WOS (Write Optimized Storage) (e.g. a "trickle load")
+RedshiftBulkLoaderDialog.Delimiter.Label=Delimiter character
+RedshiftBulkLoaderDialog.Delimiter.Tooltip=Specifies the single-character column delimiter used during the load. Default is the tab character (\\t). Be sure to use a character that is not found in any field of the records because Redshift does not use field quoting.
+RedshiftBulkLoaderDialog.NullString.Label=Null string
+RedshiftBulkLoaderDialog.NullString.Tooltip=Specifies the multi-character string that represents a NULL value. Case insensitive. Default is the string \\N
+RedshiftBulkLoaderDialog.RecordTerminator.Label=Record terminator string
+RedshiftBulkLoaderDialog.RecordTerminator.Tooltip=Specifies the multi-character string that indicates the end of a record. Default is Linefeed (\\n).
+RedshiftBulkLoaderDialog.ExceptionsLogFile.Label=Exceptions log file
+RedshiftBulkLoaderDialog.ExceptionsLogFile.Tooltip=Specifies the filename or absolute path in which to write messages indicating the input line number and reason for each rejected record. The default pathname is: catalog-dir/CopyErrorLog/STDIN-copy-from-exceptions
+RedshiftBulkLoaderDialog.RejectedDataLogFile.Label=Rejected data log file
+RedshiftBulkLoaderDialog.RejectedDataLogFile.Tooltip=Specifies the filename or absolute pathname in which to write rejected rows. This file can then be edited to resolve problems and reloaded. The default pathname is: catalog-dir/CopyErrorLog/STDIN-copy-from-rejected-data
+RedshiftBulkLoaderDialog.StreamName.Label=Stream name
+RedshiftBulkLoaderDialog.StreamName.Tooltip=Specifies the name of the stream being loaded. This name appears in the vt_load_streams virtual table. Default is PipelineName.transformName
+RedshiftBulkLoaderDialog.InsertFields.Label=Fields to insert\:
+RedshiftBulkLoaderDialog.Log.LookingAtConnection=Looking at connection\: {0}
+RedshiftBulkLoaderDialog.MainTab.CTabItem=Main options
+RedshiftBulkLoaderDialog.NoSQL.DialogMessage=No SQL needs to be executed to make this transform function properly.
+RedshiftBulkLoaderDialog.NoSQL.DialogTitle=OK
+RedshiftBulkLoaderDialog.SpecifyFields.Label=Specify database fields
+RedshiftBulkLoaderDialog.TransformName.Label=Transform name
+RedshiftBulkLoaderDialog.TargetSchema.Label=Target schema
+RedshiftBulkLoaderDialog.TargetTable.Label=Target table
+RedshiftBulkLoaderMeta.CheckResult.AllFieldsFound=All fields in the table are found in the input stream, coming from previous transforms
+RedshiftBulkLoaderMeta.CheckResult.AllFieldsFoundInOutput=All fields, coming from previous transforms, are found in the output table
+RedshiftBulkLoaderMeta.CheckResult.ConnectionExists=Connection exists
+RedshiftBulkLoaderMeta.CheckResult.ConnectionOk=Connection to database OK
+RedshiftBulkLoaderMeta.CheckResult.ExpectedInputError=No input received from other transforms\!
+RedshiftBulkLoaderMeta.CheckResult.ExpectedInputOk=Transform is receiving info from other transforms.
+RedshiftBulkLoaderMeta.CheckResult.FieldsNotFound=Fields in table, not found in input stream\:\n\n{0}
+RedshiftBulkLoaderMeta.CheckResult.FieldsNotFoundInOutput=Fields in input stream, not found in output table\:\n\n{0}
+RedshiftBulkLoaderMeta.CheckResult.FieldsReceived=Transform is connected to previous one, receiving {0} fields
+RedshiftBulkLoaderMeta.CheckResult.FieldsSpecifiedNotFound=Specified fields not found in input stream\:\n\n{0}
+RedshiftBulkLoaderMeta.CheckResult.FieldsSpecifiedNotInTable=Specified table fields not found in output table\:\n\n{0}
+RedshiftBulkLoaderMeta.CheckResult.NoConnection=Please select or create a connection to use
+RedshiftBulkLoaderMeta.CheckResult.NoFields=Couldn''t find fields from previous transforms, check the hops...\!
+RedshiftBulkLoaderMeta.CheckResult.NoTableName=No table name was entered in this transform.
+RedshiftBulkLoaderMeta.CheckResult.TableAccessible=Table [{0}] exists and is accessible
+RedshiftBulkLoaderMeta.CheckResult.TableError=Table [{0}] doesn''t exist or can''t be read on this database connection.
+RedshiftBulkLoaderMeta.CheckResult.TableNotAccessible=Couldn''t read the table info, please check the table-name & permissions.
+RedshiftBulkLoaderMeta.CheckResult.TableOk=Table [{0}] is readeable and we got the fields from it.
+RedshiftBulkLoaderMeta.CheckResult.UndefinedError=An error occurred\: {0}
+RedshiftBulkLoaderMeta.Error.ErrorConnecting=I was unable to connect to the database to verify the status of the table\: {0}
+RedshiftBulkLoaderMeta.Error.NoConnection=There is no connection defined in this transform.
+RedshiftBulkLoaderMeta.Error.NoInput=Not receiving any fields from previous transforms. Check the previous transforms for errors & the connecting hops.
+RedshiftBulkLoaderMeta.Error.NoTable=No table is defined on this connection.
+RedshiftBulkLoaderMeta.Exception.ConnectionNotDefined=Unable to determine the required fields because the database connection wasn''t defined.
+RedshiftBulkLoaderMeta.Exception.ErrorGettingFields=Unable to determine the required fields.
+RedshiftBulkLoaderMeta.Exception.TableNotFound=Unable to determine the required fields because the specified database table couldn''t be found.
+RedshiftBulkLoaderMeta.Exception.TableNotSpecified=Unable to determine the required fields because the database table name wasn''t specified.
+RedshiftBulkLoader.Injection.CONNECTIONNAME=The name of the database connection to get table names from.
+RedshiftBulkLoader.Injection.FIELDS=Fields
+RedshiftBulkLoader.Injection.SCHEMANAME=The name of the database schema to use.
+RedshiftBulkLoader.Injection.TABLENAME=The name of the table to insert records into.
+RedshiftBulkLoader.Injection.MAIN_OPTIONS=Main Options
+RedshiftBulkLoader.Injection.DIRECT=Set this option to insert data into the Read Optimized Store with a COPY DIRECT statement.
+RedshiftBulkLoader.Injection.ABORTONERROR=Set this option to abort and rollback data loading upon an error.
+RedshiftBulkLoader.Injection.EXCEPTIONSFILENAME=The optional filename to write messages about rejected records.
+RedshiftBulkLoader.Injection.REJECTEDDATAFILENAME=The optional filename to write the rejected rows of data.
+RedshiftBulkLoader.Injection.STREAMNAME=The optional name of the stream which appears in the vt_load_stream table.
+RedshiftBulkLoader.Injection.DATABASE_FIELDS=Database Fields
+RedshiftBulkLoader.Injection.FIELDSTREAM=The source field names containing the values to insert.
+RedshiftBulkLoader.Injection.FIELDDATABASE=The target field names to insert into the Redshift table.
+RedshiftBulkLoaderDialog.TruncateTable.Label=Truncate table
+RedshiftBulkLoaderDialog.OnlyWhenHaveRows.Label=Truncate on first row
+RedshiftBulkLoader.Injection.TruncateTable.Field=Truncate table
+RedshiftBulkLoader.Inject.OnlyWhenHaveRows.Field=Truncate on first row
\ No newline at end of file
diff --git a/plugins/tech/aws/src/main/resources/redshift.svg b/plugins/tech/aws/src/main/resources/redshift.svg
new file mode 100644
index 00000000000..87f05da8eae
--- /dev/null
+++ b/plugins/tech/aws/src/main/resources/redshift.svg
@@ -0,0 +1,9 @@
+
diff --git a/plugins/transforms/redshift/pom.xml b/plugins/transforms/redshift/pom.xml
new file mode 100644
index 00000000000..5a1bbe46601
--- /dev/null
+++ b/plugins/transforms/redshift/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.hop
+ hop-plugins-transforms
+ 2.7.0-SNAPSHOT
+
+
+ hop-transform-redshift-bulkloader
+ jar
+
+ Hop Plugins Transforms Redshift Bulk Loader
+
+
+ 2.1.0.19
+
+
+
+
+ com.amazon.redshift
+ redshift-jdbc42
+ ${redshift.jdbc.version}
+
+
+
\ No newline at end of file