Skip to content

Commit

Permalink
feat: add cancel method for streaming jobs (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasMTS authored Oct 8, 2024
1 parent 88bd89b commit 5694efe
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions dbt/adapters/risingwave/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, Optional

import psycopg2
from dbt.adapters.contracts.connection import Connection
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.postgres.connections import (
PostgresConnectionManager,
Expand Down Expand Up @@ -135,6 +136,55 @@ def open(cls, connection):
connection.handle.cursor().execute("SET RW_IMPLICIT_FLUSH TO true")
return connection

def cancel(self, connection: Connection):
# index here references the column order in processlist output:
# (id, user, host, database, time, info)
INFO_COL_INDEX, PID_COL_INDEX, pid = -1, 0, None

if not (connection_name := connection.name):
logger.debug("No connection name found")
return

if not (creds := connection.credentials):
logger.debug("No credentials found")
return

db, schema, table = (
creds.database,
creds.schema,
connection_name.split(".")[-1],
)
model_pattern = f'"{db}"."{schema}"."{table}"'

try:
_, cursor = self.add_query("SHOW PROCESSLIST")
if not (processlist := cursor.fetchall()):
logger.debug("No process list found")
return
pid = next(
filter(
lambda p: model_pattern in str(p[INFO_COL_INDEX]),
processlist,
)
)[PID_COL_INDEX]

except StopIteration:
logger.debug(
f"no model pattern ({model_pattern}) found in processlist for name: '{connection_name}'"
)
return
except psycopg2.InterfaceError as exc:
if "already closed" in str(exc) or "Session not found" in str(exc):
logger.debug(f"Connection '{connection_name}' already closed")
return

logger.debug(f"Cancelling query '{connection_name}' ({pid})")
try:
self.add_query(f"KILL {pid}")
except Exception as exc:
logger.debug(f"Error while cancelling query: {exc}")
raise

# Disable transactions.
def add_begin_query(self, *args, **kwargs):
pass
Expand Down

0 comments on commit 5694efe

Please sign in to comment.