Skip to content

Commit

Permalink
[INLONG-1023][Doc] Optimize Manager plugins documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Sep 29, 2024
1 parent 9c4b3bd commit debac93
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 51 deletions.
Binary file modified docs/development/extension_manager/img/inlong_plugin.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/development/extension_manager/img/plugin.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
79 changes: 55 additions & 24 deletions docs/development/extension_manager/inlong_manager_plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,63 +21,94 @@ This article is aimed at InLong-Manager plugin developers, trying to explain the

![](img/inlong_plugin.png)

- When Inlong Manager is deployed, plugins must be located under installation directory, then **Manager Process** will find the plugin jar and install the plugin automatically.
As shown in the figure, plugins mainly serve the workflows in InLong. Each task in the workflow corresponds to a listener queue, such as Init Mq corresponding to QueueOperateListener, Init Sink corresponding to SinkOperateListener, Init Sort corresponding to SortOperateListener, and Init Source corresponding to SourceOperateListener.

![](img/plugin_location.png)
When developers need to add a task to the workflow, they can add a Listener through the plugin and register the Listener to the task.

- As a developer, you can confirm your plugin be loaded successfully by searching logs below:
Below is an example of adding a TestListener process for the Init Sort task, mainly adding three files: TestListener, TestProcessPlugin, and plugin.yaml.

![](img/plugin_log.png)
![](img/plugin.png)

## Reference Demo
```java
@Slf4j
public class TestListener implements SortOperateListener {

- For helping all Inlong developers. We hava provide **manager-plugins** in Inlong Manager Module, which provide **FlinkSortProcessPlugin** as an example; or you can create **ProcessPlugin** as below;
@Override
public TaskEvent event() {
return TaskEvent.COMPLETE;
}

@Override
public boolean accept(WorkflowContext workflowContext) {
return true;
}

@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
log.info("Success execute test stream listener");
return ListenerResult.success();
}
}
```
TestListener implements SortOperateListener and overrides the listen method. When the execution reaches TestListener, it will print a line of log.

```java
public class EmptyProcessPlugin implements ProcessPlugin {
@Slf4j
public class TestProcessPlugin implements ProcessPlugin {

@Override
public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
return new LinkedHashMap<>();
public List<SourceOperateListener> createSourceOperateListeners() {
return new LinkedList<>();
}

@Override
public Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
public List<SortOperateListener> createSortOperateListeners() {
List<SortOperateListener> listeners = new LinkedList<>();
listeners.add(new TestListener());
return listeners;
}

@Override
public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
return new LinkedHashMap<>();
}

@Override
public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
return ProcessPlugin.super.createSortOperateListeners();
public Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
return new LinkedHashMap<>();
}

@Override
public Map<SinkOperateListener, EventSelector> createSinkOperateListeners() {
return ProcessPlugin.super.createSinkOperateListeners();
return new LinkedHashMap<>();
}

}
```
TestProcessPlugin implements ProcessPlugin and overrides the createSortOperateListeners method. When the plugin is loaded, the Manager will load TestListener into the SortOperateListener queue. When the workflow executes to Init Sort, TestListener will be executed.

- **DataSourceOperateListener**,**QueueOperateListener**,**SortOperateListener**,**SinkOperateListener** are child_classes extended from **TaskEventListener**. Then **EventSelector** decides whether the listener should be triggered.

```java
public interface EventSelector {

boolean accept(WorkflowContext context);

}
```

- After developing you plugin, you should prepare plugin definition file in **Yaml**, and put it under resources/META-INF.

```yaml
name: example
name: test
description: example for manager plugin
javaVersion: 1.8
pluginClass: org.apache.inlong.manager.plugin.EmptyProcessPlugin
pluginClass: org.apache.inlong.manager.plugin.TestProcessPlugin
```
- When Inlong Manager is deployed, plugins must be located under installation directory, then **Manager Process** will find the plugin jar and install the plugin automatically.
![](img/plugin_location.png)
- As a developer, you can confirm your plugin be loaded successfully by searching logs below:
![](img/plugin_log.png)
- In this way, after executing the workflow, the following log will be printed, indicating that the plugin has been successfully executed.
![](img/workflow_plugin.png)
- To develop available Listeners , you can refer to the native Listeners in `org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory`

## Last but not Least
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,96 @@ sidebar_position: 3

- Inlong Manager 插件机制如下图所示:

![](img/inlong_plugin.png)
![](img/inlong_plugin.png)

- 如图所示,插件需要被放置在安装路径之下, 当 Inlong Manager 进程启动时,会自动寻找插件编译的 jar 包并加载其中的代码
如图所示,插件主要服务于 InLong 中的工作流,工作流中的 task 对应一个 listener 队列,例如 Init Mq 对应 QueueOperateListener、Init Sink 对应 SinkOperateListener、Init Sort 对应 SortOperateListener、Init Source 对应 SourceOperateListener

![](img/plugin_location.png)

- 作为开发人员,当你看到下图所示的日志时,可以确认插件已经加载成功了:
当开发人员需要为工作流增加一个 task 时,就可以通过插件的方式添加 Listener,并将 Listener 注册到 task 中。

![](img/plugin_log.png)

## 参考 Demo
下面以为 Init Sort 这个 Task 添加一个 TestListener 流程为例,主要添加三个文件,TestListener、TestProcessPlugin、plugin.yaml。

- 为方便开发人员理解. 我们在 Inlong Manager 目录下增加了 ***manager-plugins** , 开发人员可参考 **FlinkSortProcessPlugin** 进行自己的插件开发; 或者按照如下的案例开发插件;
![](img/plugin.png)

```java
public class EmptyProcessPlugin implements ProcessPlugin {
@Slf4j
public class TestListener implements SortOperateListener {

@Override
public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
return new LinkedHashMap<>();
public TaskEvent event() {
return TaskEvent.COMPLETE;
}

@Override
public Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
return new LinkedHashMap<>();
public boolean accept(WorkflowContext workflowContext) {
return true;
}

@Override
public Map<SortOperateListener, EventSelector> createSortOperateListeners() {
return ProcessPlugin.super.createSortOperateListeners();
public ListenerResult listen(WorkflowContext context) throws Exception {
log.info("Success execute test stream listener");
return ListenerResult.success();
}
}
```
TestListener 实现了 SortOperateListener,并重写了 listen 方法,当执行到 TestListener 时,将会打印一行日志。

```java
@Slf4j
public class TestProcessPlugin implements ProcessPlugin {

@Override
public Map<SinkOperateListener, EventSelector> createSinkOperateListeners() {
return ProcessPlugin.super.createSinkOperateListeners();
public List<SourceOperateListener> createSourceOperateListeners() {
return new LinkedList<>();
}

}
```
@Override
public List<SortOperateListener> createSortOperateListeners() {
List<SortOperateListener> listeners = new LinkedList<>();
listeners.add(new TestListener());
return listeners;
}

- **DataSourceOperateListener**,**QueueOperateListener**,**SortOperateListener**,**SinkOperateListener****TaskEventListener** 的子类, 分别负责源数据端,消息队列,sort 函数,目标数据端的初始化工作。 与 Listener 绑定的**EventSelector**决定该 Listener 是否在运行时被激活。
@Override
public Map<DataSourceOperateListener, EventSelector> createSourceOperateListeners() {
return new LinkedHashMap<>();
}

```java
public interface EventSelector {
@Override
public Map<QueueOperateListener, EventSelector> createQueueOperateListeners() {
return new LinkedHashMap<>();
}

boolean accept(WorkflowContext context);
@Override
public Map<SinkOperateListener, EventSelector> createSinkOperateListeners() {
return new LinkedHashMap<>();
}

}
```
TestProcessPlugin 实现了 ProcessPlugin,并重写了 createSortOperateListeners 方法,当插件被加载时,Manager 就会将 TestListener 加载到 SortOperateListener 的队列中,当工作流执行到 Init Sort 时,TestListener 就会被执行。

- 完成插件的开发工作后, 你需要编写对应的**Yaml**格式的插件定义文件, 将其放置在工程目录 resources/META-INF 下。

```yaml
name: example
name: test
description: example for manager plugin
javaVersion: 1.8
pluginClass: org.apache.inlong.manager.plugin.EmptyProcessPlugin
pluginClass: org.apache.inlong.manager.plugin.TestProcessPlugin
```
- 如果你不确定怎样开发一个可用的 Listener ,请参考`org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory`中原生 Listener 的逻辑。
- 如图所示,当插件开放完成后,插件需要被放置在安装路径之下, 当 Inlong Manager 进程启动时,会自动寻找插件编译的 jar 包并加载其中的代码。
![](img/plugin_location.png)
- 作为开发人员,当你看到下图所示的日志时,可以确认插件已经加载成功了:
![](img/plugin_log.png)
- 这样当执行工作流后就会打印以下日志,此时表示插件已经成功执行。
![](img/workflow_plugin.png)
- 如果你仍然不确定怎样开发一个可用的 Listener ,请参考`org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory`中原生 Listener 的逻辑。

## 写在最后

Expand Down

0 comments on commit debac93

Please sign in to comment.