From c3932bc85265cc27af422676fa3d8bb85f02b76c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C=E8=A7=82=E5=AF=9F=E6=97=A5=E5=BF=97?= Date: Mon, 9 Jun 2025 17:03:26 +0800 Subject: [PATCH] fix: decrypt improvement --- src/grpc/manager.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/grpc/manager.py b/src/grpc/manager.py index 8500ed1..e29ad7d 100644 --- a/src/grpc/manager.py +++ b/src/grpc/manager.py @@ -4,9 +4,11 @@ from asyncio import AbstractEventLoop from queue import SimpleQueue from typing import Awaitable, Callable, Type -from creart import AbstractCreator, CreateTargetInfo, exists_module +from async_lru import alru_cache +from creart import AbstractCreator, CreateTargetInfo, exists_module, it from grpc.aio import insecure_channel, Channel -from tenacity import retry_if_exception_type, retry, wait_random_exponential, stop_after_attempt +from tenacity import retry_if_exception_type, retry, wait_random_exponential, stop_after_attempt, \ + retry_if_not_exception_message, before_sleep_log from src.grpc.manager_pb2 import * from src.grpc.manager_pb2_grpc import WrapperManagerServiceStub, google_dot_protobuf_dot_empty__pb2 @@ -40,6 +42,7 @@ class WrapperManager: self._stub = WrapperManagerServiceStub(self._channel) return self + @alru_cache async def status(self) -> StatusData: resp: StatusReply = await self._stub.Status(google_dot_protobuf_dot_empty__pb2.Empty) if resp.header.code != 0: @@ -72,10 +75,7 @@ class WrapperManager: async def _decrypt_request_generator(self): while True: - item = await self._decrypt_queue.get() - if item is None: - continue - yield item + yield await self._decrypt_queue.get() async def decrypt_init(self, on_success: Callable[[str, str, bytes, int], Awaitable[None]], on_failure: Callable[[str, str, bytes, int], Awaitable[None]]): @@ -84,22 +84,22 @@ class WrapperManager: reply: DecryptReply match reply.header.code: case -1: - await on_failure(reply.data.adam_id, reply.data.key, reply.data.sample, reply.data.sample_index) + it(AbstractEventLoop).create_task(on_failure(reply.data.adam_id, reply.data.key, reply.data.sample, reply.data.sample_index)) case 0: - await on_success(reply.data.adam_id, reply.data.key, reply.data.sample, reply.data.sample_index) + it(AbstractEventLoop).create_task(on_success(reply.data.adam_id, reply.data.key, reply.data.sample, reply.data.sample_index)) - @retry(retry=retry_if_exception_type(WrapperManagerException), + @retry(retry=((retry_if_exception_type(WrapperManagerException)) & (retry_if_not_exception_message('no available instance'))), wait=wait_random_exponential(multiplier=1, max=60), - stop=stop_after_attempt(32)) + stop=stop_after_attempt(32), before_sleep=before_sleep_log(it(GlobalLogger).logger, logging.WARN)) async def m3u8(self, adam_id: str) -> str: resp: M3U8Reply = await self._stub.M3U8(M3U8Request(data=M3U8DataRequest(adam_id=adam_id))) if resp.header.code != 0: raise WrapperManagerException(resp.header.msg) return resp.data.m3u8 - @retry(retry=retry_if_exception_type(WrapperManagerException), + @retry(retry=((retry_if_exception_type(WrapperManagerException)) & (retry_if_not_exception_message('no available instance'))), wait=wait_random_exponential(multiplier=1, max=60), - stop=stop_after_attempt(32)) + stop=stop_after_attempt(32), before_sleep=before_sleep_log(it(GlobalLogger).logger, logging.WARN)) async def lyrics(self, adam_id: str, language: str, region: str) -> str: resp: LyricsReply = await self._stub.Lyrics(LyricsRequest( data=LyricsDataRequest(adam_id=adam_id, language=language, region=region)))