Skip to content

Commit ead54db

Browse files
committed
优化不健康实例检查逻辑
1 parent 254cc60 commit ead54db

File tree

5 files changed

+17
-3
lines changed

5 files changed

+17
-3
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ gateway等网关插件的高扩展性
99
### 通过docker运行
1010

1111
```bash
12-
docker run anjia0532/discovery-syncer-python:v2.6.9
12+
docker run anjia0532/discovery-syncer-python:v2.6.12
1313
```
1414

1515
特别的,`-c ` 支持配置远端http[s]的地址,比如读取静态资源的,比如读取nacos的

app/handler/discovery.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
from fastapi import Response
77
from fastapi.params import Path, Body, Query
88

9+
from core.database import db
910
from . import RESP_OK
10-
from app.model.syncer_model import Registration, RegistrationType, RegistrationStatus
11+
from app.model.syncer_model import Registration, RegistrationType, RegistrationStatus, DiscoveryInstance
1112
from app.service.discovery.discovery import Discovery
1213
from app.service.gateway.gateway import Gateway
1314
from core.lib.logger import for_handler
@@ -70,6 +71,9 @@ def discovery(discovery_name: Annotated[str, Path(title="discovery_name", descri
7071
raise Exception(
7172
f"最少存活实例数{alive_num}不满足,总实例数(含之前已下线数量){len(discovery_instances)},要下线实例数{len(down_hosts)},剩余在线实例数{len(alive_hosts)}")
7273
discovery_client.modify_registration(registration, instances=instances)
74+
DiscoveryInstance({}).delete_by_instances(
75+
[f"{instance.ip}:{instance.port}" for instance in instances if not instance.enabled],
76+
db.get_sqla_helper()[1])
7377
except Exception as e:
7478
logger.error(f"主动下线上线注册中心的服务失败,discovery_name {discovery_name},registration {registration}",
7579
exc_info=e)

app/model/syncer_model.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import httpx
99
from db_libs.sqla_lib import SqlaReflectHelper
10+
from funboost import funboost_current_task
1011
from httpx import TimeoutException
1112
from pydantic import BaseModel, Field
1213
from pydantic import model_validator, AliasChoices
@@ -136,8 +137,12 @@ def health_check(self, healthcheck: dict, sqla_helper: SqlaReflectHelper):
136137
self.set_counts(params, sqla_helper)
137138
if not success:
138139
instances = self.get_target_service_all_instance(0, sqla_helper)
140+
fct = funboost_current_task()
141+
msg_time = None
142+
if fct:
143+
msg_time = fct.function_result_status.publish_time
139144
logger.warning(
140-
f"健康检查 {self.target_id} {self.service} {schema}{self.instance}{healthcheck.get('uri')} , 实例状态: {json.dumps([d.to_dict_item() for d in instances])}")
145+
f"健康检查 {self.target_id} {self.service} {schema}{self.instance}{healthcheck.get('uri')} ,msg_time: {msg_time}, 实例状态: {json.dumps([d.to_dict_item() for d in instances])}")
141146
if params['status'] != self.status and healthcheck.get("alert", {}).get("url"):
142147
instances = self.get_target_service_all_instance(0, sqla_helper)
143148
keyfunc = lambda item: item['status']

app/tasks/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ class FunboostCommonConfig(BoosterParams):
1919
is_push_to_dlx_queue_when_retry_max_times: bool = True
2020
# 任务过滤的失效期,为0则永久性过滤任务。例如设置过滤过期时间是1800秒 , 30分钟前发布过1 + 2 的任务,现在仍然执行,如果是30分钟以内发布过这个任务,则不执行1 + 2
2121
task_filtering_expire_seconds: int = 0
22+
# 消息过期时间,为0永不过期,为10则代表,10秒之前发布的任务如果现在才轮到消费则丢弃任务。
23+
msg_expire_senconds: int = 120
2224
# # 是否对函数入参进行过滤去重.
2325
do_task_filtering: bool = False
2426
# 运行时候,是否记录从消息队列获取出来的消息内容

app/tasks/task_syncer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def syncer(target: dict):
128128
healthcheck = target.get("healthcheck", {})
129129
if healthcheck:
130130
try:
131+
# syncer 同步任务,只管存或更新
131132
DiscoveryInstance({"target_id": target.get('id'), "service": service.name}).save_or_update(
132133
discovery_instances, sqla_helper)
133134
# 拿到 discovery_instances 和 health_check 里的 unhealthy 比较,将 discovery 的下掉,保留 >= min-hosts
@@ -150,6 +151,8 @@ def syncer(target: dict):
150151
# 下线 unhealthy 实例
151152
registration = Registration(service_name=service.name, ext_data=target.get("config", {}))
152153
discovery_client.modify_registration(registration, unhealthy_instances)
154+
# 删除无效实例
155+
DiscoveryInstance({}).delete_by_instances([d.instance for d in unhealthy], sqla_helper)
153156
except Exception as e:
154157
logger.warning(f"健康检查下线实例失败, {target.get('id', None)} , {service.name}", exc_info=e)
155158

0 commit comments

Comments
 (0)