最佳实践
本指南总结了使用 use-notify 的最佳实践,帮助您构建可靠、高效、易维护的通知系统。
通知设计原则
1. 合理的通知频率
python
import time
from datetime import datetime, timedelta
from use_notify import useNotify, set_default_notify_instance
class RateLimitedNotify:
"""限制通知频率的包装器"""
def __init__(self, notify_instance, min_interval=300): # 默认5分钟间隔
self.notify = notify_instance
self.min_interval = min_interval
self.last_sent = {}
def should_send(self, key):
"""检查是否应该发送通知"""
now = datetime.now()
if key not in self.last_sent:
return True
time_diff = (now - self.last_sent[key]).total_seconds()
return time_diff >= self.min_interval
def send_if_needed(self, title, content, key=None):
"""根据频率限制发送通知"""
if key is None:
key = f"{title}:{content}"
if self.should_send(key):
self.notify.publish(title=title, content=content)
self.last_sent[key] = datetime.now()
return True
return False
# 使用示例
notify = useNotify()
rate_limited_notify = RateLimitedNotify(notify, min_interval=600) # 10分钟间隔
# 在监控中使用
def check_system_health():
cpu_usage = get_cpu_usage()
if cpu_usage > 80:
rate_limited_notify.send_if_needed(
title="系统警报",
content=f"CPU使用率过高: {cpu_usage}%",
key="high_cpu_usage" # 使用固定key避免重复通知
)
2. 智能通知条件
python
from use_notify import notify
# ✅ 推荐:只在重要情况下通知
@notify(condition=lambda result: result.get('error_count', 0) > 0)
def process_data(data):
errors = []
for item in data:
try:
process_item(item)
except Exception as e:
errors.append(str(e))
return {
'processed': len(data) - len(errors),
'error_count': len(errors),
'errors': errors
}
# ✅ 推荐:根据结果严重程度决定是否通知
@notify(
condition=lambda result: result.get('severity') in ['error', 'critical'],
title=lambda result: f"[{result.get('severity', 'info').upper()}] 系统检查"
)
def system_check():
issues = check_system_issues()
severity = 'info'
if any(issue['level'] == 'critical' for issue in issues):
severity = 'critical'
elif any(issue['level'] == 'error' for issue in issues):
severity = 'error'
elif any(issue['level'] == 'warning' for issue in issues):
severity = 'warning'
return {
'severity': severity,
'issues': issues,
'timestamp': datetime.now().isoformat()
}
# ❌ 避免:过于频繁的通知
@notify() # 每次调用都通知
def log_user_action(action):
return f"用户执行了: {action}"
3. 有意义的通知内容
python
# ✅ 推荐:提供详细且有用的信息
@notify(
title="数据备份完成",
content_template="备份了 {processed_files} 个文件,总大小 {total_size},耗时 {duration}秒。\n备份路径: {backup_path}"
)
def backup_data():
start_time = time.time()
files = get_files_to_backup()
backup_path = create_backup(files)
return {
'processed_files': len(files),
'total_size': format_file_size(sum(get_file_size(f) for f in files)),
'duration': round(time.time() - start_time, 2),
'backup_path': backup_path
}
# ✅ 推荐:包含上下文信息
@notify(
title=lambda result: f"API调用{'成功' if result['success'] else '失败'}",
content_template="""API: {api_name}
状态: {status_code}
响应时间: {response_time}ms
{error_info}"""
)
def call_external_api(api_name, url):
start_time = time.time()
try:
response = requests.get(url, timeout=30)
response_time = round((time.time() - start_time) * 1000, 2)
return {
'success': response.status_code == 200,
'api_name': api_name,
'status_code': response.status_code,
'response_time': response_time,
'error_info': '' if response.status_code == 200 else f'错误: {response.text}'
}
except Exception as e:
response_time = round((time.time() - start_time) * 1000, 2)
return {
'success': False,
'api_name': api_name,
'status_code': 'N/A',
'response_time': response_time,
'error_info': f'异常: {str(e)}'
}
性能优化
1. 异步通知
python
import asyncio
from use_notify import notify
# ✅ 推荐:对于I/O密集型任务使用异步
@notify(title="异步数据处理完成")
async def process_large_dataset(dataset):
"""异步处理大数据集"""
results = []
# 并发处理多个数据块
tasks = []
for chunk in split_dataset(dataset, chunk_size=1000):
task = asyncio.create_task(process_chunk_async(chunk))
tasks.append(task)
chunk_results = await asyncio.gather(*tasks)
for result in chunk_results:
results.extend(result)
return {
'processed_count': len(results),
'success_count': sum(1 for r in results if r['success']),
'error_count': sum(1 for r in results if not r['success'])
}
# ✅ 推荐:批量处理通知
class BatchNotify:
"""批量通知处理器"""
def __init__(self, notify_instance, batch_size=10, flush_interval=60):
self.notify = notify_instance
self.batch_size = batch_size
self.flush_interval = flush_interval
self.pending_notifications = []
self.last_flush = time.time()
def add_notification(self, title, content):
"""添加通知到批次"""
self.pending_notifications.append({
'title': title,
'content': content,
'timestamp': datetime.now()
})
# 检查是否需要刷新
if (len(self.pending_notifications) >= self.batch_size or
time.time() - self.last_flush >= self.flush_interval):
self.flush()
def flush(self):
"""发送批量通知"""
if not self.pending_notifications:
return
# 合并通知内容
summary_title = f"批量通知 ({len(self.pending_notifications)} 条)"
summary_content = "\n\n".join([
f"[{notif['timestamp'].strftime('%H:%M:%S')}] {notif['title']}\n{notif['content']}"
for notif in self.pending_notifications
])
self.notify.publish(title=summary_title, content=summary_content)
self.pending_notifications.clear()
self.last_flush = time.time()
# 使用批量通知
batch_notify = BatchNotify(notify)
def log_event(event_type, details):
batch_notify.add_notification(
title=f"事件: {event_type}",
content=details
)
2. 连接池和重用
python
from use_notify import useNotify, useNotifyChannel
import threading
class OptimizedNotifyManager:
"""优化的通知管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.notify = useNotify()
self._setup_channels()
self._initialized = True
def _setup_channels(self):
"""设置优化的通知渠道"""
# 使用连接池的邮件配置
email_config = {
"smtp_server": os.getenv("SMTP_SERVER"),
"smtp_port": int(os.getenv("SMTP_PORT", "587")),
"username": os.getenv("EMAIL_USERNAME"),
"password": os.getenv("EMAIL_PASSWORD"),
"to_emails": os.getenv("EMAIL_RECIPIENTS", "").split(","),
"pool_size": 5, # 连接池大小
"max_retries": 3 # 最大重试次数
}
self.notify.add(
useNotifyChannel.Email(email_config),
useNotifyChannel.Bark({"token": os.getenv("BARK_TOKEN")}),
useNotifyChannel.Ding({"token": os.getenv("DING_TOKEN")})
)
def send_notification(self, title, content, priority='normal'):
"""发送通知"""
try:
if priority == 'high':
# 高优先级通知使用所有渠道
self.notify.publish(title=title, content=content)
else:
# 普通通知只使用轻量级渠道
bark_notify = useNotify()
bark_notify.add(useNotifyChannel.Bark({"token": os.getenv("BARK_TOKEN")}))
bark_notify.publish(title=title, content=content)
except Exception as e:
print(f"通知发送失败: {e}")
# 使用单例模式的通知管理器
notify_manager = OptimizedNotifyManager()
错误处理和可靠性
1. 优雅的错误处理
python
from use_notify import notify
import logging
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ✅ 推荐:完善的错误处理
@notify(
title="任务执行结果",
condition=lambda result: result is not None, # 只在有结果时通知
content_template="""任务: {task_name}
状态: {status}
{details}
执行时间: {execution_time}秒"""
)
def robust_task(task_name, *args, **kwargs):
"""健壮的任务执行"""
start_time = time.time()
try:
# 执行实际任务
result = execute_task(task_name, *args, **kwargs)
execution_time = round(time.time() - start_time, 2)
return {
'task_name': task_name,
'status': '成功',
'details': f'处理了 {result.get("count", 0)} 项数据',
'execution_time': execution_time,
'result': result
}
except Exception as e:
execution_time = round(time.time() - start_time, 2)
# 记录错误日志
logger.error(f"任务 {task_name} 执行失败: {e}", exc_info=True)
return {
'task_name': task_name,
'status': '失败',
'details': f'错误: {str(e)}',
'execution_time': execution_time,
'error': str(e)
}
# ✅ 推荐:重试机制
class RetryableNotify:
"""支持重试的通知"""
def __init__(self, notify_instance, max_retries=3, retry_delay=1):
self.notify = notify_instance
self.max_retries = max_retries
self.retry_delay = retry_delay
def send_with_retry(self, title, content):
"""带重试的发送通知"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
self.notify.publish(title=title, content=content)
if attempt > 0:
logger.info(f"通知在第 {attempt + 1} 次尝试后发送成功")
return True
except Exception as e:
last_exception = e
logger.warning(f"通知发送失败 (尝试 {attempt + 1}/{self.max_retries + 1}): {e}")
if attempt < self.max_retries:
time.sleep(self.retry_delay * (2 ** attempt)) # 指数退避
logger.error(f"通知发送最终失败: {last_exception}")
return False
# 使用重试通知
retryable_notify = RetryableNotify(notify)
@notify(
title="关键任务完成",
notify_instance=retryable_notify
)
def critical_task():
return "关键任务执行完成"
2. 降级策略
python
class FallbackNotify:
"""支持降级的通知系统"""
def __init__(self):
self.primary_notify = self._create_primary_notify()
self.fallback_notify = self._create_fallback_notify()
self.emergency_notify = self._create_emergency_notify()
def _create_primary_notify(self):
"""创建主要通知渠道"""
notify = useNotify()
notify.add(
useNotifyChannel.Ding({"token": os.getenv("DING_TOKEN")}),
useNotifyChannel.Email({
"smtp_server": "smtp.company.com",
"smtp_port": 587,
"username": os.getenv("EMAIL_USERNAME"),
"password": os.getenv("EMAIL_PASSWORD"),
"to_emails": [os.getenv("EMAIL_RECIPIENT")]
})
)
return notify
def _create_fallback_notify(self):
"""创建备用通知渠道"""
notify = useNotify()
notify.add(useNotifyChannel.Bark({"token": os.getenv("BARK_TOKEN")}))
return notify
def _create_emergency_notify(self):
"""创建紧急通知渠道(本地日志)"""
class LogChannel:
def send(self, title, content, **kwargs):
logger.critical(f"[紧急通知] {title}: {content}")
return True
async def send_async(self, title, content, **kwargs):
return self.send(title, content, **kwargs)
notify = useNotify()
notify.add(LogChannel())
return notify
def send_notification(self, title, content, level='normal'):
"""发送通知,支持降级"""
notifiers = [self.primary_notify, self.fallback_notify, self.emergency_notify]
for i, notifier in enumerate(notifiers):
try:
notifier.publish(title=title, content=content)
if i > 0:
logger.warning(f"使用了第 {i + 1} 级通知渠道")
return True
except Exception as e:
logger.error(f"第 {i + 1} 级通知渠道失败: {e}")
continue
logger.critical("所有通知渠道都失败了")
return False
# 使用降级通知
fallback_notify = FallbackNotify()
监控和调试
1. 通知统计
python
from collections import defaultdict, Counter
from datetime import datetime, timedelta
class NotifyStats:
"""通知统计"""
def __init__(self):
self.stats = {
'total_sent': 0,
'total_failed': 0,
'by_channel': defaultdict(lambda: {'sent': 0, 'failed': 0}),
'by_hour': defaultdict(int),
'recent_notifications': [],
'error_log': []
}
def record_sent(self, channel_name, title, content):
"""记录发送成功"""
self.stats['total_sent'] += 1
self.stats['by_channel'][channel_name]['sent'] += 1
hour = datetime.now().hour
self.stats['by_hour'][hour] += 1
self.stats['recent_notifications'].append({
'timestamp': datetime.now(),
'channel': channel_name,
'title': title,
'status': 'sent'
})
# 只保留最近100条记录
if len(self.stats['recent_notifications']) > 100:
self.stats['recent_notifications'] = self.stats['recent_notifications'][-100:]
def record_failed(self, channel_name, title, error):
"""记录发送失败"""
self.stats['total_failed'] += 1
self.stats['by_channel'][channel_name]['failed'] += 1
self.stats['error_log'].append({
'timestamp': datetime.now(),
'channel': channel_name,
'title': title,
'error': str(error)
})
# 只保留最近50条错误记录
if len(self.stats['error_log']) > 50:
self.stats['error_log'] = self.stats['error_log'][-50:]
def get_summary(self):
"""获取统计摘要"""
total = self.stats['total_sent'] + self.stats['total_failed']
success_rate = (self.stats['total_sent'] / total * 100) if total > 0 else 0
return {
'total_notifications': total,
'success_rate': round(success_rate, 2),
'total_sent': self.stats['total_sent'],
'total_failed': self.stats['total_failed'],
'by_channel': dict(self.stats['by_channel']),
'peak_hour': max(self.stats['by_hour'].items(), key=lambda x: x[1])[0] if self.stats['by_hour'] else None
}
# 集成统计功能的通知包装器
class MonitoredNotify:
"""带监控的通知"""
def __init__(self, notify_instance):
self.notify = notify_instance
self.stats = NotifyStats()
def publish(self, title, content, **kwargs):
"""发送通知并记录统计"""
try:
result = self.notify.publish(title=title, content=content, **kwargs)
self.stats.record_sent('mixed', title, content)
return result
except Exception as e:
self.stats.record_failed('mixed', title, e)
raise
def get_stats(self):
"""获取统计信息"""
return self.stats.get_summary()
# 使用监控通知
monitored_notify = MonitoredNotify(notify)
# 定期报告统计信息
@notify(
title="通知系统统计报告",
content_template="""统计周期: 过去24小时
总通知数: {total_notifications}
成功率: {success_rate}%
发送成功: {total_sent}
发送失败: {total_failed}
高峰时段: {peak_hour}:00"""
)
def generate_stats_report():
return monitored_notify.get_stats()
2. 调试模式
python
import os
from use_notify import useNotify, useNotifyChannel
class DebugNotify:
"""调试模式通知"""
def __init__(self, debug=None):
self.debug = debug if debug is not None else os.getenv('DEBUG', 'false').lower() == 'true'
self.notify = self._create_notify()
def _create_notify(self):
"""根据调试模式创建通知实例"""
notify = useNotify()
if self.debug:
# 调试模式:使用控制台输出
class DebugChannel:
def send(self, title, content, **kwargs):
print(f"\n[DEBUG NOTIFICATION]")
print(f"Title: {title}")
print(f"Content: {content}")
print(f"Kwargs: {kwargs}")
print(f"Timestamp: {datetime.now()}")
print("-" * 50)
return True
async def send_async(self, title, content, **kwargs):
return self.send(title, content, **kwargs)
notify.add(DebugChannel())
else:
# 生产模式:使用实际通知渠道
if os.getenv("BARK_TOKEN"):
notify.add(useNotifyChannel.Bark({"token": os.getenv("BARK_TOKEN")}))
if os.getenv("DING_TOKEN"):
notify.add(useNotifyChannel.Ding({"token": os.getenv("DING_TOKEN")}))
return notify
def publish(self, title, content, **kwargs):
"""发送通知"""
if self.debug:
print(f"[DEBUG] 准备发送通知: {title}")
return self.notify.publish(title=title, content=content, **kwargs)
# 使用调试通知
debug_notify = DebugNotify()
# 在装饰器中使用
from use_notify import set_default_notify_instance
set_default_notify_instance(debug_notify)
@notify(title="调试任务")
def debug_task():
return "任务执行完成"
安全最佳实践
1. 敏感信息处理
python
import re
from use_notify import notify
class SecureNotify:
"""安全的通知处理"""
# 敏感信息模式
SENSITIVE_PATTERNS = [
r'password[\s]*[:=][\s]*[\S]+',
r'token[\s]*[:=][\s]*[\S]+',
r'key[\s]*[:=][\s]*[\S]+',
r'secret[\s]*[:=][\s]*[\S]+',
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', # 信用卡号
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
]
@classmethod
def sanitize_content(cls, content):
"""清理敏感信息"""
sanitized = str(content)
for pattern in cls.SENSITIVE_PATTERNS:
sanitized = re.sub(pattern, '[REDACTED]', sanitized, flags=re.IGNORECASE)
return sanitized
@classmethod
def safe_notify(cls, title, content, **kwargs):
"""安全的通知发送"""
safe_title = cls.sanitize_content(title)
safe_content = cls.sanitize_content(content)
# 记录原始内容到安全日志(如果需要)
logger.info(f"发送通知: {safe_title}")
return notify.publish(title=safe_title, content=safe_content, **kwargs)
# ✅ 推荐:使用安全通知
@notify(
title="用户操作日志",
content_template=lambda result: SecureNotify.sanitize_content(
f"用户 {result['user']} 执行了操作: {result['action']}"
)
)
def log_user_action(user, action, details):
return {
'user': user,
'action': action,
'details': details,
'timestamp': datetime.now().isoformat()
}
2. 访问控制
python
from functools import wraps
from use_notify import notify
def require_permission(permission):
"""权限检查装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 检查用户权限
current_user = get_current_user()
if not has_permission(current_user, permission):
raise PermissionError(f"用户 {current_user} 没有 {permission} 权限")
return func(*args, **kwargs)
return wrapper
return decorator
# ✅ 推荐:结合权限控制使用通知
@require_permission('admin')
@notify(
title="管理员操作",
content_template="管理员 {user} 执行了 {operation}"
)
def admin_operation(operation):
user = get_current_user()
execute_admin_operation(operation)
return {
'user': user,
'operation': operation,
'timestamp': datetime.now().isoformat()
}
测试策略
1. 单元测试
python
import unittest
from unittest.mock import Mock, patch
from use_notify import useNotify, notify
class TestNotifications(unittest.TestCase):
"""通知功能测试"""
def setUp(self):
"""测试设置"""
self.mock_channel = Mock()
self.mock_channel.send.return_value = True
self.mock_channel.send_async.return_value = True
self.notify = useNotify()
self.notify.add(self.mock_channel)
def test_basic_notification(self):
"""测试基本通知功能"""
self.notify.publish(title="测试", content="测试内容")
self.mock_channel.send.assert_called_once_with(
title="测试",
content="测试内容"
)
def test_decorator_notification(self):
"""测试装饰器通知"""
@notify(
notify_instance=self.notify,
title="装饰器测试",
content_template="结果: {result}"
)
def test_function():
return "成功"
result = test_function()
self.assertEqual(result, "成功")
self.mock_channel.send.assert_called_once()
# 检查调用参数
call_args = self.mock_channel.send.call_args
self.assertEqual(call_args[1]['title'], "装饰器测试")
self.assertIn("成功", call_args[1]['content'])
def test_conditional_notification(self):
"""测试条件通知"""
@notify(
notify_instance=self.notify,
condition=lambda result: result > 10,
title="条件测试"
)
def conditional_function(value):
return value
# 不满足条件,不应该发送通知
conditional_function(5)
self.mock_channel.send.assert_not_called()
# 满足条件,应该发送通知
conditional_function(15)
self.mock_channel.send.assert_called_once()
@patch('use_notify.channels.requests.post')
def test_channel_failure_handling(self, mock_post):
"""测试渠道失败处理"""
# 模拟网络错误
mock_post.side_effect = Exception("网络错误")
with self.assertLogs() as log:
try:
self.notify.publish(title="测试", content="测试内容")
except Exception:
pass
# 验证错误被正确记录
self.assertTrue(any("网络错误" in record.message for record in log.records))
class TestNotificationIntegration(unittest.TestCase):
"""通知集成测试"""
def test_end_to_end_workflow(self):
"""端到端工作流测试"""
# 创建测试通知实例
test_notify = useNotify()
# 添加测试渠道
test_channel = Mock()
test_channel.send.return_value = True
test_notify.add(test_channel)
# 模拟完整的业务流程
@notify(
notify_instance=test_notify,
title="业务流程完成",
condition=lambda result: result['success'],
content_template="处理了 {count} 项数据,耗时 {duration} 秒"
)
def business_process(data):
start_time = time.time()
# 模拟数据处理
processed_count = len(data)
time.sleep(0.1) # 模拟处理时间
duration = round(time.time() - start_time, 2)
return {
'success': True,
'count': processed_count,
'duration': duration
}
# 执行业务流程
test_data = [1, 2, 3, 4, 5]
result = business_process(test_data)
# 验证结果
self.assertTrue(result['success'])
self.assertEqual(result['count'], 5)
# 验证通知被发送
test_channel.send.assert_called_once()
# 验证通知内容
call_args = test_channel.send.call_args
self.assertEqual(call_args[1]['title'], "业务流程完成")
self.assertIn("5", call_args[1]['content']) # 包含处理数量
if __name__ == '__main__':
unittest.main()
2. 性能测试
python
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from use_notify import useNotify, notify
def performance_test():
"""性能测试"""
# 创建测试通知实例
test_notify = useNotify()
# 添加快速测试渠道
class FastTestChannel:
def __init__(self):
self.call_count = 0
def send(self, title, content, **kwargs):
self.call_count += 1
time.sleep(0.001) # 模拟1ms延迟
return True
async def send_async(self, title, content, **kwargs):
self.call_count += 1
await asyncio.sleep(0.001) # 模拟1ms异步延迟
return True
test_channel = FastTestChannel()
test_notify.add(test_channel)
# 测试同步性能
print("测试同步通知性能...")
start_time = time.time()
for i in range(100):
test_notify.publish(title=f"测试 {i}", content=f"内容 {i}")
sync_duration = time.time() - start_time
print(f"同步发送100条通知耗时: {sync_duration:.2f}秒")
print(f"平均每条通知耗时: {sync_duration/100*1000:.2f}ms")
# 测试异步性能
async def async_performance_test():
print("\n测试异步通知性能...")
start_time = time.time()
tasks = []
for i in range(100):
task = test_notify.publish_async(title=f"异步测试 {i}", content=f"异步内容 {i}")
tasks.append(task)
await asyncio.gather(*tasks)
async_duration = time.time() - start_time
print(f"异步发送100条通知耗时: {async_duration:.2f}秒")
print(f"平均每条通知耗时: {async_duration/100*1000:.2f}ms")
print(f"性能提升: {sync_duration/async_duration:.2f}x")
asyncio.run(async_performance_test())
print(f"\n总调用次数: {test_channel.call_count}")
if __name__ == '__main__':
performance_test()
通过遵循这些最佳实践,您可以构建一个可靠、高效、易维护的通知系统。记住要根据具体的业务需求和环境特点来调整这些实践,确保通知系统能够真正为您的应用程序增值。