Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh committed Dec 17, 2024
1 parent 977ad70 commit 0ad00f6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
var factory = JdbcUtils.getDialectFactory(jdbcUrl);
this.config = config;
try {
conn = JdbcUtils.getConnection(config.getJdbcUrl());
conn =
JdbcUtils.getConnection(
config.getJdbcUrl(), config.getUser(), config.getPassword());
// Table schema has been validated before, so we get the PK from it directly
this.pkColumnNames = tableSchema.getPrimaryKeys();
// column name -> java.sql.Types
Expand Down Expand Up @@ -179,7 +181,11 @@ public boolean write(Iterable<SinkRow> rows) {
conn.close();

// create a new connection if the current connection is invalid
conn = JdbcUtils.getConnection(config.getJdbcUrl());
conn =
JdbcUtils.getConnection(
config.getJdbcUrl(),
config.getUser(),
config.getPassword());
// reset the flag since we will retry to prepare the batch again
updateFlag = false;
jdbcStatements = new JdbcStatements(conn, config.getQueryTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,9 @@ public void validate(
Set<String> jdbcPks = new HashSet<>();
Set<String> jdbcTableNames = new HashSet<>();

java.util.Properties info = new java.util.Properties();
if (config.getUser() != null) {
info.put("user", config.getUser());
}
if (config.getPassword() != null) {
info.put("password", config.getPassword());
}

try (Connection conn = DriverManager.getConnection(jdbcUrl, info);
try (Connection conn =
DriverManager.getConnection(
jdbcUrl, config.getUser(), config.getPassword());
ResultSet tableNamesResultSet =
conn.getMetaData().getTables(null, schemaName, "%", null);
ResultSet columnResultSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public static Optional<JdbcDialectFactory> getDialectFactory(String jdbcUrl) {
}

/** The connection returned by this method is *not* autoCommit */
public static Connection getConnection(String jdbcUrl) throws SQLException {
public static Connection getConnection(String jdbcUrl, String user, String password)
throws SQLException {
var props = new Properties();
// enable TCP keep alive to avoid connection closed by server
// both MySQL and PG support this property
Expand All @@ -55,6 +56,12 @@ public static Connection getConnection(String jdbcUrl) throws SQLException {
int socketTimeout = isPg ? SOCKET_TIMEOUT : SOCKET_TIMEOUT * 1000;
props.setProperty("connectTimeout", String.valueOf(connectTimeout));
props.setProperty("socketTimeout", String.valueOf(socketTimeout));
if (user != null) {
props.put("user", user);
}
if (password != null) {
props.put("password", password);
}

var conn = DriverManager.getConnection(jdbcUrl, props);
// disable auto commit can improve performance
Expand Down

0 comments on commit 0ad00f6

Please sign in to comment.