Skip to content

Commit

Permalink
feat(app): fix cpu bug
Browse files Browse the repository at this point in the history
  • Loading branch information
MorvanZhou committed Aug 28, 2024
1 parent 4b9aa43 commit c6a019a
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 56 deletions.
4 changes: 2 additions & 2 deletions src/retk/core/ai/llm/knowledge/system_extend.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
请注意:

1. 你返回的结果必须为下面案例展示的 JSON 格式。
2. 在每次生成 JSON 结果的时候,生成的 value 请遵循我的主语言,比如我展示的信息使用 English,那么就生成 English
内容,若主要使用中文,那么就生成中文内容
2. 返回的结果中使用的语言请与我展示信息中的语言保持一致。比如展示内容为中文,则返回结果的 value 也为中文,如果展示内容为英文,则返回结果的
value 也为英文

# 案例 1:

Expand Down
13 changes: 10 additions & 3 deletions src/retk/core/files/upload.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import io
import ssl
from typing import List, Tuple, Optional

import httpx
Expand Down Expand Up @@ -141,8 +142,13 @@ async def vditor_upload(au: AuthedUser, files: List[UploadFile]) -> dict:


# pylint: disable=line-too-long
async def fetch_image_vditor(au: AuthedUser, url: str, count=0, referer: str = "", user_agent: str = "") -> Tuple[
str, const.CodeEnum]:
async def fetch_image_vditor(
au: AuthedUser,
url: str,
count=0,
referer: str = "",
user_agent: str = ""
) -> Tuple[str, const.CodeEnum]:
if count > 2:
logger.debug(f"too many 30X code, failed to get {url}")
return "", const.CodeEnum.FILE_OPEN_ERROR
Expand All @@ -169,7 +175,8 @@ async def fetch_image_vditor(au: AuthedUser, url: str, count=0, referer: str = "
RuntimeError,
httpx.ConnectError,
httpx.ReadTimeout,
httpx.HTTPError
httpx.HTTPError,
ssl.SSLError,
) as e:
logger.debug(f"failed to get {url}: {e}")
return "", const.CodeEnum.FILE_OPEN_ERROR
Expand Down
119 changes: 71 additions & 48 deletions src/retk/core/scheduler/tasks/extend_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,58 @@ def deliver_unscheduled_extend_nodes():
return res


async def get_cases(db, batch: List[NodeExtendQueue]) -> List[knowledge.ExtendCase]:
cases: List[knowledge.ExtendCase] = []

nid2item = {item["nid"]: item for item in batch}

nodes = await db[CollNameEnum.nodes.value].find({"id": {"$in": list(nid2item.keys())}}).to_list(None)
for node in nodes:
if node is None:
continue
item = nid2item[node["id"]]
cases.append(
knowledge.ExtendCase(
_id=item["_id"],
uid=item["uid"],
nid=item["nid"],
summary_service=item["summaryService"],
summary_model=item["summaryModel"],
extend_service=item["extendService"],
extend_model=item["extendModel"],
md=node["md"],
)
)
return cases


async def update_extended_nodes(db, case: knowledge.ExtendCase):
ext = ExtendedNode(
uid=case.uid,
sourceNid=case.nid,
sourceMd=case.md,
extendMd=case.extend_md,
extendSearchTerms=case.extend_search_terms,
)
if config.is_local_db():
doc = await db[CollNameEnum.llm_extended_node.value].find_one(
{"uid": case.uid, "sourceNid": case.nid}
)
if doc is None:
await db[CollNameEnum.llm_extended_node.value].insert_one(ext)
else:
await db[CollNameEnum.llm_extended_node.value].update_one(
{"uid": case.uid, "sourceNid": case.nid},
{"$set": ext}
)
else:
await db[CollNameEnum.llm_extended_node.value].update_one(
{"uid": case.uid, "sourceNid": case.nid},
{"$set": ext},
upsert=True
)


async def async_deliver_unscheduled_extend_nodes() -> str:
_, db = init_mongo(connection_timeout=5)
batch_size = 40
Expand All @@ -30,28 +82,12 @@ async def async_deliver_unscheduled_extend_nodes() -> str:
while True:
done_id_list = []
batch: List[NodeExtendQueue] = await db[CollNameEnum.llm_extend_node_queue.value].find().limit(
batch_size).to_list(None)
batch_size
).to_list(None)
if len(batch) == 0:
break
cases: List[knowledge.ExtendCase] = []
req_id = "".join([str(random.randint(0, 9)) for _ in range(10)])

for item in batch:
node = await db[CollNameEnum.nodes.value].find_one({"id": item["nid"]})
if node is None:
continue
cases.append(
knowledge.ExtendCase(
_id=item["_id"],
uid=item["uid"],
nid=item["nid"],
summary_service=item["summaryService"],
summary_model=item["summaryModel"],
extend_service=item["extendService"],
extend_model=item["extendModel"],
md=node["md"],
)
)
cases = await get_cases(db, batch)

s0 = time.perf_counter()
cases = await knowledge.batch_summary(
Expand All @@ -72,46 +108,33 @@ async def async_deliver_unscheduled_extend_nodes() -> str:
uid=case.uid,
type_=const.user_behavior_types.UserBehaviorTypeEnum.LLM_KNOWLEDGE_RESPONSE,
remark=json.dumps(
{"md": case.stripped_md, "summary": case.summary, "extend": case.extend_md},
{
"md": case.stripped_md,
"summary": case.summary,
"summaryService": case.summary_service,
"summaryModel": case.summary_model,
"extend": case.extend_md,
"extendService": case.extend_service,
"extendModel": case.extend_model,
},
ensure_ascii=False
),
)
done_id_list.append(case._id)
if case.summary_code != const.CodeEnum.OK or case.extend_code != const.CodeEnum.OK:
continue
ext = ExtendedNode(
uid=case.uid,
sourceNid=case.nid,
sourceMd=case.md,
extendMd=case.extend_md,
extendSearchTerms=case.extend_search_terms,
)
if config.is_local_db():
doc = await db[CollNameEnum.llm_extended_node.value].find_one(
{"uid": case.uid, "sourceNid": case.nid}
)
if doc is None:
await db[CollNameEnum.llm_extended_node.value].insert_one(ext)
else:
await db[CollNameEnum.llm_extended_node.value].update_one(
{"uid": case.uid, "sourceNid": case.nid},
{"$set": ext}
)
else:
await db[CollNameEnum.llm_extended_node.value].update_one(
{"uid": case.uid, "sourceNid": case.nid},
{"$set": ext},
upsert=True
)

await update_extended_nodes(db, case)

total_summary_time += s1 - s0
total_extend_time += e1 - e0

if len(done_id_list) > 0:
res = await db[CollNameEnum.llm_extend_node_queue.value].delete_many({"_id": {"$in": done_id_list}})
total_success_count += res.deleted_count
# remove the batch
await db[CollNameEnum.llm_extend_node_queue.value].delete_many(
{"_id": {"$in": [b["_id"] for b in batch]}}
)

if total_success_count > 0:
if len(done_id_list) > 0:
logger.info(
f"llm extend knowledge task: "
f"avg_summary_time: {total_summary_time / total_success_count:.2f}s, "
Expand Down
4 changes: 2 additions & 2 deletions src/retk/core/statistic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


async def __write_new(path: Path, lock: asyncio.Lock):
async with aiofiles.open(path, "w") as f:
async with aiofiles.open(path, "w", encoding="utf-8") as f:
async with lock:
await f.write("")

Expand All @@ -38,7 +38,7 @@ async def add_user_behavior(
current_log_file.rename(backup_file)
await __write_new(current_log_file, lock)

async with aiofiles.open(current_log_file, "a") as f:
async with aiofiles.open(current_log_file, "a", encoding="utf-8") as f:
record = {
"time": time_now.strftime('%Y-%m-%d %H:%M:%S'),
"uid": uid,
Expand Down
2 changes: 1 addition & 1 deletion src/retk/models/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
pass


def init_mongo(connection_timeout: int):
def init_mongo(connection_timeout: int) -> Union["AsyncIOMotorClient", MongitaClientDisk]:
conf = config.get_settings()
if config.is_local_db():
if not conf.RETHINK_LOCAL_STORAGE_PATH.exists():
Expand Down

0 comments on commit c6a019a

Please sign in to comment.