AWS代付、代实名
阿里云国际 | 腾讯云国际

Amazon SES 电商邮件运营进阶:大规模营销活动和邮件高可用

AWS账单代付阅读(0)

概述

双十一、黑五、618…每个电商大促都是对邮件系统的极限挑战。日常 1 万封的发送量,在大促期间可能暴涨到 100 万封。如何确保邮件系统在流量峰值时稳定运行?如何在成本可控的前提下保证高送达率?

本文将分享电商企业在大规模邮件营销中的实战经验,帮助你构建高可用、可扩展的邮件发送架构。

💡

前置阅读:建议先阅读 *《从零到一搭建发送平台》*了解 SES 基础

大规模场景的三大挑战

挑战 1:流量峰值管理

典型场景:双十一预热邮件

日常发送量:10,000 封/天

大促前 3 天:500,000 封/天(50x 增长)

大促当天:1,000,000 封/天(100x 增长)

问题: – ❌ 配额不足,邮件发送失败 – ❌ 发送速率限制,邮件延迟 – ❌ 突然的流量增长导致 ISP 限流

挑战 2:送达率保障

数据对比

|场景

|送达率

|影响

|日常运营(共享 IP)

|95-97%

|可接受

|大促期间(共享 IP)

|85-90%

|

损失 10 ** **万+ ** **用户触达

|大促期间(专用 IP + 预热)

|98-99%

|最优

1% ** **送达率差异** ** = 10,000 ** **封邮件** ** = ** **潜在数十万营收

挑战 3:成本控制

成本构成分析

|项目

|日常成本

|大促成本

|优化空间

|邮件发送费用

|$1/天

|$100/天

|通过批量 API 降低 20%

|专用 IP

|$0

|$24.95/月

|大促后释放

|数据传输

|$0.5/天

|$10/天

|附件外链化降低 50%

|

总计 |

$1.5/ |

$110/ |

可优化 30%

容量规划:提前 1 个月准备

配额计算公式

Amazon SES 电商邮件运营进阶:大规模营销活动和邮件高可用

def calculate_quota(user_count, email_per_user, campaign_days):

“””

user_count: 目标用户数

email_per_user: 每用户邮件数(预热+正式)

campaign_days: 活动天数

“””

total_emails = user_count * email_per_user

daily_quota = total_emails / campaign_days * 1.2 # 20% 缓冲

计算所需发送速率(假设集中在 8 小时发送)

sending_hours = 8

required_rate = daily_quota / (sending_hours * 3600)

return {

‘daily_quota’: int(daily_quota),

‘sending_rate’: int(required_rate) + 1

}

示例:双十一活动

quota = calculate_quota(

user_count=1_000_000, # 100 万用户

email_per_user=3, # 预热 2 封 + 正式 1 封

campaign_days=5 # 5 天活动期

)

print(f”所需配额:{quota[‘daily_quota’]:,} 封/天”)

print(f”所需速率:{quota[‘sending_rate’]} 封/秒”)

输出

所需配额:720,000 封/天

所需速率:25 封/秒

配额申请时间表

|时间节点

|行动

|说明

|

大促前 6 |评估发送量需求

|基于历史数据和业务预测

|

大促前 5 |提交配额申请

|通过 AWS Support 提交

|

大促前 4 |申请专用 IP

|开始 IP 预热流程

|

大促前 1 |压力测试

|验证系统承载能力

|

大促期间 |实时监控

|关注关键指标

📚

参考文档: *Managing your Amazon SES sending quotas*

专用 IP 预热:选择合适的方式

为什么需要预热?

ISP 对新 IP 地址持谨慎态度。如果突然从新 IP 发送大量邮件,会被视为可疑行为,导致: – 邮件被限流(throttling) – 邮件进入垃圾箱 – IP 被加入黑名单

大促场景推荐:标准自动预热

对于大促场景,推荐使用

标准自动预热(Standard Auto Warm-up ,原因:

进度可追踪:通过预热百分比精确控制流量

灵活调整:可以根据大促时间表加速或减速

Public IP 分流:前期利用 AWS Public IP 池分担流量

成本可控:只在大促期间使用,之后可以释放

大促准备时间表

|时间节点

|预热百分比

|每日发送量

|操作

|

大促前 4 |0% → 25%

|500 → 50,000

|申请 DIP,开始预热

|

大促前 3 |25% → 50%

|50,000 → 200,000

|监控退信率和投诉率

|

大促前 2 |50% → 75%

|200,000 → 500,000

|逐步增加发送量

|

大促前 1 |75% → 100%

|500,000 → 目标量

|达到生产水平,压力测试

|

大促期间 |100%

|目标量

|全力发送

|

大促后 1 |–

|–

|评估是否保留 DIP

标准预热计划(详细版)

|周次

|每日发送量

|累计发送量

|重点关注

|

第 1 |500 → 2,000

|7,000

|选择高质量用户

|

第 2 |5,000 → 20,000

|87,500

|监控退信率 < 2%

|

第 3 |50,000 → 100,000

|525,000

|监控投诉率 < 0.05%

|

第 4 |200,000 → 目标量

|1,050,000+

|达到生产水平

标准自动预热监控脚本

import boto3

from datetime import datetime, timedelta

class StandardAutoWarmupManager:

“””标准自动预热管理器”””

def __init__(self, dedicated_ip):

self.ses_client = boto3.client(‘sesv2’)

self.dedicated_ip = dedicated_ip

def get_warmup_status(self):

“””获取预热状态和百分比”””

response = self.ses_client.get_dedicated_ip(Ip=self.dedicated_ip)

ip_info = response[‘DedicatedIp’]

return {

‘ip’: ip_info[‘Ip’],

‘warmup_status’: ip_info[‘WarmupStatus’],

‘warmup_percentage’: ip_info.get(‘WarmupPercentage’, 0),

‘pool_name’: ip_info.get(‘PoolName’, ‘default’)

}

def calculate_sending_capacity(self, target_volume):

“””根据预热百分比计算当前可发送量”””

status = self.get_warmup_status()

percentage = status[‘warmup_percentage’]

当前可通过 DIP 发送的量

dip_capacity = int(target_volume * (percentage / 100))

剩余流量会通过 Public IP 发送

public_ip_volume = target_volume – dip_capacity

return {

‘warmup_percentage’: percentage,

‘dip_capacity’: dip_capacity,

‘public_ip_volume’: public_ip_volume,

‘total_volume’: target_volume

}

def adjust_warmup_percentage(self, new_percentage):

“””手动调整预热百分比(高级功能)”””

try:

self.ses_client.put_dedicated_ip_warmup_attributes(

Ip=self.dedicated_ip,

WarmupPercentage=new_percentage

)

print(f”✅ 预热百分比已调整为 {new_percentage}%”)

return True

except Exception as e:

print(f”❌ 调整失败:{e}”)

return False

def monitor_warmup_progress(self):

“””监控预热进度”””

status = self.get_warmup_status()

print(f”IP: {status[‘ip’]}”)

print(f”预热状态: {status[‘warmup_status’]}”)

print(f”预热进度: {status[‘warmup_percentage’]}%”)

根据进度给出建议

percentage = status[‘warmup_percentage’]

if percentage < 25:

print(“📊 建议:当前处于预热初期,保持低发送量”)

elif percentage < 50:

print(“📊 建议:可以逐步增加发送量”)

elif percentage < 75:

print(“📊 建议:接近预热完成,可以加大发送量”)

else:

print(“📊 建议:预热接近完成,可以全力发送”)

return status

使用示例

manager = StandardAutoWarmupManager(‘192.0.2.1’)

监控预热进度

status = manager.monitor_warmup_progress()

计算当前可发送量(目标 100 万封)

capacity = manager.calculate_sending_capacity(1_000_000)

print(f”\n当前可通过 DIP 发送:{capacity[‘dip_capacity’]:,} 封”)

print(f”通过 Public IP 发送:{capacity[‘public_ip_volume’]:,} 封”)

如果需要加速预热(谨慎使用)

manager.adjust_warmup_percentage(60)

预热进度追踪示例

*# * *每天检查预热进度*

|Day 1

|预热进度: 5%

|DIP 发送 500 封

|Public IP 发送 9,500 封

|Day 7

|预热进度: 25%

|DIP 发送 50,000 封

|Public IP 发送 150,000 封

|Day 14

|预热进度: 50% →

|DIP 发送 250,000 封

|Public IP 发送 250,000 封

|Day 21

|预热进度: 75%

|DIP 发送 600,000 封

|Public IP 发送 200,000 封

|Day 28

|预热进度: 100%

|DIP 发送 1,000,000 封

|Public IP 发送 0 封

托管 IP 池:长期弹性扩容方案

对于发送量波动大、需要长期使用 DIP 的企业,

托管 IP 池( Managed IP Pool 是最简单的方式。

适用场景: – ✅ 发送量波动大(日常 10 万,大促 100 万) – ✅ 需要长期使用 DIP – ✅ 不想关心 IP 预热和扩容细节

核心优势: – 🤖 全自动:AWS 自动申请、预热、扩容、释放 IP – 📈 弹性扩容:根据发送量自动增减 IP 数量 – 💰 成本优化:只为实际使用的 IP 付费 – 🎯 最省心:无需人工干预

配置示例

创建托管 IP 池

aws sesv2 create-dedicated-ip-pool \

–pool-name ecommerce-managed-pool \

–scaling-mode MANAGED

将 Configuration Set 关联到托管池

aws sesv2 put-configuration-set-delivery-options \

–configuration-set-name marketing-campaigns \

–sending-pool-name ecommerce-managed-pool

自动扩容示例

时间线:

├─ 第 1 个月:日发送 10 万封

│ └─ AWS 分配:1 个 DIP

├─ 第 2 个月:日发送增长到 50 万封

│ └─ AWS 自动:申请并预热第 2 个 DIP

├─ 第 3 个月(大促):日发送 100 万封

│ └─ AWS 自动:申请并预热第 3、4 个 DIP

└─ 第 4 个月:日发送降回 30 万封

└─ AWS 自动:释放多余的 DIP,保留 2 个

成本对比

|方案

|日常成本

|大促成本

|灵活性

|

固定 4 个 DIP |$99.8/月

|$99.8/月

|❌ 日常浪费

|

托管 IP |$24.95/月

|$99.8/月

|✅ 按需付费

|

节省 |

$74.85/ |$0

|–

预热期间的最佳实践

1.选择高质量用户

def select_warmup_users(all_users, warmup_percentage):

“””根据预热百分比选择发送用户”””

预热初期(< 25%):只发送给最活跃用户

if warmup_percentage < 25:

return [u for u in all_users if u[‘engagement_score’] > 80]

预热中期(25-50%):活跃 + 中等活跃用户

elif warmup_percentage < 50:

return [u for u in all_users if u[‘engagement_score’] > 50]

预热后期(50-75%):大部分用户

elif warmup_percentage < 75:

return [u for u in all_users if u[‘engagement_score’] > 30]

预热完成(> 75%):所有用户

else:

return all_users

2.实时监控关键指标

def monitor_warmup_health(dedicated_ip):

“””监控预热健康度”””

cloudwatch = boto3.client(‘cloudwatch’)

获取过去 1 小时的指标

metrics = cloudwatch.get_metric_statistics(

Namespace=’AWS/SES’,

MetricName=’Reputation.BounceRate’,

Dimensions=[{‘Name’: ‘DedicatedIp’, ‘Value’: dedicated_ip}],

StartTime=datetime.now() – timedelta(hours=1),

EndTime=datetime.now(),

Period=3600,

Statistics=[‘Average’]

)

bounce_rate = metrics[‘Datapoints’][0][‘Average’] if metrics[‘Datapoints’] else 0

告警阈值

if bounce_rate > 0.05: # 5%

send_alert(f”⚠️ 退信率过高:{bounce_rate:.2%}”)

return False

return True

📚

参考文档: *Warming up dedicated IP addresses*

高可用架构:多区域部署

架构设计

故障转移实现

class MultiRegionSender:

def __init__(self):

self.regions = [

{‘name’: ‘us-east-1’, ‘priority’: 1, ‘client’: boto3.client(‘ses’, region_name=’us-east-1′)},

{‘name’: ‘us-west-2’, ‘priority’: 2, ‘client’: boto3.client(‘ses’, region_name=’us-west-2′)}

]

self.current_region = self.regions[0]

def send_email_with_failover(self, email_data, max_retries=2):

“””带故障转移的邮件发送”””

for region in self.regions:

try:

response = region[‘client’].send_email(**email_data)

print(f”✅ 发送成功 via {region[‘name’]}”)

return response

except Exception as e:

print(f”❌ {region[‘name’]} 发送失败: {e}”)

如果不是最后一个区域,尝试下一个

if region != self.regions[-1]:

print(f”🔄 切换到 {self.regions[self.regions.index(region) + 1][‘name’]}”)

continue

else:

所有区域都失败,记录到死信队列

self.send_to_dlq(email_data)

raise

def send_to_dlq(self, email_data):

“””发送到死信队列,稍后重试”””

sqs = boto3.client(‘sqs’)

sqs.send_message(

QueueUrl=’https://sqs.us-east-1.amazonaws.com/[已去除电话]/email-dlq’,

MessageBody=json.dumps(email_data)

)

健康检查

def check_ses_health(region):

“””检查 SES 服务健康状态”””

ses_client = boto3.client(‘ses’, region_name=region)

cloudwatch = boto3.client(‘cloudwatch’, region_name=region)

try:

检查账户状态

account_info = ses_client.get_account()

检查发送配额

quota = account_info[‘SendQuota’]

usage_percent = (quota[‘SentLast24Hours’] / quota[‘Max24HourSend’]) * 100

检查最近的错误率

metrics = cloudwatch.get_metric_statistics(

Namespace=’AWS/SES’,

MetricName=’Send’,

StartTime=datetime.now() – timedelta(minutes=5),

EndTime=datetime.now(),

Period=300,

Statistics=[‘Sum’]

)

return {

‘healthy’: usage_percent < 90, # 使用率 < 90%

‘usage_percent’: usage_percent,

‘quota_remaining’: quota[‘Max24HourSend’] – quota[‘SentLast24Hours’]

}

except Exception as e:

return {‘healthy’: False, ‘error’: str(e)}

性能优化:批量发送与并发控制

使用 SendBulkTemplatedEmail API

相比逐封发送,批量 API 可以: – ✅ 减少 API 调用次数 50 倍 – ✅ 降低网络开销 60% – ✅ 提升发送速度 3-5 倍

def send_bulk_campaign(recipients, template_name):

“””批量发送模板化邮件”””

ses_client = boto3.client(‘sesv2’)

准备批量目标(最多 50 个/批次)

bulk_destinations = []

for recipient in recipients:

bulk_destinations.append({

‘Destination’: {

‘ToAddresses’: [recipient[’email’]]

},

‘ReplacementTemplateData’: json.dumps({

‘name’: recipient[‘name’],

‘product’: recipient[‘recommended_product’],

‘discount’: recipient[‘discount_code’]

})

})

分批发送

batch_size = 50

for i in range(0, len(bulk_destinations), batch_size):

batch = bulk_destinations[i:i + batch_size]

try:

response = ses_client.send_bulk_email(

FromEmailAddress='[已去除邮箱]’,

DefaultContent={

‘Template’: {

‘TemplateName’: template_name,

‘TemplateData’: json.dumps({‘default’: ‘value’})

}

},

BulkEmailEntries=batch,

ConfigurationSetName=’marketing-campaign’

)

print(f”✅ 批次 {i//batch_size + 1} 发送成功:{len(batch)} 封”)

except Exception as e:

print(f”❌ 批次 {i//batch_size + 1} 发送失败:{e}”)

并发控制与速率限制

import asyncio

import aioboto3

from asyncio import Semaphore

class RateLimitedSender:

def __init__(self, max_rate_per_second=50):

self.max_rate = max_rate_per_second

self.semaphore = Semaphore(max_rate_per_second)

self.sent_count = 0

self.start_time = None

async def send_with_rate_limit(self, email_data):

“””带速率限制的发送”””

async with self.semaphore:

if self.start_time is None:

self.start_time = asyncio.get_event_loop().time()

检查是否需要等待

elapsed = asyncio.get_event_loop().time() – self.start_time

if elapsed < 1.0 and self.sent_count >= self.max_rate:

await asyncio.sleep(1.0 – elapsed)

self.sent_count = 0

self.start_time = asyncio.get_event_loop().time()

发送邮件

async with aioboto3.Session().client(‘ses’, region_name=’us-east-1′) as ses:

response = await ses.send_email(**email_data)

self.sent_count += 1

return response

使用示例

async def send_campaign(recipients):

sender = RateLimitedSender(max_rate_per_second=50)

tasks = [sender.send_with_rate_limit(r) for r in recipients]

results = await asyncio.gather(*tasks, return_exceptions=True)

return results

连接池管理

from botocore.config import Config

from botocore.config import Config

配置连接池

config = Config(

max_pool_connections=100, # 最大连接数

retries={‘max_attempts’: 3, ‘mode’: ‘adaptive’}

)

ses_client = boto3.client(‘ses’, config=config)

📚

参考文档: *Sending bulk email with Amazon SES*

监控告警:实时掌控发送状态

关键监控指标

CloudWatch 告警配置

CRITICAL_ALARMS = {

‘bounce_rate’: {

‘threshold’: 0.05, # 5%

‘evaluation_periods’: 1,

‘action’: ‘pause_sending’

},

‘complaint_rate’: {

‘threshold’: 0.001, # 0.1%

‘evaluation_periods’: 1,

‘action’: ‘pause_sending’

},

‘send_error_rate’: {

‘threshold’: 0.10, # 10%

‘evaluation_periods’: 2,

‘action’: ‘switch_region’

},

‘quota_usage’: {

‘threshold’: 0.90, # 90%

‘evaluation_periods’: 1,

‘action’: ‘request_increase’

}

}

使用 VDM 实时监控大促活动

VDM 在大促场景的价值

大促期间,通过 VDM Dashboard 实时监控关键指标:

监控方式 1** **:VDM Dashboard ** **实时查看

  • 打开 VDM Dashboard
  • 选择时间范围:最近 1 小时
  • 关注关键指标:
  • 送达率 > 95%
  • 退信率 < 5%
  • 投诉率 < 0.1%
  • 查看 ISP 数据:
  • Gmail 收件箱率
  • Outlook 收件箱率
  • Yahoo 收件箱率
  • 监控方式 2** **:CloudWatch ** **告警

import boto3

cloudwatch = boto3.client(‘cloudwatch’)

创建送达率告警

cloudwatch.put_metric_alarm(

AlarmName=’ses-low-delivery-rate’,

ComparisonOperator=’LessThanThreshold’,

EvaluationPeriods=1,

MetricName=’Reputation.DeliveryRate’,

Namespace=’AWS/SES’,

Period=300, # 5 分钟

Statistic=’Average’,

Threshold=0.95, # 95%

ActionsEnabled=True,

AlarmActions=[

‘arn:aws:sns:us-east-1:[已去除电话]:ses-alerts’

],

AlarmDescription=’SES 送达率低于 95%’

)

创建退信率告警

cloudwatch.put_metric_alarm(

AlarmName=’ses-high-bounce-rate’,

ComparisonOperator=’GreaterThanThreshold’,

EvaluationPeriods=1,

MetricName=’Reputation.BounceRate’,

Namespace=’AWS/SES’,

Period=300,

Statistic=’Average’,

Threshold=0.05, # 5%

ActionsEnabled=True,

AlarmActions=[

‘arn:aws:sns:us-east-1:[已去除电话]:ses-alerts’

],

AlarmDescription=’SES 退信率超过 5%’

)

监控方式** ** 3** **:定期导出数据分析

每小时导出一次 VDM 数据

aws sesv2 create-export-job \

–export-data-source ‘{

“MetricsDataSource”: {

“Namespace”: “VDM”,

“Dimensions”: {

“CONFIGURATION_SET”: [“black-friday-campaign”]

},

“Metrics”: [

{“Name”: “SEND”},

{“Name”: “DELIVERY”},

{“Name”: “BOUNCE”},

{“Name”: “COMPLAINT”}

],

“StartDate”: “2024-11-24T00:00:00Z”,

“EndDate”: “2024-11-24T23:59:59Z”

}

}’ \

–export-destination ‘{

“DataFormat”: “CSV”,

“S3Url”: “s3://campaign-metrics/black-friday/”

}’

VDM Dashboard 大促监控要点

实时送达率:目标 > 98%

ISP 分布:关注 Gmail、Outlook、Yahoo 的表现

打开率趋势:对比历史活动,评估主题行效果

点击率 (UV):评估落地页和 CTA 效果

地域分析:不同地区的参与度差异

自动化告警响应

def lambda_handler(event, context):

“””CloudWatch 告警触发的自动响应”””

alarm_name = event[‘detail’][‘alarmName’]

alarm_state = event[‘detail’][‘state’][‘value’]

if alarm_state == ‘ALARM’:

if ‘bounce_rate’ in alarm_name or ‘complaint_rate’ in alarm_name:

暂停发送,等待人工介入

pause_all_campaigns()

send_urgent_notification(“🚨 发送已暂停:声誉指标异常”)

elif ‘send_error_rate’ in alarm_name:

切换到备用区域

switch_to_backup_region()

send_notification(“🔄 已切换到备用区域”)

elif ‘quota_usage’ in alarm_name:

自动申请配额提升

request_quota_increase()

send_notification(“📈 已提交配额提升申请”)

def pause_all_campaigns():

“””暂停所有活动发送”””

停止 SQS 消费

标记活动状态为暂停

pass

def switch_to_backup_region():

“””切换到备用区域”””

更新 Route 53 记录

重定向流量到备用区域

pass

成本优化策略

1. 合理使用专用 IP

策略:按需租用,大促后释放

def manage_dedicated_ip_lifecycle(campaign_schedule):

“””管理专用 IP 生命周期”””

ses_client = boto3.client(‘sesv2’)

大促前 4 周申请

if days_until_campaign(campaign_schedule) == 28:

response = ses_client.request_dedicated_ips(

PoolName=’campaign-pool’,

WarmupEnabled=True

)

print(f”✅ 已申请专用 IP:{response[‘DedicatedIps’]}”)

大促结束后 1 周释放

elif days_after_campaign(campaign_schedule) == 7:

释放专用 IP

for ip in get_campaign_ips():

ses_client.delete_dedicated_ip_pool(PoolName=’campaign-pool’)

print(“✅ 已释放专用 IP,节省成本”)

成本节省:$24.95/月 × 10 个月 = $249.5/年

2. 附件外链化

问题:大附件增加数据传输成本

def optimize_email_with_s3(email_content, attachments):

“””将附件上传到 S3,邮件中使用链接”””

s3_client = boto3.client(‘s3’)

attachment_links = []

for attachment in attachments:

上传到 S3

key = f”attachments/{uuid.uuid4()}/{attachment[‘filename’]}”

s3_client.upload_fileobj(

attachment[‘file’],

’email-attachments-bucket’,

key

)

生成预签名 URL(7 天有效)

url = s3_client.generate_presigned_url(

‘get_object’,

Params={‘Bucket’: ’email-attachments-bucket’, ‘Key’: key},

ExpiresIn=604800 # 7 days

)

attachment_links.append({

‘name’: attachment[‘filename’],

‘url’: url

})

在邮件中插入下载链接

email_html = email_content + “

附件下载:

for link in attachment_links:

email_html += f”{link[‘name’]}”

email_html += “”

return email_html

成本节省:数据传输成本降低 50%

3. 区域选择优化

|区域

|发送成本

|数据传输成本

|总成本(100万封)

|us-east-1

|$100

|$50

|$150

|ap-southeast-1

|$100

|$80

|$180

|

节省 |–

|

$30 |

20%

总结与下一步

通过本文,你已经掌握了: – ✅ 大规模场景的容量规划方法 – ✅ 专用 IP 预热的 4 周计划 – ✅ 高可用架构的多区域部署 – ✅ 性能优化和成本控制策略 – ✅ 实时监控和自动化告警

下一步行动

评估业务规模:计算大促期间的发送量需求

制定预热计划:提前 4 周申请专用 IP

搭建监控体系:配置 CloudWatch 告警

压力测试:大促前 1 周验证系统承载能力

系列文章

相关资源

📚

官方文档: – *Managing sending quotas* – *Dedicated IP warming* – *Monitoring your sending activity*

💡

最佳实践: – *High-volume email sending* – *Email deliverability best practices*

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用

|


快时尚电商行业智能体设计思路与应用实践(五)借助 AgentCore Runtime 与 Bedrock 模型平台,轻松实现 Claude Agent SDK 的生产级部署

AWS账单代付阅读(0)

序言

在智能体的开发实践中,一个常见现象是,在本地运行流畅的智能体,部署到生产环境后却频繁暴露出工程层面的不确定性问题,如执行时长不足、会话状态不稳定、算力资源分配困难、模型访问方式不统一,以及可观测性体系欠缺等。这类问题往往并非源于智能体自身的逻辑缺陷,而是由运行环境与模型平台之间的差异所引发。

随着快时尚电商企业对于自主式智能体的研发需求与日俱增,Claude Agent SDK 逐渐成为了智能体工具箱的重要组成部分,为构建具备多步推理与工具调用能力的自主式智能体提供了良好的开发体验,而 Amazon Bedrock 作为主流大模型的托管平台,为 Nova、Claude、DeepSeek、Qwen、Mistral 等系列模型提供了统一、安全且可控的推理服务。若将二者结合,并依托 AgentCore Runtime 环境,开发者便可在保留本地开发流畅度的同时,让智能体以较低的改造成本适应生产环境对稳定性、隔离性与可观测性的要求。

本文旨在通过一条务实的技术路径,介绍如何基于 AgentCore Runtime 与 Bedrock 模型平台,搭建能够直接承载 Claude Agent SDK 智能体的生产级运行环境。内容将涵盖本地开发、模型调用、环境配置、部署流程及常见问题排查等关键环节,以期帮助开发者更顺畅地将自主式智能体应用于实际业务场景。

希望本文能为正在或计划实施智能体生产落地的团队提供一份可行的技术参考,助力智能体部署过程更加可控、结果更可预期。

Amazon Bedrock AgentCore Runtime

在本地让智能体运行成功后,下一步是将其部署到一个稳定、可扩展、可观测的生产环境中。

Amazon Bedrock AgentCore 正是为此设计的运行底座,它为智能体应用提供了统一的开发模型、工具集与托管执行环境。其中, AgentCore Runtime 是一个专为智能体工作负载构建的无服务器执行环境。

与传统容器不同,AgentCore Runtime 采用基于

microVM 的隔离方式,为智能体运行提供安全、可靠、高效、并行且可预测的执行环境。其核心能力包括:

每次调用拥有独立的 microVM 环境,确保不同用户、任务和智能体之间完全隔离;

单次执行最长可达 8 小时,适合需要长时间推理、分析或外部工具调用的智能体;

框架无关的运行方式:无论你使用 Strands Agents、Claude Agent SDK、LangGraph、CrewAI 或自定义框架,均可通过 AgentCore Runtime 运行,只需提供符合规范的入口脚本。

AgentCore SDK 目前支持 Python、TypeScript等,使智能体能够在无服务器环境中运行,而无需自行构建或维护底层基础设施,从而有效降低部署与运维的复杂度。

Claude Agent SDK

Claude Agent SDK 是智能体开发引擎与模块化基础设施,其定位既非简单的提示词封装工具,也非低代码拖拽平台,而是一个用于构建生产就绪具备自主探索与执行能力的智能体应用的完整方案和技术底座。

它提供了包括上下文管理(记忆/会话)、工具调用、任务执行、权限与安全以及状态管理在内的核心支持,帮助开发者系统化地搭建稳定、可扩展的智能体应用。

该 SDK 同时提供 Python 和 TypeScript 版本,适用于后端服务、脚本自动化、Web 应用等多种场景。

借助 Claude Agent SDK,开发者可以大幅减少在“底层架构组件(如状态机、工作流引擎、上下文持久化、工具调用链路等)”上的实现负担,从而更专注于业务逻辑与智能体行为设计(如规划、决策、工具使用策略),能够更快速地从原型阶段推进到生产部署。

对于需要长上下文支持、复杂交互、多工具调用、严格安全权限控制或高稳定性部署的应用场景,例如代码生成与执行、文档分析、自动化流程、报告生成、RAG 与长文档理解等,Claude Agent SDK 提供了非常适宜的基础设施支持。

简而言之,Claude Agent SDK 的核心价值在于它是一个代码优先、面向生产、功能全面(涵盖工具调用、记忆、安全与可扩展性) 的智能体开发基础框架,帮助团队高效构建、迭代并交付可靠的企业级智能体应用。

Bedrock 模型平台

Bedrock 模型平台是 AWS 提供的全托管生成式 AI 服务,能够统一访问多家领先模型,并内置了企业级的安全、治理与扩展能力。

它让开发者无需自建基础设施即可灵活调用 Nova、Claude、DeepSeek、Qwen、Mistral 等前沿大模型,并支持针对私有数据的定制化微调与检索增强(RAG)。

Claude Agent SDK 结合 Bedrock 模型平台使用时,能直接依托其高可用、合规且全球部署的推理基础设施,确保智能体服务的稳定与低延迟。

Claude Agent SDK 可通过 Bedrock 无缝切换或组合不同模型(如 Nova、Claude、DeepSeek、Qwen、Mistral 等),从而根据任务场景优化成本与效果,提升智能体策略的灵活性。

借助 Bedrock 内置的监控、审计与权限控制,使用 Claude Agent SDK 开发的智能体可自然符合企业安全规范,实现工具调用与数据访问的可追踪、可管控。

该组合大幅降低了生产级智能体的运维门槛,让开发者更专注于智能体的行为逻辑,而将模型部署、扩缩容与合规保障交由平台托管。

Global CRIS vs GEO CRIS

使用Global CRIS(例如 global.anthropic.claude-sonnet-4-[已去除电话]-v1:0),可以实现在任意支持 Bedrock 的 AWS 区域调用同一模型,无需为不同区域维护多个模型 ID,简化了全球部署的管理复杂度。

该特性提升了 AI 推理服务的全球可扩展性和弹性,当一个区域出现服务中断或延迟时,请求可以路由至其他可用区域,保障了应用程序的高可用性。

它降低了在全球多个区域部署和维护 AI 应用的成本与运营负担,开发者只需使用一个统一的模型标识,即可构建具备跨区域容灾和低延迟推理能力的全球化应用架构。

|

|特性

|Global CRIS (全球跨区域推理)

|GEO CRIS (特定地域跨区域推理配置)

|1

|路由范围

|全球所有AWS区域

|特定的地理范围内(如美国、欧洲等)

|2

|配置前缀

|global.anthropic.claude…

|地理前缀(如 us., eu., ap.)+ anthropic.claude…

|3

|主要优势

|容量最大、弹性最强、吞吐量最高

|在特定地理边界内提供容灾和流量分发

|4

|适用场景

|对全球可用性、处理峰值流量有极高要求的应用

|需要遵守数据驻留法规、将流量限制在大洲内的应用

|5

|使用成本

|通常比单区域或地理CRIS更具成本效益

|根据具体区域定价

它们是如何工作的?

无论 Global CRIS 还是 GEO CRIS,都属于AWS的“跨区域推理”功能。当你从某个“源区域”发出请求时,Amazon Bedrock会自动根据你选择的配置文件,将请求路由到其“目标区域”列表中的一个最佳区域进行处理,以此提升性能、可用性和吞吐量。

如何选择使用哪一种?

选择哪种方式主要取决于具体需求:

选择Global CRIS:如果应用需要最高的全球可用性、应对突发流量峰值的能力,并且数据可以在全球范围内路由。

选择GEO CRIS:如果业务有严格的数据驻留或合规要求,必须将数据处理限制在某个地理边界内(如欧盟境内),同时仍需要在该区域内具备高可用性。

总而言之,你可以将 GEO CRIS 视为在一个大洲或国家内进行负载均衡和容灾,而 Global CRIS 则是在全球范围内进行资源调配和优化,本文的操作示例将选用Global CRIS。

操作示例

1.架构说明

┌─────────────────────────────────────────────────────┐

│ 用户请求

│ User Request

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ AgentCore Runtime

│ (托管基础设施、自动扩缩容、监控)

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ BedrockAgentCoreApp实例

│ • 接收HTTP/MCP请求

│ • 管理会话状态

│ • 处理内存(STM)

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ main函数入口

│ (@app.entrypoint装饰的异步入口点)

│ • 解析payload参数

│ • 调度示例函数执行

│ • 处理异常和返回结果

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ 函数调度

└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ Claude Agent SDK query函数

│ • 构造Claude请求参数

│ • 处理异步消息流

│ • 管理工具调用和结果

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ Claude Agent SDK 处理层

│ • AssistantMessage: 助手响应

│ • ResultMessage: 结果和成本

│ • ToolMessage: 工具调用结果

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ Bedrock Claude 模型

│ (Sonnet 4.5推理,Global CRIS)

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ 工具执行层

│ • Read工具:文件读取

│ • Write工具:文件写入

│ • 其他自定义工具

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ 响应处理

│ • 提取TextBlock内容

│ • 格式化输出

│ • 计算和显示成本

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ 返回BedrockAgentCoreApp

│ • 异步函数执行完成

│ • 控制权返回给装饰器

│ • 生成HTTP/MCP响应

└────────────────┬────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐

│ 最终响应

│ • Bedrock AgentCore封装

│ • 返回给用户

└─────────────────────────────────────────────────────┘

2.本地运行

安装 Claude Agent SDK,并同时安装 Claude Code。

  • claude-agent-sdk
  • claude-code
  • 验证 Claude Code 是否正常工作

Claude Agent SDK 依赖 Claude Code,支持订阅和 API 模式。下面将进行 Claude Code 与 Bedrock 集成,关键环境变量如下:

set AWS_BEARER_TOKEN_BEDROCK=

set CLAUDE_CODE_USE_BEDROCK=1

set AWS_REGION=

set ANTHROPIC_MODEL=global.anthropic.claude-sonnet-4-[已去除电话]-v1:0

set ANTHROPIC_SMALL_FAST_MODEL=global.anthropic.claude-haiku-4-[已去除电话]-v1:0

export AWS_BEARER_TOKEN_BEDROCK=

export CLAUDE_CODE_USE_BEDROCK=1

export AWS_REGION=

export ANTHROPIC_MODEL=global.anthropic.claude-sonnet-4-[已去除电话]-v1:0

export ANTHROPIC_SMALL_FAST_MODEL=global.anthropic.claude-haiku-4-[已去除电话]-v1:0

可以通过

Claude Code CLI 的

/model 命令验证模型配置。

> /model

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

Select model

Switch between Claude models. Applies to this session and future Claude Code sessions. For other/previous model names, specify with –model.

1. Default (recommended) Use the default model (currently Sonnet 4.5) · $3/$15 per Mtok

2. Opus Opus 4.5 · Most capable for complex work · $5/$25 per Mtok

3. Haiku Haiku 4.5 · Fastest for quick answers · $1/$5 per Mtok

> 4. global.anthropic.claude-sonnet-4-[已去除电话]-v1:0 Custom model √

运行本地示例代码

使用 Claude Agent SDK 运行后会输出 Claude 的回答结果。

claude_agent_sdk_on_local_machine.py

快时尚电商行业智能体设计思路与应用实践(五)借助 AgentCore Runtime 与 Bedrock 模型平台,轻松实现 Claude Agent SDK 的生产级部署

import anyio

from claude_agent_sdk import (

AssistantMessage,

ClaudeAgentOptions,

ResultMessage,

TextBlock,

query,

)

用来缓存 “示例二” 助理的输出内容

system_prompt_output = “”

async def demo_basic_query():

print(“✨ 【示例一:基础问答】”)

print(“准备向智能体提一个经典勾股定理问题…\n”)

async for message in query(prompt=”勾如果是8,股如果是15,那么弦是多少?”):

if isinstance(message, AssistantMessage):

for block in message.content:

if isinstance(block, TextBlock):

print(f”✨ 助理回答:{block.text}”)

print(“\n—\n”)

async def demo_system_prompt():

global system_prompt_output

print(“✨ 【示例二:系统提示与角色设定】”)

print(“让助理以“简单解释型”角色来说明一个概念…\n”)

options = ClaudeAgentOptions(

system_prompt=”你是一位擅长把复杂概念说得浅显易懂的老师,请用一句话回答。”,

max_turns=1,

)

async for message in query(

prompt=”请用一句话解释什么是勾股定理。”,

options=options,

):

if isinstance(message, AssistantMessage):

for block in message.content:

if isinstance(block, TextBlock):

print(f”✨ 助理回答:{block.text}”)

保存内容,用于写入 科普.txt

system_prompt_output = block.text

print(“\n—\n”)

async def demo_tool_usage():

print(“✨ 【示例三:工具调用】”)

print(“让助理使用工具创建科普文件…\n”)

options = ClaudeAgentOptions(

allowed_tools=[“Read”, “Write”],

system_prompt=”你是一位贴心的文件小助手。”,

)

直接创建科普.txt

global system_prompt_output

write_science_file_prompt = f”创建一个名为 科普.txt 的文件,内容为:'{system_prompt_output}'”

print(“✨ 正在将示例二的输出写入 科普.txt…\n”)

async for message in query(

prompt=write_science_file_prompt,

options=options,

):

if isinstance(message, AssistantMessage):

for block in message.content:

if isinstance(block, TextBlock):

print(f”✨ 助理消息:{block.text}”)

elif isinstance(message, ResultMessage) and message.total_cost_usd > 0:

print(f”\n✨ 本次调用费用:${message.total_cost_usd:.4f}”)

print(“\n—\n”)

async def main():

print(“✨ Claude Agent SDK 示例开始运行!\n”)

await demo_basic_query()

await demo_system_prompt()

await demo_tool_usage()

print(“✨ 所有示例已执行完毕!”)

if __name__ == “__main__”:

anyio.run(main)

运行命令:

python claude_agent_sdk_on_local_machine.py

运行结果:

✨ Claude Agent SDK 示例开始运行!

✨ 【示例一:基础问答】

准备向智能体提一个经典勾股定理问题…

✨ 助理回答:这是一个勾股定理的问题。

已知:

  • 勾(直角三角形的一条直角边)= 8
  • 股(直角三角形的另一条直角边)= 15
  • 弦(斜边)= ?

根据勾股定理:弦² = 勾² + 股²

计算:

弦² = 8² + 15²

弦² = 64 + 225

弦² = 289

弦 = √289 = 17

所以,弦是 17

✨ 【示例二:系统提示与角色设定】

让助理以“简单解释型”角色来说明一个概念…

✨ 助理回答:勾股定理就是直角三角形两条直角边的平方和等于斜边的平方,用公式表示就是 a² + b² = c²。

✨ 【示例三:工具调用】

让助理使用工具创建科普文件…

✨ 正在将示例二的输出写入 科普.txt…

✨ 助理消息:我来帮你创建这个文件。

✨ 助理消息:已成功创建文件 科普.txt,内容为关于勾股定理的科普说明!文件已保存到 /tmp/科普.txt

✨ 本次调用费用:$0.0196

✨ 所有示例已执行完毕!

2.本地包装

接下来,使用的 AgentCore Python SDK 为代码添加 AgentCore Runtime 包装,使其可以部署到 AgentCore Runtime,只需

添加几行代码,即可让现有智能体支持 AgentCore Runtime。

claude_agent_sdk_on_agentcore_runtime.py

!/usr/bin/env python3

import anyio

from bedrock_agentcore import BedrockAgentCoreApp

from claude_agent_sdk import (

AssistantMessage,

ClaudeAgentOptions,

ResultMessage,

TextBlock,

query,

)

创建 AgentCore 应用实例

app = BedrockAgentCoreApp()

用来缓存 “示例二” 助理的输出内容

system_prompt_output = “”

async def demo_basic_query():

print(“✨ 【示例一:基础问答】”)

print(“准备向智能体提一个经典勾股定理问题…\n”)

async for message in query(prompt=”勾如果是8,股如果是15,那么弦是多少?”):

if isinstance(message, AssistantMessage):

for block in message.content:

if isinstance(block, TextBlock):

print(f”✨ 助理回答:{block.text}”)

print(“\n—\n”)

async def demo_system_prompt():

global system_prompt_output

print(“✨ 【示例二:系统提示与角色设定】”)

print(“让助理以“简单解释型”角色来说明一个概念…\n”)

options = ClaudeAgentOptions(

system_prompt=”你是一位擅长把复杂概念说得浅显易懂的老师,请用一句话回答。”,

max_turns=1,

)

async for message in query(

prompt=”请用一句话解释什么是勾股定理。”,

options=options,

):

if isinstance(message, AssistantMessage):

for block in message.content:

if isinstance(block, TextBlock):

print(f”✨ 助理回答:{block.text}”)

system_prompt_output = block.text

print(“\n—\n”)

async def demo_tool_usage():

print(“✨ 【示例三:工具调用】”)

print(“让助理使用工具创建科普文件…\n”)

options = ClaudeAgentOptions(

allowed_tools=[“Read”, “Write”],

system_prompt=”你是一位贴心的文件小助手。”,

)

global system_prompt_output

write_science_file_prompt = (

f”创建一个名为 科普.txt 的文件,内容为:'{system_prompt_output}'”

)

print(“✨ 正在将示例二的输出写入 科普.txt…\n”)

async for message in query(

prompt=write_science_file_prompt,

options=options,

):

if isinstance(message, AssistantMessage):

for block in message.content:

if isinstance(block, TextBlock):

print(f”✨ 助理消息:{block.text}”)

elif isinstance(message, ResultMessage) and message.total_cost_usd > 0:

print(f”\n✨ 本次调用费用:${message.total_cost_usd:.4f}”)

print(“\n—\n”)

使用 AgentCore 的 entrypoint

@app.entrypoint

async def main(payload: dict=None):

print(“✨ Claude Agent SDK 示例开始运行!\n”)

await demo_basic_query()

await demo_system_prompt()

await demo_tool_usage()

print(“✨ 所有示例已执行完毕!”)

AgentCore Runtime 入口

if __name__ == “__main__”:

app.run()

代码包装增加了以下内容,可以看出,几乎不需要修改原有代码,就可以将 Claude Agent SDK 应用包装成 AgentCore Runtime 应用:

from bedrock_agentcore import BedrockAgentCoreApp

app = BedrockAgentCoreApp()

@app.entrypoint

async def main(payload: dict=None):

运行命令:

python claude_agent_sdk_on_agentcore_runtime.py

此时服务将在本地 8080 端口启动,可通过以下命令进行测试:

curl -X POST http://localhost:8080/invocations -H “Content-Type:application/json” -d “{}”

运行结果:

✨ Claude Agent SDK 示例开始运行!

✨ 【示例一:基础问答】

准备向智能体提一个经典勾股定理问题…

✨ 助理回答:这是一个关于勾股定理的问题。

根据勾股定理:勾² + 股² = 弦²

已知:

  • 勾 = 8
  • 股 = 15

计算:

  • 8² + 15² = 弦²
  • 64 + 225 = 弦²
  • 289 = 弦²
  • 弦 = √289 = **17**

所以,弦是17

这是一个经典的勾股数组合(8, 15, 17)。

✨ 【示例二:系统提示与角色设定】

让助理以“简单解释型”角色来说明一个概念…

✨ 助理回答:勾股定理是指在直角三角形中,两条直角边的平方和等于斜边的平方(即 a² + b² = c²)。

✨ 【示例三:工具调用】

让助理使用工具创建科普文件…

✨ 正在将示例二的输出写入 科普.txt…

✨ 助理消息:我来帮你创建这个文件。

✨ 助理消息:由于这是一个新文件,让我直接创建它:

✨ 助理消息:我需要在当前允许的工作目录中创建文件。让我在当前工作目录创建这个文件:

✨ 助理消息:完成了!我已经在当前工作目录(“)中成功创建了 科普.txt 文件,内容为:

勾股定理是指在直角三角形中,两条直角边的平方和等于斜边的平方(即 a² + b² = c²)。

文件已经创建好了!📝

✨ 本次调用费用:$0.0368

✨ 所有示例已执行完毕!

{“timestamp”: “2025-12-07T17:43:33.983Z”, “level”: “INFO”, “message”: “Invocation completed successfully (38.683s)”, “logger”: “bedrock_agentcore.app”, “requestId”: “f68d3ed5-387b-432e-8810-cdc66ef0c79f”}

3.云端托管

AgentCore Starter Toolkit 是一个命令行工具,可显著简化部署流程。

前置条件

  • Python 3.10 或更高版本
  • 已配置 AWS 凭证
  • 已安装 Amazon Bedrock AgentCore SDK
  • IAM 权限

部署智能体需要两组 IAM 权限:

开发者角色:允许创建与管理 AgentCore 资源;

执行角色:为运行时智能体授予必要权限。

设置

auto_create_execution_role=True 时,Toolkit 会自动创建执行角色,但开发者角色仍需手动配置。

开始部署

安装 Toolkit:

pip install bedrock-agentcore-starter-toolkit

agentcore –help

创建依赖文件

requirements.txt:

bedrock-agentcore==1.0.4

claude-agent-sdk==0.1.3

配置 AgentCore

运行命令

agentcore configure -e claude_agent_sdk_on_agentcore_runtime.py

-e (或

-entrypoint) 参数用于指定您智能体的入口点文件,即包含您智能体核心代码的 Python 文件。

此命令将为智能体创建用于部署到 AWS 的配置。

除非有特定需求,否则请接受所有默认值。

Configuring Bedrock AgentCore…

✓ Using file: claude_agent_sdk_on_agentcore_runtime.py

🏷️ Inferred agent name: claude_agent_sdk_on_agentcore_runtime

Press Enter to use this name, or type a different one (alphanumeric without ‘-‘)

Agent name [claude_agent_sdk_on_agentcore_runtime]:

✓ Using agent name: claude_agent_sdk_on_agentcore_runtime

🔍 Detected dependency file: requirements.txt

Press Enter to use this file, or type a different path (use Tab for autocomplete):

Path or Press Enter to use detected dependency file: requirements.txt

✓ Using requirements file: requirements.txt

🚀 Deployment Configuration

Warning: Direct Code Deploy deployment unavailable (zip utility not found). Falling back to Container deployment.

Select deployment type:

1. Container – Docker-based deployment

✓ Deployment type: Container

🔐 Execution Role

Press Enter to auto-create execution role, or provide execution role ARN/name to use existing

Execution role ARN/name (or press Enter to auto-create):

✓ Will auto-create execution role

🏗️ ECR Repository

Press Enter to auto-create ECR repository, or provide ECR Repository URI to use existing

ECR Repository URI (or press Enter to auto-create):

✓ Will auto-create ECR repository

🔐 Authorization Configuration

By default, Bedrock AgentCore uses IAM authorization.

Configure OAuth authorizer instead? (yes/no) [no]:

✓ Using default IAM authorization

🔒 Request Header Allowlist

Configure which request headers are allowed to pass through to your agent.

Common headers: Authorization, X-Amzn-Bedrock-AgentCore-Runtime-Custom-*

Configure request header allowlist? (yes/no) [no]:

✓ Using default request header configuration

Configuring BedrockAgentCore agent: claude_agent_sdk_on_agentcore_runtime

💡 No container engine found (Docker/Finch/Podman not installed)

✓ Default deployment uses CodeBuild (no container engine needed), For local builds, install Docker, Finch, or Podman

Memory Configuration

Tip: Use –disable-memory flag to skip memory entirely

✅ MemoryManager initialized for region: us-east-1

No existing memory resources found in your account

Options:

• Press Enter to create new memory

• Type ‘s’ to skip memory setup

Your choice:

✓ Short-term memory will be enabled (default)

• Stores conversations within sessions

• Provides immediate context recall

Optional: Long-term memory

• Extracts user preferences across sessions

• Remembers facts and patterns

• Creates session summaries

• Note: Takes 120-180 seconds to process

Enable long-term memory? (yes/no) [no]: no

✓ Using short-term memory only

Will create new memory with mode: STM_ONLY

Memory configuration: Short-term memory only

Network mode: PUBLIC

Generated Dockerfile: .bedrock_agentcore\claude_agent_sdk_on_agentcore_runtime\Dockerfile

Setting ‘claude_agent_sdk_on_agentcore_runtime’ as default agent

┌────────────────────────────────── Configuration Success ─────────────────────────────────────────────┐

│ Agent Details │

│ Agent Name: claude_agent_sdk_on_agentcore_runtime │

│ Deployment: container │

│ Region: us-east-1 │

│ Account: │

│ │

│ Configuration │

│ Execution Role: Auto-create │

│ ECR Repository: Auto-create │

│ Network Mode: Public │

│ ECR Repository: Auto-create │

│ Authorization: IAM (default) │

│ │

│ │

│ Memory: Short-term memory (30-day retention) │

│ │

│ │

│ Config saved to: .bedrock_agentcore.yaml │

│ │

│ Next Steps: │

│ agentcore launch │

└──────────────────────────────────────────────────────────────────────────────────────────────────────┘

配置后将生成

.bedrock_agentcore.yaml 文件。

修改 Dockerfile 以安装 Claude Code

.bedrock_agentcore/ 目录中,修改Dockerfile:

FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim

WORKDIR /app

所有环境变量集中在一个层中

ENV UV_SYSTEM_PYTHON=1 \

UV_COMPILE_BYTECODE=1 \

UV_NO_PROGRESS=1 \

PYTHONUNBUFFERED=1 \

DOCKER_CONTAINER=1 \

AWS_REGION=us-east-1 \

AWS_DEFAULT_REGION=us-east-1

安装 Node.js 和 npm (用于 CLAUDE CODE)

RUN apt-get update && apt-get install -y \

nodejs \

npm \

&& rm -rf /var/lib/apt/lists/*

全局安装 Claude Code CLI (用于 CLAUDE CODE)

RUN npm install -g @anthropic-ai/claude-code

COPY requirements.txt requirements.txt

从 requirements 文件安装依赖

RUN uv pip install -r requirements.txt

安装 AWS OpenTelemetry 发行版

RUN uv pip install aws-opentelemetry-distro>=0.10.1

设置环境变量以标识在 Docker 容器中运行,用于主机绑定逻辑

ENV DOCKER_CONTAINER=1

创建非 root 用户

RUN useradd -m -u 1000 bedrock_agentcore

USER bedrock_agentcore

暴露端口

EXPOSE 9000

EXPOSE 8000

EXPOSE 8080

复制整个项目 (遵循 .dockerignore 规则)

COPY . .

启动应用(带 OpenTelemetry 监控)

CMD [“opentelemetry-instrument”, “python”, “-m”, “claude_agent_sdk_on_agentcore_runtime”]

这样,部署时会自动安装 Claude Code CLI 以及其他所需组件。

启动部署

运行命令:

agentcore launch -a claude_agent_sdk_on_agentcore_runtime \

–env CLAUDE_CODE_USE_BEDROCK=1 \

–env AWS_BEARER_TOKEN_BEDROCK= \

–env DEBUG=true \

–env ANTHROPIC_MODEL=global.anthropic.claude-sonnet-4-[已去除电话]-v1:0 \

–env ANTHROPIC_SMALL_FAST_MODEL=global.anthropic.claude-haiku-4-[已去除电话]-v1:0

部署完成后,检查AgentCore Runtime的环境变量是否正确:

检查相关组件是否均已创建:

  • AgentCore Runtime
  • Short-Term Memory
  • ECR Repository
  • IAM Roles and Policies

进行测试:

可以在 CloudWatch 日志中查看调用详情,点击“Logs”:

点击“Start tailing”

可以通过“Live Tail”辅助调试

经验总结

  • 大多数无服务器平台只允许运行几分钟,而 AgentCore Runtime 允许运行 长达 8 小时! 这意味着智能体可以执行深度研究、长时间工作流或大规模数据处理任务,而无需担心超时。
  • 只需几行代码:导入、装饰、运行,无需重写智能体或调整架构,非常灵活。
  • 本地环境和生产运行有所不同,生产环境中同时有多个用户请求时,microVM 隔离意味着每次调用都有独立环境, 没有共享状态,没有内存泄漏,没有相互干扰。
  • 结语

通过本文介绍的方案,“本地应用开发”与“生产环境部署”之间的鸿沟得以显著缩小。部署过程变得简单,仅需少量代码调整与几条命令,即可将 Claude Agent SDK 智能体迁移至 AgentCore Runtime 上运行。AgentCore 的包装模式尊重原有代码结构,不强制开发者更换框架或改写业务逻辑,极大降低了生产化成本。

对于真正需要长时间推理、多步骤任务执行与工具链协作的智能体来说,会话隔离能力与最长 8 小时的执行窗口并非附加价值,而是支持“自主式探索型智能体”顺利运行的关键基础。依托 Bedrock 模型平台提供的统一、稳定、安全的模型调用体系,企业即可在一个可控、可观测、可扩展的环境中,让智能体真正走向生产级落地。

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用

|


行业筑基 iData · GenAI驱动的企业数据架构重塑

AWS账单代付阅读(0)

亚马逊AWS官方博客

行业筑基 iData · GenAI驱动的企业数据架构重塑

1. 引言:企业智能化落地的数据困境

在数字化转型的浪潮中,企业智能化已成为提升核心竞争力的关键。然而,根据多项调研数据,大多数企业在这条路上举步维艰。73%的高管无法从遗留的、脱节的供应链、资产和运营中获得可操作的数据驱动洞察;35%-65%的受访者指出工具不兼容是阻碍智能化的关键问题。

企业数据整合之所以困难,根本原因在于:

跨系统与跨部门协作的巨大沟通成本

当一个业务问题需要多个系统提供数据支持时,往往涉及多个部门的多名人员。从提出需求、沟通理解、数据获取、格式转换到最终分析,这一过程可能需要几天甚至几周时间。这些沟通与协调的时间成本,往往吞噬了AI本该带来的效率提升

系统间整合临时性与片面性

许多企业的系统整合是临时性的,缺乏完整视角。大量数据被存储在各个独立系统中,彼此之间有限互联或完全没有连接,导致数据流通受阻。一个简单的业务问题可能需要横跨多个系统才能获得完整答案,这使得数据的价值难以充分释放。

生成式AI的出现让业务部门(LoB)开始重新思考如何解决业务问题。产品经理需要结合销售部门的客户反馈和技术团队的开发进度来优化产品路线图;供应链经理需要同时访问采购记录、物流数据和财务部门的付款状态来解决交付延期问题;市场团队希望整合客服部门的用户投诉与研发团队的缺陷跟踪系统来改进宣传策略;人力资源部门需要分析跨越财务、业务和管理层的多维度数据来优化人才配置。这些跨系统、跨部门的数据需求,亟需一种更高效的数据架构来支撑。

制造、汽车、零售、游戏、媒体广告、金融服务、医疗健康七大行业虽业务场景各异,却共同面临”** **数据孤岛”** **这一核心瓶颈——** **多部门、多系统、多格式的数据割裂严重制约数据驱动决策。** 数据异构性体现在格式(SQL/Excel/文本/影像/日志/IoT流)、更新频率(毫秒级实时至周级批量)、存储位置(分散于各业务系统)三个维度。金融与医疗还叠加严格的合规与隐私要求,进一步提升整合难度。传统分析依赖人工经验和固定报表,难以支撑灵活按需的跨源洞察需求。当前Text2SQL等AI技术仅解决单一结构化数据查询,对跨数据源融合分析仍存在明显能力缺口。 **打通数据孤岛、实现异构数据智能整合,已成为企业数字化转型的关键突破口,具备语义理解、跨源融合与合规保障能力的AI** **数据平台将成为核心解法。

|维度

|汽车与制造

|零售

|游戏

|媒体与广告

|金融服务

|医疗健康

|1

|

核心痛点 |研产销服数据孤岛,跨部门全流程优化困难,车联网海量数据利用率低

|异构数据整合难,营销效果评估低效,库存与需求匹配滞后

|分析需求繁复,跨源洞察能力不足,策划迭代依赖数据平台

|内容与广告效果优化难,创意匹配慢,用户行为理解碎片化

|风控数据分散,客户视图不完整,合规监测滞后,反欺诈响应慢

|患者数据分散于多系统,跨院互通难,隐私合规要求高

|2

|

涉及部门 |研发、生产、质检、工程、采购、销售、售后、车联网运营

|销售、营销、仓储物流、客户服务

|策划、运营、营销、客服

|内容编辑、广告投放、用户运营、创意设计、数据分析

|风控、合规、零售业务、对公业务、客服、营销、IT

|临床科室、检验科、影像科、药剂科、医保、科研

|3

|

数据来源 |PLM/MES/QMS(生产质检)、SCM(供应链)、DMS(经销商)、TSP(车联网)、售后系统

|交易系统、仓储管理系统、CRM系统、广告平台

|玩家行为日志、游戏内经济系统、营销平台、客服系统

|CMS(内容管理)、广告投放平台、用户行为系统、社交媒体、版权系统

|核心业务系统、交易系统、CRM、征信系统、外部数据源、监管报送系统

|HIS(医院信息)、LIS(检验)、PACS(影像)、EMR(电子病历)、医保系统

|4

|

数据格式 |CAD/CAE设计文件+生产结构化数据+车联网IoT流+非结构化(客诉、工单)

|SQL、Excel、文本多样

|实时流+批量日志+结构化表

|文本、视频、图片、用户日志、广告效果数据

|结构化交易数据+非结构化(合同、录音、影像、舆情)

|结构化(检验报告、处方)+非结构化(病历文本、医学影像)

|5

|

更新频率 |设计数据低频+生产实时+车联网毫秒级+售后日级

|小时级/日级/周级不一

|近实时+批量+不定期

|内容发布不定期+广告效果实时/日级+用户行为实时

|交易实时+风控实时+日终批量+监管周期性

|门诊实时+检验日级+影像批量+科研周期性

|6

|

关键场景 |产品质量追溯、设备预防维护、工艺优化、供应链协同、用户驾驶行为分析、OTA策略优化

|ROI分析、精准营销、库存优化、客户全生命周期管理

|留存/付费优化、关卡难度调整、虚拟经济平衡

|内容智能推荐、广告效果归因、用户画像构建、创意A/B测试、版权追踪

|实时风控、反欺诈、客户360画像、智能营销、合规监测、信贷审批

|临床决策支持、药物相互作用预警、疾病风险预测、科研队列分析、医保控费

|7

|

整合价值 |缩短研发周期、提升整车质量、降低售后成本、优化用户体验

|提高营销ROI、降低库存成本、提升转化率

|提升LTV、优化游戏体验、缩短迭代周期

|提升内容分发效率、优化广告ROI、增强用户粘性、加速创意迭代

|降低风险损失、提升客户体验、满足监管要求、提高审批效率

|提升诊疗效率、减少误诊漏诊、优化医疗资源配置、加速临床科研

|8

|

AI 应用现状 |单点应用起步,车联网数据利用率低,研产销服打通困难

|Text2SQL起步,跨源分析不足

|Text2SQL有应用,海量原始数据分析存在瓶颈

|推荐算法相对成熟,跨平台数据整合与创意智能化不足

|风控模型较成熟,但数据孤岛严重,跨条线客户视图缺失

|隐私合规限制大,跨系统整合困难,AI辅助诊断仍在探索

|9

|

特殊挑战 |供应链上下游协同、车规级数据安全

|促销峰值弹性、全渠道统一

|玩家隐私、游戏平衡性

|版权合规、内容审核、多平台分发

|

强监管合规(反洗钱、数据本地化)、金融级安全 |

强隐私保护(HIPAA/个保法)、医学知识专业性

2. 企业数据架构的演进历程

企业数据架构经历了三个关键发展阶段:

第一代:数据驱动阶段(分散的数据孤岛)

早期企业数据架构特点是系统独立、数据分散。ERP、CRM、业务系统各自为政,数据存储在各自的数据库中,形成众多”数据孤岛”。这一阶段的企业主要关注数据的存储和基本运营,缺乏整体视角。

第二代:数据洞察驱动阶段(集中式数据平台)

随着数据量增长和分析需求提升,企业开始建立集中式数据平台,如数据仓库和数据湖。通过ETL工具从各业务系统抽取数据,集中存储并进行分析。这一阶段的企业追求”单一数据真相源”(Single Source of Truth),通过BI和高级分析工具提取数据洞察。

第三代:业务与创新驱动阶段(去中心化领域知识)

随着GenAI等技术发展,企业数据架构正向去中心化领域知识模式演进。这一模式保留了集中式数据平台的优势,同时在业务领域层面构建知识系统,整合供应链、研发、制造、可持续性等领域的专业知识,以支持更高效的业务决策和创新。

这种演进使GenAI能够获取企业上下文,提供真正可执行的洞察,从而实现数字主线(Digital Thread)—— 一种能够连接产品开发过程中传统上孤立的元素,并在整个生命周期中提供资产集成视图的通信架构。

3. GenAI驱动下的数据架构重塑

业务数据链条的重要性

打通业务数据链条是企业实现业务创新的关键。传统企业中,数据常被局限在设计、制造、运营等不同环节中,缺乏有效连接。这种断裂导致企业无法看到产品全生命周期的完整视图,从而限制了数据价值的发挥。

然而,建立这种链条面临多重挑战:

  • 数据孤岛:不同系统的数据格式、结构各异
  • 人员孤岛:不同部门人员使用不同系统,缺乏共同语言
  • 业务孤岛:各业务领域有其独特术语和流程,难以统一理解
  • 业务领域知识系统构建与数据产品目录

在现代企业数据架构中,业务领域知识系统是连接传统数据平台与终端用户的桥梁。这一系统不是简单堆叠现有数据,而是按业务领域(如财务、客户、供应链等)组织数据,并通过”数据产品”的概念使数据更易被消费。

数据产品目录的关键作用

数据产品目录是整个架构中的核心环节,它存储了对应的业务领域知识,使数据能够被有效组织和发现。单个数据系统可能存储海量数据,但业务用户往往只需其中一小部分。数据产品目录通过以下方式解决这一问题:

领域知识映射:将业务术语与底层数据建立映射关系 语义层建设:提供业务友好的数据定义和计算逻辑 元数据管理:记录数据来源、处理方式和质量指标 数据依赖关系:明确不同数据产品之间的关联

这使得AI系统能够感知可用的数据产品,并根据需要调用适当的数据,而无需了解底层数据的复杂结构。

从传统知识图谱到Agentic AI数据系统

Agentic AI将传统知识图谱的理念提升到整个数据系统层面。传统知识图谱通常是静态的,主要解决数据关联问题;而基于Agentic的数据系统则是动态的,能够主动理解业务问题,协调各系统获取数据,并生成有价值的洞察。

在这一模式下,不同业务领域会有专门的Agent负责处理该领域内的数据请求。例如,当产品经理需要了解”客户对产品的使用体验和新产品成本与收益”时,系统会自动协调产品专家Agent、财务专家Agent、客服专家Agent等多个角色,从SCM、OMS、CRM等系统获取所需数据,最终生成完整报告,整个过程无需人工协调多个部门。

这种Agent协作模式实现了真正的”按需即时取数”,其工作方式高度拟人化,就像在一个高效团队中各司其职的专家协同工作。以产品经理的这个请求为例,完整流程是这样展开的:

需求识别与分解:产品经理向产品专家Agent提出综合性问题,该Agent会像一位经验丰富的产品总监一样,将这个复杂问题分解为多个专业子问题。 多Agent 协同调度

  • 产品专家Agent首先向客服专家Agent请求”是否有客户反馈相关数据”
  • 同时向财务专家Agent询问”新产品成本和收益数据”

二级Agent 精准取数

  • 客服专家Agent进一步细分需求,分别向VOC专家Agent请求”新产品用户反馈汇总”,向售后专家Agent获取”新产品用户问题数据”,这些Agent则从CRM系统中精准提取相关数据
  • 财务专家Agent向供应链专家Agent请求”新产品物流成本数据”
  • 供应链专家Agent从SCM系统获取”新产品物料成本”和”新产品配送成本”
  • 销售专家Agent则向OMS专家Agent请求”新产品订单数据”,并直接获取销售报表数据

数据聚合与转化:各专业Agent不只是简单传递原始数据,而是像真实的业务专家一样,对数据进行清洗、分析和解读,将技术指标转化为业务洞察。 智能报告生成:产品专家Agent最终收到所有反馈后,综合各方面信息,生成一份既包含客户使用体验分析,也包含成本收益评估的完整报告,就像一位资深产品总监提交的专业分析报告。

整个过程中,每个Agent都精准理解自身职责范围内的数据需求,既不会过度索取造成系统负担,也不会遗漏关键信息。这种拟人化的数据获取方式不仅提高了效率,更重要的是保持了业务语境的连贯性,使最终生成的分析报告既有数据支撑,又具备业务洞察,真正实现了”数据即服务”的理念。

4. 现代企业数据架构实现路径

建设现代企业数据架构是一个渐进过程,可分为三个阶段:首先是智能外围阶段,通过非侵入式AI数据助手连接现有系统,由数据Agent统筹数据获取、内容管理与协同功能,以知识库为核心实现语义理解与跨系统整合;其次是主动数据架构阶段,实现需求驱动的数据发现与自动化管道构建,将一次性查询转化为持久化数据产品,并能智能设计数据模型以满足变化的业务需求;最终发展到数据即服务阶段,形成自治型数据产品生态系统,实现数据产品间自动协作与自我优化,构建全面覆盖业务领域的智能数据服务体系。

阶段一:智能外围 – 保留核心系统的AI数据助手

智能外围架构建立了原始数据系统与数据Agent之间的桥梁:

原始系统层:包括企业现有的数据API接口、原始数据系统以及下游数据API Agent 智能层:由数据Agent统筹,包含三个核心功能模块:

  • 数据获取工具(Function Caller):负责调用API、执行查询和提取数据
  • 数据内容管理(RAG):处理数据检索、增强和上下文理解
  • 下游协同(Collaborator):协调多个Agent之间的数据交换和任务分工

转换机制

  • MCP Server:管理API调用与数据获取工具之间的通信
  • Knowledge Base:连接数据缓存与内容管理,提供知识支持
  • Multi-Agen:实现下游数据与协同模块的交互
  • 相对传统系统的突破性创新**。传统企业数据系统(左侧)存在”程序化实现”和”人工介入”的割裂状态,例如在ERP、CRM和LOB系统形成独立孤岛,数据难以自动化流通。而智能外围架构(右侧)通过数据系统Agent统一协调,实现了各业务领域Agent(如ERP Agent、CRM Agent、LOB Agent)的智能连接。每个业务领域Agent都配备专属数据获取工具和内容管理系统,通过上下行数据流实现整体协同。 **知识库的核心价值:

在这一架构中,

数据内容管理知识库是整个系统的关键基础设施,发挥着不可替代的作用: 语义连接:知识库充当业务语言与系统数据之间的”翻译官”,使AI能够理解业务术语并映射到相应数据源 上下文保存:保存历史交互和业务背景,确保AI理解查询意图的连续性 跨系统推理:支持AI根据已有知识推断出可能需要查询的其他系统,实现主动数据关联 数据解释:提供对原始数据的业务解释框架,帮助AI生成有业务价值的分析结果 权限与规则:存储数据访问规则和使用约束,确保AI在合规前提下操作数据

这一阶段采取非侵入式方法,保留现有数据系统不变,通过外部AI系统扩展功能:

  • 智能数据采集与整合:Agentic AI自动收集、清洗和关联来自不同源的数据
  • 主动洞察生成:AI持续监控数据流,自动识别模式和趋势,生成业务洞察
  • 知识库构建:建立基础的业务领域知识库,支持跨系统查询

下面展示了一个案例:

该Demo构建了一个COO视角下的企业数据概览,包括三个关键部门对应的数据系统:销售部门、供应链部门和客服部门。界面清晰地展示了整体架构中的主控与分支关系:

主控面板:展示了COO Agent(Orchestrator)的功能区域,作为整个数据协作的枢纽,负责跨部门任务的分发与协调。该Agent由两个核心部分组成: 知识库(KB :包含企业宏观战略、部门协作规则、数据治理政策及业务流程图谱 MCP 工具集:提供系统级别的数据调用接口,支持跨部门数据整合与分析 部门区域:展示了各业务部门的专属Agent,包括: 销售Agent:连接CRM系统和订单管理系统 供应链Agent:连接库存管理系统和物流跟踪系统 客服Agent:连接客户反馈系统和工单管理系统

每个Agent(无论是orchestrator,还是部门Agent),均由知识库(包括该领域的业务规则、术语解释、历史决策记录和数据模式)和数据系统(MCP Tools)或者子数据Agent构成。这种结构确保了每个Agent不仅能够获取数据,还能理解和解释数据的业务含义。

通过这种架构设计,COO可以直观地监控整个企业数据生态,并在保留原有系统独立性的同时,实现跨部门的数据协同和智能决策支持。知识库在这一架构中发挥着关键作用,它使得不同部门的数据能够在业务语义层面实现连接,为企业提供全局视角的数据洞察。

当我们询问一个问题包括:

XXX产品的销量如何?业绩达标了吗?(是什么?)

为什么没有达标(为什么?)

怎么改进(怎么办?)

分析流程:每个步骤都由特定Agent负责,并通过知识库获取必要的业务上下文:

  • 问题接收与解析:理解用户提问意图
  • 相关数据收集:自动从多个系统提取数据
  • 根因分析:推断问题产生的可能原因
  • 解决方案生成:提出针对性建议
  • 阶段二:主动数据架构 – AI赋能的数据系统演进

在阶段一基础上,开始构建更为主动的数据架构:

  • 需求驱动的数据发现:AI代理根据业务问题自主寻找相关数据源并评估其价值
  • 智能数据模型构建:自动设计和实现新的数据集和架构以满足不断变化的需求
  • 自适应数据管道:AI系统持续优化数据流程,消除瓶颈并提高整体系统效率
  • 数据产品目录初步建立:开始正式构建数据产品目录,支持跨领域数据访问

需求驱动下持久化数据管道的构建过程:从右侧数据消费者出发,业务用户通过GenAI应用界面提出数据需求,触发中右部分的GenAI存储与应用层响应。在这一层,Agent计划系统智能分析请求、制定数据获取方案并协调执行;生成式AI模型(LLM)提供语义理解能力;数据获取与分析模块规划具体执行路径。这些智能组件不仅满足当前请求,更会建立起持久化数据管道 – 将一次性请求转化为可重复使用的数据流程。请求传递至中部数据处理中枢,这里的”Data治理”模块确保数据质量与一致性,而底层的”数据产品(领域知识管理)”则将临时数据需求转化为持久化数据产品,定义标准化的数据结构、处理逻辑和访问接口。系统根据需要调用批处理或实时处理工具,建立从左侧数据生产者(ERP、CRM、LOB等系统)到最终用户的端到端数据管道。这种方式使每个业务请求不只是一次性查询,而是促成可持续利用的数据资产的形成,实现了从”重复建设”到”积累沉淀”的数据能力提升模式。

还是以上面的数据为例,现在COO提出了一个新的业务/数据领域构建请求:请帮我生成新的数据系统用于风险管理监测

分析流程 需求收集与理解

  • 起始于COO输入请求:”请帮我生成数据架构设计方案给管理层测”
  • COO Agent通过主动提问方式明确关注点:”风险管理相关运营指标?”
  • 系统识别出关键需求指标,包括”生产环境数据质量与业务流程相关指标”和”用户体验度量指标分析”

数据源智能发现与能力评估

  • COO Agent自动识别并调用相关业务Agent:销售Agent、客服Agent和供应链Agent
  • 各业务Agent主动报告其可提供的数据接口:
  • 销售Agent:销售量、市场量、利润等核心指标
  • 客服Agent:用户满意度、投诉量等体验指标
  • 供应链Agent:生产量、库存量、交付指标等

智能数据管道设计

  • 基于需求自动规划数据存储方案:
  • 为非结构化用户行为数据选择NoSQL存储
  • 为结构化业务数据规划关系型数据库
  • 为历史分析数据设计数据仓库平台
  • ETL流程智能设计:
  • 设计数据转换规则并考虑跨系统关联需求
  • 规划批处理作业自动调度
  • 设计数据处理监控机制
  • 自动纳入安全与合规考量:
  • 数据加密方案
  • 基于角色的访问控制策略
  • 备份与恢复计划设计
  • 数据合规要求分析

综合架构方案生成

  • COO Agent生成完整的数据架构设计方案,包含:
  • 核心风险管理指标框架与数据源映射
  • 三大支撑系统设计:智能风险预警、客户体验监测、质量追溯系统
  • 系统预期效益分析:提升异常识别能力、改善客户体验、降低质量问题

当确认新的数据领域构建后,便会形成新的风险管理数据领域

阶段三:数据即服务 – 自治型智能数据产品

最终阶段是建立完全自治的数据生态系统:

  • 智能协作网络:数据产品之间建立自动化协作机制,形成自组织的数据生态系统
  • 持续价值创造:数据产品能自主进行分析、洞察生成和结果整合,无需人工干预
  • 自主数据产品生态:每个数据集作为独立产品运行,具备自我管理和优化能力
  • 完整的数据产品目录:覆盖所有业务领域的数据产品,支持复杂业务场景
  • 5. 结语

GenAI驱动下的企业数据架构重塑不仅是技术升级,更是企业思维方式的革新。通过打通业务数据链条,构建业务领域知识系统,企业能够:

  • 显著降低跨部门数据获取与分析的时间成本
  • 提升数据驱动决策的准确性与时效性
  • 释放员工创新潜能,将精力从繁琐的数据收集转向高价值分析

在这一转型过程中,数据产品目录的建设尤为关键,它将企业专业知识与数据资产高效连接,支持AI系统”理解”企业数据的业务含义。

成功的企业数据架构重塑需要技术与业务的紧密协作,以及自上而下的战略支持。企业应根据自身数字化成熟度,选择适当的起点,循序渐进地建设现代企业数据架构,使GenAI真正成为业务创新的强大引擎。

人类和技术的结合始终是推动企业向前发展的核心。GenAI不是要取代人的价值,而是通过重塑数据架构,释放人更多的创造力,共同开启企业智能化的新篇章。

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用


DPDK技术深度解析:AWS上的Kernel Bypass网络优化基础

AWS账单代付阅读(0)

亚马逊AWS官方博客

DPDK技术深度解析:AWS上的Kernel Bypass网络优化基础

概述

在高频交易、实时游戏、视频流处理等对延迟极其敏感的应用场景中,传统的Linux网络栈往往成为性能瓶颈。本文将深入介绍DPDK (Data Plane Development Kit) 技术原理,以及如何在AWS环境中实现kernel bypass网络优化。

什么是Kernel Bypass?

传统网络栈的性能问题

先说说传统Linux网络栈为什么慢。当一个数据包到达网卡时,会触发硬件中断,CPU停下手头的活去处理这个中断。内核的网络协议栈接管后,要做一堆事情:解析协议头、路由查找、防火墙规则检查等等。处理完之后,数据还得从内核空间拷贝到用户空间,应用程序才能拿到。

这整个流程里,性能损耗主要来自几个地方:

首先是内存拷贝。数据包从网卡DMA到内核缓冲区,然后又要拷贝到应用程序的缓冲区。每次拷贝都要消耗CPU周期和内存带宽。

其次是上下文切换。内核态和用户态之间的切换本身就有开销,要保存和恢复寄存器状态、刷新TLB。如果数据包很小、频率很高,这个开销就会变得很明显。

还有中断处理的延迟。硬件中断虽然优先级高,但调度本身需要时间。在高负载下,中断风暴会让CPU疲于应付,真正的业务逻辑反而得不到执行。

对于普通应用来说,这些开销可以接受。但如果你在做高频交易,或者处理实时视频流,每多一微秒的延迟都可能是问题。

Kernel Bypass怎么解决这些问题

Kernel bypass的思路很直接:既然内核是瓶颈,那就绕过它。让应用程序直接操作网卡硬件,把数据包的接收和发送都放到用户空间来做。

这样做有几个好处。零拷贝是最直观的——数据包从网卡DMA到用户空间的内存,中间不经过内核,省掉了一次拷贝。轮询模式替代中断,应用程序主动去网卡上取数据包,而不是被动等中断通知。虽然轮询会占用CPU,但延迟更可控。

另外还可以做批量处理。一次从网卡取多个数据包,分摊掉一些固定开销。再配合CPU亲和性绑定,把网络处理线程固定在某个核心上,避免线程迁移导致的缓存失效。

当然,这不是没有代价的。轮询模式意味着要独占CPU核心,资源利用率会下降。而且绕过内核后,很多内核提供的功能(比如防火墙、流量控制)都得自己实现。所以kernel bypass更适合那种”我知道我在做什么”的场景,而不是通用解决方案。

DPDK技术深度解析

DPDK是什么

DPDK (Data Plane Development Kit) 最初由Intel主导开发并开源,目前由Linux基金会下的DPDK项目维护的用户态网络处理框架。简单说,它提供了在用户空间直接操作网卡的能力,把kernel bypass的理念落地成了可以用的代码库。

架构上看,DPDK分几层:最底层是PMD (Poll Mode Driver),这是用户态的网卡驱动,负责直接和硬件打交道。往上是DPDK的核心库,提供内存管理、队列操作、数据包处理等基础功能。最上层是你的应用程序,通过DPDK的API来收发数据包。

整个数据路径都在用户空间,不经过内核。这就是DPDK能做到低延迟的根本原因。

几个关键技术点

轮询模式驱动 (PMD)

PMD是DPDK的核心。传统驱动靠中断通知,PMD则是不停地轮询网卡的收发队列(RX/TX rings),看有没有新数据包。这听起来很浪费CPU,但在高吞吐场景下反而更高效——中断本身的开销、上下文切换的开销都省了。而且延迟更稳定,不会因为中断调度的不确定性而产生抖动。

PMD直接操作网卡的收发队列,数据包通过DMA写到用户空间的内存里,应用程序直接读取,真正做到了零拷贝。

内存管理

DPDK对内存管理做了很多优化。首先是用HugePages(大页内存),Linux默认的页大小是4KB,HugePages可以用2MB甚至1GB的页。这样能大幅减少TLB (Translation Lookaside Buffer) miss,因为同样大小的内存需要的页表项更少。

其次是内存池机制。DPDK启动时会预分配一大块内存,后续的数据包缓冲区都从这个池子里取,避免运行时的动态分配。动态分配不仅慢,还可能触发系统调用,破坏了用户态处理的初衷。

还有NUMA感知。在多路CPU的服务器上,每个CPU有自己的本地内存。DPDK会尽量让网卡、CPU、内存都在同一个NUMA节点上,避免跨节点访问带来的延迟。

CPU亲和性

DPDK通常会把网络处理线程绑定到特定的CPU核心上。这样做有两个好处:一是避免线程在不同核心间迁移,导致L1/L2缓存失效;二是可以配合CPU隔离(isolcpus),让这些核心专门用于网络处理,不被其他进程打扰。

在高性能场景下,DPDK还会用无锁数据结构(比如无锁环形队列)来避免多线程竞争。锁的开销在微秒级延迟的场景下是不可接受的。

AWS环境中的DPDK部署

1. 选择并启动合适的EC2实例

推荐实例类型

C7i系列:最新Intel处理器,优化的网络性能 C7a系列:AMD EPYC处理器,高性价比选择 C8g系列:ARM Graviton处理器,能效优化

实例规格建议

固定带宽实例(推荐生产环境)

c7a.8xlarge # 32 vCPU, 64GB RAM, 12.5Gbps网络

c7a.16xlarge # 64 vCPU, 128GB RAM, 25Gbps网络

c7a.32xlarge # 128 vCPU, 256GB RAM, 50Gbps网络

本篇文章中,以 c7a.8xlarge 实例为例,演示DPDK在Amazon linux 2023 上的部署以及测试流程。

步骤1:创建EC2实例

首先创建一台新的EC2,网络设置里指定特定子网(固定AZ),同时创建一个新的安全组。

步骤2:配置安全组规则

创建完成后,在新创建的安全组中添加一条新的入站规则,允许所有流量,允许源设置为安全组自身(方便后续测试)。

步骤3:创建并附加ENI网络接口

控制台中启动EC2完成后,再在控制台新建一个网络接口(ENI),注意子网选择和已启动的EC2一致,同时选择相同的安全组。此时实例上应该有两个网络接口,一个用于正常网络连接,另一个用于DPDK。

2. 系统环境准备

Amazon Linux 2023环境安装

切换好root权限

sudo -i

安装开发工具组和git

dnf groupinstall “development tools” -y

dnf install git -y

安装NUMA相关依赖

dnf install numactl numactl-devel -y

安装Python3和现代构建工具

dnf install python3-pip -y

pip3 install meson ninja pyelftools

更新环境变量

echo ‘export PATH=”/usr/local/bin:$PATH”‘ >> /etc/profile

source /etc/profile

内存优化及CPU隔离

编辑GRUB配置

vim /etc/default/grub

在 GRUB_CMDLINE_LINUX_DEFAULT 中追加以下参数(保持你现有的其他参数不变)

注意:c7a系列使用AMD处理器,使用idle=poll;Intel处理器使用intel_idle.max_cstate=0

例(AMD处理器 – c7a系列):

GRUB_CMDLINE_LINUX_DEFAULT=”… default_hugepagesz=1G hugepagesz=1G hugepages=4 isolcpus=1-3 nohz_full=1-3 rcu_nocbs=1-3 idle=poll …”

例(Intel处理器 – c7i系列):

GRUB_CMDLINE_LINUX_DEFAULT=”… default_hugepagesz=1G hugepagesz=1G hugepages=4 isolcpus=1-3 nohz_full=1-3 rcu_nocbs=1-3 intel_idle.max_cstate=0 …”

更新GRUB配置

grub2-mkconfig -o /boot/grub2/grub.cfg

reboot

重启之后执行

sudo -i

mkdir -p /mnt/huge_1gb

echo “nodev /mnt/huge_1gb hugetlbfs pagesize=1GB 0 0” | tee -a /etc/fstab

mount -a

检查是否生效

mount | grep huge

cat /proc/meminfo | grep Huge

3. DPDK编译安装部署

使用现代构建系统

下载DPDK 23.11.3 LTS版本

wget https://fast.dpdk.org/rel/dpdk-23.11.3.tar.xz

tar xJf dpdk-23.11.3.tar.xz

cd dpdk-stable-23.11.3/

使用meson构建系统(推荐)

meson setup build # x86_64架构

ARM架构使用:meson setup build -Dplatform=generic

编译和安装

cd build

ninja

ninja install

ldconfig

VFIO 驱动安装

推荐:使用VFIO驱动(现代化选择)

切换到home目录

cd ~

获取Amazon优化的VFIO驱动

git clone https://github.com/amzn/amzn-drivers.git

cd amzn-drivers/userspace/dpdk/enav2-vfio-patch

./get-vfio-with-wc.sh

加载VFIO模块

modprobe vfio

modprobe vfio-pci

echo 1 > /sys/module/vfio/parameters/enable_unsafe_noiommu_mode

网卡绑定与配置

切换到DPDK目录

cd ~/dpdk-stable-23.11.3

查看网卡状态,找到第二块网卡的接口名和PCI地址

usertools/dpdk-devbind.py –status

示例输出:

Network devices using kernel driver

===================================

0000:27:00.0 ‘Elastic Network Adapter (ENA) ec20’ if=enp39s0 drv=ena unused= *Active*

0000:28:00.0 ‘Elastic Network Adapter (ENA) ec20’ if=enp40s0 drv=ena unused=

关闭要绑定的网卡(替换为你的接口名)

ifconfig enp40s0 down

绑定到VFIO驱动(替换为你的PCI地址)

usertools/dpdk-devbind.py –bind=vfio-pci 28:00.0

再次查看状态,确认绑定成功

usertools/dpdk-devbind.py –status

验证绑定结果(替换为你的PCI地址)

lspci -v -s 28:00.0

4. DPDK基础测试与验证

环境变量配置

设置开发环境变量

export PKG_CONFIG_PATH=/usr/local/lib64/pkgconfig:$PKG_CONFIG_PATH

export LD_LIBRARY_PATH=/usr/local/lib64:$LD_LIBRARY_PATH

HelloWorld示例测试

安装libatomic库

dnf install libatomic -y

切换到DPDK示例目录

cd ~/dpdk-stable-23.11.3/examples/helloworld

编译示例程序

make

运行HelloWorld示例

./build/helloworld

预期输出:

EAL: Detected XX CPU cores

hello from core 0

hello from core 1

性能基准测试

切换到DPDK构建目录

cd ~/dpdk-stable-23.11.3/build

使用testpmd自带的icmp转发模式测试

./app/dpdk-testpmd -l 0-1 -n 4 –huge-dir=/mnt/huge_1gb –proc-type=primary — –interactive –forward-mode=icmpecho

在testpmd交互界面中,输入start启动转发

testpmd> start

预期输出:

icmpecho packet forwarding – ports=1 – cores=1 – streams=1

此时不要关闭,等待测试

测试完成后,输入stop和quit退出

testpmd> stop

testpmd> quit

为了验证DPDK的性能优势,我们需要对比普通网卡和DPDK管理网卡的延迟差异。

同一可用区启动另一台EC2实例作为测试客户端,绑定上与待测试EC2相同的安全组(方便通信)。

在测试客户端安装必要工具

sudo dnf install iputils fping python3-pip -y

sudo pip3 install numpy

注意:在运行延迟测试前,需要先在DPDK服务器上启动testpmd的icmp转发模式(参考上一节),确保DPDK网卡能够响应ping请求。

将下面延迟测试脚本和延迟分析代码,粘贴进测试客户端实例,同时添加执行权限。

*延迟测试脚本*

!/bin/bash

延迟测试脚本 latency_test.sh

使用方法:./latency_test.sh

配置IP地址(请根据实际情况修改)

NORMAL_IP=”172.31.37.195″ # DPDK服务器的普通网卡IP(第一块网卡)

DPDK_IP=”172.31.39.205″ # DPDK服务器的DPDK网卡IP(第二块网卡)

TEST_COUNT=100

生成时间戳用于文件命名

TIMESTAMP=$(date +%Y%m%d_%H%M%S)

清理旧的测试结果

rm -f normal_full.txt dpdk_full.txt normal_stats.txt dpdk_stats.txt

echo “开始延迟测试对比 – $TIMESTAMP”

echo “测试次数: $TEST_COUNT”

echo “”

测试普通网卡延迟(实时显示并保存)

echo “==========================================”

echo “测试普通网卡延迟 ($NORMAL_IP):”

echo “==========================================”

ping -c $TEST_COUNT $NORMAL_IP | tee normal_full.txt

grep “min/avg/max” normal_full.txt > normal_stats.txt

echo “”

echo “==========================================”

echo “测试DPDK网卡延迟 ($DPDK_IP):”

echo “==========================================”

ping -c $TEST_COUNT $DPDK_IP | tee dpdk_full.txt

grep “min/avg/max” dpdk_full.txt > dpdk_stats.txt

echo “”

echo “==========================================”

echo “生成统计分析报告…”

echo “==========================================”

生成详细统计报告

python3 analyze_latency.py

可选:保存历史记录

mkdir -p latency_history

cp normal_full.txt latency_history/normal_${TIMESTAMP}.txt

cp dpdk_full.txt latency_history/dpdk_${TIMESTAMP}.txt

echo “”

echo “历史记录已保存到 latency_history/ 目录”

*延迟分析脚本*

analyze_latency.py

import re

import numpy as np

def parse_ping_latencies(filename):

“””解析ping输出中的每个RTT值”””

latencies = []

with open(filename, ‘r’) as f:

for line in f:

匹配 “time=0.123 ms” 格式

match = re.search(r’time=([\d.]+) ms’, line)

if match:

latencies.append(float(match.group(1)))

return latencies

def calculate_stats(latencies):

“””计算详细统计信息,包括P90和P99″””

if not latencies:

return None

arr = np.array(latencies)

return {

‘min’: np.min(arr),

‘avg’: np.mean(arr),

‘max’: np.max(arr),

‘p50’: np.percentile(arr, 50),

‘p90’: np.percentile(arr, 90),

‘p99’: np.percentile(arr, 99),

‘stddev’: np.std(arr)

}

解析原始延迟数据

print(“正在解析延迟数据…”)

normal_latencies = parse_ping_latencies(‘normal_full.txt’)

dpdk_latencies = parse_ping_latencies(‘dpdk_full.txt’)

计算统计信息

normal_stats = calculate_stats(normal_latencies)

dpdk_stats = calculate_stats(dpdk_latencies)

输出结果

print(“\n延迟测试对比结果:”)

print(“=” * 70)

print(f”{‘指标’:<10} {'普通网卡(ms)':<18} {'DPDK网卡(ms)':<18} {'改善率':<12}")

print(“-” * 70)

if normal_stats and dpdk_stats:

metrics = [‘min’, ‘avg’, ‘p50’, ‘p90’, ‘p99’, ‘max’, ‘stddev’]

for metric in metrics:

normal_val = normal_stats[metric]

dpdk_val = dpdk_stats[metric]

improvement = ((normal_val – dpdk_val) / normal_val) * 100

print(f”{metric.upper():<10} {normal_val:<18.3f} {dpdk_val:<18.3f} {improvement:>9.1f}%”)

print(f”\n样本数量: 普通网卡={len(normal_latencies)}, DPDK网卡={len(dpdk_latencies)}”)

else:

print(“错误:无法解析延迟数据”)

预期测试结果

延迟测试对比结果:

==========================================================

指标 普通网卡(ms) DPDK网卡(ms) 改善率

———————————————————-

MIN 0.159 0.107 32.7%

AVG 0.186 0.140 24.4%

P50 0.186 0.140 24.7%

P90 0.196 0.149 24.0%

P99 0.207 0.161 22.1%

MAX 0.277 0.161 41.9%

STDDEV 0.012 0.008 38.3%

测试结论

从测试数据来看,DPDK的优化效果很明显。平均延迟从0.186ms降到0.140ms,降了24%左右。最大延迟从0.277ms降到0.161ms,标准差也从0.012降到0.008。 DPDK不仅能大幅降低延迟,还能显著提升延迟波动。

大实例NUMA优化策略

NUMA架构基础

NUMA(非统一内存访问)架构中,每个CPU有自己的本地内存。访问本地内存快(~100ns),访问远程内存慢(~200-300ns)。正确的NUMA配置可以避免2-3倍的内存访问延迟,在高负载场景下性能提升可达20-50%。

单NUMA环境(c7a.8xlarge、c7a.16xlarge等):本文前面的配置步骤已经足够,无需额外优化。 多NUMA环境(c7a.32xlarge及以上):如果你使用更大的实例,需要按以下步骤调整配置。

步骤1:查看NUMA拓扑和网卡位置

查看完整的NUMA拓扑

numactl –hardware

示例输出(c7a.32xlarge,2个NUMA节点,128 vCPU):

available: 2 nodes (0-1)

node 0 cpus: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63

node 0 size: 126165 MB

node 0 free: 125549 MB

node 1 cpus: 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127

node 1 size: 126285 MB

node 1 free: 125542 MB

node distances:

node 0 1

0: 10 32

1: 32 10

lscpu | grep NUMA

示例输出

NUMA node(s): 2

NUMA node0 CPU(s): 0-63

NUMA node1 CPU(s): 64-127

确认DPDK网卡在哪个NUMA节点(替换为你的PCI地址)

cat /sys/bus/pci/devices/0000:60:00.0/numa_node

输出:1(表示网卡在NUMA节点1)

步骤2:调整GRUB配置(需要重启)

根据步骤1的结果,调整GRUB配置。假设网卡在NUMA节点1(CPU 64-127),需要:

  • 增加HugePages数量(系统会自动平均分配到各NUMA节点)
  • 隔离网卡所在NUMA节点的部分CPU核心
  • 编辑GRUB配置

vim /etc/default/grub

修改 GRUB_CMDLINE_LINUX_DEFAULT,调整以下参数:

– hugepages=4 改为 hugepages=8(每个节点4GB,足够DPDK使用)

– isolcpus=1-3 改为 isolcpus=65-71(隔离NUMA1的8个核心用于DPDK,保留核心64给系统)

示例(AMD处理器 – c7a系列):

GRUB_CMDLINE_LINUX_DEFAULT=”… default_hugepagesz=1G hugepagesz=1G hugepages=8 isolcpus=65-71 nohz_full=65-71 rcu_nocbs=65-71 idle=poll …”

更新GRUB配置并重启

grub2-mkconfig -o /boot/grub2/grub.cfg

reboot

步骤3:重启后验证配置

注意:重启后需要重新执行以下操作(参考前面章节的具体命令):

  • 加载VFIO模块
  • 将网卡绑定到VFIO驱动
  • 设置环境变量(PKG_CONFIG_PATH和LD_LIBRARY_PATH)

验证HugePages配置:

验证HugePages已自动分配到各NUMA节点

cat /sys/devices/system/node/node*/hugepages/hugepages-[已去除电话]kB/nr_hugepages

输出示例(8个大页平均分配):

node0: 4

node1: 4

步骤4:运行DPDK应用时绑定NUMA节点

运行DPDK应用时,必须确保CPU、内存、网卡都在同一NUMA节点:

切换到DPDK目录

cd ~/dpdk-stable-23.11.3/build

使用numactl绑定到网卡所在的NUMA节点(假设是节点1)

numactl –cpunodebind=1 –membind=1 \

./app/dpdk-testpmd -l 65-67 -n 4 \

–socket-mem=0,2048 \

–huge-dir=/mnt/huge_1gb \

–proc-type=primary \

— –interactive –forward-mode=icmpecho

关键参数说明:

numactl –cpunodebind=1 : 绑定到NUMA节点1的CPU

numactl –membind=1 : 内存只从NUMA节点1分配

–socket-mem=0,2048 : NUMA0分配0,NUMA1分配2GB(避免跨节点访问)

-l 65-67 : 使用CPU核心65-67(NUMA1上的隔离核心,测试环境3个核心足够)

在testpmd交互界面启动

testpmd> start

步骤5:验证NUMA优化效果

在DPDK应用运行时,打开另一个终端验证:

查看进程的NUMA内存分配(确认内存都在节点1,节点0为0或很少)

numastat -p $(pidof dpdk-testpmd)

预期输出示例(c7a.32xlarge,2个NUMA节点,网卡在节点1):

Per-node process memory usage (in MBs) for PID xxx (dpdk-testpmd)

Node 0 Node 1 Total

—— —— —–

Huge [已去除电话]

Heap 0 0.64 0.64

Stack 0 0.13 0.13

Private 18.77 134.93 153.70

——- —— —— —–

Total 18.77 2183.70 2202.48

NUMA优化核心原则:

网卡在哪个NUMA节点,就把所有资源(CPU、内存、进程)都绑定到那个节点,避免任何跨NUMA节点的访问。这是多NUMA环境下获得最佳性能的关键。

总结

DPDK技术为AWS云环境中的高性能网络应用提供了强大的基础设施。通过深入理解kernel bypass原理、合理配置系统环境、选择适当的硬件和驱动,可以在云环境中实现接近裸机的网络性能。

本文介绍的DPDK基础知识和配置方法为后续的F-Stack等高级网络栈应用奠定了坚实基础。在下一篇文章中,我们将详细介绍如何基于DPDK构建和部署F-Stack高性能网络应用。

参考资源

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用


Amazon S3 Storage Lens 存储统计管理工具增加了性能指标,支持数十亿个前缀,并可导出到 S3 表

AWS账单代付阅读(0)

亚马逊AWS官方博客

Amazon S3 Storage Lens 存储统计管理工具增加了性能指标,支持数十亿个前缀,并可导出到 S3 表

今天,我们隆重宣布推出 Amazon S3 Storage Lens 存储统计管理工具的三项新功能,让您可以更深入地了解存储性能和使用模式。通过添加性能指标、支持分析数十亿个前缀以及直接导出至 Amazon S3 表类数据存储服务,您拥有了所需工具来优化应用程序性能、降低成本,并就 Amazon S3 存储策略作出数据驱动型决策。

新的性能指标类别

S3 Storage Lens 存储统计管理工具现在包括八个新的性能指标类别,可帮助识别和解决整个组织的性能限制。这些性能指标类别在组织、账户、存储桶和前缀级别提供。例如,该服务可帮助您识别存储桶或前缀中可能降低应用程序性能的小型对象。通过批处理小型对象以使用 Amazon S3 Express One Zone 存储类别来处理更高性能的小型对象工作负载,可以缓解这种情况。

要访问新的性能指标,您需要在新建 Storage Lens 存储统计管理工具控制面板或编辑现有配置时,启用 S3 Storage Lens 存储统计管理工具高级层级中的性能指标。

|

指标类别 |

详细信息 |

使用案例 |

缓解措施

|读取请求大小

|按天划分的读取请求大小(GET)分布

|识别包含会降低性能的小型读取请求模式的数据集

|小型请求:批量处理小型对象或使用 Amazon S3 Express One Zone 存储类来处理高性能小型对象工作负载

|写入请求大小

|按天划分的写入请求大小(PUT、POST、COPY 和 UploadPart)分布

|识别包含会降低性能的小型写入请求模式的数据集

|大型请求:并行处理请求、使用 MPU 或使用 AWS CRT

|存储空间大小

|对象大小分布

|识别包含会降低性能的小型对象的数据集

|小型对象大小:考虑捆绑小型对象

|并发 PUT 503 错误

|由于对同一对象进行并发 PUT 操作而导致的 503 错误数量

|识别采用会降低性能的并发 PUT 节流的前缀

|对于单一写入器,可修改重试行为或使用 Amazon S3 Express One Zone 存储类。对于多个写入器,可使用共识机制或 Amazon S3 Express One Zone 存储类

|跨区域数据传输

|在区域内跨区域传输的字节数和发送的请求数量

|发现因跨区域数据访问而导致的潜在性能下降和成本增加风险

|将计算与数据放在同一 AWS 区域内

|访问的唯一对象

|每天访问的唯一对象的数量或百分比

|识别经常访问一小部分对象的数据集。可以将这些数据集移至更高性能的存储层级以获得更佳性能

|请考虑将有效数据移至 Amazon S3 Express One Zone 存储类或其他缓存解决方案

|FirstByteLatency(现有 Amazon CloudWatch 指标)

|第一个字节延迟指标的每日平均值

|从收到完整请求到开始返回响应的每日每个请求的平均时间

|TotalRequestLatency(现有 Amazon CloudWatch 指标)

|总请求延迟的每日平均值

|从收到第一个字节到发送最后一个字节的每个请求所经过的每日平均时间

**工作原理

在 Amazon S3 控制台上,我选择 创建 Storage Lens 存储统计管理工具控制面板来新建控制面板。您也可以编辑现有的控制面板配置。然后,我配置一般设置,例如提供 控制面板名称状态和可选 标签。 然后,选择 下一步**。

然后,我选择

包括所有区域和包含所有存储桶,并指定要包含的区域和存储桶来定义控制面板的范围。

我在 Storage Lens 存储统计管理工具控制面板配置中选择加入

高级层级,选择 性能指标,然后选择 下一步

然后,我选择

前缀聚合作为额外的指标聚合,接着将其余信息保留为默认值,然后再选择 下一步

我选择

默认指标报告,然后选择 通用存储桶作为存储桶类型,然后选择我的 AWS 账户中的 Amazon S3 存储桶作为 目标存储桶。我将其余信息保留为默认值,然后选择 下一步

启用后,我将直接在 Storage Lens 存储统计管理工具控制台控制面板中收到每日性能指标。还可以选择将 CSV 或 Parquet 格式的报告导出至账户中的任何存储桶或发布到 Amazon CloudWatch。性能指标每日汇总和发布,并将在以下多个级别提供:组织、账户、存储桶和前缀。在此下拉菜单中,我为

指标选择了“并发百分比 PUT 503 错误”,为 日期范围选择了“过去 30 天”,为 前 N 个存储桶选择了“10”。

并发 PUT 503 错误计数指标跟踪对同一对象同时执行 PUT 操作所生成的 503 错误的数量。节流错误可降低应用程序性能。对于单个写入器,修改重试行为或使用更高性能的存储层级,例如 Amazon S3 Express One Zone 存储类来缓解并发 PUT 503 错误。对于多个写入器的情境,使用共识机制来避免并发 PUT 503 错误,或使用性能更高的存储层级,例如 Amazon S3 Express One Zone 存储类。

对 S3 存储桶中的所有前缀进行全面分析

S3 Storage Lens 存储统计管理工具现在支持通过新的

扩展前缀指标报告分析 S3 存储桶中的所有前缀。此功能消除了先前限制分析的限制,即仅限于满足 1% 大小阈值和最大深度为 10 个级别的前缀。现在,无论大小或深度如何,您都可以跟踪每个存储桶多达数十亿个前缀,从而以最精细的前缀级别进行分析。

扩展前缀指标报告包括所有现有的 S3 Storage Lens 存储统计管理工具指标类别:存储使用情况、活动指标(请求和传输的字节数)、数据保护指标和详细的状态代码指标。

如何开始使用?

我按照

工作原理部分中概述的相同步骤来创建或更新 Storage Lens 存储统计管理工具控制面板。在控制台的步骤 4 中,选择导出选项时,可以选择新的 扩展前缀指标报告。此后,我可以将扩展前缀指标报告以 CSV 或 Parquet 格式导出至我账户中的任何通用存储桶,以便高效查询我的 Storage Lens 存储统计管理工具数据。

**注意事项

此增强功能解决了组织需要对整个前缀结构进行精细了解的情境问题。例如,您可以识别分段上传不完整的前缀以降低成本,跟踪整个前缀结构在加密和复制要求方面的合规性,并在最精细级别检测性能问题。 将 S3 Storage Lens 存储统计管理工具指标导出至 S3 表类数据存储服务

S3 Storage Lens 存储统计管理工具指标现在可以自动导出至 S3 表类数据存储服务,这是 AWS 上的一项完全托管式功能,内置 Apache Iceberg 支持。利用这种集成,每天可自动向 AWS 托管的 S3 表类数据存储服务交付指标,便于立即查询,无需额外的处理基础设施。 **如何开始使用?

首先在控制台上按照步骤 5 中概述的流程进行操作,在控制台上选择导出目标。这次,我选择

扩展前缀指标报告。除了通用存储桶外,我还选择 表存储桶

新的 Storage Lens 存储统计管理工具指标将导出至 AWS 托管式存储桶

aws-s3 中的新表中。

我选择

expanded_prefixes_activity_metrics 表来查看扩展前缀报告的 API 使用量指标。

我可以在 Amazon S3 控制台上预览该表,也可以使用 Amazon Athena 来查询该表。

**注意事项

**S3 表类数据存储服务与 S3 Storage Lens 存储统计管理工具的集成,简化了使用熟悉的 SQL 工具和 AWS 分析服务(例如 Amazon Athena、Amazon QuickSight、Amazon EMR 和 Amazon Redshift)的指标分析流程,且无需数据管道。这些指标会自动整理以实现最佳查询,并提供满足您需求的自定义保留和加密选项。

这种集成支持跨账户和跨区域分析、自定义控制面板创建以及与其他 AWS 服务的数据关联。例如,您可以将 Storage Lens 存储统计管理工具指标与 S3 Metadata 相结合,以分析前缀级别的活动模式,并识别前缀中包含冷数据的对象,这些对象符合转换到低成本存储层级的条件。

对于代理式人工智能工作流程,您可以使用自然语言,通过 S3 表类数据存储服务 MCP 服务器查询 S3 表类数据存储服务中的 S3 Storage Lens 存储统计管理工具指标。代理可以提出诸如“上个月哪个存储桶增长最快?”之类的问题或者“按存储类别向我显示存储成本”,并从您的可观测性数据中获得即时洞察。

**现已推出

**所有三项增强功能均已在目前提供 S3 Storage Lens 存储统计管理工具的所有 AWS 区域(中国区域和 AWS GovCloud(美国)除外)推出。

这些功能包含在 Amazon S3 Storage Lens 存储统计管理工具高级层级中,除标准高级层级定价外,不收取额外费用。对于 S3 表类数据存储服务导出,您只需为 S3 表类数据存储服务的存储、维护和查询付费。导出功能本身不收取额外费用。

要了解有关 Amazon S3 Storage Lens 存储统计管理工具性能指标、对数十亿个前缀的支持以及导出至 S3 表类数据存储服务的更多信息,请参阅《Amazon S3 User Guide》。有关定价的详细信息,请访问 Amazon S3 定价页面。


使用Amazon Quick Suite定制成本分析智能体

AWS账单代付阅读(8)

亚马逊AWS官方博客

使用Amazon Quick Suite定制成本分析智能体

概述

在云计算时代,成本管理已成为企业优化云资源使用的关键环节。AWS提供了强大的成本分析工具,而Amazon Quick Suite凭借其内置的生成式AI能力,让成本分析变得更加智能和直观。本文将详细介绍如何使用Quick Suite构建一个智能化的AWS成本分析系统,让您能够通过自然语言提问的方式快速获取成本洞察。

什么是** **Amazon Quick Suite** **?

Amazon Quick Suite是AWS推出的综合性、生成式AI驱动的商业智能平台,是QuickSight的新一代替代产品。它将传统的商业智能能力与现代AI技术深度整合,无需机器学习专业知识即可使用。Quick Suite包含五个集成能力:

Amazon Quick Sight:数据可视化核心组件 Amazon Quick Flows:工作流自动化 Amazon Quick Automate:流程优化 Amazon Quick Index:数据发现 Amazon Quick Research:综合分析

📚

参考文档:What is Amazon Quick Suite?

Quick Suite** **内置** **AI** **能力的优势

Quick Suite将生成式AI能力作为产品的核心功能内置其中,提供:

自然语言查询:使用日常语言提问,无需编写SQL或复杂查询 智能洞察生成:自动分析数据并提供可操作的建议 上下文理解:在对话中保持上下文,支持连续提问 可视化建议:自动推荐最适合的图表类型 AI 驱动的数据探索:通过对话式交互发现数据洞察

📚

参考文档:Simplify AWS Cost Data Analysis with Amazon Q in QuickSight

架构概览

关键组件说明AWS Cost & Usage Report (CUR):提供详细的成本和使用数据 Amazon S3:存储导出的成本数据 SPICE 引擎:Quick Suite的内存计算引擎,提供超快查询性能 内置 AI 引擎:Quick Suite集成的生成式AI能力,支持自然语言交互

📚

参考文档:AWS Cost & Usage Report Features

配置** **Quick Suite

前提条件

在开始之前,您需要:

  • 一个AWS账户
  • Quick Suite Enterprise Edition订阅
  • 适当的IAM权限
  • 步骤** **1** **:启用** **Quick Suite** **账户

首先需要注册Quick Suite账户。Quick Suite提供两种订阅方案,根据您的需求选择合适的版本。

定价说明

Quick Suite采用三部分定价模式:

用户订阅费用专业版:$20/用户/月

  • 包含访问聊天代理、空间、Quick Sight、Quick Research和Quick Flows
  • 包含2个座席小时(Flows)+ 2个研究座席小时(Research)

企业版:$40/用户/月

  • 包含专业版所有功能
  • 额外包含创建仪表板、使用Quick Automate等高级功能
  • 包含4个座席小时(Automate/Flows)+ 4个研究座席小时(Research)

Quick Index 存储费用

  • 前50MB免费(约5000份文档)
  • 超出部分:$1/MB/月

基础设施费用

  • $250/月/账户(固定费用)

免费试用:新客户可享受30天免费试用,最多25名用户,免基础设施费用。

📚

参考文档:Amazon Quick Suite Pricing

步骤** **2** **:配置用户权限

Quick Suite提供两种订阅类型,每种订阅提供不同的功能权限:

订阅选择建议: – 专业版:适合需要查看数据、使用AI分析但不需要创建仪表板的用户 – 企业版:适合需要创建和管理仪表板、数据集和自动化的高级用户

在Quick Suite控制台中确认用户权限配置:

确认用户权限

步骤** **3** **:启用** **SPICE** **容量

SPICE(Super-fast, Parallel, In-memory Calculation Engine)是Quick Suite的内存计算引擎,为数据分析提供极快的查询性能。

SPICE 的优势: – 超快查询响应时间 – 支持高达20亿行数据 – 自动数据压缩 – 并行处理能力

在Quick Suite配置中启用SPICE容量:

启用SPICE容量

SPICE 定价: – 按GB计费 – 每个账户有10GB免费额度 – 超出部分按使用量付费

📚

参考文档:Amazon QuickSight supports 2B row SPICE dataset

准备数据集

步骤** **1** **:配置** **AWS Cost & Usage Report

AWS Cost & Usage Report (CUR)是成本分析的数据源,它提供最详细的成本和使用信息。

CUR 的特性: – 支持小时级、日级、月级粒度 – 包含资源级别的详细信息 – 支持成本分摊数据 – 可导出为CSV或Parquet格式 最佳实践配置启用小时级粒度:获取更详细的成本数据 包含资源 ID:追踪到具体资源 自动刷新:确保数据及时更新 使用GZIP 格式:兼容Quick Suite Dataset

数据流程时序图

📚

参考文档:Configure detailed information sources

步骤** **2** **:创建数据源

在Quick Suite中创建数据源,连接到存储在S3中的成本数据。

数据导出设置

配置数据导出类型和目标位置:

数据导出设置

AWS Data Exports支持三种导出类型:

数据导出类型

Standard Data Export:标准数据导出,灵活自定义 Cost and Usage Dashboard:预构建的成本和使用仪表板 Legacy Data Export:传统格式支持 推荐选择:同时选择Cost and Usage Dashboard类型和Standard Data Export类型,前者自带丰富的预制仪表板,方便监控成本大盘,后者的支持更丰富的数据字段,包含最关键的成本标签字段,方便构建分析Topic。

Manifest** **文件位置

Manifest文件是连接S3数据和Quick Suite的关键,它描述了数据文件的位置和结构:

Manifest S3位置

Manifest 文件的作用: – 指向实际的数据文件 – 定义数据架构 – 支持增量更新 – 处理分区数据

📚

参考文档:Understanding export delivery

配置新数据源

在Quick Suite中创建新的数据源:

创建数据源

配置数据源连接参数:

新数据源配置

关键配置项: – Bucket:存储CUR数据的S3存储桶名称 – Manifest File Key:Manifest.json文件的S3路径 – Role ARN:Quick Suite访问S3的IAM角色(可选,用于覆盖账户级角色)

步骤** **3** **:创建数据集

从配置好的数据源创建数据集:

从数据源创建数据集

数据集配置建议选择导入模式SPICE:将数据导入内存,查询速度最快(推荐) Direct Query:直接查询源数据,适合超大数据集 配置刷新计划

  • Enterprise Edition支持小时级刷新
  • 根据成本数据更新频率设置

数据准备

  • 添加计算字段
  • 配置数据类型
  • 设置过滤器
  • 步骤** **4** **:验证数据集

创建完成后,可以在Quick Suite仪表板中查看数据:

Quick Suite仪表板

定制成本分析主题

Quick Suite通过”主题(Topics)“功能让AI理解您的数据。主题是数据集的自然语言友好层,让内置AI能够准确理解和回答问题。

什么是主题?

主题是Quick Suite中的一个核心概念,它: – 将数据集转换为可查询的知识库 – 定义字段的同义词和语义 – 配置命名实体和关系 – 优化自然语言查询的准确性

步骤** **1** **:创建新主题

在Quick Suite中为成本数据集创建主题:

新建主题

主题创建要点: – 名称:使用描述性名称,如”AWS成本分析” – 描述:说明主题涵盖的数据范围 – 数据集选择:选择之前创建的成本数据集

步骤** **2** **:选择数据字段

选择要包含在主题中的数据字段:

选择LineItem数据字段

字段选择策略必选字段

  • 时间维度(日期、月份)
  • 成本字段(未混合成本、摊销成本)
  • 服务名称
  • 账户ID
  • 区域

可选字段

  • 资源ID
  • 使用类型
  • 操作
  • 标签

字段优化

  • 为每个字段启用”Include”选项
  • 添加业务相关的同义词
  • 设置默认聚合方式
  • 步骤** **3** **:添加资源标签和别名

为字段添加同义词和别名,让Quick Suite的AI更好地理解您的业务术语:

添加资源标签和别名

同义词配置示例

|字段名

|同义词

|line_item_unblended_cost

|成本、费用、花费、支出

|product_servicename

|服务、AWS服务、产品

|line_item_usage_account_id

|账户、账号、Account

|product_region

|区域、地区、Region

|line_item_usage_type

|使用类型、用量类型

命名实体配置

命名实体帮助AI识别特定的值,例如: – 服务名称:EC2、S3、Lambda、RDS等 – 区域:us-east-1、ap-southeast-1等 – 账户别名:生产账户、开发账户等

步骤** **4** **:构建验证问题

创建一组验证问题来测试主题配置:

构建验证问题

推荐的验证问题类型基础查询

  • “上个月的总成本是多少?”
  • “本月到目前为止花费了多少?”

服务分析

  • “哪个服务的成本最高?”
  • “EC2的月度成本趋势如何?”

账户分析

  • “各账户的成本分布是什么?”
  • “生产账户和开发账户的成本对比?”

区域分析

  • “哪个区域的成本增长最快?”
  • “us-east-1和ap-southeast-1的成本对比?”

趋势分析

  • “过去6个月的成本趋势?”
  • “为什么上周成本突然增加?”
  • 步骤** **5** **:测试** **AI** **生成的问题

Quick Suite的AI会自动生成建议问题,测试这些问题以验证配置:

尝试AI生成的问题进行测试

AI 生成问题的优势: – 发现您可能没想到的查询角度 – 验证主题配置的完整性 – 为用户提供问题模板

步骤** **6** **:查看定制化问题结果

测试定制化问题并查看结果:

定制化问题的结果

结果验证要点: – ✅ 答案是否准确 – ✅ 可视化类型是否合适 – ✅ 数据是否完整 – ✅ 响应时间是否可接受

如果结果不理想,可以: 1. 调整字段同义词 2. 添加更多命名实体 3. 优化字段聚合设置 4. 标记问题为”已验证”以改进AI学习

高级使用场景

场景 1 :发现成本驱动因素 问题示例: – “哪些使用类型导致了Amazon服务成本最高?” – “S3存储成本的主要组成部分是什么?”

Quick Suite的AI会: 1. 分析成本数据 2. 按使用类型分组 3. 生成可视化图表 4. 提供优化建议

场景 2 EC2 实例成本分析 问题示例: – “按购买选项(按需、预留、Spot)分析EC2实例详情” – “哪种购买选项最具成本效益?”

Quick Suite的AI会: 1. 区分不同购买选项 2. 计算每种选项的成本 3. 比较成本效益 4. 建议优化策略

场景 3 :成本波动调查 问题示例: – “为什么上周成本增加了30%?” – “哪个服务导致了成本激增?”

Quick Suite的AI会: 1. 对比不同时间段 2. 识别异常变化 3. 定位具体服务或资源 4. 提供根因分析

场景 4 :创建执行摘要 问题示例: – “生成本月成本执行摘要” – “创建成本优化数据故事”

Quick Suite的AI会: 1. 汇总关键指标 2. 识别趋势和模式 3. 生成可分享的报告 4. 提供行动建议

📚

参考文档:Simplify AWS Cost Data Analysis with Amazon Q

总结

通过本文的详细指导,您已经学会了如何:

  • 选择合适的Quick Suite订阅并配置内置AI功能
  • 设置AWS Cost & Usage Report数据导出
  • 创建和优化数据集
  • 定制成本分析主题
  • 使用自然语言进行智能成本分析
  • 相关资源

📚

官方文档**: – Amazon Quick Suite Documentation – AWS Cost Management User Guide – Quick Suite Features ***前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用


使用 Kiro 规范驱动开发加速数据质量建设

AWS账单代付阅读(7)

亚马逊AWS官方博客

使用 Kiro 规范驱动开发加速数据质量建设

业务背景

无论是传统行业企业的数据运营分析,互联网企业的数据行为分析,再到 AI 时代的领域数据上下文知识传递,精确的参数知识微调,数据质量一直都是极为重要的一环。在与企业客户的交流中逐渐发现,数据质量已从“技术小问题”升级为业务危机。近年来,即便企业在大数据和工具上投入巨大,脏数据、重复数据和过期数据仍广泛存在,直接威胁到 AI 项目、客户运营和报表的可靠性。

数据管道的质量受数据本身、基础设施、生命周期管理、开发部署和处理流程等多维因素影响,其中错误数据类型、清洗阶段问题和兼容性问题尤为常见,导致管道不稳定、数据不可用。数据从需求到落地阶段的语义表达不一致,实际数据管道中的重复和不一致记录、缺乏前瞻性的清洗与治理、依赖人工排查、以及在合规和监管报送上因格式不一致而出错,这些问题同时带来时间浪费和修复成本增加。糟糕的数据质量会显著拉低生产力、增加成本、拖慢决策与创新节奏;越来越多企业开始建立数据质量指标,但度量和落地仍不统一,需要更系统化的治理与持续监测方案。从业务方视角,需求已经从“偶尔清洗一次”转向“可观测、可度量、可追责”的持续数据质量管理:包括自动监控关键表、快速定位问题源头、量化质量分数,以及与数据产品和 AI 应用的 SLA 挂钩。

技术背景

数据质量的管理,与传统软件开发的需求-设计-实施 -监控的逻辑一致,规范驱动开发与数据质量设计高度契合。在数据需求产生阶段即需要对质量有一定的认知从而在后续流程中形成系统性的监控。

回顾软件开发的历史,1992 年,Bertrand Meyer 在 IEEE Computer 上发表《Applying “Design by Contract”》,系统提出“设计合约”思想,用前置条件、后置条件和不变量来精确定义软件组件的行为,被视为规范驱动开发在面向对象范式中的起点之一。1980–2000 年代,Clarke、Emerson、Sifakis 等人发展并[已去除[已去除推广语]语]模型检查,将系统行为用 Kripke 结构和时序逻辑形式化建模,并用自动算法验证实现是否满足给定规格,其工作在 2007 年获得 ACM 图灵奖。2010 年代,随着 REST API 和微服务兴起,OpenAPI 等规范成为事实标准,推动“spec-first API development”。将 API 规范作为单一事实源,通过工具生成服务器代码、客户端 SDK、文档和契约测试,并在 CI/CD 中校验规范变更的兼容性,形成“规范 → 实现/测试 → 网关策略”的工程闭环。 2020–2023 年,随着大模型和代码生成工具出现,AI 开始参与编码与测试生成,但“纯提示驱动(prompt-based)”开发暴露出难以复现、行为飘移和缺乏治理的痛点。2024–2025 年,业界开始明确提出“spec-driven development with AI”的模式,将大模型从“自由写代码”转向“在规范约束下生成实现和测试”。与此同时,关于数据管道质量和数据质量管理的研究指出:要保证复杂数据系统可控,需要明确定义可检查的数据契约和不变量,使质量规则能够被自动执行和回归验证。

Amazon Kiro 是一款面向“规范驱动开发”的 agentic AI IDE:它把多轮对话和零散需求收敛成结构化规范(如 requirements、design、tasks 等文档),然后由多个 AI 代理在这些规范约束下规划、编写和重构代码,让“写代码”变成“围绕规范运营的软件生产线,而非随意聊天写脚本”。 通过 Steering 文件和 MCP 集成把团队规范、外部 API、数据库和项目系统统一接入同一工作空间,以实现从单人原型到多人协作的一致开发体验。在此基础上,2025 亚马逊云科技 re:Invent 发布的 property-based testing 能力进一步把“规范”变成可执行的正确性度量,Kiro 会从规范中自动提取“对任意输入都应成立的性质”,生成大规模随机测试用例来验证代码是否始终满足这些属性,从而不再只依赖少量示例用例。

本文将围绕规范驱动开发理念,在典型数仓场景下,利用 Kiro AI IDE 使用 Redshift MCP 进行数据采样及血缘探索,自动生成Amazon Glue Data Quality 规则及脚本,并且通过 Glue MCP 部署质量任务,生成质量报告,旨在帮助数据工程团队快速落地质量管理。

方案架构

  • 使用 Kiro 的 Spec-Driven 开发,编写 spec 说明文档;
  • 添加必要前置信息,如 Redshift 集群及表范围、Glue connection 等,Kiro 首先通过 Redshift MCP 自动探索表格式及通过物化试图DDL 推断表逻辑,自动生成 Glue Data Quality 规则;
  • Kiro 将自动生成设计及任务清单文档,并自我检查每个任务输出内容,也可经过人为验证后执行任务;
  • Kiro 通过 Glue MCP生成 data quality 任务作业,运行后获得Json 格式的质量报告存储到 S3 存储,通过集成报表持续监控数据质量。
  • 实践过程

  • 前置条件

以典型的建立在Redshift的数仓为例,有 ODS、DWD、ADS三层数仓模型,并且通过物化试图实现分层逻辑。数据源是来自于 RDS Mysql 。Redshift 与 RDS 都分别在 Glue 创建了 Connections。

  • Kiro 工程介绍(参考工程见 Github)

.kiro/、

settings/、

mcp.json等,是 Kiro 在项目里的配置与 MCP 集成入口,本项目已经把 Redshift、Glue 等外部系统通过 MCP 暴露给 IDE 使用。

specs/redshift-data-quality/目录下有

design.md、

requirements.md、

tasks.md等,典型的 Kiro spec-driven 结构:需求、设计、任务按文档拆开管理。

sample-job/中的 Python 脚本,如

rds-redshift-row-count-diff.py、

redshift-table-base-check.py,是根据这些 spec 生成或演进的 Glue/Redshift 作业样例。

  • 示例配置文件及需求说明

配置文件:

需求清单

  • DQ Job Generator:为数据质量校验生成可执行 Glue Spark 作业的系统。
  • DQDL:由 AWS Glue Data Quality 的 EvaluateDataQuality 变换使用的数据质量定义语言。
  • EvaluateDataQuality:在 DataFrame 上应用 DQDL 规则的 AWS Glue 变换。
  • Single-Table Check:对单个表字段进行的数据质量校验(完整性、唯一性、取值范围等)。
  • Cross-Table Reconciliation:对关联表之间的一致性进行校验(行数、聚合值等)。
  • Cross-Source Comparison:对 RDS 源表与 Redshift ODS 表之间的数据同步情况进行校验。
  • Materialized View:在 Redshift 中存储查询结果并用于定义数据血缘的物化视图。
  • Data Layer:数据仓库中的分层结构(ODS/DWD/DWS/ADS 等)。
  • MCP Tool:用于与 AWS 服务交互的 Model Context Protocol 工具。
  • Configuration File:由用户提供的、用于描述数仓结构的 markdown 配置文件。
  • 示例设计文档解析
  • Configuration Parser(配置解析器)
  • 解析用户提供的 markdown 配置文件。
  • 从中抽取连接信息(Redshift / RDS 等)、需要校验的表及血缘关系。
  • 解析 RDS 源与 Redshift 之间的对账/比对规则,为后续跨源核对做输入。
  • Redshift Analyzer(Redshift 分析器,经 MCP 调用)
  • 通过 MCP 工具调用 Redshift,使用

execute_query查询系统表或视图,获取目标表的元数据(列名、类型、分布键、分区等)。

  • 通过

SHOW VIEW等命令抽取物化视图的 DDL,用来理解每张 ODS/DWD 表背后的计算逻辑和数据血缘。

  • 解析这些 SQL,构建表之间的血缘关系,为单表校验、跨表对账和跨源对比提供上下游信息。
  • 通过 MCP 工具调用 Redshift,使用
  • Glue Job Script Generator(Glue 作业脚本生成器)

  • 根据上一步得到的表结构和血缘信息,生成单表数据质量检查作业脚本(完整性、唯一性、范围等)。
  • 生成跨表对账脚本(如事实表与汇总表的行数或聚合值比对)。
  • 生成跨源对比脚本(RDS 源表与 Redshift ODS 表的同步一致性检查)。
  • 在脚本中内联 DQDL 规则集( Glue Data Quality 的规则定义),并尽量沿用既有的 sample job 模式,便于复用和维护。
  • Glue Connection Manager(Glue 连接管理器,经 MCP 调用)

  • 创建或校验到 Redshift 的 Glue connection,确保 Glue 作业可以访问 Redshift。
  • 创建或校验到 RDS 的 Glue connection,用于跨源比较时访问源库。
  • 对这些连接执行连通性测试,提前发现权限或网络问题,而不是等作业运行时才失败。
  • Glue Job Deployer(Glue 作业部署器,经 MCP 调用)

  • 将生成好的作业脚本上传到 S3,作为 Glue 作业的代码位置。
  • 基于配置创建 Glue 作业(包括 IAM 角色、超时时间、资源配置、参数等)。
  • 将作业与前面创建好的 Glue connections 关联起来,保证运行环境就绪。
  • 上报部署状态(成功/失败、作业名称、版本等),为后续编排和监控提供依据。
  • 示例任务清单示例
  • Glue data quality 任务解析

部分生成规则说明

总结与展望

通过使用 Kiro 的 Spec-Driven 开发,将需求、设计和任务固化为结构化的 spec 文档,再结合 Redshift 与 Glue Data Quality 的能力,把数据质量规则从“零散脚本”升级为“可维护的规格资产和自动化流水线”。这种方式一方面利用 Kiro 的上下文管理能力,根据 Redshift 集群信息、表范围、Glue 连接等前置信息自动探索表结构和物化视图 DDL,从而半自动生成 Glue Data Quality 规则;另一方面则通过设计文档与任务清单让人类在关键节点参与审查,形成 AI 与工程团队协作的闭环。结合 Glue 作业执行与质量报告集成报表,可以将数据质量监控从“事后排查”转向“持续观测与预警”,为数据产品和下游 AI 应用提供更稳定的基础。后续将业务规则和指标血缘显式写入 spec,让 Kiro 在生成规则时能覆盖跨表关系和关键 KPI 的不变量,而不仅限于单表校验;引入property-based testing 的思路,对生成的规则和脚本自动构造大量测试场景,验证“在各种数据分布下规则仍然合理”,减少调试时间与误报率。

生成实践建议,从小范围试点开始:选 1–2 个关键业务域(如订单、支付或核心用户行为),约束在有限的 Redshift schema 和 Glue connection 范围内实践 spec-driven 规则生成和执行,把流程打通后再扩展到更多表和域。规格模板标准化:为数据质量场景设计统一的 spec 模板(域/表/字段、质量维度、阈值策略、预期分布、上游下游关系),在 Kiro 中复用模板,避免每次从零开始描述需求。引入多层校验与快照:对 Kiro 生成的规则和脚本,建立“快照对比 + 小样本回归 + 影子运行”的三层校验,将大部分问题拦在开发和预发布阶段,而不是上线后靠人工排错。把运行数据反哺规格:定期分析 Glue Data Quality 报告中的异常模式、误报情况和长期无异常的规则,将这些信息回写到 spec 和生成器配置中,例如调整阈值、补充前置条件、废弃低价值规则,让规格和生成逻辑随业务与数据分布演进,而不是“一次性写完不再更新”。工具链与团队配合:在团队层面明确“规格是单一事实源”的共识,把 spec 文档纳入代码评审和发布流程,要求新增或修改规则必须先改 spec 再生成实现,同时为数据工程师提供 Kiro 与 Glue 的操作指南和最佳实践示例,降低技术负担和使用门槛。

推荐阅读

Kiro and the future of AI spec-driven software development

Spec-driven development with AI: Get started with a new open source toolkit

Stop Chatting, Start Specifying: Spec-Driven Design with Kiro IDE

Bertrand Meyer, “Applying ‘Design by Contract’”, IEEE Computer, 1992

Amazon Glue Data Quality Best Practices

Why Spec-Driven Development Will Define the Next Generation of Elite Engineers

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用


全新 Amazon Route 53 Global Resolver:统一、安全的全球 DNS 解析服务(预览版)

AWS账单代付阅读(7)

亚马逊AWS官方博客

全新 Amazon Route 53 Global Resolver:统一、安全的全球 DNS 解析服务(预览版)

亚马逊云科技最近推出了一款填补架构空白的新产品:Amazon Route 53 Global Resolver。它的定位是“统一、安全、可全球访问的 DNS 解析端点”,这标志着 AWS 在 DNS 解析服务链路上提供了统一的全球边缘入口。您可以通过官方博客和开发者指南了解这款产品的基础功能和使用方法。

对于许多关注基础设施架构的读者而言,Global Resolver 带来的可能性是令人兴奋的。然而,不是所有场景都需要一个全球性的解析器。Global Resolver 的真正意义,在于它为特定需求提供了精准的解决方案。

因此我们撰写了这篇补充文章,旨在:

场景甄别:明确 Global Resolver 究竟适用于哪些场景,哪些用户能从中获得显著价值。 落地实践:分享在实际配置,测试,和使用过程中的一些注意事项。

接下来,我们将从两个最本质且需求迥异的场景出发,来解析 Global Resolver 的定位和价值,这份区分也将有助于您判断它是否是您当前架构的理想选择。

Global Resolver 的应用场景

面向企业网络(To Business):混合云与全球骨干网的 DNS 架构统一

对于跨国企业或需要处理大量混合云流量的组织来说,Global Resolver 的核心价值在于帮助它们在复杂的多网络环境下构建一条稳健、安全且策略统一的 DNS 链路。

随着企业 IT 资产逐渐分散在本地数据中心、企业网络与多个 AWS 云区域之间,DNS 已成为架构中最容易产生碎片化和潜在脆弱的环节。本地网络有自身的解析规则,云上 VPC 拥有私有 Hosted zone,它们既独立又需要在某些关键点互通。同时,企业必须维持严格的内部访问控制、合规监测,以及对关键业务域名的安全策略过滤。

Global Resolver 利用 AWS 的全球 Anycast 网络,让企业用户在任意地理位置,都能以完全一致的方式访问所有 DNS 命名空间——无论是公共互联网域名,还是 AWS 上的 Public 与 Private Hosted Zones。它通过集成 Route 53 DNS Firewall 等安全服务,提供了统一的过滤和安全策略执行点。此外,由于 DNS 流量需经过公网传输,DNS over HTTPS (DoH) 和 DNS over TLS (DoT) 协议还可以保护企业的敏感应用的 DNS 请求不会被窃听与篡改,确保最后一公里的安全。

简而言之,如果您的组织正在构建一个跨区域、多网络的混合云基础设施,并希望将 DNS 升级为一个统一、高度可管理且具有全球一致性体验的基础设施层,那么 Global Resolver 是高度契合您的需求的。

面向终端应用(To Client):规避 DNS 污染,确保全球一致的解析体验

对于 iOS、Android 或桌面端的终端应用开发者而言,Global Resolver 的意义与企业侧完全不同。终端应用开发者无需担心企业级的策略控制或私有命名空间,他们真正关注的是:应用能否在任何国家、任何网络环境下,稳定且不受干扰地解析到准确无误的 IP 地址?

在移动互联场景中,终端用户所连接的本地 ISP 或公共网络 DNS 有可能是不可信的环境,它们可能导致解析延迟高,甚至存在 DNS 污染或劫持的风险。因此,越来越多的移动应用选择主动采用 DoH/DoT,与可信赖的安全解析端点直接通信,从而绕过本地不安全的 DNS 环境。

Global Resolver 天然具备 Anycast 能力,并支持带 Token 的访问控制,为终端应用提供了一个安全、低延迟的 DoH/DoT 目标。它不仅能确保请求和响应的加密安全,还能避免共享公共 DNS 出口带来的性能波动。当应用需要分发敏感资源、或依赖域名在全球各地都提供统一且快速的响应逻辑时,Global Resolver 提供了一个理想的解决方案,助力应用开发者实现全球用户体验的标准化和最优化。

部署与使用 Global Resolver 的注意事项

在将 Global Resolver 引入生产环境之前,您需要了解若干设计与操作层面的细节,以便在架构决策和实际操作中避免常见误区。

To Business 场景注意事项

Global Resolver 是对公网可达的 Anycast 服务:私有地址不可作为 access source

Global Resolver 的解析入口位于 AWS 的 Anycast 公网边缘节点,因此它只能看到来自客户端的“公网出口地址”。这带来两个重要的架构影响。其一,当您在 Global Resolver 上配置 access source(用以限制允许访问的客户端 IP)时,不能使用 RFC1918 私有地址段(例如 10/8、172.16/12、192.168/16)。Global Resolver 只会看到客户端的公网 IP(通常是 NAT 网关、IGW 或出口设备的地址),因此 access source 必须以这些公网地址为准。其二,对于位于 VPC 内或企业内网的实例(例如 EC2、内网主机),若要访问 Global Resolver,它们必须走公网出口:VPC 中的资源需要通过 Internet Gateway(IGW)或 NAT 出口;企业内部的客户端也需要通过 NAT/公网出口,而不能仅依赖私有 VPN、Direct Connect 或专线将 DNS 请求“直接”送到 Global Resolver。

如果您需要一个“私有解析器”并通过私有链路直接访问,应该使用 Route 53 VPC Resolver inbound endpoints 和 outbound endpoints,而不是 Global Resolver。

私有 Hosted Zone 会覆盖同名的公共解析

如果某个 Private hosted zone 被关联到 Global Resolver,则相同域名的 Public hosted zone 就不会再通过 Global Resolver 返回结果了。这个工作逻辑和 VPC Resolver 完全一样。

例如,您有一个 public hosted zone

example.com,包含公网域名记录 public.example.com,也有一个 private hosted zone example.com,包含私网域名记录 private.example.com。只要 private hosted zone 被关联在 Global Resolver 上,那么所有经过 Global Resolver 的查询都只能得到 private.example.com 的结果,而不会再得到 public.example.com 的结果。

因此,在设计内部和公网 DNS 时,更稳妥的方式往往是让两者名字明确区分开,比如内部使用

example.internalcorp.example 或其它专门的内部域名,而不是和公网域名完全同名。这样可以避免意外的解析覆盖,也让 Global Resolver 的行为更加清晰、可控。

To Client 场景注意事项

仅需 Access token 鉴权

在面向终端设备或客户端应用(iOS/Android/桌面客户端)的场景中,Global Resolver 常被用作一个可信的远端解析端点,以规避本地 DNS 劫持或污染。对于这类使用者,Global Resolver 提供的 Token 机制本身就是访问控制手段:只要客户端在请求中携带有效 Token,解析请求就会被接受,因而并不需要再额外配置 access source 规则。但是,请注意 access token 的最长有效期是 365 天。

实践中的一个建议是:

To-B(企业/混合云)与 To-C(客户端/应用)分别建立独立的 Global Resolver 实例。企业实例可启用更严格的 access source、DNS Firewall 与私有 Hosted Zone 关联;客户端实例可只依赖 Token 作为权限边界,简化配置并避免企业策略误用到面向互联网的客户端上。将两类流量分开管理能最大限度降低权限错配或策略冲突的风险。

DoH/DoT 的测试与协议细节:遵从 RFC8484 和 RFC7858 的约定

Global Resolver 遵循 RFC 8484 (DNS over HTTPS) 和 RFC 7858 (DNS over TLS)。在测试和调试时,应当按照各协议的行为规范来构造查询。

DoH (GET)

使用 cURL 进行 GET 测试时,dns= 参数必须是 DNS 查询报文的

base64url 编码(对原始 DNS wire-format 进行 base64 编码,并使用 URL-safe 字符集)。格式示例为:

https:///dns-query?token=&dns=。

DoH (POST)

POST 请求的 body 必须是原始的 DNS 二进制报文,Content-Type 为

application/dns-message。

DoT ** **携带 Access token ** **的方式:通过 EDNS0 ** **选项传递

在 DoT 的场景里,由于没有 HTTP Header 这样的元数据载体,Token 必须被封装在 DNS 报文内部。具体而言,Token 是通过 EDNS0 扩展字段传递的。EDNS0 的设计初衷即为 DNS 协议提供扩展“容器”,因此携带 Token 这类自定义鉴权信息是符合协议设计的。

客户端在构造 DNS 查询时,在 OPT RR 中插入一个自定义的 EDNS0 选项 (Option Code:

0xffa0)。该选项的 Data 字段即为 Token 的字符串数据。Global Resolver 收到查询后,它会从 EDNS0 的指定字段中取出 Token,与自身的策略进行比对。匹配则继续解析,不匹配则拒绝。

关于测试工具的说明

虽然

dig 或

kdig 等工具理论上支持通过

+ednsopt 发送自定义 Hex 数据,但需要手动将 Token 转码并构造 Payload,过程较为繁琐且容易出错;而

dog、

doggo 等工具在 DoT 模式下通常尚未提供自定义 EDNS0 选项的接口。

因此,若要验证带 Token 的 DoT 解析,建议编写简单的 Python/Go 脚本进行测试。而对于常规的连通性或 ECS (EDNS Client Subnet) 验证,依然可以使用这些工具通过 DoH 协议快速完成,如下所示 (Resolver 域名,token,和 IP 地址已模糊处理):

总结

Global Resolver 的出现为跨网络、多环境的 DNS 架构提供了一个更干净、更统一的解决方案。它让企业能够在全球范围内,以一致的安全方式暴露内部解析能力,同时也让移动端与桌面端应用在任何网络环境下都能获得可靠、不被污染的解析结果。对混合云用户来说,它补上了原本跨公网访问私有域名这一点上的空白;而对面向全球用户的应用来说,它提供了一个稳定的、可控的、安全的 DoH/DoT 入口。

最后,如果您的系统涉及到移动端或桌面端的客户端开发,那么 Access token 的管理就成了一个必须关注的部分。应用无法依赖系统级设置来调用 Global Resolver,因此 Token 需要由应用自行安全地保存与使用。无论采用 Keychain、Keystore 或等效的加密存储方式,都应确保 Token 不会以明文暴露。而且要牢记:Token 的有效期最长只有 365 天。这意味着 Token 需要被纳入您应用版本迭代的计划中,无论是通过应用更新周期自动刷新,还是通过后端下发机制滚动替换,都需要有明确的生命周期管理方案。

*Global Resolver * *的可用区域不包括北京和宁夏。对于在亚马逊云科技中国区域开展业务的读者,我们推荐您了解 Static BGP (S-BGP) * *服务。它是专为中国区域设计的一种成本优化型数据传输服务。通过这项服务,我们致力于帮助客户在保证网络性能的同时显著降低数据传输成本。 S-BGP * *可为符合条件的客户提供高达 20%* *~70% * *的数据传输费用节省。详细的内容可以* *点击这里* *。*

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用


基于视频理解的智能视频剪辑指南

AWS账单代付阅读(7)

为什么需要智能剪辑

媒体娱乐、广告营销、零售电商等行业的客户拥有大量的视频资源,如网剧、纪录片、访谈和创意素材等。为满足不同观众群体和分发渠道的需求,需要对这些视频资源进行大量的编辑和再创作。对于中长视频,需要对内容总结、剪辑、翻译、提取精彩片段等;对于短视频,需要增加特效、变换、重新配音等。要完成这些工作,通过传统的视频工具链将耗费大量的人力和时间。

随着多模态模型的不断发展,对于视频理解的能力不断增强,通过多模态模型与多种专业模型的配合,能够多维度的高效率的理解视频内容,包括视频中音轨,视频中字幕,视频中的画面,视频中分片;这也使得通过深入的视频理解,有明确目标的视频二次加工成为可能。

本文介绍一套利用生成式AI多模态模型用于视频理解从而进行视频编辑的指南,通过多模态大模型对视频内容进行深入理解,在此基础上提供了一套智能剪辑的功能集合,用于简化视频编辑工作流,提高视频创作效率,从而为观众提供更友好和丰富多样的视觉体验。

本文将介绍:

  • 智能剪辑的多种客户场景
  • 方案指南的设计原理
  • 指南主要功能介绍和优势
  • 代码示例以及API使用介绍
  • 实际应用场景的介绍
  • 智能剪辑的多种客户场景

在传统影视行业,流媒体行业,广告行业,娱乐行业,零售电商行业,体育行业等,只要涉及视频领域的创作就离不开视频剪辑,视频剪辑的工作是一个相对劳动密集型工作,需要大量人工观看视频,理解视频,视频打点,视频编辑,视频特效等劳动,其中一些环节是可以利用AI驱动的工作流进行辅助,从而提高效率。

|业务场景

|客户类型

|业务特点

|影视剪辑和二创

|流媒体公司,影视版权公司

|客户具有版权电影,电视剧等进行自动分段,高光时刻,进行多区域和多平台分发

|体育比赛精彩时刻

|体育媒体,体育运营

|对于各类型体育赛事中的比赛,进行精彩时刻的提取,比如足球进球,比赛逆转,绝杀,精彩展现回放等

|短剧投放视频

|短剧内容运营企业

|大量的短剧资源,需要制作一些精彩时刻,前情提要,进行多渠道投放,吸引付费用户

|科教片和纪录片的分段和总结

|传统媒体,流媒体公司

|知识类视频,需要进行专业的理解,翻译,分段,在合适的频道进行播放

|直播快速分段和提取

|娱乐行业,零售电商

|娱乐直播,电商直播或者其他类型的视频直播,进行快速的分割,并提取出其中重要信息。

各类客户面对的痛点

  • 利用传统工作流,需要人力成本高, 效率低, 难以满足运营需求
  • 需要专业人才,成本高,周期长,、
  • 视频剪辑和制作缺乏自动化工具
  • 准确性和标准化,很难进行精确控制
  • 内容一致性, 等一致性问题
  • 需要多部门流程,缺乏统一的自动化工作流管理

这对上述的业务场景和普遍的痛点,我们利用生成式AI多模态大模型,Agentic AI,媒体服务,打造了一套指南,目的在于利用AI自动化视频内容的剪辑的工作流,辅助人工,大幅提高工作效率,解决客户的痛点。

方案指南设计原理和对比

从工作流的角度,我们对于AI智能剪辑的思路,是模拟专业团队的工作流对视频内容进行处理,从而产出最终用户所预期的视频内容。这对这个工作流,我们可以大致可以分为一下几个关键环节:

  • 视频理解:包括视频画面,音频,字幕在时间纬度上综合理解
  • 视频内容的抽取:包括分段,精彩时刻抽取,特定场景抽取、特定情节抽取等工作
  • 视频组合:根据理解和抽取的结果,利用媒体处理工具或者服务,对于视频进行重新合成以及配音,字幕,转场特效等工作

对于每个工作流的节点有不同的方法实现,我们针对工作流中,每种可能方式进行对比测试,得到了相对能适合大多数场景的技术组合。

视频理解方式

视频理解方式从基础形式上来说分为两种:分段理解综合理解和整段理解

整段理解的形式

是把整个视频输入给多模态模型,由多模态模型直接给出时间维度的理解

分段理解的形式

先把视频内容按照一定规则进行分割,成子单元,每个子单元分别进行理解再进行整体理解

这两种方式个有优缺点

对于整段理解来说

  • 优点:操作比较简单并且灵活适配各种视频模式
  • 缺点:视频长度变长后,由于目前VLM model取样逐渐稀疏,时间准确度会下降,

下面是针对一组不同时长的,足球标赛视频,标记出射门,进球时刻,用一个多模态大模型的测试结果

使用如下提示词:

请分析如下视频,长度为 {video-length}, 请标记射门,进球的时刻

|

|5分钟

|15分钟

|29分钟

|45分钟

|视频正确信息

|1:35 射门并进球

|射门1:35 ,5:20,10:31

进球:1:35

|射门1:35 ,5:20,10:31,16:01,24:32

进球:1:35,24:32

|N/A

|时间偏差

|0s

|0-3s

|3-5s

|N/A

|是否有遗漏

|无

|无

|有

|N/A

可以看到随着视频长度的增加,时间精度会下降,甚至有遗漏的情况,另外模型不支持超长视频输入

对于分段理解来说

  • 优点:时间准确度较高,并且分段越细致,时间准确度越高
  • 缺点:需要多个模块组合,工程化比较复杂

*结论1* *:作为视频剪辑的方案来说,为了视频剪辑的效果,时间精确度是基础,另外对于不同的客户场景,长度一小时以上的视频编辑是比较普遍的需求;所以我们选择分段理解来保持时间的精度并能处理较长的视频。*

如何分段

我们选择分段理解作为我们理解方式,那么如何分段就是一个需要面对的技术问题,分段的方式是否合适决定了

  • 时间的准确度
  • 最小的视频处理短圆
  • 视频理解的精度和深度:

分段的基本方式包括

  • 按照镜头顺序分割
  • 按照音频转场分割
  • 按照时间平均分割

下面的图显示了对于一个视频来说,不同分段的特性

特征总结

  • 平均分割虽然简单但是会把镜头和对话截断
  • 镜头分割法可以覆盖整个视频,但是可能会截断对话,适合体育比赛,纪录片大部分信息在画面的视频
  • 按照音频对话分割可能覆盖不了全部视频,但不会截断对话,适合短剧等以对话驱动情节的视频

*结论2* *:我们不采用平均分割,采用镜头分割和语音对话分割的综合理解,来适应不同的视频场景*

视频的抽取方式

对于智能视频剪辑来说,基于视频理解的视频抽取是核心的步骤,这里有两个主流方式

通过向量化搜索的方式

通过大模型推理的方式

这两种方式,第一种是利用向量化(视频或者转录后文字等多种方式向量化)与搜索引擎相结合,查找结果;第二种是利用大模型对剧情全局理解,再对结构化数据进行进行推理,从而产生结果。

这两种结果各有优点

场景1 精确动作 “查找进球镜头” 每个分片大概2-3分钟

|

|10个分片包含5个进球镜头

|50个分片包含20个进球镜头

|100个分片包含20个进球镜头

|200个分片包含100个进球镜头

|方法1 遗漏和选错总数

|0

|1

|2

|2

|方法2 遗漏和选错总数

|0

|2

|2

|6

场景2 按照情节查找 “男主幽默的对话”,这里会有些主观判断

|

|5组对话

|20组对话

|50组对话

|方法1 遗漏和选错总数

|有1-2个

|有5个左右

|会出现较多误判

|方法2 遗漏和选错总数

|0

|0

|2

对于精确动作片段的查找,方法1和方法2在时长2小时内是基本相似的,而且错误率较低;当视频数量过多时,方法2会有遗漏

对于按照情节查找,方法1错误率较高,方法2可以保持一定正确率

综合测试结果和系统分析,方法1 更适合标签化的媒体资源系统,方案2 更适合影视剧单片按照视频情节剪辑

*结论3 * *模拟内容剪辑工作流,理解视频情节后进行视频剪辑,我们更倾向于 “* *利用大模型对剧情全局理解,再对结构化数据进行进行推理,从而产生结果”*

方案指南的流程设计

根据如上的结论1,结论2 和结论3,我们进行如下设计:

  • 通过语音转场和视频镜头转场进行视频的拆分和提取,
  • 再通过多模态模型进行综合理解,
  • 然后再利用大语言模型按照用户需求和影视内容情节进行查找,得到内容片段组合
  • 逻辑架构

    方案指南的技术架构和主要功能

    技术架构

根据逻辑架构进行了技术架构设计,并以此进行了指南的开发

本方案提供了一个综合的视频处理系统架构,包括前端应用程序和后端的多个视频处理与分析工作流模块。

  • 视频上传到S3存储桶后,通过API Gateway和Lambda服务处理客户端请求,以及协调各个工作流。
  • 音频转录模块通过Amazon SageMaker、S3、Lambda等服务实现音频的提取,并将结果存储到DynamoDB中。支持导出srt格式的字幕文件。
  • 视频理解模块通过AI服务Rekognition和Bedrock上的多模态大模型,以及SNS、SQS、Lambda和DynamoDB等多种服务,实现对视频帧活视频片段的理解、分析和处理。
  • 视频摘要生成、视频字幕翻译、视频拆分、高光时刻等模块,充分利用了Bedrock上的大模型,然后把结果存储到DynamoDB中,生成的视频直接存储到S3。
  • 方案指南的功能列表

  • 视频理解:把音频转录为文字;基于视频片段抽帧或视频片段生成对镜头的描述
  • 视频摘要:基于音频转录的文字或镜头描述,通过大语言模型进行总结/摘要
  • 字幕翻译:将视频字幕文件从一种语言翻译为其他语言
  • 视频剪辑:通过大语言模型理解音频转录文字或镜头的文字描述,根据用户需求拆分成小的视频片段
  • 高光时刻:通过对视频的理解,生成精彩视频的片段
  • 配音、字幕烧录:在生成高光时刻时,可使用自定义文字进行配音(旁白),并把字幕烧录到视频中
  • 方案指南有如下优点

  • 用户可以根据实际需求调整提示词和调用顺序,达到不同产出效果
  • 不绑定任何模型,用户可以根据实际需求配置模型
  • 在安全方面,操作都在客户账号下,保证媒体内容资源的安全,视频数据不会用于公共基础模型训练。
  • 本方案提供了完整工具链对接方式和API。
  • 本方案采用Serverless架构,无需运营人员考虑系统的并发和容量问题。
  • 指南主要功能介绍和代码示例以及API使用介绍

    多种方式的视频信息提取提取

本方案指南提供多种视频拆分和信息提取的方式,功能如下图所示

*示例代码*

多种模式的视频信息提取

a)利用语音

音频识别部分

model = whisper.load_model(

model_type, download_root=”/opt/ml/code/models/”, in_memory=True, device=”cuda”

)

result = model.transcribe(source, **decode_options)

基于视频理解的智能视频剪辑指南

audio = whisper.load_audio(source)

mel = whisper.log_mel_spectrogram(audio, model.dims.n_mels, padding=whisper.audio.N_SAMPLES).to(model.device)

mel_segment = whisper.pad_or_trim(mel, whisper.audio.N_FRAMES)

_, probs = model.detect_language(mel_segment)

lang = max(probs, key=probs.get)

在进行自动语音识别(Automatic Speech Recognition, ASR)时,偶尔会出现幻觉,为了减轻这种现象,需要引入语音活动检测(Voice Activity Detection, VAD)。对于不同的语言,使用擅长的VAD。

from funasr import AutoModel as FunasrModel

from silero_vad import get_speech_timestamps, load_silero_vad, read_audio

if lang == “zh”:

vad_model = FunasrModel(

model=”/opt/ml/code/models/vad_model”, model_revision=”v2.0.4″

)

res = vad_model.generate(input=source)

speech_timestamps = []

for row in res[0][“value”]:

speech_timestamps.append({“start”: row[0] / 1000, “end”: row[1] / 1000})

else:

vad_model = load_silero_vad()

wav = read_audio(source)

speech_timestamps = get_speech_timestamps(

wav,

vad_model,

return_seconds=True,

threshold=0.2, # Return speech timestamps in seconds (default is samples)

)

b)利用片段抽取视频

在默剧或者纪录片中,仅采用ASR对语音识别不够,需要基于视频帧或视频片段进行理解。当采用这种方式时,先使用亚马逊云科技的Rekognition服务对视频进行逻辑切分,即把视频根据镜头切分为若干个片段(segment)。然后基于片段进行抽帧,把同一个片段的图片发送到大语言模型进行理解。对于支持视频模态的大模型,则可以直接把一个视频片段直接发送给大语言模型进行理解。在这里为保证抽帧涵盖整个视频片段,减少遗漏,我们设计了抽帧逻辑。

def get_sample_points(start, end, sample_count=12):

step = (end – start) / (sample_count + 1)

center = (start + end) / 2

sample_points = []

for i in range(1, sample_count + 1):

point = start + i * step

if point == center:

rounded_point = int(round(point))

elif point < center:

rounded_point = int(math.ceil(point))

else:

rounded_point = int(math.floor(point))

sample_points.append(rounded_point)

return list(set(sample_points))

parameter_list = []

index = 0

for shot in shots:

start_time = shot[“StartTimestampMillis”] / 1000

duration = (shot[“EndTimestampMillis”] – shot[“StartTimestampMillis”]) / 1000

video_bytes = process_segment(temp_file_path, start_time, duration)

上传LLM payload到S3,而不是原始视频

video_key = f”transcribe_video/{project_id}/{shot[‘StartFrameNumber’]}.json”

s3_client.put_object(

Bucket=bucket_name,

Key=video_key,

Body=json.dumps(build_llm_payload(get_content_messages(video_bytes, “video”), language)),

)

parameter_list.append(

{

“index”: index,

“start_time”: str(round(shot[“StartTimestampMillis”] / 1000, 2)),

“end_time”: str(round(shot[“EndTimestampMillis”] / 1000, 2)),

“next_start_time”: str(round(shot[“next_start_time”] / 1000, 2)),

“previous_end_time”: str(round(shot[“previous_end_time”] / 1000, 2)),

“parameter_key”: video_key,

}

)

index += 1

视频摘要功能示例

本方案指南,能够通过视频分段和内容提取得到原始的视频内容信息,通过大语言模型可以进行视频内容的摘要

*提示词示例*

You are a professional video editor, skilled in analyzing video shot sequences and extracting their core content. I will provide you with a series of video shot descriptions in chronological order. Your task is to create a summary describing what the video is about.

Note that this summary needs to be completed in the same language as the shot description. If the shot description is in English, then the summary you create must also be in English.

Please carefully read the following shot descriptions:

${shot_str}

Please follow the steps below:

1. Read through the sequence of shot descriptions carefully to ensure understanding of the overall content and the connections between shots.

2. Extract the core scenes and key information from the lens sequence.

3. Organize these core scenarios and key information into a coherent summary. The summary content needs to be structured.

4. Please use the ${language} language for output.

Please only output the summary content, do not add any additional explanations or comments.

智能切片功能示例

本方案指南,能够智能识别长视频中的关键内容点,将冗长的视频精准切分为多个主题鲜明的短视频片段。

用户可通过本方案页面上传视频文件到s3,也可以直接指定s3上现有的视频文件。然后即可对视频进行理解。常规视频可使用基于音频的理解。对于记录片或者默剧,可使用基于视频帧/视频片段的方式。

接下来即可进行分段处理,用户可以设置希望分段的最大值和最小值,也可以把最大值和最小值都设置为0,让大语言模型根据内容进行自由分段。提交分段设置后,大语言模型根据内容进行分割,并把结果展示给用户。

用户可对开始、结束时间进行调整,也可以手动增加、删除分段。确认后,点击切割按钮对视频文件进行实际切割,方案会把最终生成的视频存放到S3上。

无论是分段还是其他需要视频内容提取的操作都是用大语言模型配合动态提示词来完成

*示例代码*

“You are a professional text editor, skilled in segmenting long texts and summarizing topics.

Please carefully read the following sentences and segment the content according to the given rules.

I will edit the video based on your segmentation results.\n\nNote that these segments and summaries need to be completed in the same language as the text.

If the text is in English, then the segments and summaries you create must also be in English

.\n\nsentence:\n\n${data}\n\n\n\nPlease follow the following guidelines:\n\n1. Combine consecutive, thematically related sentences into one concise summary of the topic.\n2.

Ensure the topic is concise, clear, and engaging.\n3. Output the summarized topics and corresponding sentence number ranges in order, using a hyphen to connect the starting and ending sentence numbers (e.g., 1-10).\n4. Sentences within each paragraph must be continuous, and cross-sentence segmentation is not allowed.\n5. All sentences must be included in paragraphs.\n6. Please output in the original language.\n7. Do not use any single or double quotes in the summary.\n\nPlease summarize into a minimum of 1 and a maximum of 4 paragraphs\n\nPlease strictly follow the following JSON format to output your segmented results:\n\n[\n{\”start\”: , \”end\”: , \”seg\”: , \”summary\”: \”\”}\n{\”start\”: , \”end\”: , \”seg\”: , \”summary\”: \”\”}\n…\n]\nNote:\n\n- The paragraph number (seg) must be a consecutive integer.\nPlease ensure that all content from sentence 1 to sentence 658 is included in your segment.\n- Only output the content in JSON format, no other explanations are needed.\n\nPlease remember that high-quality segmentation and summarization are crucial for the subsequent video editing work. We look forward to your professional performance!

def lambda_handler(event, context):

if event[“segment”][“payload”].get(“error”):

raise Exception(event[“segment”][“payload”].get(“error”))

transcribe_job_id = event[“transcribe_job_id”]

job_id = event[“job_id”]

segment = event[“segment”][“payload”].get(“body”)

if segment.startswith(“

json") and segment.endswith("

“):

segment = segment[7:-3]

try:

segment_dict = json.loads(segment)

except json.decoder.JSONDecodeError:

logger.info(segment)

segment_dict = parse_json(segment)

sentences = get_all_sentences(transcribe_job_id)

i = 0

with segment_table.batch_writer() as batch:

for item in segment_dict:

segment = {}

segment[“start_time”] = sentences[item[“start”] – 1][“start_time”]

segment[“end_time”] = sentences[item[“end”] – 1][“end_time”]

segment[“summary”] = item[“summary”]

segment[“job_id”] = job_id

segment[“index”] = i

i = i + 1

batch.put_item(Item=segment)

高光时刻功能示例

在当今信息爆炸的时代,视频内容日益丰富,但用户时间有限。为了解决这一矛盾,通过本方案能够智能识别并提取视频中最具价值、最吸引人的片段,为用户提供精华内容体验。

在流程上,系统先对原视频进行理解,并对内容进行文字摘要。然后根据场景,比如产品发布会、短剧,传入不同的提示词模板,即可得到不同场景下的高光时刻集锦,用户可对大语言模型挑选出来的高光时刻片段进行调整,可以勾选其他片段,也可以删除现有片段。确认后,点击生成按钮,即可合成最终的高光时刻视频。

在提取高光时刻片段时,有时我们需要加入旁白,而不采用原音。本方案提供此支持,并支持多语言旁白和字幕。

*示例代码*

在提取高光时刻场景,用户除了希望提取精彩片段,还希望精彩片段在一个时间范围内,以便[已去除[已去除推广语]语]。在利用LLM进行提取精彩片段时,即使明确告知LLM每个片段时长以及需要提取时长,也经常出现一次不能达到目标情况。可以多次迭代以便LLM选出合适内容。

Determine which prompt to use based on the current step

if current_highlight_step == 0:

Initial selection – ask for complete highlight selection

task_prompt = INITIAL_TASK_PROMPT

format_prompt = INITIAL_FORMAT_PROMPT.format(second_limit=second_limit)

elif highlight_second > current_highlight_duration:

Need to add more shots

diff_time = round(highlight_second – current_highlight_duration, 2)

task_prompt = INCREMENTAL_TASK_PROMPT

format_prompt = INCREMENTAL_FORMAT_PROMPT.format(

current_highlight_duration=current_highlight_duration,

diff_time=diff_time,

current_highlight_shots=current_highlight_shots,

)

else:

Need to remove some shots

diff_time = round(current_highlight_duration – highlight_second, 2)

task_prompt = DECREMENTAL_TASK_PROMPT

format_prompt = DECREMENTAL_FORMAT_PROMPT.format(

current_highlight_duration=current_highlight_duration,

diff_time=diff_time,

current_highlight_shots=current_highlight_shots,

)

提供完整的API

本方案提指南供了OpenAPI格式的API,供第三方调用。完整API文档在部署完成后即可浏览。

下图为方案部分API,,用户在API页面展开浏览和测试每一个方法,每个方法的使用具有详细的解释,每个参数也有对应schema说明。

在API鉴权方面有三种模式:

  • prod,生产模式,默认值,调用API时需要指定API key和OIDC(cognito)的jwt token
  • api,API模式,调用API时仅需API key。此模式主要用于和第三方系统集成。
  • dev,开发模式,调用API时不进行鉴权,可以在OpenAPI文档上调用和测试API。禁止在生产环境中使用开发模式。
  • 实际应用场景的介绍

在实际客户应用中,根据客户实际情况,配置方案指南,调试提示词,并给出每个客户最佳使用方法是实际应用的难点。

我们需要充分了解:

  • 客户素材库中视频资源格式(如 MP4、MOV、AVI 等)与分辨率(从 480P 到 4K 不等)
  • 不同类别的输入视频形式:比如长视频(如数小时的访谈、赛事录像)、无声视频(如默剧、产品演示片段)的复杂情况,需通过分类处理策略适配多样化素材。
  • 客户需要产出视频的形式:比如高光时刻,视频总结,视频分段,视频摘要
  • 客户需要的工作流:比如自动化的批量产生视频浓缩、和原有的成熟工作流相结合

下面是一些典型情况的举例:

  • 对于有音频的视频,采用 ASR 语音识别技术(如 Whisper 模型)提取音频语义,转化为文字信息作为剪辑依据;
  • 对于无音频或音频无效的视频,以及含硬字幕的素材,通过抽帧工具按合理频率提取画面帧,再借助 OCR 技术提取字幕或画面文字,补充内容理解维度。
  • 而对于需深度画面分析的场景(如识别无声视频中的动作逻辑、复杂场景细节),则直接调用支持视频输入的大语言模型,将视频片段作为输入,让模型生成结构化的画面描述,为后续智能剪辑提供精准的内容参考,确保各类素材都能被有效解析。

下面是实际的应用案例:

案例1 演出和发布会视频自动剪辑

每年的re:Invent云计算峰会无疑是科技行业的年度盛事,作为全球最具影响力的云计算技术峰会之一,它总能带来令人目不暇接的新产品发布、技术突破与战略方向。然而,随之而来的是海量视频内容和庞大的信息流,如何从中提炼出真正的精彩时刻,一直是市场团队面临的巨大挑战。

往年,市场部团队需要投入大量人力进行视频内容的手工筛选和精彩片段的提取工作。这一过程不仅耗时耗力,还面临着主观判断可能导致的关键信息遗漏。团队成员需要观看数百小时的演讲、工作坊和技术演示,从中识别并剪辑出最具价值的内容片段,这种方式效率低下且难以保证质量一致性。

在2024年的re:Invent后,市场团队引入了本方案,彻底改变了这一工作流程。该方案利用先进的视频内容分析技术,能够自动识别演讲中的关键产品发布、重要技术公告、观众热烈反应以及演讲者的精彩表现等重要时刻。实施这一方案后,市场团队处理re:Invent效率大幅提升。方案完成初步筛选后,市场团队只需进行最后的审核和微调。

更重要的是,这一方案不仅减轻了工作量,还提高了内容质量。实现更全面的覆盖,确保不会遗漏任何关键时刻;更快速的内容发布,提升市场反应速度。

客户要求:

  • 客户快速的通过现场录制得到1K视频,大小在10G左右,
  • 输入视频为:长度2-3小时,每天有30-40个视频, 视频中的有效部分一直都有演讲声音,为英语,无字幕
  • 每个视频为转换为输出的浓缩视频,可以投放到社交媒体的短视频,每个3-4分钟
  • 客户需要的工作流:自动化的批量产生
  • 设计的最佳应用方法:

客户通过API调用的方式实现自动化工作流

  • 调用

Project方法,自动化同时建立5-10个highlightmoment 的project

  • 获得project_id后,通过调用

TrancribeJob方法同时建立audio方式的视频内容提取

  • 音频信息提取成功后通过调用

SummaryJob方法,并发建立摘要任务

  • 摘要成功后,通过并行调用

HighlightJob方法设置length参数为3分钟

  • 通过调用

Highlight方法得到最终产出视频

推荐的highlight moment提示词为:

你是一名专业的视频剪辑师。这是你需要剪辑的视频介绍:

${summary_str}

以下是一系列按时间顺序排列的视频镜头,包括用标记的对话内容和标记的镜头时长(秒)。

${shot_str}

你的任务是${task_prompt}

剪辑出高光时刻,高光时刻只需要包括新产品科技发布, 精彩的示例

严格按照原片的时间顺序排列这些不调整先后顺序

请注意,一个高光时刻可能跨越多个连续镜头。为保持对话的连贯性,你可以将这些相关镜头都包含在高光视频中。

为了连续性,在选择高光时刻时请尽可能包括多个连续的镜头。对话内容不要太突兀。尽量保持连贯性。如果一个镜头小于5秒,且前后的镜头都没有被选为高光时刻,那么它也不该被选中。

每个高光时刻包含的若干个连续镜头中的对话应该是完整的,不应该突然跳转到另一个高光时刻。

请剔除片头曲和片尾曲 ${format_prompt}

请删除头尾空白部分

案例2 影视视频的多国语言二次创作

中国头部的传媒公司,具有大量多语种的记录片资源,需要进行剪辑,二次创作,全球多个社交媒体平台进行分发。

客户面临着如下问题:

  • 需要大量人力,而且周期长,难以满足运营节奏
  • 对于视频切条的时间准确度无法把握
  • 多语种需要专业人才,成本高,周期长,而且一个片子里可能存在不同语言
  • 需要多个部门配合
  • 客户实际要求:

  • 片源形态大多数为记录片,2K视频,大小在10G左右,
  • 输入视频为:长度1小时,每天有10个视频, 视频中有多国语言的解说
  • 每个视频进行分段,每个5-10分钟
  • 需要相对完整工具链和工作流
  • 设计的最佳应用方法:

客户基于视频理解的视频编辑指南资产为技术底座,和其现有的工作流结合

  • 媒体资源库内容批量上传的S3
  • 按照步骤进行批量的音频和视频转录
  • 按照缺省方式进行视频的理解
  • 按照提示词设计的方式进行视频分段,大概8-10段
  • 从S3中下载分段后的结果
  • 推荐的分段提示词为:

你是一位专业的文本编辑,擅长分割长文本并概括主题。请仔细阅读以下句子,并根据给定规则进行分段。

我将根据你的分段结果编辑视频。

注意:这些分段和概括需要用与文本相同的语言完成。

如果文本是英语,则你创建的分段和概括也必须使用英语。

请遵循以下指南:

1. 将连续的主题相关句子合并为一个简洁的主题概述。

2. 确保主题简洁、清晰且引人入胜。

3. 输出概括的主题和对应的句子编号范围,使用连字符连接起始和结束句子号(例如 1-10)。

4. 每个段落内的句子必须连续,跨句子分段不允许。

5. 所有句子必须包含在段落中。

6. 请以原始语言输出。

7. 不要在概括中使用任何单引号或双引号。

请概括为最小1个和最大4个段落。请严格遵循以下JSON

{\”start\”: , \”end\”: , \”seg\”: , \”summary\”: \”\”}\n{\”start\”: , \”end\”: , \”seg\”: , \”summary\”: \”\”}\n…\n]\n

注意:

  • 段落号(seg)必须是连续整数。请确保从句子1到句子658的所有内容都包含在你的分段中。
  • 仅输出JSON格式内容,不需要其他说明。
  • *前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

    本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用

|


从X86到AWS Graviton4:合合信息图像识别应用的性能突破之旅

AWS账单代付阅读(7)

亚马逊AWS官方博客

从X86到AWS Graviton4:合合信息图像识别应用的性能突破之旅

摘要

本文分享了合合信息将其图像识别应用从x86架构迁移到AWS Graviton ARM架构完整实践过程。通过精心的规划和实施,该项目不仅实现了3倍的性能提升,还在业务量翻倍的情况下将实例数量减少了61%,每台实例处理能力提升至之前的491%,整体成本下降为之前的30.1% 显著优化了总拥有成本(TCO)。

一、项目背景:计算密集型应用的成本挑战

1.1系统现状

合合信息为全球数亿用户及众多行业的企业客户提供服务,专注于多模态大模型文本智能技术,高质量数据建设等核心技术,这些都是典型的计算密集型应用。随着业务量的快速增长,原有的 C5 实例集群在应对高峰期请求时,面临着三大挑战:

  • 高计算需求:图像识别算法需要大量CPU资源进行特征提取和模型推理
  • 性能扩展瓶颈:在业务高峰期,需要启动大量的 C5 实例才能满足海量的并发请求,这不仅导致了资源管理的复杂性增加,也带来了潜在的性能抖动。
  • 成本压力:庞大的实例数量直接导致了高昂的计算成本,如何在保证服务质量的前提下,有效控制成本,是合合信息持续关注的重点。
  • 1.2迁移前的架构

  • 实例配置:近百台EC2 C5实例组成的业务集群
  • 处理能力:集群每秒计处理500 MB图片处理请求
  • 平均负载:每台实例需要大量CPU资源处理56 MB/s的图像信息,峰值CPU使用率达到90%
  • 主要痛点:实例数量多、管理复杂、成本高昂
  • 二、前期规划:技术与商业价值的双重考量

2025年亚马逊云科技Graviton4实例在中国区发布,对比原X86架构的虚拟机提供了更高性价比。在性能的提升同时还能实现成本优化,无疑将为企业带来更大的价值。合合信息对这几款实例的成本效益进行分析:

  • 测试目标:对比 x86 架构与不同代际 Graviton 处理器的 CPU 性能。
  • 测试区域:cn-northwest-1 (中国 – 宁夏区域)
  • 测试工具:本次测试采用质数计算作为基准测试工具,该测试在计算密集型任务上具有很高的代表性,等效于 sysbench cpu –cpu-max-prime=20000 –threads=4 命令。
  • 测试环境:所有实例均使用 Amazon Linux 2 操作系统,并统一配置4个线程,以充分利用所有 vCPU 资源。
  • 2.1 Graviton 与x86的性能对比

|实例类型

|CPU架构

|执行时间 (秒)

|每秒事件数

|性能比较(相较于 c5.xlarge)

|1

|c5.xlarge

|X86

|8.45

|267.7

|Baseline (100%)

|2

|c6g.xlarge

|Graviton2

|7.12

|317.7

|+18.7% faster

|3

|c7g.xlarge

|Graviton3

|6.23

|363.1

|+35.6% faster

|4

|c8g.xlarge

|Graviton4

|5.89

|384

|+43.5% faster

从上图的“成本比较”可以看出,所有C6g、C7g、C8g Graviton 实例的性能都高于C5 X86实例。

2.2 性能价格比 (Performance/Price Ratio)

为了更全面地衡量实例的价值,我们引入“性能价格比”这一指标(每秒事物数/按需价格),即每花费一元人民币所能获得的性能。

|实例类型

|按需价格 (CNY/小时)

|成本节约 (相较于 c5.xlarge)

|每秒事件数

|性能价格比

|性能价格比提升

|1

|c5.xlarge

|CNY 0.986

|基准

|267.7

|271.4

|–

|2

|c6g.xlarge

|CNY 0.783

|20.60%

|317.7

|405.6

|49.40%

|3

|c7g.xlarge

|CNY 0.783

|20.60%

|363.1

|463.7

|70.80%

|4

|c8g.xlarge

|CNY 0.783

|20.60%

|384

|490.3

|80.60%

从上表性价比的比较中可以看出:

  • C8g 的价值最大化:xlarge 展现了最高的性能价格比,其性价比高出 C5.xlarge 80.6%,这意味着用户每花费一分钱,都能从 C8g 实例上获得最多的计算性能。
  • Graviton 实例的普遍成本优势:对比X86实例所有 Graviton 实例均提供了约 20% 的成本节约,同时性能却远超 x86 实例。这使得迁移到 Graviton 成为一项极具吸引力的成本优化策略。
  • 2.3 测试结果总结:

  • Graviton4 性能领跑:xlarge (Graviton4) 实例以 5.89 秒的执行时间拔得头筹,相较于基准的 C5.xlarge (x86) 实例,性能提升了 43.5%。这充分证明了 Graviton4 处理器在纯计算能力上的巨大优势。每一代 Graviton 处理器都带来了显著的性能提升。C6g 相较于 C5 提升 18.7%,C7g 提升 35.6%,而 C8g 则达到了 43.5% 的提升,显示了 AWS 在自研芯片上持续且成功的投入。
  • C8g 的价值最大化:在测试中xlarge 都展现了最高的性价比,其性价比高出 C5.xlarge 80.6%;这意味着每花费一分钱都能从 C8g 实例上获得最多的计算性能。
  • 在使用sysbench进行初步后,通过真实业务场景继续进行了相关功能与性能测试,得出对EC2虚拟机更有价值的参数数据。
  • 三、中期实施:代码适配与问题攻坚

    3.1实施过程

被迁移程序主要通过Lua/C语言实现,应用从x86迁移到ARM64架构需要对原C语言库进行重新编译和优化,整个过程包括以下八项目工作:

1)准备 Graviton 环境

  • 启动 Graviton EC2 实例(如xlarge)
  • 安装编译工具链和 Lua 环境

2)扫描 C 库依赖

  • 列出所有 Lua应用调用所有C库模块
  • 识别需要重新编译的C扩展模块
  • 检查是否使用X86特定SIMD 指令(SSE/AVX)

3)编译选项优化

  • 更新 Makefile,添加 Arm64 优化参数
  • 编译时启用-02或-03优化级别

4)重新编译 C 库

5)更新容器镜像(Dockerfile)

6)更新CI/CD 多架构支持

7)测试和性能评估

8)生产环境灰度发布

3.2 问题解决

在项目实施中整体比较顺利,但在测试过程中遇到以下两个问题,迁移团队进行分析与解决:

问题1:字符集兼容性问题

  • 问题说明:

迁移到 Graviton 后测试过程中发现在x86下的部分代码输出出现了乱码问题:经过分析其原因是X86 通常是 UTF-16BE(Big Endian)格式,而现代 ARM(包括 Graviton)通常是UTF-16LE格式,存在字节序(Endianness)差异:

  • UTF-16BE(Big Endian):高位字节在前,例如:字符 “A” (U+0041) 存储为 00 41
  • UTF-16LE(Little Endian):低位字节在前,例如:字符 “A” (U+0041) 存储为 41 00

X86 和 ARM 架构的字节序差异导致 UTF-16 数据解析错误。

  • 解决策略:

确认原因后,通过在 Lua 代码中明确指定字符编码转换 set_iconv $d $s from=utf-16be to=utf-8,确保正确处理 Big Endian 格式的 UTF-16 数据,问题得到解决。

问题2:迁移后发生Coredump问题

  • 问题说明:

迁移到 Graviton 后测试发现在x86下的部分可以运行的代码在Graviton中出现Coredump问题:分析原因发现原有程序在x86下运行有时会出现可以忽略的报错,而Graviton环境无法忽略产生Coredump。

深入研究后其触发是因为,ARM 架构对内存访问的对齐要求比 x86 更严格。x86 处理器允许未对齐的内存访问(如从奇数地址读取 4 字节整数),仅产生性能损失。而 ARM 遇到未对齐访问会触发 SIGBUS 信号导致程序崩溃。此外,ARM 对某些未定义行为(如空指针解引用、数组越界)的检测更敏感,在 x86 上可运行的代码,在 Graviton 上会直接出现 Coredump。

  • 解决策略:

确认原因后针对该问题使用短期+长期的综合方案解决:

  • 短期:在 C 代码中添加signal_handler信号处理,使用捕获信号防止崩溃,同时通过/proc/cpu/alignment 内核参数放宽内存对齐检查解决。
  • 长期:在后续将检查 C 代码中的内存对齐问题进行最终解决。
  • 四、后期部署:稳妥的灰度上线策略

测试完成后,为了确保切换过程中的业务稳定性和连续性,设计了三阶段的灰度发布方案。通过渐进式的策略有助于控制风险,发布过程的真实流量也分步验证了新架构的稳定性和性能表现。同时为保障灰度发布过程的万无一失,也建立了完善的监控与风险应对机制:

  • 监控关键指标:实时追踪应用响应时间、错误率、系统资源利用率(CPU、内存)以及成本变化趋势。
  • 风险控制措施:配置了基于关键指标的实时告警系统,并部署了自动回滚机制。一旦监控到异常,系统能够迅速将流量切回 x86 集群,确保问题得到快速遏制。
  • 阶段一:小流量验证 (5% 流量)

  • 部署 5 台 C8g 实例集群,切换少量线上生产流量。
  • 核心任务:监控关键性能指标(应用响应时间、错误率、资源利用率),验证核心功能的完整性与正确性。
  • 阶段二:扩大验证 (30% 流量)

  • 生产集群逐步扩展至 15 台 C8g 实例。
  • 核心任务:在新旧集群间进行性能数据对比分析,并收集小范围用户反馈,确保体验一致。
  • 阶段三:全量切换 (100% 流量)

  • 根据核心业务与性能数据,将所有 30+ 台 C8g 实例部署上线,此时已经可以承载原近百台的C5集群的业务流量。
  • 平滑地将全部流量切换至新集群,并正式下线原有的 C5 实例。
  • 核心任务:进入持续监控和长期优化阶段。
  • 五、成果总结:超预期的性能突破

实际性能提升结果:满足同样的业务需求,升级前近百台C5虚拟机集群可处理500MB/秒的请求,峰值5.56MB/秒/台,升级后仅用30+台C8g 虚拟机集群就可处理1.15GB/秒的请求, 峰值32.86MB/秒/台 。

集群节点数容量变化:

集群网络承载容量变化:

从 X86 到 AWS Graviton 的迁移,不仅是一次技术架构的升级,更是一次驱动业务向前发展的战略决策。通过精心的规划、严谨的实施和持续的优化,我们最终实现了 3 倍的性能提升、61% 的实例缩减和 491% 的单实例效率增长,同时显著优化了总体拥有成本。

这个案例有力地证明,合合信息拥抱像 AWS Graviton 这样的新兴 ARM 架构不仅在技术上完全可行,更能为企业带来超乎想象的商业回报,为数字化转型注入强劲的创新动力。

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者

|

AWS 架构师中心: 云端创新的引领者

探索 AWS 架构师中心,获取经实战验证的最佳实践与架构指南,助您高效构建安全、可靠的云上应用


AWS代付、代充值免实名

联系我们阿里云国际免实名