From c6a019a81e8a34ff26f3dc8d1b05e69a968fd829 Mon Sep 17 00:00:00 2001 From: morvanzhou Date: Wed, 28 Aug 2024 22:08:44 +0800 Subject: [PATCH] feat(app): fix cpu bug --- .../core/ai/llm/knowledge/system_extend.md | 4 +- src/retk/core/files/upload.py | 13 +- src/retk/core/scheduler/tasks/extend_node.py | 119 +++++++++++------- src/retk/core/statistic.py | 4 +- src/retk/models/client.py | 2 +- 5 files changed, 86 insertions(+), 56 deletions(-) diff --git a/src/retk/core/ai/llm/knowledge/system_extend.md b/src/retk/core/ai/llm/knowledge/system_extend.md index 42cb8e9..53a7283 100644 --- a/src/retk/core/ai/llm/knowledge/system_extend.md +++ b/src/retk/core/ai/llm/knowledge/system_extend.md @@ -10,8 +10,8 @@ 请注意: 1. 你返回的结果必须为下面案例展示的 JSON 格式。 -2. 在每次生成 JSON 结果的时候,生成的 value 请遵循我的主语言,比如我展示的信息使用 English,那么就生成 English - 内容,若主要使用中文,那么就生成中文内容。 +2. 返回的结果中使用的语言请与我展示信息中的语言保持一致。比如展示内容为中文,则返回结果的 value 也为中文,如果展示内容为英文,则返回结果的 + value 也为英文。 # 案例 1: diff --git a/src/retk/core/files/upload.py b/src/retk/core/files/upload.py index 3e93ef1..60a4c1b 100644 --- a/src/retk/core/files/upload.py +++ b/src/retk/core/files/upload.py @@ -1,5 +1,6 @@ import datetime import io +import ssl from typing import List, Tuple, Optional import httpx @@ -141,8 +142,13 @@ async def vditor_upload(au: AuthedUser, files: List[UploadFile]) -> dict: # pylint: disable=line-too-long -async def fetch_image_vditor(au: AuthedUser, url: str, count=0, referer: str = "", user_agent: str = "") -> Tuple[ - str, const.CodeEnum]: +async def fetch_image_vditor( + au: AuthedUser, + url: str, + count=0, + referer: str = "", + user_agent: str = "" +) -> Tuple[str, const.CodeEnum]: if count > 2: logger.debug(f"too many 30X code, failed to get {url}") return "", const.CodeEnum.FILE_OPEN_ERROR @@ -169,7 +175,8 @@ async def fetch_image_vditor(au: AuthedUser, url: str, count=0, referer: str = " RuntimeError, httpx.ConnectError, httpx.ReadTimeout, - httpx.HTTPError + httpx.HTTPError, + ssl.SSLError, ) as e: logger.debug(f"failed to get {url}: {e}") return "", const.CodeEnum.FILE_OPEN_ERROR diff --git a/src/retk/core/scheduler/tasks/extend_node.py b/src/retk/core/scheduler/tasks/extend_node.py index 83bf37c..ac9fed5 100644 --- a/src/retk/core/scheduler/tasks/extend_node.py +++ b/src/retk/core/scheduler/tasks/extend_node.py @@ -21,6 +21,58 @@ def deliver_unscheduled_extend_nodes(): return res +async def get_cases(db, batch: List[NodeExtendQueue]) -> List[knowledge.ExtendCase]: + cases: List[knowledge.ExtendCase] = [] + + nid2item = {item["nid"]: item for item in batch} + + nodes = await db[CollNameEnum.nodes.value].find({"id": {"$in": list(nid2item.keys())}}).to_list(None) + for node in nodes: + if node is None: + continue + item = nid2item[node["id"]] + cases.append( + knowledge.ExtendCase( + _id=item["_id"], + uid=item["uid"], + nid=item["nid"], + summary_service=item["summaryService"], + summary_model=item["summaryModel"], + extend_service=item["extendService"], + extend_model=item["extendModel"], + md=node["md"], + ) + ) + return cases + + +async def update_extended_nodes(db, case: knowledge.ExtendCase): + ext = ExtendedNode( + uid=case.uid, + sourceNid=case.nid, + sourceMd=case.md, + extendMd=case.extend_md, + extendSearchTerms=case.extend_search_terms, + ) + if config.is_local_db(): + doc = await db[CollNameEnum.llm_extended_node.value].find_one( + {"uid": case.uid, "sourceNid": case.nid} + ) + if doc is None: + await db[CollNameEnum.llm_extended_node.value].insert_one(ext) + else: + await db[CollNameEnum.llm_extended_node.value].update_one( + {"uid": case.uid, "sourceNid": case.nid}, + {"$set": ext} + ) + else: + await db[CollNameEnum.llm_extended_node.value].update_one( + {"uid": case.uid, "sourceNid": case.nid}, + {"$set": ext}, + upsert=True + ) + + async def async_deliver_unscheduled_extend_nodes() -> str: _, db = init_mongo(connection_timeout=5) batch_size = 40 @@ -30,28 +82,12 @@ async def async_deliver_unscheduled_extend_nodes() -> str: while True: done_id_list = [] batch: List[NodeExtendQueue] = await db[CollNameEnum.llm_extend_node_queue.value].find().limit( - batch_size).to_list(None) + batch_size + ).to_list(None) if len(batch) == 0: break - cases: List[knowledge.ExtendCase] = [] req_id = "".join([str(random.randint(0, 9)) for _ in range(10)]) - - for item in batch: - node = await db[CollNameEnum.nodes.value].find_one({"id": item["nid"]}) - if node is None: - continue - cases.append( - knowledge.ExtendCase( - _id=item["_id"], - uid=item["uid"], - nid=item["nid"], - summary_service=item["summaryService"], - summary_model=item["summaryModel"], - extend_service=item["extendService"], - extend_model=item["extendModel"], - md=node["md"], - ) - ) + cases = await get_cases(db, batch) s0 = time.perf_counter() cases = await knowledge.batch_summary( @@ -72,46 +108,33 @@ async def async_deliver_unscheduled_extend_nodes() -> str: uid=case.uid, type_=const.user_behavior_types.UserBehaviorTypeEnum.LLM_KNOWLEDGE_RESPONSE, remark=json.dumps( - {"md": case.stripped_md, "summary": case.summary, "extend": case.extend_md}, + { + "md": case.stripped_md, + "summary": case.summary, + "summaryService": case.summary_service, + "summaryModel": case.summary_model, + "extend": case.extend_md, + "extendService": case.extend_service, + "extendModel": case.extend_model, + }, ensure_ascii=False ), ) done_id_list.append(case._id) if case.summary_code != const.CodeEnum.OK or case.extend_code != const.CodeEnum.OK: continue - ext = ExtendedNode( - uid=case.uid, - sourceNid=case.nid, - sourceMd=case.md, - extendMd=case.extend_md, - extendSearchTerms=case.extend_search_terms, - ) - if config.is_local_db(): - doc = await db[CollNameEnum.llm_extended_node.value].find_one( - {"uid": case.uid, "sourceNid": case.nid} - ) - if doc is None: - await db[CollNameEnum.llm_extended_node.value].insert_one(ext) - else: - await db[CollNameEnum.llm_extended_node.value].update_one( - {"uid": case.uid, "sourceNid": case.nid}, - {"$set": ext} - ) - else: - await db[CollNameEnum.llm_extended_node.value].update_one( - {"uid": case.uid, "sourceNid": case.nid}, - {"$set": ext}, - upsert=True - ) + + await update_extended_nodes(db, case) total_summary_time += s1 - s0 total_extend_time += e1 - e0 - if len(done_id_list) > 0: - res = await db[CollNameEnum.llm_extend_node_queue.value].delete_many({"_id": {"$in": done_id_list}}) - total_success_count += res.deleted_count + # remove the batch + await db[CollNameEnum.llm_extend_node_queue.value].delete_many( + {"_id": {"$in": [b["_id"] for b in batch]}} + ) - if total_success_count > 0: + if len(done_id_list) > 0: logger.info( f"llm extend knowledge task: " f"avg_summary_time: {total_summary_time / total_success_count:.2f}s, " diff --git a/src/retk/core/statistic.py b/src/retk/core/statistic.py index bf79c46..be906b7 100644 --- a/src/retk/core/statistic.py +++ b/src/retk/core/statistic.py @@ -13,7 +13,7 @@ async def __write_new(path: Path, lock: asyncio.Lock): - async with aiofiles.open(path, "w") as f: + async with aiofiles.open(path, "w", encoding="utf-8") as f: async with lock: await f.write("") @@ -38,7 +38,7 @@ async def add_user_behavior( current_log_file.rename(backup_file) await __write_new(current_log_file, lock) - async with aiofiles.open(current_log_file, "a") as f: + async with aiofiles.open(current_log_file, "a", encoding="utf-8") as f: record = { "time": time_now.strftime('%Y-%m-%d %H:%M:%S'), "uid": uid, diff --git a/src/retk/models/client.py b/src/retk/models/client.py index 50fbfe5..77e6e7b 100644 --- a/src/retk/models/client.py +++ b/src/retk/models/client.py @@ -23,7 +23,7 @@ pass -def init_mongo(connection_timeout: int): +def init_mongo(connection_timeout: int) -> Union["AsyncIOMotorClient", MongitaClientDisk]: conf = config.get_settings() if config.is_local_db(): if not conf.RETHINK_LOCAL_STORAGE_PATH.exists():