Skip to content

Commit

Permalink
Bring workflowlogging in line with pipeline logging
Browse files Browse the repository at this point in the history
  • Loading branch information
hansva committed Mar 13, 2024
1 parent 35b80b9 commit 4979a94
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 30 deletions.
6 changes: 3 additions & 3 deletions assemblies/plugins/tech/hopserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
<parent>
<groupId>org.apache.hop</groupId>
<artifactId>hop-assemblies-plugins-tech</artifactId>
<version>2.8.0-SNAPSHOT</version>
<version>2.9.0-SNAPSHOT</version>
</parent>

<artifactId>hop-assemblies-plugins-tech-hopserver</artifactId>
<version>2.8.0-SNAPSHOT</version>
<version>2.9.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Hop Assemblies Plugins Hop Server</name>
<description />
Expand All @@ -39,7 +39,7 @@
<dependency>
<groupId>org.apache.hop</groupId>
<artifactId>hop-plugins-tech-hopserver</artifactId>
<version>2.8.0-SNAPSHOT</version>
<version>2.9.0-SNAPSHOT</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion plugins/tech/hopserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<groupId>org.apache.hop</groupId>
<artifactId>hop-plugins-tech-hopserver</artifactId>
<version>2.8.0-SNAPSHOT</version>
<version>2.9.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Hop Plugins Technology Google</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ private void logPipeline(
rootNode.put("name", pipelineMeta.getName());
rootNode.put("description", pipelineMeta.getDescription());
rootNode.put("filename", pipelineMeta.getFilename());
rootNode.put("channelId", channel.getLogChannelId());
rootNode.put("workflowPipelineLogChannelId", channel.getLogChannelId());
rootNode.put("status", pipelineStatus);
rootNode.put("lastLoggingLineNr", lastNrInLogStore);
rootNode.put("log", HopLogStore.getAppender()
rootNode.put(
"log",
HopLogStore.getAppender()
.getBuffer(channel.getLogChannelId(), false, 0, lastNrInLogStore)
.toString());
if (parent != null) {
Expand Down Expand Up @@ -198,7 +200,8 @@ private void logPipeline(
transformNode.put("pluginId", transformMeta.getPluginId());
transformNode.put("copies", transformMeta.getCopies(pipeline));
transformNode.put(
"channelId", transformMetaDataCombi.transform.getLogChannel().getLogChannelId());
"actionTransformChannelId",
transformMetaDataCombi.transform.getLogChannel().getLogChannelId());
transformNode.put("copy", transformMetaDataCombi.copy);
transformNode.put("status", transformMetaDataCombi.transform.getStatus().getDescription());
transformNode.put("errors", transformMetaDataCombi.transform.getErrors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.extension.ExtensionPoint;
import org.apache.hop.core.extension.IExtensionPoint;
Expand All @@ -33,6 +35,8 @@
import org.apache.hop.hopserver.util.HopServerUtils;
import org.apache.hop.workflow.ActionResult;
import org.apache.hop.workflow.WorkflowMeta;
import org.apache.hop.workflow.action.ActionMeta;
import org.apache.hop.workflow.action.IAction;
import org.apache.hop.workflow.engine.IWorkflowEngine;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -77,24 +81,44 @@ public void callExtensionPoint(

try {

logWorkflow(log, workflow, endpoint);
logWorkflow(log, workflow, endpoint, "Initializing");

// Periodic logging
final Timer timer = new Timer();
TimerTask timerTask =
new TimerTask() {
@Override
public void run() {
try {
logWorkflow(log, workflow, endpoint, "Running");
} catch (Exception e) {
throw new RuntimeException(
"Unable to do interval logging for Workflow Log object", e);
}
}
};
timer.schedule(timerTask, 5 * 1000L, 5 * 1000L);

workflow.addWorkflowFinishedListener(
workflowMetaIWorkflowEngine -> {
timer.cancel();
workflow.getExtensionDataMap().put(WORKFLOW_END_DATE, new Date());
logWorkflow(log, workflow, endpoint);
logWorkflow(log, workflow, endpoint, "Finished");
});

} catch (Exception e) {
// Let's not kill the workflow just yet, just log the error
// otherwise: throw new HopException(...)
//
log.logError("Error logging to Neo4j:", e);
log.logError("Error logging to Hop Server:", e);
}
}

private void logWorkflow(
final ILogChannel log, final IWorkflowEngine<WorkflowMeta> workflow, String endpoint)
final ILogChannel log,
final IWorkflowEngine<WorkflowMeta> workflow,
final String endpoint,
final String workflowStatus)
throws HopException {
final WorkflowMeta workflowMeta = workflow.getWorkflowMeta();
final ILogChannel channel = workflow.getLogChannel();
Expand All @@ -108,48 +132,100 @@ private void logWorkflow(
rootNode.put("executionId", HopServerUtils.getExecutionId());
rootNode.put("type", EXECUTION_TYPE_WORKFLOW);
rootNode.put("name", workflowMeta.getName());
rootNode.put("status", workflowStatus);
rootNode.put("description", workflowMeta.getDescription());
rootNode.put("filename", workflowMeta.getFilename());
rootNode.put("channelId", channel.getLogChannelId());
if(parent != null){
rootNode.put("workflowPipelineLogChannelId", channel.getLogChannelId());
if (parent != null) {
rootNode.put("parentChannelId", parent.getLogChannelId());
}
rootNode.put(
"startDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_START_DATE)));
"startDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_START_DATE)));
if (workflow.getExtensionDataMap().get(WORKFLOW_END_DATE) != null) {
rootNode.put(
"endDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_END_DATE)));
"endDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_END_DATE)));
}
rootNode.put("logDate", new SimpleDateFormat(DATE_FORMAT).format(new Date()));

// Action information
ArrayNode ActionNodes = mapper.createArrayNode();

// Get Active Action Meta
for (ActionMeta actionMeta : workflow.getActiveActions()) {
ObjectNode actionNode = mapper.createObjectNode();
IAction action = actionMeta.getAction();
String ActionLoggingText =
HopLogStore.getAppender()
.getBuffer(action.getLogChannel().getLogChannelId(), false)
.toString();
actionNode.put("type", EXECUTION_TYPE_ACTION);
actionNode.put("name", action.getName());
actionNode.put("pluginId", action.getPluginId());
actionNode.put("description", action.getDescription());
actionNode.put("actionTransformChannelId", action.getLogChannel().getLogChannelId());
actionNode.put("status", "Running");
actionNode.put("log", ActionLoggingText);
actionNode.put(
"startDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_START_DATE)));
if (workflow.getExtensionDataMap().get(WORKFLOW_END_DATE) != null) {
actionNode.put(
"endDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_END_DATE)));
}
actionNode.put("logDate", new SimpleDateFormat(DATE_FORMAT).format(new Date()));
ActionNodes.add(actionNode);
}

// Add finished Actions
for (ActionResult actionResult : workflow.getActionResults()) {
ObjectNode actionNode = mapper.createObjectNode();
// Find Action Meta
ActionMeta foundActionMeta =
workflowMeta.getActions().stream()
.filter(actionMeta -> actionResult.getActionName().equals(actionMeta.getName()))
.findFirst()
.orElse(null);
IAction action = foundActionMeta.getAction();

String ActionLoggingText =
HopLogStore.getAppender().getBuffer(actionResult.getLogChannelId(), false).toString();

actionNode.put("type", EXECUTION_TYPE_ACTION);
actionNode.put("name", actionResult.getActionName());
actionNode.put("description", actionResult.getComment());
//actionNode.put("pluginId", "");
actionNode.put("channelId", actionResult.getLogChannelId());
//actionNode.put("copy", "");
//actionNode.put("status", "");
actionNode.put("pluginId", action.getPluginId());
actionNode.put("description", action.getDescription());
actionNode.put("actionTransformChannelId", actionResult.getLogChannelId());
if (actionResult.getResult().getNrErrors() > 0) {
actionNode.put("status", "Stopped");
} else {
actionNode.put("status", "Finished");
}
actionNode.put("errors", actionResult.getResult().getNrErrors());
actionNode.put("linesRead", actionResult.getResult().getNrLinesRead());
actionNode.put("linesWritten", actionResult.getResult().getNrLinesWritten());
actionNode.put("linesInput", actionResult.getResult().getNrLinesInput());
actionNode.put("linesOutput", actionResult.getResult().getNrLinesOutput());
actionNode.put("linesRejected", actionResult.getResult().getNrLinesRejected());
actionNode.put("log", ActionLoggingText);
actionNode.put("logDate", new SimpleDateFormat(DATE_FORMAT).format(new Date()));

actionNode.put(
"startDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_START_DATE)));
if (workflow.getExtensionDataMap().get(WORKFLOW_END_DATE) != null) {
actionNode.put(
"endDate",
new SimpleDateFormat(DATE_FORMAT)
.format(workflow.getExtensionDataMap().get(WORKFLOW_END_DATE)));
}
actionNode.put(
"logDate", new SimpleDateFormat(DATE_FORMAT).format(actionResult.getLogDate()));
actionNode.put("durationMs", actionResult.getResult().getElapsedTimeMillis());
ActionNodes.add(actionNode);
}

Expand All @@ -166,16 +242,16 @@ private void logWorkflow(
// Execute and get the response.
HttpResponse response = httpclient.execute(httppost);

if (response.getStatusLine().getStatusCode() != 200) {
if (response.getStatusLine().getStatusCode() >= 400) {
log.logError(
"The server returned a non status 200 code : "
"The server returned an error status code : "
+ response.getStatusLine().getStatusCode()
+ " - "
+ response.getStatusLine().getReasonPhrase());
}

} catch (Exception e) {

log.logError("Unexpected error occurred: ", e);
}
}
}

0 comments on commit 4979a94

Please sign in to comment.