Commit 22e027a1 by 管志勇

Initial commit

parents
### IntelliJ IDEA ###
.idea/**
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
###
sync_log_common.log
sync_state_common.txt
import requests
import pymysql
import sys
from datetime import datetime, timedelta
import logging
import argparse
import re
import os
import time
# 配置日志
log_path = os.path.join(os.path.dirname(__file__), 'sync_log_common.log')
logging.basicConfig(
filename=log_path,
level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 定义替换print语句的函数
def log_message(message):
logging.info(message)
print(message)
# 配置参数(需根据实际环境修改)
DIFY_API_BASE_URL = 'http://192.168.141.145/v1/datasets'
DATASET_ID = 'ab6b26f3-87ad-449d-99eb-55e9bc4f15b1'
API_KEY = 'dataset-jPCfnGYrO3Jyyj9isGMjiyBe'
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '123456',
'db': 'knowledge_base',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor,
}
SEGMENT_LENGTH = 1000 # 分段大小
SYNC_STATE_FILE = os.path.join(os.path.dirname(__file__), 'sync_state_common.txt')
DOCUMENT_READY_TIMEOUT = 10 # 创建就绪超时时间(秒)
POLLING_INTERVAL = 1 # 轮询间隔(秒)
# 增强的时间戳管理
def read_sync_state():
"""初始化同步时间戳策略"""
try:
with open(SYNC_STATE_FILE, 'r') as f:
content = f.read().strip()
if not content:
raise ValueError("触发全量同步初始化")
return datetime.strptime(content, '%Y-%m-%d %H:%M:%S')
except (FileNotFoundError, ValueError):
initial_time = datetime.now() - timedelta(days=365 * 30)
with open(SYNC_STATE_FILE, 'w') as f:
f.write(initial_time.strftime('%Y-%m-%d %H:%M:%S'))
return initial_time
def save_sync_state(timestamp):
"""原子时间戳存储"""
with open(SYNC_STATE_FILE, 'w') as f:
f.write(timestamp.strftime('%Y-%m-%d %H:%M:%S'))
# 数据同步核心模块
def fetch_records(sync_type, last_sync):
"""动态SQL生成器"""
conn = pymysql.connect(**DB_CONFIG)
try:
with conn.cursor() as cursor:
base_sql = """
SELECT id, alarm_name, solution_html, update_datetime, is_delete
FROM api_knowledge
WHERE is_audit=1 AND is_publish=1 and alarm_name is not null and solution_html is not null
"""
if sync_type == 'full':
sql = base_sql
log_message(f"{sql}")
cursor.execute(sql)
else:
sql = base_sql + " AND (update_datetime > %s or create_datetime > %s) ORDER BY create_datetime ASC"
log_message(f"{sql}")
cursor.execute(sql, (last_sync, last_sync))
return cursor.fetchall()
finally:
conn.close()
def clean_solution_html(solution_html):
"""清洗solution_html,保留img标签,其他标签替换为空"""
return re.sub(r'<(?!img\b)[^>]*>', '', solution_html)
def split_content(alarm_name, content):
"""将内容按{SEGMENT_LENGTH}字分段,每段以alarm_name开头并添加换行"""
segments = []
start = 0
content_length = len(content)
# 处理空报警名称的情况
if not alarm_name:
alarm_name = ""
while start < content_length:
end = start + SEGMENT_LENGTH
# 处理最后一段
if end >= content_length:
segment = content[start:]
# 构造以alarm_name开头的分段
formatted_segment = f"{alarm_name}\n{segment}"
segments.append(formatted_segment)
break
# 截取当前分段
segment = content[start:end]
# 构造以alarm_name开头的分段
formatted_segment = f"{alarm_name}\n{segment}"
segments.append(formatted_segment)
start = end
# 过滤空分段
return [s for s in segments if s.strip()]
def get_document_id_by_name(alarm_name):
"""通过文档名称查询文档ID"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
url = f"{DIFY_API_BASE_URL}/{DATASET_ID}/documents?keyword={alarm_name}"
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
# 遍历结果,查找精确匹配的文档名称
for doc in data.get('data', []):
if doc.get('name') == alarm_name:
return doc.get('id')
return None
def create_document(alarm_name):
"""创建文档"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
create_url = f"{DIFY_API_BASE_URL}/{DATASET_ID}/document/create-by-text"
payload = {
"name": alarm_name,
"text": '',
"indexing_technique": "high_quality",
"process_rule": {"mode": "automatic"},
}
response = requests.post(create_url, headers=headers, json=payload)
response.raise_for_status()
return response.json()['document']['id']
def is_document_ready(document_id, segments, alarm_name):
"""通过尝试创建分段判断文档是否就绪,并返回测试分段ID"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
create_segment_url = f"{DIFY_API_BASE_URL}/{DATASET_ID}/documents/{document_id}/segments"
valid_segments = [s for s in segments if s.strip()]
if not valid_segments:
log_message(f"No valid segments to check readiness for document {document_id}")
return False, None
test_segment = valid_segments[0]
payload = {
"segments": [{"content": test_segment, "keywords": [alarm_name]}]
}
try:
response = requests.post(create_segment_url, headers=headers, json=payload)
response.raise_for_status()
segment_id = response.json()['data'][0]['id']
log_message(f"Created test segment ID: {segment_id} for document {document_id}")
return True, segment_id
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log_message(f"Document {document_id} not ready yet - 404 error")
return False, None
raise
def wait_for_document_ready(document_id, segments, alarm_name):
"""等待文档就绪并返回测试分段ID"""
start_time = time.time()
log_message(f"Waiting for document {document_id} to be ready...")
while time.time() - start_time < DOCUMENT_READY_TIMEOUT:
try:
ready, segment_id = is_document_ready(document_id, segments, alarm_name)
if ready:
test_segment_id = segment_id
log_message(f"Document {document_id} is ready, test segment ID: {test_segment_id}")
return True, test_segment_id
except Exception as e:
log_message(f"Temporary error while checking document readiness: {str(e)}")
log_message(f"Document not ready yet, retrying in {POLLING_INTERVAL} seconds...")
time.sleep(POLLING_INTERVAL)
log_message(f"Document {document_id} did not become ready within timeout")
return False, None
def create_segments(document_id, segments, alarm_name):
"""创建文档分段,排除测试用的第一个分段"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
create_segment_url = f"{DIFY_API_BASE_URL}/{DATASET_ID}/documents/{document_id}/segments"
valid_segments = [s for s in segments if s.strip()]
if not valid_segments:
log_message(f"No valid segments to create for document {document_id}")
return {"message": "No valid segments provided"}
# 等待文档就绪并获取测试分段ID
ready, test_segment_id = wait_for_document_ready(document_id, valid_segments, alarm_name)
if not ready:
raise Exception(f"Document {document_id} not ready after waiting")
# 插入所有分段(排除测试用的第一个分段)
segments_to_create = valid_segments[1:]
if not segments_to_create:
log_message(f"Only 1 segment available, no new segments to create for document {document_id}")
return {"message": "No new segments to create"}
payload = {
"segments": [{"content": s, "keywords": [alarm_name]} for s in segments_to_create]
}
response = requests.post(create_segment_url, headers=headers, json=payload)
response.raise_for_status()
log_message(f"Created {len(segments_to_create)} segments for document {document_id}")
return response.json()
def delete_document(document_id):
"""删除文档"""
headers = {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
delete_url = f"{DIFY_API_BASE_URL}/{DATASET_ID}/documents/{document_id}"
response = requests.delete(delete_url, headers=headers)
response.raise_for_status()
log_message(f"Deleted document with ID: {document_id}")
return response.json()
def sync_record(record):
"""同步单条记录到Dify"""
alarm_name = record['alarm_name']
solution_html = clean_solution_html(record['solution_html'])
segments = split_content(alarm_name, solution_html)
log_message(f"Processing record: {alarm_name}")
# 处理删除逻辑
if record['is_delete'] == 1:
document_id = get_document_id_by_name(alarm_name)
if document_id:
delete_document(document_id)
log_message(f"Deleted document: {alarm_name}")
return "deleted"
log_message(f"Document not found for deletion: {alarm_name}")
return "not_found"
# 处理更新或新增逻辑
document_id = get_document_id_by_name(alarm_name)
if document_id:
delete_document(document_id)
log_message(f"Deleted existing document: {alarm_name}, ID: {document_id}")
valid_segments = [s for s in segments if s.strip()]
if not valid_segments:
log_message(f"Skipping document creation for '{alarm_name}' - no valid content")
return "skipped"
# 创建新文档和分段
dify_id = create_document(alarm_name)
create_segments(dify_id, valid_segments, alarm_name)
log_message(f"Created new document: {alarm_name}, ID: {dify_id} with {len(valid_segments) - 1} segments")
return "recreated"
# 可视化控制模块
def print_progress(idx, total, prefix=""):
"""终端进度可视化"""
bar_length = 40
filled = int(bar_length * idx / total)
bar = '#' * filled + '-' * (bar_length - filled)
message = f"\r{prefix} |{bar}| {idx}/{total} ({idx / total:.0%}) "
sys.stdout.write(message)
sys.stdout.flush()
def main():
"""主控制流程"""
parser = argparse.ArgumentParser(description='Sync data with Dify API')
parser.add_argument('--sync-type', choices=['full', 'incremental'], default=None,
help='指定同步类型:全量或增量')
args = parser.parse_args()
last_sync = read_sync_state()
if args.sync_type:
sync_type = args.sync_type
else:
sync_type = 'full' if last_sync.year < 2010 else 'incremental'
log_message(f"{sync_type.upper()} SYNC MODE | Base time: {last_sync}")
records = fetch_records(sync_type, last_sync)
log_message(f"{sync_type.upper()} SYNC MODE | Records: {len(records)}")
if not records:
log_message("NO INCREMENTAL DATA NEED TO SYNC")
return
total = len(records)
max_update_datetime = last_sync
error_log = []
deleted_count = 0
recreated_count = 0
skipped_count = 0
for idx, record in enumerate(records, 1):
try:
result = sync_record(record)
if result == "deleted":
deleted_count += 1
elif result == "recreated":
recreated_count += 1
elif result == "skipped":
skipped_count += 1
current_update = record['update_datetime']
if current_update and current_update > max_update_datetime:
max_update_datetime = current_update
print_progress(idx, total, prefix=f"SYNCING")
except Exception as e:
error_log.append(f"ID:{record['alarm_name']} ERROR:{str(e)}")
log_message(f"ERROR: {record['alarm_name'][:15]}... - {str(e)}")
log_message("\n" + "=" * 50)
save_sync_state(datetime.now())
log_message(
f"SYNC COMPLETED | SUCCESS: {deleted_count + recreated_count} (Deleted: {deleted_count}, Recreated: {recreated_count}, Skipped: {skipped_count}) | FAILURE: {len(error_log)}")
log_message(f"LATEST TIMESTAMP: {max_update_datetime}")
if __name__ == "__main__":
main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment