WebhockTransfer/app/db.py
auto-bot b11c39f3bf feat: add async retry mechanism with exponential backoff
- Add app/utils/retry.py with configurable async retry decorator
- Update DeliveryLog model to track attempt_count and latency_seconds
- Apply @http_retry to engine._exec_forward and _exec_notify methods
- Update save_logs to record retry metadata
- Add comprehensive unit tests for retry functionality
- Support configuration via environment variables (RETRY_*)

This improves reliability for downstream HTTP calls by automatically
retrying transient failures with exponential backoff and jitter.
2025-12-24 11:04:41 +08:00

105 lines
4.5 KiB
Python

from sqlalchemy import create_engine, Column, Integer, String, JSON, Boolean, Table, ForeignKey, DateTime, Text
from sqlalchemy.orm import declarative_base, sessionmaker, relationship, backref
import os
from datetime import datetime
Base = declarative_base()
class Target(Base):
__tablename__ = 'targets'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True, index=True)
url = Column(String)
timeout_ms = Column(Integer, default=5000)
class NotificationChannel(Base):
__tablename__ = 'notification_channels'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
channel_type = Column(String) # feishu, wecom
webhook_url = Column(String)
class MessageTemplate(Base):
__tablename__ = 'message_templates'
id = Column(Integer, primary_key=True)
name = Column(String, unique=True) # 方便识别,如 "收款成功通知"
template_content = Column(Text) # "收到{amt}元"
class WebhookEndpoint(Base):
__tablename__ = 'webhook_endpoints'
id = Column(Integer, primary_key=True)
namespace = Column(String, unique=True, index=True)
description = Column(String, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
rules = relationship("ProcessingRule", back_populates="endpoint", cascade="all, delete-orphan")
class ProcessingRule(Base):
__tablename__ = 'processing_rules'
id = Column(Integer, primary_key=True)
endpoint_id = Column(Integer, ForeignKey('webhook_endpoints.id'))
endpoint = relationship("WebhookEndpoint", back_populates="rules")
# Tree structure support
parent_rule_id = Column(Integer, ForeignKey('processing_rules.id'), nullable=True)
children = relationship("ProcessingRule", backref=backref('parent', remote_side=[id]), cascade="all, delete-orphan")
priority = Column(Integer, default=0) # Higher executes first (if we want ordering)
match_field = Column(String) # e.g. "trans_order_info.remark" or "event_define_no"
operator = Column(String, default="eq") # eq, neq, contains, startswith, regex
match_value = Column(String) # e.g. "imcgcd03" or "pay.success"
actions = relationship("RuleAction", back_populates="rule", cascade="all, delete-orphan")
class RuleAction(Base):
__tablename__ = 'rule_actions'
id = Column(Integer, primary_key=True)
rule_id = Column(Integer, ForeignKey('processing_rules.id'))
rule = relationship("ProcessingRule", back_populates="actions")
action_type = Column(String) # "forward" or "notify"
# Forward params
target_id = Column(Integer, ForeignKey('targets.id'), nullable=True)
target = relationship("Target")
# Notify params
channel_id = Column(Integer, ForeignKey('notification_channels.id'), nullable=True)
channel = relationship("NotificationChannel")
template_id = Column(Integer, ForeignKey('message_templates.id'), nullable=True)
template = relationship("MessageTemplate")
# Extra params for templating (e.g. {"pay_method": "微信"})
template_vars = Column(JSON, nullable=True)
class RequestLog(Base):
__tablename__ = 'request_logs'
id = Column(Integer, primary_key=True)
namespace = Column(String, index=True)
remark = Column(String, nullable=True) # 保留用于快速筛选,可选
event_no = Column(String, nullable=True) # 保留用于快速筛选,可选
raw_body = Column(JSON)
received_at = Column(DateTime, default=datetime.utcnow)
status = Column(String) # success, error
delivery_logs = relationship("DeliveryLog", back_populates="request_log", cascade="all, delete-orphan")
class DeliveryLog(Base):
__tablename__ = 'delivery_logs'
id = Column(Integer, primary_key=True)
request_id = Column(Integer, ForeignKey('request_logs.id'))
target_name = Column(String)
type = Column(String) # relay, notify
status = Column(String) # success, failed
response_summary = Column(Text, nullable=True)
attempt_count = Column(Integer, default=1) # Number of attempts made
latency_seconds = Column(Float, nullable=True) # Total latency for all attempts
created_at = Column(DateTime, default=datetime.utcnow)
request_log = relationship("RequestLog", back_populates="delivery_logs")
DB_PATH = os.getenv("DB_PATH", "sqlite:///./config/data.db")
engine = create_engine(DB_PATH, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def init_db():
Base.metadata.create_all(bind=engine)