Skip to content

Commit

Permalink
feat: manually mirror eino's code from bytedance
Browse files Browse the repository at this point in the history
  • Loading branch information
BytePender committed Dec 6, 2024
1 parent 72e2843 commit a4aaa08
Show file tree
Hide file tree
Showing 155 changed files with 24,554 additions and 14 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
# .github
# Eino

English | [中文](README.zh_CN.md)

## Overview


## Security

If you discover a potential security issue in this project, or think you may
have discovered a security issue, we ask that you notify Bytedance Security via our [security center](https://security.bytedance.com/src) or [vulnerability reporting email]([email protected]).

Please do **not** create a public GitHub issue.

## License

This project is licensed under the [Apache-2.0 License](LICENSE.txt).
54 changes: 54 additions & 0 deletions README.zh_CN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Eino

[English](README.md) | 中文


## 概括

Eino['aino] (近似音: i know) 旨在提供 Golang 语言的 AI 应用开发框架。 Eino 参考了开源社区中诸多优秀的 AI 应用开发框架,例如 LangChain、LangGraph、LlamaIndex 等,提供了更符合 Golang 编程习惯的 AI 应用开发框架。

Eino 提供了丰富的辅助AI应用开发的原子组件、集成组件、组件编排、切面扩展等能力,可以帮助开发者更加简单便捷地开发出架构清晰、易维护、高可用的AI应用。

## 框架特点

- **丰富的组件**

将多场景普遍使用的能力,抽象成可独立使用、可编排使用的组件,开箱即用。例如 ChatModel、PromptTemplate、Retriever、Loader 等。

组件又可细分为:功能不可细拆的原子组件、由一到多中组件以某种范式组合而成的集成组件。

- **易用的图编排**

将各组件实例,作为图的节点,以图的点边关系连接,以边的方向逐步执行节点并传输数据流,将AI应用的逻辑以图的方式进行编排和执行。

图编排可极大简化 **并行****异步** 逻辑的开发,并优化其代码结构

- **完善的流处理**

根据输入、输出是否为流式,可划分成 4 种交互模式。 图编排可根据上下游节点的输入、输出是否是流,自动进行 流 和 非流 的转换,极大地方便开发者对AI应用提供流的能力

| 函数名 | 模式说明 |
|-----------|-----------|
| Invoke | 输入是非流、输出是非流 |
| Stream | 输入是非流、输出是流 |
| Collect | 输入是流、输出是非流 |
| Transform | 输入是流、输出是流 |

- **高扩展性的切面**

图编排为图、节点的执行前后提供切面的注入、执行机制。开发者可基于此机制,在不侵入主流程的前提下,灵活地设计和注入自己的切面能力。例如 Trace、埋点、日志等


## 详细文档

// TODO:链接用户手册等文档

## 安全

如果你在该项目中发现潜在的安全问题,或你认为可能发现了安全问题,请通过我们的[安全中心](https://security.bytedance.com/src)[漏洞报告邮箱]([email protected])通知字节跳动安全团队。

**不要**创建公开的 GitHub Issue。

## 开源许可证

本项目依据 [Apache-2.0 许可证](LICENSE.txt) 授权。
234 changes: 234 additions & 0 deletions callbacks/aspect_inject.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package callbacks

import (
"context"

"github.com/cloudwego/eino/schema"
)

// Fast inject callback input / output aspect for component developer
// e.g.
//
// func (t *testchatmodel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
// defer func() {
// if err != nil {
// callbacks.OnEnd(ctx, err)
// }
// }()
//
// ctx = callbacks.OnStart(ctx, &model.CallbackInput{
// Messages: input,
// Tools: nil,
// Extra: nil,
// })
//
// // do smt
//
// ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
// Message: resp,
// Extra: nil,
// })
//
// return resp, nil
// }
//
// OnStart invokes the OnStart logic for the particular context, ensuring that all registered
// handlers are executed in reverse order (compared to add order) when a process begins.
func OnStart(ctx context.Context, input CallbackInput) context.Context {
mgr, ok := managerFromCtx(ctx)
if !ok {
return ctx
}

for i := len(mgr.handlers) - 1; i >= 0; i-- {
handler := mgr.handlers[i]
timingChecker, ok := handler.(TimingChecker)
if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnStart) {
ctx = handler.OnStart(ctx, mgr.runInfo, input)
}
}

return ctx
}

// OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup
// and finalization when a process ends.
// handlers are executed in normal order (compared to add order).
func OnEnd(ctx context.Context, output CallbackOutput) context.Context {
mgr, ok := managerFromCtx(ctx)
if !ok {
return ctx
}

for i := 0; i < len(mgr.handlers); i++ {
handler := mgr.handlers[i]
timingChecker, ok := handler.(TimingChecker)
if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnEnd) {
ctx = handler.OnEnd(ctx, mgr.runInfo, output)
}
}

return ctx
}

// OnStartWithStreamInput invokes the OnStartWithStreamInput logic of the particular context, ensuring that
// every input stream should be closed properly in handler.
// handlers are executed in reverse order (compared to add order).
func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (
nextCtx context.Context, newStreamReader *schema.StreamReader[T]) {

mgr, ok := managerFromCtx(ctx)
if !ok {
return ctx, input
}

if len(mgr.handlers) == 0 {
return ctx, input
}

var neededHandlers []Handler
for i := range mgr.handlers {
h := mgr.handlers[i]
timingChecker, ok := h.(TimingChecker)
if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnStartWithStreamInput) {
neededHandlers = append(neededHandlers, h)
}
}

if len(neededHandlers) == 0 {
return ctx, input
}

cp := input.Copy(len(neededHandlers) + 1)
for i := len(neededHandlers) - 1; i >= 0; i-- {
h := neededHandlers[i]
ctx = h.OnStartWithStreamInput(ctx, mgr.runInfo, schema.StreamReaderWithConvert(cp[i], func(src T) (CallbackInput, error) {
return src, nil
}))
}

return ctx, cp[len(cp)-1]
}

// OnEndWithStreamOutput invokes the OnEndWithStreamOutput logic of the particular, ensuring that
// every input stream should be closed properly in handler.
// handlers are executed in normal order (compared to add order).
func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (
nextCtx context.Context, newStreamReader *schema.StreamReader[T]) {

mgr, ok := managerFromCtx(ctx)
if !ok {
return ctx, output
}

if len(mgr.handlers) == 0 {
return ctx, output
}

var neededHandlers []Handler
for i := range mgr.handlers {
h := mgr.handlers[i]
timingChecker, ok := h.(TimingChecker)
if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnEndWithStreamOutput) {
neededHandlers = append(neededHandlers, h)
}
}

if len(neededHandlers) == 0 {
return ctx, output
}

cp := output.Copy(len(neededHandlers) + 1)
for i := 0; i < len(neededHandlers); i++ {
h := neededHandlers[i]
ctx = h.OnEndWithStreamOutput(ctx, mgr.runInfo, schema.StreamReaderWithConvert(cp[i], func(src T) (CallbackOutput, error) {
return src, nil
}))
}

return ctx, cp[len(cp)-1]
}

// OnError invokes the OnError logic of the particular, notice that error in stream will not represent here.
// handlers are executed in normal order (compared to add order).
func OnError(ctx context.Context, err error) context.Context {
mgr, ok := managerFromCtx(ctx)
if !ok {
return ctx
}

for i := 0; i < len(mgr.handlers); i++ {
handler := mgr.handlers[i]
timingChecker, ok := handler.(TimingChecker)
if !ok || timingChecker.Needed(ctx, mgr.runInfo, TimingOnError) {
ctx = handler.OnError(ctx, mgr.runInfo, err)
}
}

return ctx
}

// SwitchRunInfo updates the RunInfo in the context if a previous RunInfo already exists for that context.
func SwitchRunInfo(ctx context.Context, info *RunInfo) context.Context {
cbm, ok := managerFromCtx(ctx)
if !ok {
return ctx
}

return ctxWithManager(ctx, cbm.withRunInfo(info))
}

// InitCallbacks initializes a new context with the provided RunInfo and handlers.
// If successful, it returns a new context containing RunInfo and handlers; otherwise, it returns a context with a nil manager.
func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context {
mgr, ok := newManager(info, handlers...)
if ok {
return ctxWithManager(ctx, mgr)
}

return ctxWithManager(ctx, nil)
}

// Needed checks if any callback handlers exist in this context.
func Needed(ctx context.Context) bool {
_, cbmOK := managerFromCtx(ctx)
return cbmOK
}

// NeededForTiming checks if any callback handlers exist in this context that are needed for this specific timing.
func NeededForTiming(ctx context.Context, timing CallbackTiming) bool {
mgr, ok := managerFromCtx(ctx)
if !ok {
return false
}

if len(mgr.handlers) == 0 {
return false
}

for i := 0; i < len(mgr.handlers); i++ {
handler := mgr.handlers[i]
timingChecker, ok := handler.(TimingChecker)
if !ok || timingChecker.Needed(ctx, mgr.runInfo, timing) {
return true
}
}

return false
}
Loading

0 comments on commit a4aaa08

Please sign in to comment.