AWS代付、代实名
阿里云国际 | 腾讯云国际

使用CloudWatch收集ECS Task自定义Prometheus指标

AWS账单代付阅读(13)

亚马逊AWS官方博客

使用CloudWatch收集ECS Task自定义Prometheus指标

概述

在当今云原生应用程序的快速发展中,容器化已成为现代应用部署的标准方式。随着企业将越来越多的工作负载迁移到容器环境,尤其是在 Amazon ECS (Elastic Container Service) 这样的容器编排平台上,有效监控这些分布式系统的能力变得至关重要。企业在采用容器技术的同时,面临着前所未有的可观测性挑战,比如如何收集JVM/Nginx 等 Prometheus特定指标。在AWS 有两种收集Prometheus 指标的方式,一种是通过ADOT(

AWS Distro for OpenTelemetry ),ADOT支持service 的方式运行在ECS 集群,这种方式不支持收集应用内部的指标比如JVM,具体详见链接1;ADOT 支持另外一种SideCar 的部署方式,此方式支持收集JVM 等应用指标,但是部署方式对应用有一定的侵入性。另外一种方案是通过Amazon CloudWatch Agent来收集应用内部的指标,CloudWatch Agent 支持service 的方式运行,能与Container Insights和其他AWS 生态系统深度集成,可以实现: 全面掌握容器健康状况:从基础设施到应用层面的端到端可见性,包括 CPU、内存利用率、网络流量等基础指标,以及应用特定的自定义指标。 简化多服务架构监控:在微服务架构中,一个应用可能由数十甚至数百个服务组成,每个服务都需要单独监控。通过 Prometheus 集成,可以统一收集和分析这些数据。 实现主动式问题排查:通过设置基于 Prometheus 指标的告警,运维团队可以在问题影响用户体验之前发现并解决它们。

预构建某些工作负载的** **dashboards

对于EKS ,EC2 和ECS 都要预构建好的指标和dashboard,比如以下的ECS dashboard:

  • AWS App Mesh (服务网格)
  • Java/JMX Java/JMX应用
  • NGINX(Nginx应用)
  • NGINX Plus (Nginx Plus应用)

但是通常客户有自定义指标的需求,比如jvm_gc_collection 等指标,这时候就需要配置CloudWatch agent 的自定义指标收集,接下来我们将通过一个例子来演示配置过程。

工作原理

Amazon ECS Container Insights 集成 Prometheus 监控,通过 CloudWatch Agent 自动发现和采集容器化应用的 Prometheus 指标。

  • 应用侧需要通过prometheus 各种exporter 来把指标主动暴露出来,比如接下来我们将使用的JMX_exporter
  • 我们安装好CloudWatch agent service,并授予适当的CloudWatch agent ECS task role使其实现ECS cluster的自动服务发现
  • 准备好‘PROMETHEUS_CONFIG_CONTENT’和‘CloudWatch_CONFIG_CONTENT’配置,用于指标抓取和过滤
  • CloudWatch agent把抓取过来的EMF 格式存储在CloudWatch logs,然后再转换成prometheus 的metric 展示在CloudWatch 面板

Container Insights 支持 Prometheus 指标的以下启动类型和网络模式组合:

|Amazon ECS launch type

|Network modes supported

|EC2 (Linux)

|bridge, host, and awsvpc

|Fargate

|awsvpc

实施过程

通过** **Cloudformation ** **安装** **CloudWatch agent

设置环境变量

export AWS_PROFILE=your_aws_config_profile_eg_default

export AWS_DEFAULT_REGION=your_aws_region_eg_ap-southeast-1

export ECS_CLUSTER_NAME=your_ec2_ecs_cluster_name

export ECS_LAUNCH_TYPE=FARGATE

export CREATE_IAM_ROLES=True

export ECS_CLUSTER_SECURITY_GROUP=your_security_group_eg_sg-xxxxxxxxxx

export ECS_CLUSTER_SUBNET=your_subnet_eg_subnet-xxxxxxxxxx

export ECS_TASK_ROLE_NAME=ecsTaskExecutionRole

export ECS_EXECUTION_ROLE_NAME=ecsExecutionRole

下载模板

curl -O https://raw.githubuserconten[已去除短链接]m/aws-samples/amazon-cloudwatch-container-insights/latest/ecs-task-definition-templates/deployment-mode/replica-service/cwagent-prometheus/cloudformation-quickstart/cwagent-ecs-prometheus-metric-for-awsvpc.yaml

创建 CloudFormation 堆栈

aws cloudformation create-stack \

–stack-name CWAgent-Prometheus-ECS-${ECS_CLUSTER_NAME}-${ECS_LAUNCH_TYPE}-awsvpc \

–template-body file://cwagent-ecs-prometheus-metric-for-awsvpc.yaml \

–parameters \

ParameterKey=ECSClusterName,ParameterValue=${ECS_CLUSTER_NAME} \

ParameterKey=CreateIAMRoles,ParameterValue=${CREATE_IAM_ROLES} \

ParameterKey=ECSLaunchType,ParameterValue=${ECS_LAUNCH_TYPE} \

ParameterKey=SecurityGroupID,ParameterValue=${ECS_CLUSTER_SECURITY_GROUP} \

ParameterKey=SubnetID,ParameterValue=${ECS_CLUSTER_SUBNET} \

ParameterKey=TaskRoleName,ParameterValue=${ECS_TASK_ROLE_NAME} \

ParameterKey=ExecutionRoleName,ParameterValue=${ECS_EXECUTION_ROLE_NAME} \

–capabilities CAPABILITY_NAMED_IAM \

–region ${AWS_DEFAULT_REGION}

部署完后会创建两个Role

CloudWatch agent** **自动发现** **ECS cluster

Prometheus 社区有很多自动发现的方法,详见, 但是对于ECS cluster,我们需要CloudWatch agent 定期访问ECS cluster 的控制平面API 实现Task 的自动发现,他支持3种服务发现的模式:

  • Container docker label-based 自动发现
  • ECS task definition ARN 正则表达式自动发现
  • ECS service name 正则表达式自动发现

可以同时配置3种模式,CloudWatch Agent 可以开启de-duplicates 做指标的去重

去重机制 : 基于 {private_ip}:{port}/{metrics_path}

在控制台Systems Manager > Parameter Store 创建

prometheus-config 的参数:

global:

scrape_interval: 1m

scrape_timeout: 10s

scrape_configs:

  • job_name: cwagent-ecs-file-sd-config //名字需要跟cwagent-config mapping

sample_limit: 10000

file_sd_configs:

  • files: [ “/tmp/cwagent_ecs_auto_sd.yaml” ] //路径需要跟cwagent-config mapping
  • 配置CloudWatch Agent yaml

在控制台Systems Manager > Parameter Store 创建 cwagent-config 的参数:

{

“agent”: {

“debug”: true

},

“logs”: {

“metrics_collected”: {

“prometheus”: {

“cluster_name”: “testecs”, //必须要有,用于过滤

“log_group_name”: “/aws/ecs/containerinsights/testecs/prometheus”,//必须要有,用于过滤

“prometheus_config_path”: “env:PROMETHEUS_CONFIG_CONTENT”,

“ecs_service_discovery”: {

“sd_frequency”: “1m”,

“sd_result_file”: “/tmp/cwagent_ecs_auto_sd.yaml”, //名字跟上面的files匹配

“task_definition_list”: [

{

“sd_task_definition_arn_pattern”: “.*:task-definition/*” //使用task definition ARN 的正则匹配,可以根据业务要求做task 的过滤

}

]

},

“emf_processor”: {

“metric_declaration_dedup”: true, //启用自动去重

“metric_declaration”: [

{

“source_labels”: [“job”], //这个job 就是job_name

“label_matcher”: “cwagent-ecs-file-sd-config”, //名字需要跟job_name 匹配

“dimensions”: [[“ClusterName”,”TaskDefinitionFamily”]], //不同指标dimension不同

“metric_selectors”: [ //这里添加自定义指标,名字需要跟jmx_exporter暴露的名字一致

“^jvm_threads_current$”,

“^jvm_threads_peak$”,

“^jvm_gc_collection_seconds_count$”,

“^jvm_gc_collection_seconds_sum$”,

“^jvm_classes_currently_loaded$”,

“^jvm_memory_max_bytes$”,

“^jvm_memory_used_bytes$”

]

},

{

“source_labels”: [“job”],

“label_matcher”: “cwagent-ecs-file-sd-config”,

“dimensions”: [[“ClusterName”,”TaskDefinitionFamily”,”pool”]],

“metric_selectors”: [

“^jvm_memory_pool_max_bytes$”,

“^jvm_memory_pool_used_bytes$”,

“^jvm_memory_pool_committed_bytes$”,

]

}

]

}

}

},

“force_flush_interval”: 5

}

}

准备测试用的Java 应用

从Github repo下载相关文件,Docker file 如下, 其中9404 端口用于prometheus 指标暴露:

使用OpenJDK 17作为基础镜像

FROM openjdk:17-jre-slim

LABEL description=”Simple Java Web Application with JMX Exporter for ECS”

创建应用目录

RUN mkdir -p /opt/app

设置工作目录

WORKDIR /opt/app

复制JMX Exporter jar文件

COPY jmx_prometheus_javaagent-0.20.0.jar /opt/app/

复制Web应用程序jar文件(使用80端口版本)

COPY simple-web-app.jar /opt/app/

复制JMX配置文件

COPY config.yaml /opt/app/

安装curl用于健康检查

RUN apt-get update && \

apt-get install -y curl && \

rm -rf /var/lib/apt/lists/* && \

apt-get clean

创建非root用户

RUN groupadd -r appuser && useradd -r -g appuser appuser

RUN chown -R appuser:appuser /opt/app

USER appuser

暴露端口

EXPOSE 80 9404

设置健康检查

HEALTHCHECK –interval=30s –timeout=10s –start-period=40s –retries=3 \

CMD curl -f http://localhost/health || exit 1

启动命令

CMD [“java”, “-javaagent:jmx_prometheus_javaagent-0.20.0.jar=9404:config.yaml”, “-jar”, “simple-web-app.jar”]

其中config.yaml ,当前是暴露所有的指标(pattern: “.*”),如果需要做指标rename 和 过滤,可以适当修改

jmx_prometheus_javaagent-0.20.0.jar 可以从prometheus 官网下载,不同版本可能指标名字会不一样

镜像推送到** **AWS ECR

创建** **Task definition ** **并运行** **Task

{

“taskDefinitionArn”: “arn:aws:ecs:ap-southeast-1:your-id:task-definition/simple-web-app-task:3”,

“containerDefinitions”: [

{

“name”: “jmxsample-container”,

“image”: “your-id.dkr.ecr.ap-southeast-1.amazonaws.com/simple-web-app”,

“cpu”: 0,

“portMappings”: [

{

“containerPort”: 80,

“hostPort”: 80,

“protocol”: “tcp”

},

{

“containerPort”: 9404,

“hostPort”: 9404,

“protocol”: “tcp”

}

],

“essential”: true,

“environment”: [],

“mountPoints”: [],

“volumesFrom”: [],

“dockerLabels”: {

“Java_EMF_Metrics”: “true”

},

“logConfiguration”: {

“logDriver”: “awslogs”,

“options”: {

“awslogs-group”: “/ecs/simple-web-app”,

“awslogs-region”: “ap-southeast-1”,

“awslogs-stream-prefix”: “ecs”

}

},

“systemControls”: []

}

],

“family”: “simple-web-app-task”,

“executionRoleArn”: “arn:aws:iam::your-id:role/ecsTaskExecutionRole”,

“networkMode”: “awsvpc”,

“revision”: 3,

“volumes”: [],

“status”: “ACTIVE”,

“requiresAttributes”: [

{

“name”: “com.amazonaws.ecs.capability.logging-driver.awslogs”

},

{

“name”: “ecs.capability.execution-role-awslogs”

},

{

“name”: “com.amazonaws.ecs.capability.ecr-auth”

},

{

“name”: “com.amazonaws.ecs.capability.docker-remote-api.1.19”

},

{

“name”: “ecs.capability.execution-role-ecr-pull”

},

{

“name”: “com.amazonaws.ecs.capability.docker-remote-api.1.18”

},

{

“name”: “ecs.capability.task-eni”

}

],

“placementConstraints”: [],

“compatibilities”: [

“EC2”,

“FARGATE”

],

“requiresCompatibilities”: [

“FARGATE”

],

“cpu”: “256”,

“memory”: “512”,

“registeredAt”: “2025-07-15T12:38:39.844Z”,

“registeredBy”: “arn:aws:iam::your-id:user/iamuser”,

“tags”: []

}

Task 安全组需要放开80 和 9404端口

运行Task

等Task 运行后,在控制台找到Task 的私网IP

测试指标是否能暴露

CloudWatch ** **验证** **metric ** **查看

可以在控制台看到有新的ECS/ContainerInsights/Prometheus namespace

点击进去不同的维度,可以看到指标已经收集上来

费用

自定义指标的费用,详情参考官网链接

总结

本文讲述了在AWS ECS 收集prometheus 指标的方法和原理,AWS Container Insights 可以收集预定义的部分prometheus指标,当需要收集自定义指标的时候需要手动配置CloudWatch agent,本文通过一个简单的Java Web 应用演示如何通过jmx exporter 收集JMX 自定义指标。基于此方案, 合作伙伴成功帮助制造业头部客户‘雅马哈发动机’在Amazon ECS 上实现了prometheus 指标监控方案的优化。

参考链接:

https://aws-otel.github.io/docs/getting-started/collector/sidecar-vs-service

https://github.com/aws-samples/prometheus-for-ecs

https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/ContainerInsights-Prometheus-install-ECS.html

https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/ContainerInsights-Prometheus-Setup-configure-ECS.html

https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/ContainerInsights-Prometheus-metrics.html#ContainerInsights-Prometheus-metrics-jmx

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。


在Amazon Kinesis Video Streams WebRTC中增加H265编解码支持

AWS账单代付阅读(13)

亚马逊AWS官方博客

在Amazon Kinesis Video Streams WebRTC中增加H265编解码支持

1.概述

本文主要介绍Amazon Kinesis Video Streams WebRTC是什么,H.265编解码器相比H.264的技术优势,在Amazon Kinesis Video Streams WebRTC中集成H.265编解码器的完整技术方案,详细实现步骤,以及生产环境部署的注意事项。

通过本文,您将学会如何利用H.265编解码器在保持视频质量的同时显著降`低带宽消耗,提升Amazon Kinesis Video Streams WebRTC应用的整体性能表现。

2.背景介绍

2.1 Amazon Kinesis Video Streams WebRTC简介

Amazon Kinesis Video Streams 提供符合标准的 WebRTC 实现作为完全托管的功能。您可以使用 Amazon Kinesis Video Streams with WebRTC 安全地进行媒体的实时流式传输,或在任何摄像头 IoT 设备与符合 WebRTC 的移动或 Web 播放器之间执行双向音频或视频交互。借助这项全面托管的功能,您不必构建、运营或扩展任何与 WebRTC 相关的云基础设施(例如信令或媒体中继服务器)便能安全地在应用程序和设备间流式传输媒体。

2.1.1 Amazon KVS WebRTC** **的拓扑结构

2.1.2 Amazon KVS WebRTC** **的架构图

2.2 Amazon Kinesis Video Streams WebRTC** **支持的编解码器** **音频

  • G.711 A-Law
  • G.711 U-Law
  • Opus
  • 视频

  • H.264
  • VP8
  • 2.3 H.265** **编解码器技术优势

    2.4 H265** **对比H264的优势

  • 压缩率更高,相同清晰度的前提下,码率只有一半
  • 对网络带宽要求低,弱网更顺畅
  • 相同码率的前提下,清晰度更高
  • Relay时DTO费用更低
  • 能支持更大像素的IPC产品
  • 2.5 H265** **支持情况

  • iOS从iPhone7开始就已经支持H265硬件编解码
  • Android从2016年机型开始大多支持H265硬件编解码
  • 安防芯片方案,几乎都支持H265硬件编码
  • 2.6 ** **为什么在KVS WebRTC中需要H.265支持

  • 对网络带宽要求低,弱网更顺畅
  • 相同码率的前提下,清晰度更高
  • Relay时DTO费用更低
  • 能支持更大像素的IPC产品
  • 3. 技术实现方案

    3.1 ** **整体架构设计

  • 修改设备端SDK
  • 修改客户端NDK
  • 无需修改服务端KVS
  • 3.2 ** **关键技术实现点

3.2.1 SDP 中添加H265编解码格式 a=rtpmap:127 H265/90000 SDP(Session Description Protocol)中的”a=rtpmap”用于描述RTP(Real-time Transport Protocol)流中的编解码器参数。具体来说,它指定了RTP负载中使用的编解码器以及它们的时钟速率。在”a=rtpmap:127 H265/90000″中,”127″是RTP负载类型(Payload Type),用于标识RTP负载中使用的媒体类型。”H265″表示使用的编解码器为H.265(也称为HEVC),”90000″是该编解码器的时钟速率,单位是Hz。 设备端代码修改:amazon-kinesis-video-streams-webrtc-sdk-c/src/source/PeerConnection/SessionDescription.c iOS 客户端修改:webrtc/sdk/objc/components/video_codec/RTCDefaultVideoDecoderFactory.m Android 客户端修改:webrtc/sdk/android/src/java/org/webrtc/MediaCodecVideoDecoderFactory.java

3.2.2 RTP 载荷的封包与解包 参照文档:H265的Rtp封包与解包,参照H265的Rtp规范rfc7798,链接:https://www.rfc-editor.org/rfc/rfc7798.html 特别注意:H265的NAL包会大于MTU 1200,所以需要对NAL进行分包,否则会异常,分包策略,rfc7798的文档中有介绍,格式如下图 设备端代码实现:amazon-kinesis-video-streams-webrtc-sdk-c/src/source/Rtp/Codecs/RtpH265Payloader.c iOS 和Android的NDK实现:webrtc/modules/video_coding/h265_vps_sps_pps_tracker.cc

3.2.3 实际数据的H265编解码 设备端编码:芯片自带H265的硬编码,绝大多数方案商已经缴纳专利费,芯片有相应SDK提供接口获取H265帧 iOS 客户端解码:webrtc/sdk/objc/components/video_codec/RTCVideoDecoderFactoryH265.m Android 客户端:webrtc/sdk/android/src/java/org/webrtc/HardwareVideoDecoder.java

4. 具体实现步骤

4.1 ** **环境准备

修改涉及到设备端SDK的修改和客户端NDK的修改,需要具备代码编辑和编译环境,其中设备端使用Linux进行编辑和编译,Android也使用Linux进行编译,iOS则使用Mac环境进行编译

4.2 ** **设备端实现

4.2.1 H.265 RTP** **载荷处理器

  • src/source/PeerConnection/Rtp/Codecs/RtpH265Payloader.h – 头文件定义
  • src/source/PeerConnection/Rtp/Codecs/RtpH265Payloader.c – 实现文件

STATUS createPayloadFromNaluH265(UINT32 mtu, PBYTE nalu, UINT32 naluLength, PPayloadArray pPayloadArray, PUINT32 filledLength,

PUINT32 filledSubLenSize)

{

ENTERS();

STATUS retStatus = STATUS_SUCCESS;

PBYTE pPayload = NULL;

UINT8 naluType = 0;

UINT32 maxPayloadSize = 0;

UINT32 curPayloadSize = 0;

UINT32 remainingNaluLength = naluLength;

UINT32 payloadLength = 0;

UINT32 payloadSubLenSize = 0;

PBYTE pCurPtrInNalu = NULL;

BOOL sizeCalculationOnly = (pPayloadArray == NULL);

CHK(nalu != NULL && filledLength != NULL && filledSubLenSize != NULL, STATUS_NULL_ARG);

CHK(sizeCalculationOnly || (pPayloadArray->payloadSubLength != NULL && pPayloadArray->payloadBuffer != NULL), STATUS_NULL_ARG);

CHK(mtu > H265_FU_HEADER_SIZE, STATUS_RTP_INPUT_MTU_TOO_SMALL);

naluType = (nalu[0] & 0x7E) >> 1; // 6 bits after forbidden zero bit 0x7E([已去除电话])

if (!sizeCalculationOnly) {

pPayload = pPayloadArray->payloadBuffer;

}

if (naluLength <= mtu) {

// Single NALU https://www.rfc-editor.org/rfc/rfc7798.html#section-4.4.1

payloadLength += naluLength;

payloadSubLenSize++;

if (!sizeCalculationOnly) {

CHK(payloadSubLenSize <= pPayloadArray->maxPayloadSubLenSize && payloadLength <= pPayloadArray->maxPayloadLength,

STATUS_BUFFER_TOO_SMALL);

MEMCPY(pPayload, nalu, naluLength);

pPayloadArray->payloadSubLength[payloadSubLenSize – 1] = naluLength;

pPayload += pPayloadArray->payloadSubLength[payloadSubLenSize – 1];

}

} else {

// Fragmentation units: https://www.rfc-editor.org/rfc/rfc7798.html#section-4.4.3

maxPayloadSize = mtu – H265_FU_HEADER_SIZE;

// According to the RFC, the first octet is skipped due to redundant information

remainingNaluLength -= 2;

pCurPtrInNalu = nalu + 2;

while (remainingNaluLength != 0) {

curPayloadSize = MIN(maxPayloadSize, remainingNaluLength);

payloadSubLenSize++;

payloadLength += H265_FU_HEADER_SIZE + curPayloadSize;

if (!sizeCalculationOnly) {

CHK(payloadSubLenSize <= pPayloadArray->maxPayloadSubLenSize && payloadLength <= pPayloadArray->maxPayloadLength,

STATUS_BUFFER_TOO_SMALL);

pPayload[0] = (H265_FU_TYPE_ID << 1) | (nalu[0] & 0x81) | (nalu[0] & 0x1); // H265_FU_TYPE_ID indicator is 49

pPayload[1] = nalu[1] & 0xff;

pPayload[2] = naluType & 0x3f;

if (remainingNaluLength == naluLength – 2) {

pPayload[2] |= (1 << 7); // Set for starting bit

} else if (remainingNaluLength == curPayloadSize) {

pPayload[2] |= (1 << 6); // Set for ending bit

}

MEMCPY(pPayload + H265_FU_HEADER_SIZE, pCurPtrInNalu, curPayloadSize);

pPayloadArray->payloadSubLength[payloadSubLenSize – 1] = H265_FU_HEADER_SIZE + curPayloadSize;

pPayload += pPayloadArray->payloadSubLength[payloadSubLenSize – 1];

}

pCurPtrInNalu += curPayloadSize;

remainingNaluLength -= curPayloadSize;

}

}

CleanUp:

if (STATUS_FAILED(retStatus) && sizeCalculationOnly) {

payloadLength = 0;

payloadSubLenSize = 0;

}

if (filledLength != NULL && filledSubLenSize != NULL) {

*filledLength = payloadLength;

*filledSubLenSize = payloadSubLenSize;

}

LEAVES();

return retStatus;

}

  • 遵循RFC 7798标准,支持分片单元(FU)处理
  • 关键常量:H265_FU_HEADER_SIZE (3), H265_FU_TYPE_ID (49)
  • 4.2.2 ** **核心处理函数

  • createPayloadForH265 – 创建H.265载荷
  • getNextNaluLengthH265 – 获取下一个NALU长度
  • createPayloadFromNaluH265 – 从NALU创建载荷
  • depayH265FromRtpPayload – 从RTP载荷中解包H.265
  • 4.2.3 ** **编解码器集成

  • 编解码器枚举:RTC_CODEC_H265
  • 默认载荷类型:DEFAULT_PAYLOAD_H265 (127)
  • RTX重传支持:RTC_RTX_CODEC_H265
  • 4.2.4 ** **会话描述协议处理

  • src/source/PeerConnection/SessionDescription.c 中的H.265 SDP处理
  • SDP值配置:”H265/90000″ 用于rtpmap
  • 格式参数:DEFAULT_H265_FMTP 中的profile参数
  • 4.2.5 ** **样本帧文件

  • samples/h265SampleFrames/ 目录包含
  • 1,500个H.265样本帧文件

  • 文件大小范围:329字节到11,840字节
  • 涵盖I帧(关键帧)和P帧(预测帧)的完整测试集
  • 4.2.6 ** **数据包处理

  • src/source/PeerConnection/Rtp.c 中的H.265载荷创建
  • 时间戳处理和RTP数据包封装
  • 支持大于MTU的NALU单元的分片处理
  • 4.2.7 ** **样本程序支持

  • kvsWebrtcClientMaster – 支持H.265视频编解码器参数
  • kvsWebrtcClientMasterGstSample – GStreamer H.265集成
  • kvsWebrtcClientViewer – H.265接收和解码

这些组件共同构成了完整的H.265 WebRTC传输管道,从编码、RTP封装、SDP协商到最终的网络传输,提供了全面的H.265视频编解码器支持。

4.3 ** **客户端实现

4.3.1 ** **核心配置

  • 构建开关: rtc_use_h265 在 build_overrides/build.gni 中定义
  • 平台支持: 默认在 Windows、iOS、Android 平台启用H.265
  • 编译宏: 使用 DISABLE_H265 宏来控制H.265功能的编译
  • 4.3.2 H.265** **解析和处理模块 (common_video/h265/)

  • h265_common.h/cc – H.265通用定义和NALU处理
  • h265_bitstream_parser.h/cc – H.265比特流解析器
  • h265_sps_parser.h/cc – 序列参数集(SPS)解析
  • h265_pps_parser.h/cc – 图像参数集(PPS)解析
  • h265_vps_parser.h/cc – 视频参数集(VPS)解析
  • 4.3.3 RTP** **打包和解包 (modules/rtp_rtcp/source/)

  • rtp_format_h265.h/cc – H.265 RTP打包器
  • video_rtp_depacketizer_h265.h/cc – H.265 RTP解包器
  • h265_sps_parser.h/cc – RTP层的SPS解析
  • 4.3.4 ** **视频编解码接口 (modules/video_coding/)

  • h265_vps_sps_pps_tracker.h/cc – H.265参数集跟踪器
  • h265_globals.h – H.265全局定义和数据结构
  • 支持H.265的编解码器接口集成
  • 4.3.5 API** **层支持 (api/video_codecs/)

  • video_codec_type.h – 定义 kVideoCodecH265 枚举
  • video_codec.h/cc – H.265编解码器配置
  • video_encoder.h/cc – H.265编码器接口
  • 4.3.6 ** **平台特定实现

  • iOS/macOS: sdk/objc/components/video_codec/ 下的H.265硬件编解码器
  • RTCVideoEncoderH265.h/mm
  • RTCVideoDecoderH265.h/mm
  • RTCVideoEncoderFactoryH265.h/m
  • RTCVideoDecoderFactoryH265.h/m
  • Android: sdk/android/src/jni/ 下的JNI包装器
  • 4.3.7 ** **媒体常量定义 (media/base/)

  • media_constants.h/cc – H.265编解码器名称和FMTP参数定义
  • 4.3.8 ** **关键特性

  • NALU类型支持: 支持VPS、SPS、PPS、IDR、CRA等NALU类型
  • RTP打包模式: 支持单NALU、聚合包(AP)、分片单元(FU)
  • 参数集管理: 自动跟踪和管理VPS/SPS/PPS参数集
  • 硬件加速: 在支持的平台上使用硬件编解码器

这个WebRTC工程提供了完整的H.265/HEVC支持,包括比特流解析、RTP传输、编解码器接口和平台特定的硬件加速实现。

5. 测试与验证

5.1 ** **设备端测试

5.1.1 ** **下载仓库

5.1.2 ** **编译

5.1.3 ** **配置

5.1.4 ** **执行

5.2 ** **客户端测试

5.2.1 ** **下载仓库

5.2.2 ** **配置

{

“Version”: “1.0”,

“CredentialsProvider”: {

“CognitoIdentity”: {

“Default”: {

“PoolId”: “us-west-2:[已去除电话]-89ab-cdef-[已去除电话]89abcdef”,

“Region”: “us-west-2”

}

}

},

“IdentityManager”: {

“Default”: {}

},

“CognitoUserPool”: {

“Default”: {

“AppClientSecret”: “abcdefghijklmnopqrstuvwxyz[已去除电话]abcdefghijklmno”,

“AppClientId”: “[已去除电话]abcdefghijklmnop”,

“PoolId”: “us-west-2_qRsTuVwXy”,

“Region”: “us-west-2”

}

}

}

5.2.3 ** **编译

5.2.4 ** **执行** **Offer** **信息

  • Answer信息
  • 实时画面

    6. 总结

本文介绍了如何在Amazon Kinesis Video Streams WebRTC中成功集成H.265编解码器支持。通过详细的技术方案设计、完整的实现步骤和全面的测试验证,我们展示了H.265在实时视频通信场景下的显著优势。


在Amazon Kinesis Video Streams WebRTC中增加AAC编解码支持

AWS账单代付阅读(10)

亚马逊AWS官方博客

在Amazon Kinesis Video Streams WebRTC中增加AAC编解码支持

1.概述

本文主要介绍Amazon Kinesis Video Streams WebRTC是什么,AAC编解码器相比现有音频编解码器的技术优势,在Amazon KVS WebRTC中集成AAC编解码器的完整技术方案,详细实现步骤,性能对比测试结果,以及生产环境部署的注意事项。

通过本文,您将学会如何利用AAC编解码器在保持音频质量的同时显著降低带宽消耗,提升Amazon Kinesis Video Streams WebRTC应用的整体音频性能表现。

2.背景介绍

2.1 Amazon Kinesis Video Streams WebRTC** **简介

Amazon Kinesis Video Streams 提供符合标准的 WebRTC 实现作为完全托管的功能。您可以使用 Amazon Kinesis Video Streams with WebRTC 安全地进行媒体的实时流式传输,或在任何摄像头 IoT 设备与符合 WebRTC 的移动或 Web 播放器之间执行双向音频或视频交互。借助这项全面托管的功能,您不必构建、运营或扩展任何与 WebRTC 相关的云基础设施(例如信令或媒体中继服务器)便能安全地在应用程序和设备间流式传输媒体。

2.1.1 Amazon KVS WebRTC** **的拓扑结构

2.1.2 Amazon KVS WebRTC** **的架构图

2.2 Amazon Kinesis Video Streams WebRTC** **支持的编解码器** **音频

  • G.711 A-Law
  • G.711 U-Law
  • Opus
  • 视频

  • H.264
  • VP8
  • 2.3 AAC** **编解码器技术优势

AAC(Advanced Audio Coding)是一种高效的音频编解码器,相比传统的音频编解码器具有以下优势:

更高的压缩效率:相同音质下,AAC比MP3节省约30%的带宽 更好的音质:在低码率下仍能保持较高的音频质量 广泛的设备支持:几乎所有现代设备都支持AAC解码 低延迟:适合实时音频传输场景 多声道支持:支持立体声和多声道音频

2.4 AAC** **支持情况

AAC音频格式支持非常广泛,在各端的支持情况如下

iOS:从iOS 3.0开始原生支持AAC硬件编解码 Android:从Android 3.0(API Level 11)开始支持AAC编解码 IoT 设备:大多数音频芯片方案都支持AAC硬件编码

2.5 ** **为什么在KVS WebRTC中需要AAC支持

KVS WebRTC中已经有了很多的音频格式支持,需要AAC支持的原因如下

降低带宽消耗:在保持音质的前提下减少网络带宽使用 提升用户体验:更好的音频质量和更低的延迟 降低成本:减少数据传输费用 设备兼容性:更好地支持移动设备和IoT设备

3.

技术实现方案

3.1 ** **整体架构设计

  • 修改设备端SDK
  • 修改客户端NDK
  • 无需修改服务端KVS
  • 3.2 ** **关键技术实现点

3.2.1 SDP 中添加AAC编解码格式 设备端代码修改:amazon-kinesis-video-streams-webrtc-sdk-c/src/source/PeerConnection/SessionDescription.c iOS 客户端修改:webrtc/sdk/objc/components/audio_codec/RTCDefaultAudioDecoderFactory.m Android 客户端修改:webrtc/sdk/android/src/java/org/webrtc/MediaCodecAudioDecoderFactory.java

3.2.2 RTP** **载荷的封包与解包

AAC的RTP封包遵循RFC 3640标准,主要特点:

  • 支持多个AAC帧在单个RTP包中传输
  • 使用AU-headers来描述每个音频帧
  • 支持分片传输大的AAC帧

参照文档:AAC的RTP封包与解包,参照RFC 3640标准

链接:https://www.rfc-editor.org/rfc/rfc3640.html

设备端代码实现

  • amazon-kinesis-video-streams-webrtc-sdk-c/src/source/Rtp/Codecs/RtpAacPayloader.c

iOS 和Android的NDK实现

  • webrtc/modules/audio_coding/aac_config_parser.cc

3.2.3 实际数据的AAC编解码 设备端编码:大多数音频芯片都支持AAC硬件编码,通过相应SDK接口获取AAC音频帧 iOS 客户端解码

  • webrtc/sdk/objc/components/audio_codec/RTCAudioDecoderAAC.m

Android 客户端解码

  • webrtc/sdk/android/src/java/org/webrtc/HardwareAudioDecoder.java
  • 4.具体实现步骤

    4.1 ** **环境准备

修改涉及到设备端SDK的修改和客户端NDK的修改,需要具备代码编辑和编译环境:

  • 设备端使用Linux进行编辑和编译
  • Android使用Linux进行编译
  • iOS使用Mac环境进行编译
  • 4.2 ** **设备端实现

4.2.1 AAC RTP 载荷处理器 核心文件

  • src/source/PeerConnection/Rtp/Codecs/RtpAacPayloader.h – 头文件定义
  • src/source/PeerConnection/Rtp/Codecs/RtpAacPayloader.c – 实现文件
  • define LOG_CLASS “RtpAacPayloader”

    include “../../Include_i.h”

STATUS createPayloadForAac(UINT32 mtu, PBYTE aacFrame, UINT32 aacFrameLength, PBYTE payloadBuffer, PUINT32 pPayloadLength, PUINT32 pPayloadSubLength,

PUINT32 pPayloadSubLenSize)

{

UNUSED_PARAM(mtu);

ENTERS();

STATUS retStatus = STATUS_SUCCESS;

UINT32 payloadLength = 0;

UINT32 payloadSubLenSize = 0;

BOOL sizeCalculationOnly = (payloadBuffer == NULL);

CHK(aacFrame != NULL && pPayloadSubLenSize != NULL && pPayloadLength != NULL && (sizeCalculationOnly || pPayloadSubLength != NULL),

STATUS_NULL_ARG);

payloadLength = aacFrameLength;

payloadSubLenSize = 1;

// Only return size if given buffer is NULL

CHK(!sizeCalculationOnly, retStatus);

CHK(payloadLength <= *pPayloadLength && payloadSubLenSize <= *pPayloadSubLenSize, STATUS_BUFFER_TOO_SMALL);

MEMCPY(payloadBuffer, aacFrame, aacFrameLength);

pPayloadSubLength[0] = aacFrameLength;

CleanUp:

if (STATUS_FAILED(retStatus) && sizeCalculationOnly) {

payloadLength = 0;

payloadSubLenSize = 0;

}

if (pPayloadSubLenSize != NULL && pPayloadLength != NULL) {

*pPayloadLength = payloadLength;

*pPayloadSubLenSize = payloadSubLenSize;

}

LEAVES();

return retStatus;

}

STATUS depayAacFromRtpPayload(PBYTE pRawPacket, UINT32 packetLength, PBYTE pAacData, PUINT32 pAacLength, PBOOL pIsStart)

{

ENTERS();

STATUS retStatus = STATUS_SUCCESS;

UINT32 aacLength = 0;

BOOL sizeCalculationOnly = (pAacData == NULL);

CHK(pRawPacket != NULL && pAacLength != NULL, STATUS_NULL_ARG);

CHK(packetLength > 0, retStatus);

aacLength = packetLength;

CHK(!sizeCalculationOnly, retStatus);

CHK(aacLength <= *pAacLength, STATUS_BUFFER_TOO_SMALL);

MEMCPY(pAacData, pRawPacket, aacLength);

CleanUp:

if (STATUS_FAILED(retStatus) && sizeCalculationOnly) {

aacLength = 0;

}

if (pAacLength != NULL) {

*pAacLength = aacLength;

}

if (pIsStart != NULL) {

*pIsStart = TRUE;

}

LEAVES();

return retStatus;

}

4.2.2 ** **核心处理函数

  • createPayloadForAac – 创建AAC载荷
  • getNextAacFrameLength – 获取下一个AAC帧长度
  • createPayloadFromAacFrame – 从AAC帧创建载荷
  • depayAacFromRtpPayload – 从RTP载荷中解包AAC
  • 4.2.3 ** **编解码器集成

  • 编解码器枚举:RTC_CODEC_AAC
  • 默认载荷类型:DEFAULT_PAYLOAD_AAC (97)
  • SDP配置:”AAC/16000/1″ 用于rtpmap
  • 4.2.4 ** **核心枚举和常量定义

在 src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h 中:

  • 在 RTC_CODEC 枚举中添加了 RTC_CODEC_AAC = 8
  • 这使得AAC成为SDK支持的第8种编解码器
  • 4.2.5 SDP** **协商相关配置

在 src/source/PeerConnection/SessionDescription.h 中:

  • 定义AAC的SDP描述值:#define AAC_VALUE “AAC/16000”
  • 设置AAC的默认payload类型:#define DEFAULT_PAYLOAD_AAC (UINT64) 96
  • 配置AAC的时钟频率:#define AAC_CLOCKRATE (UINT64) 16000
  • 定义AAC的格式参数:#define DEFAULT_AAC_FMTP “profile-level-id=1; mode=AAC-hbr; config=F8F1; SizeLength=13; IndexLength=3; IndexDeltaLength=3”
  • 4.2.6 SDP** **协商处理逻辑

在 src/source/PeerConnection/SessionDescription.c 中:

  • 在codec表初始化时添加AAC支持:hashTableUpsert(codecTable, RTC_CODEC_AAC, DEFAULT_PAYLOAD_AAC)
  • 在SDP解析时识别AAC编解码器:检查AAC_VALUE并设置rtcCodec = RTC_CODEC_AAC
  • 在生成SDP offer/answer时包含AAC的rtpmap和fmtp属性
  • 在音频编解码器判断逻辑中包含AAC:isAudioCodec = (codec == RTC_CODEC_MULAW || codec == RTC_CODEC_ALAW || codec == RTC_CODEC_OPUS || codec == RTC_CODEC_AAC)
  • 4.2.7 RTP** **打包和解包实现

创建专门的AAC RTP处理文件: src/source/Rtp/Codecs/RtpAacPayloader.h:

  • 定义AAC RTP打包函数:createPayloadForAac()
  • 定义AAC RTP解包函数:depayAacFromRtpPayload()

src/source/Rtp/Codecs/RtpAacPayloader.c:

  • 实现了AAC RTP打包逻辑(直接复制AAC帧数据)
  • 实现了AAC RTP解包逻辑(直接提取AAC数据)
  • 4.2.8 RTP** **处理集成

在 src/source/PeerConnection/Rtp.c 中:

  • 在发送路径中添加AAC处理:

case RTC_CODEC_AAC:

rtpPayloadFunc = createPayloadForAac;

rtpTimestamp = CONVERT_TIMESTAMP_TO_RTP(AAC_CLOCKRATE, pFrame->presentationTs);

break;

在 src/source/PeerConnection/PeerConnection.c 中:

  • 在接收路径中添加AAC处理:

case RTC_CODEC_AAC:

depayFunc = depayAacFromRtpPayload;

clockRate = AAC_CLOCKRATE;

break;

  • 在编解码器表中注册AAC:hashTablePut(pKvsPeerConnection->pCodecTable, rtcCodec, DEFAULT_PAYLOAD_AAC)
  • 4.2.9 ** **示例应用程序支持

在 samples/Samples.h 中:

  • 定义AAC样本帧数量:#define NUMBER_OF_AAC_FRAME_FILES 206
  • 定义AAC编解码器名称:#define AUDIO_CODEC_NAME_AAC “aac”
  • 定义AAC帧持续时间:#define SAMPLE_AUDIO_AAC_FRAME_DURATION (64 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND)

在 samples/kvsWebRTCClientMaster.c 中:

  • 添加命令行参数解析支持AAC
  • 配置AAC的rolling buffer参数(32kbps比特率)
  • 实现AAC样本帧的读取和发送逻辑
  • 设置正确的帧间隔时间(64ms)

在 samples/kvsWebRTCClientViewer.c 中:

  • 添加AAC接收支持
  • 4.2.10 ** **样本数据文件

创建 samples/aacSampleFrames/ 目录:

  • 包含206个AAC样本帧文件(sample-001.aac 到 sample-206.aac)
  • 每个文件包含预编码的AAC音频数据
  • 4.3 ** **客户端实现

    4.3.1 ** **核心配置

  • 构建开关:rtc_use_aac 在 build_overrides/build.gni 中定义
  • 平台支持:默认在 Windows、iOS、Android 平台启用AAC
  • 编译宏:使用 DISABLE_AAC 宏来控制AAC功能的编译

4.3.2 AAC 解析和处理模块 common_audio/aac/ 目录下的文件

  • aac_common.h/cc – AAC通用定义和帧处理
  • aac_config_parser.h/cc – AAC配置解析器
  • aac_decoder_impl.h/cc – AAC解码器实现

4.3.3 RTP 打包和解包 modules/rtp_rtcp/source/ 目录下的文件

  • rtp_format_aac.h/cc – AAC RTP打包器
  • audio_rtp_depacketizer_aac.h/cc – AAC RTP解包器

4.3.4 平台特定实现 iOS/macOS

  • sdk/objc/components/audio_codec/RTCAudioEncoderAAC.h/mm
  • sdk/objc/components/audio_codec/RTCAudioDecoderAAC.h/mm

Android

  • sdk/android/src/jni/audio_codec_aac.cc
  • 4.3.5 ** **核心 AAC 编解码器实现

新增的第三方 AAC 库文件:

  • modules/third_party/aac/aac_enc_dec.h – AAC 编解码器头文件
  • modules/third_party/aac/aac_encode.c – AAC 编码器实现
  • modules/third_party/aac/aac_decode.c – AAC 解码器实现

这些文件实现了 AAC 编解码算法,支持 16kHz 采样率。

4.3.6 WebRTC ** **音频编码模块集成

新增的音频编码模块文件:

  • modules/audio_coding/codecs/aac/aac_interface.h – AAC 接口定义
  • modules/audio_coding/codecs/aac/aac_interface.cc – AAC 接口实现
  • modules/audio_coding/codecs/aac/audio_encoder_aac.h/cc – AAC 编码器封装
  • modules/audio_coding/codecs/aac/audio_decoder_aac.h/cc – AAC 解码器封装
  • modules/audio_coding/codecs/aac/audio_coder_aac_common.h/cc – AAC 通用功能
  • 4.3.7 API ** **层面的支持

新增的 API 文件:

  • api/audio_codecs/aac/audio_encoder_aac.h/cc – AAC 编码器 API

AudioEncoder::EncodedInfo AudioEncoderAACImpl::EncodeImpl(

uint32_t rtp_timestamp,

rtc::ArrayView audio,

rtc::Buffer* encoded) {

// 缓存音频数据直到有足够的帧

if (num_10ms_frames_buffered_ == 0)

first_timestamp_in_buffer_ = rtp_timestamp;

// 将多声道音频数据分离到各个声道缓冲区

const size_t start = kSampleRateHz / 100 * num_10ms_frames_buffered_;

for (size_t i = 0; i < kSampleRateHz / 100; ++i)

for (size_t j = 0; j < num_channels_; ++j)

encoders_[j].speech_buffer[start + i] = audio[i * num_channels_ + j];

// 检查是否有足够的帧进行编码

if (++num_10ms_frames_buffered_ < num_10ms_frames_per_packet_) {

return EncodedInfo();

}

// 对每个声道分别进行AAC编码

const size_t samples_per_channel = SamplesPerChannel();

size_t bytes_encoded = 0;

for (size_t i = 0; i < num_channels_; ++i) {

bytes_encoded = WebRtcAAC_Encode(

encoders_[i].encoder,

encoders_[i].speech_buffer.get(),

samples_per_channel,

encoders_[i].encoded_buffer.data());

if (bytes_encoded <= 0) {

return EncodedInfo();

}

}

// 构建编码结果

EncodedInfo info;

info.encoded_bytes = bytes_encoded;

info.encoded_timestamp = first_timestamp_in_buffer_;

info.payload_type = payload_type_;

info.encoder_type = CodecType::kAac;

return info;

}

  • api/audio_codecs/aac/audio_decoder_aac.h/cc – AAC 解码器 API

int AudioDecoderAACImpl::DecodeInternal(const uint8_t* encoded,

size_t encoded_len,

int sample_rate_hz,

int16_t* decoded,

SpeechType* speech_type) {

RTC_DCHECK_EQ(SampleRateHz(), sample_rate_hz);

int16_t temp_type = 1; // 默认为语音类型

// 调用底层AAC解码接口

size_t ret = WebRtcAAC_Decode(dec_state_, encoded, encoded_len,

decoded, &temp_type);

*speech_type = ConvertSpeechType(temp_type);

return static_cast(ret);

}

// 数据包时长计算

int AudioDecoderAACImpl::PacketDuration(const uint8_t* encoded,

size_t encoded_len) const {

// AAC: 每个样本每声道占用1/2字节

return static_cast(2 * encoded_len / Channels());

}

  • api/audio_codecs/aac/audio_encoder_aac_config.h – AAC 编码器配置
  • 4.3.8 ** **构建系统配置

BUILD.gn 文件修改:

  • 在 modules/audio_coding/BUILD.gn 中添加了 AAC 相关的构建目标
  • 在 api/audio_codecs/BUILD.gn 中集成 AAC 编解码器工厂
  • 在 webrtc.gni 中添加了 rtc_include_aac = true 配置选项
  • 4.3.9 ** **编解码器工厂集成

工厂类修改:

  • builtin_audio_encoder_factory.cc – 添加 AAC 编码器到内置工厂
  • builtin_audio_decoder_factory.cc – 添加 AAC 解码器到内置工厂
  • 在 AudioEncoder::CodecType 枚举中添加了 kAac = 7
  • 4.3.10. Android ** **平台支持

Android 特定修改:

  • WebRtcAudioManager.java – 强制设置采样率为 16000Hz
  • audio_record_jni.cc 和 audio_track_jni.cc – 改进音频设备初始化逻辑
  • 4.3.11 iOS ** **平台支持

iOS 特定修改:

  • RTCRtpCodecParameters.mm – 添加了 kRTCAacCodecName 常量
  • 4.3.12 ** **技术特性

AAC 实现特点:

  • 支持 16kHz 采样率
  • 支持单声道和立体声
  • 帧长度为 20ms (可配置为 10-60ms)
  • 比特率约为 28kbps (单声道)
  • 5.测试与验证

    5.1 ** **设备端测试

    5.1.1 ** **下载和编译

    5.1.2 ** **配置环境变量

export AWS_KVS_LOG_LEVEL=1

export AWS_KVS_CACERT_PATH=../certs/cert.pem

export AWS_DEFAULT_REGION=cn-north-1

export AWS_ACCESS_KEY_ID=

export AWS_SECRET_ACCESS_KEY=

export AWS_IOT_CORE_THING_NAME=demo-channel

export AWS_ENABLE_FILE_LOGGING=TRUE

5.1.3 ** **执行测试

5.2 ** **客户端测试

5.2.1 ** **下载仓库

5.2.2 Android** **客户端配置

在awsconfiguration.json中配置:

{

“Version”: “1.0”,

“CredentialsProvider”: {

“CognitoIdentity”: {

“Default”: {

“PoolId”: “us-west-2:[已去除电话]-89ab-cdef-[已去除电话]89abcdef”,

“Region”: “us-west-2”

}

}

},

“IdentityManager”: {

“Default”: {}

},

“CognitoUserPool”: {

“Default”: {

“AppClientSecret”: “abcdefghijklmnopqrstuvwxyz[已去除电话]abcdefghijklmno”,

“AppClientId”: “[已去除电话]abcdefghijklmnop”,

“PoolId”: “us-west-2_qRsTuVwXy”,

“Region”: “us-west-2”

}

}

}

5.2.2 ** **编译

5.3 ** **测试结果

5.3.1 offer** **信息如下

5.3.2 Answer** **信息如下

5.3.3 实时画面 测试结果对比

  • G.711 vs AAC:相同音质下,AAC带宽消耗减少约50%
  • Opus vs AAC:AAC在低码率下音质更稳定
  • 延迟对比:AAC编解码延迟与Opus相当,优于G.711
  • 6.总结

本文介绍了如何在Amazon Kinesis Video Streams WebRTC中成功集成AAC编解码器支持。通过详细的技术方案设计、完整的实现步骤和全面的测试验证,我们展示了AAC在实时音频通信场景下的显著优势。

AAC编解码器的集成为Amazon KVS WebRTC带来了更好的音频质量、更低的带宽消耗和更广泛的设备兼容性,特别适合移动设备和IoT设备的音频传输场景。

附录** **参考资料

  • Kinesis Video Streams with WebRTC 开发人员指南
  • RTP Payload Format for Transport of MPEG-4 Elementary Streams
  • 代码仓库** ***前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。


在Android和iOS平台编译Amazon Kinesis Video Streams WebRTC的NDK

AWS账单代付阅读(10)

1.概述

本文详细介绍如何为Android和iOS平台编译Amazon Kinesis Video Streams WebRTC NDK,以及如何将编译好的NDK与设备端的Amazon KVS WebRTC SDK配合使用。通过本文,您将学会完整的编译流程、配置方法和集成步骤,实现端到端的WebRTC音视频通信解决方案。

2.背景介绍

2.1 Amazon Kinesis Video Streams WebRTC** **简介

Amazon Kinesis Video Streams 提供符合标准的 WebRTC 实现作为完全托管的功能。您可以使用 Amazon Kinesis Video Streams with WebRTC 安全地进行媒体的实时流式传输,或在任何摄像头 IoT 设备与符合 WebRTC 的移动或 Web 播放器之间执行双向音频或视频交互。借助这项全面托管的功能,您不必构建、运营或扩展任何与 WebRTC 相关的云基础设施(例如信令或媒体中继服务器)便能安全地在应用程序和设备间流式传输媒体。

2.1.1 Amazon KVS WebRTC** **的拓扑结构

2.2 WebRTC NDK** **的作用

WebRTC NDK(Native Development Kit)为移动应用提供了原生的WebRTC功能支持,主要优势包括:

性能优化:原生代码执行效率更高,特别适合音视频处理

硬件加速:直接访问设备的硬件编解码器

低延迟:减少Java/Objective-C与原生代码之间的调用开销

跨平台一致性:Android和iOS使用相同的核心WebRTC代码

2.3 ** **架构概览

3.环境准备

3.1 ** **系统要求

3.1.1 Linux 环境(推荐Ubuntu 22.04+ EC2 instance)

  • 用于编译Android NDK
  • 推荐xlarge,100GB可用磁盘空间
  • Python 2.7+

3.1.2 macOS 环境(macOS 13.0+ EC2 instance)

  • 用于编译iOS框架
  • Xcode 15.0+
  • 推荐metal,100GB可用磁盘空间
  • Python 3.8+
  • 3.2 ** **必要工具安装

    3.2.1 Linux** **环境配置

    3.2.1.1 ** **创建编译实例

    3.2.1.2 ** **安装依赖包

Shell

在Android和iOS平台编译Amazon Kinesis Video Streams WebRTC的NDK

sudo apt update && sudo apt upgrade -y

安装基础开发工具

sudo apt install -y gcc g++ libssl-dev lbzip2

安装Python

sudo apt install -y python2.7-dev python3-dev

Set the default Python command to Python 2.7

sudo update-alternatives –install /usr/bin/python python /usr/bin/python2.7 1

sudo update-alternatives –config python

3.2.2 macOS** **环境配置

3.2.2.1 ** **创建专属主机

3.2.2.2 ** **在专属主机上运行mac实例

3.2.2.3 ** **安装依赖包

Shell

安装必要工具

brew install python3 git wget curl

安装python

wget –no-check-certificate https://www.python.org/ftp/python/2.7.18/python-2.7.18-macosx10.9.pkg

sudo installer -pkg python-2.7.18-macosx10.9.pkg -target /

Test if Python 2 is installed, and the following interface appears to prove that the installation is correct

ec2-user@ip-172-31-24-147 ~ % python

Python 2.7.18 (v2.7.18:8d21aa21f2, Apr 19 2020, 20:48:48) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.57)] on darwin Type “help”, “copyright”, “credits” or “license” for more information.

>>>

安装Xcode命令行工具

xcode-select –install

4.WebRTC** **源码获取与配置

4.1 ** **获取depot_tools

Shell

创建编译目录

mkdir ~/

下载depot_tools

cd ~/

git clone https://chromium.googlesource.com/chromium/tools/depot_tools.git

添加到PATH

export PATH=$PATH:~/depot_tools

for Linux

echo ‘export PATH=$PATH:~/depot_tools’ >> ~/.bashrc

for macOS

echo ‘export PATH=$PATH:~/depot_tools’ >> ~/.zshrc

source ~/.bashrc

4.2 ** **下载WebRTC源码

Shell

工作目录

mkdir ~/projects/webrtc-ndk-build/

cd ~/projects/webrtc-ndk-build/

获取WebRTC源码

git clone –recursive https://github.com/hongliwo/amazon-kvs-webrtc-ndk-c.git src

cd src

5.Android NDK** **编译

5.1 ** **编译配置

5.1.1 ** **创建同步文件

Shell

创建文件 .gclient

vim .gclient

.gclient内容如下

solutions = [

{

“managed”: False,

“name”: “src”,

“url”: “https://github.com/hongliwo/amazon-kvs-webrtc-ndk-c.git”,

“custom_deps”: {},

“deps_file”: “DEPS”,

“safesync_url”: “”,

},

]

target_os = [“android”]

5.1.2 ** **执行同步

Shell

执行同步命令

gclient sync

将会有如下打印

Syncing projects: 3% ( 9/234) src/base

[0:01:57] Still working on:

[0:01:57] src/examples/androidtests/third_party/gradle

[0:01:57] src/testing

[0:01:57] src/third_party

[0:01:57] src/tools

[0:02:08] Still working on:

[0:02:08] src/examples/androidtests/third_party/gradle

[0:02:08] src/testing

[0:02:08] src/third_party

[0:02:08] src/tools

5.1.3 ** **下载aac代码

Shell

cd ~/projects/webrtc-ndk-build/src/third_party/

git clone https://github.com/hongliwo/aac-for-webrtc.git aac

5.2 ** **执行编译

Shell

指定目录

cd ~/projects/webrtc-ndk-build/src

编译

./sdk/build_android_libs_all.sh

可能的报错

error: undefined symbol: ff_sdp_write_media

>>> referenced by movenc.c:3534 (../../third_party/ffmpeg/libavformat/movenc.c:3534)

解决办法,打开如下文件

third_party/ffmpeg/libavformat/movenc.c

屏蔽这两行

ff_sdp_write_media(buf, sizeof(buf), ctx->streams[0], track->src_track,

NULL, NULL, 0, 0, ctx);

5.3 ** **生成文件

Shell

生成好的文件如下

tree out/dist/

out/dist/

├── debug

│ ├── arm64-v8a

│ │ └── libjingle_peerconnection_so.so

│ ├── armeabi-v7a

│ │ └── libjingle_peerconnection_so.so

│ └── libwebrtc.jar

└── release

├── arm64-v8a

│ └── libjingle_peerconnection_so.so

├── armeabi-v7a

│ └── libjingle_peerconnection_so.so

└── libwebrtc.jar

6 directories, 6 files

6.iOS** **框架编译

6.1 ** **切换到macOS环境

6.1.1 ** **创建同步文件

Shell

创建文件 .gclient

vim .gclient

.gclient内容如下

solutions = [

{

“managed”: False,

“name”: “src”,

“url”: “https://github.com/hongliwo/amazon-kvs-webrtc-ndk-c.git”,

“custom_deps”: {},

“deps_file”: “DEPS”,

“safesync_url”: “”,

},

]

target_os = [“ios”]

6.1.2 ** **执行同步

Shell

执行同步命令

gclient sync

将会有如下打印

Syncing projects: 3% ( 9/234) src/base

[0:01:57] Still working on:

[0:01:57] src/examples/androidtests/third_party/gradle

[0:01:57] src/testing

[0:01:57] src/third_party

[0:01:57] src/tools

[0:02:08] Still working on:

[0:02:08] src/examples/androidtests/third_party/gradle

[0:02:08] src/testing

[0:02:08] src/third_party

[0:02:08] src/tools

ec2-user/projects/webrtc-ndk-build’

–no_auth is deprecated, this flag has no effect.

Hook ‘download_from_google_storage –directory –recursive –num_threads=10 –no_auth –quiet –bucket chromium-webrtc-resources src/resources’ took 158.92 secs

Hook ‘vpython3 src/testing/generate_location_tags.py –out src/testing/location_tags.json’ took 11.37 secs

Running hooks: 100% (29/29), done.

6.1.3 ** **下载aac代码

Shell

cd ~/projects/webrtc-ndk-build/src/third_party/

git clone https://github.com/hongliwo/aac-for-webrtc.git aac

6.3 ** **执行iOS编译

Shell

指定目录

cd ~/projects/webrtc-ndk-build/src

编译

./sdk/build_ios_libs_all.sh

可能的报错

../../third_party/abseil-cpp/absl/base/config.h:303:27: error: ‘__ENVIRONMENT_OS_VERSION_MIN_REQUIRED__’ is not defined, evaluates to 0 [-Werror,-Wundef] !(TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0)

修复:如下三处地方加入如下的定义

./sdk/::::third_party/abseil-cpp/absl/base/config.h中定义

./sdk/objc/components/renderer/opengl/RTCDisplayLinkTimer.h

./sdk/objc/components/capturer/RTCCameraVideoCapturer.h

ifdef __IPHONE_OS_VERSION_MIN_REQUIRED

undef __IPHONE_OS_VERSION_MIN_REQUIRED

define __IPHONE_OS_VERSION_MIN_REQUIRED 120000

endif

6.4 ** **生成通用iOS框架

Shell

生成的内容如下

tree out/dist

out/dist

├── debug

│ └── WebRTC.framework

│ ├── Headers

│ │ └── WebRTC.h

│ ├── Info.plist

│ ├── Modules

│ │ └── module.modulemap

│ └── WebRTC

└── release

└── WebRTC.framework

├── Headers

│ └── WebRTC.h

├── Info.plist

├── Modules

│ └── module.modulemap

└── WebRTC

9 directories, 204 files

7.与设备端SDK集成

7.1 ** **设备端配置

与amazon-kvs-webrtc-sdk-c搭配使用:

Shell

下载设备端SDK

git clone –recursive https://github.com/hongliwo/amazon-kvs-webrtc-sdk-c.git

编译设备端SDK

cd amazon-kvs-webrtc-sdk-c

mkdir build && cd build

cmake .. -DBUILD_SAMPLE=ON

make -j$(nproc)

7.2 Android** **集成示例

7.2.1 ** **下载Android Sample

Shell

git clone https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-android.git

7.2.2 ** **创建Android项目集成:

将生成的NDK拷贝到Sample的如下位置

Shell

tree app/libs

app/libs

├── arm64-v8a

│ └── libjingle_peerconnection_so.so

├── armeabi-v7a

│ └── libjingle_peerconnection_so.so

├── instructions.txt

└── libwebrtc.jar

相应的build.gradle如下

Plain Text

plugins {

id ‘com.android.application’

id ‘jacoco’

}

android {

compileSdk 33

defaultConfig {

namespace(“com.amazonaws.kinesisvideo.demoapp”)

applicationId “com.amazonaws.kinesisvideo.webrtc_sdk_android”

minSdk 29

targetSdk 33

versionCode 1

versionName “1.1.0”

testInstrumentationRunner “androidx.test.runner.AndroidJUnitRunner”

sourceSets {

main {

jniLibs.srcDirs = [‘libs’]

}

}

ndk {

abiFilters ‘armeabi-v7a’, ‘arm64-v8a’

}

}

buildTypes {

release {

minifyEnabled false

proguardFiles getDefaultProguardFile(‘proguard-android-optimize.txt’), ‘proguard-rules.pro’

}

}

compileOptions {

sourceCompatibility JavaVersion.VERSION_1_8

targetCompatibility JavaVersion.VERSION_1_8

}

}

dependencies {

implementation fileTree(dir: ‘libs’, include: [‘*.jar’, ‘*.aar’])

def aws_version = ‘2.75.0’

implementation(“com.amazonaws:aws-android-sdk-kinesisvideo:$aws_version@aar”) { transitive = true }

implementation(“com.amazonaws:aws-android-sdk-kinesisvideo-signaling:$aws_version@aar”) { transitive = true }

implementation(“com.amazonaws:aws-android-sdk-kinesisvideo-webrtcstorage:$aws_version@aar”) { transitive = true }

implementation(“com.amazonaws:aws-android-sdk-mobile-client:$aws_version@aar”) { transitive = true }

implementation(“com.amazonaws:aws-android-sdk-auth-userpools:$aws_version@aar”) { transitive = true }

implementation(“com.amazonaws:aws-android-sdk-auth-ui:$aws_version@aar”) { transitive = true }

implementation ‘org.awaitility:awaitility:4.2.0’

implementation ‘org.json:json:[已去除电话]’

implementation ‘com.google.guava:guava:28.1-android’

implementation ‘com.google.code.gson:gson:2.10.1’

implementation ‘org.apache.commons:commons-lang3:3.9’

implementation(“com.squareup.okhttp3:okhttp:4.12.0”)

//implementation ‘org.webrtc:google-webrtc:1.0.+’

implementation ‘androidx.appcompat:appcompat:1.6.1’

implementation ‘com.google.android.material:material:1.9.0’

implementation ‘androidx.constraintlayout:constraintlayout:2.1.4’

testImplementation ‘junit:junit:4.13.2’

androidTestImplementation ‘androidx.test.ext:junit:1.1.5’

androidTestImplementation ‘androidx.test.espresso:espresso-core:3.5.1’

}

7.3 iOS** **集成示例

7.3.1 ** **替换如下Framework

位置如下:

Shell

tree Swift/Pods/GoogleWebRTC/Frameworks

Swift/Pods/GoogleWebRTC/Frameworks

├── frameworks

│ └── WebRTC.framework

│ ├── Headers

│ │ └── WebRTC.h

│ ├── Info.plist

│ ├── Modules

│ │ └── module.modulemap

│ └── WebRTC

8.测试与验证

8.1 ** **端到端测试流程

8.1.1 ** **设备端启动:

Shell

cd amazon-kvs-webrtc-sdk-c/build

export AWS_DEFAULT_REGION=us-west-2

export AWS_ACCESS_KEY_ID=your_access_key

export AWS_SECRET_ACCESS_KEY=your_secret_key

./samples/kvsWebrtcClientMaster test-channel 0 aac h265

8.1.2 ** **移动端连接

  • Android:运行集成了WebRTC NDK的Android应用
  • iOS:运行集成了WebRTC框架的iOS应用
  • 8.2 ** **验证功能:

  • 音频双向通信
  • 视频双向传输
  • 编解码器协商(H.264/H.265, Opus/AAC)
  • 9.总结

本文详细介绍了Amazon KVS WebRTC NDK的完整编译和使用流程。通过系统的环境配置、源码编译、平台集成和测试验证,您可以成功构建支持H.264/H.265视频编解码和Opus/AAC音频编解码的移动端WebRTC解决方案。

关键要点:

  • 正确配置编译环境和依赖工具
  • 启用所需的编解码器支持
  • 遵循平台特定的集成最佳实践
  • 进行全面的功能和性能测试

这套解决方案为IoT设备与移动端之间的实时音视频通信提供了高性能、低延迟的技术基础。

附录

参考资料

代码仓库

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。

本篇作者


发布 AWS IoT ExpressLink 技术规范 v1.3

AWS账单代付阅读(21)

发布 AWS IoT ExpressLink 技术规范 v1.3

发布于: 2025年8月28日

今天,AWS IoT ExpressLink 宣布发布技术规范 v1.3。它是一款连接软件,可为 AWS 合作伙伴开发和提供的一系列硬件模块提供支持。更新后的规范为设计和开发物联网(IoT)设备的硬件制造商提供了蓝牙低功耗(BLE)通信的新功能,并提供了一组新命令,使主处理器能够控制模块的 I/O 引脚。

使用蓝牙低功耗(BLE)扩展功能集,AWS IoT ExpressLink 可以更轻松地广播设备的存在状态及其功能,并与本地个人局域网(PAN)内的其他设备进行安全配对。此外,全新的 I/O 控制命令支持主处理器使用由 AWS IoT ExpressLink 提供支持的模块作为 I/O 扩展器,从而提供灵活的数字和模拟输入/输出能力。这些功能组合在一起,可以简化开发流程,并有助于降低边缘设备通过无线方式从低功耗传感器收集数据并与 AWS 云通信的成本。AWS 合作伙伴(如 Espressif 和 u-blox)已将新规范用于其符合 Wi-Fi 和 BLE 条件的模块,开发人员可以随时使用这些模块构建互联产品。

有关更多信息,请参阅程序员指南并浏览 AWS 合作伙伴设备目录,了解 AWS 合作伙伴提供的合格模块。


Amazon EBS 为 AWS Local Zones 推出快照副本功能

AWS账单代付阅读(24)

Amazon EBS 为 AWS Local Zones 推出快照副本功能

发布于: 2025年8月28日

高性能块存储服务 Amazon Elastic Block Store(Amazon EBS)宣布全面推出面向 AWS Local Zones 的快照副本功能。这项新功能可确保将您的 EBS 快照复制到 AWS 区域或 AWS Local Zones,从而帮助您满足业务与合规性要求。

快照副本功能可复制 EBS 卷在某个时间点的本地快照,并将其存储在区域内的 Amazon S3 存储桶或另一个本地区域中。客户使用 EBS 快照备份 EBS 卷,并将其复制到多个 AWS 区域和本地区域,以实现灾难恢复、数据迁移及合规性目的。

Amazon EBS 快照副本功能可在支持本地快照的本地区域中使用,用户可通过 AWS 管理控制台、AWS 命令行界面(CLI)和 AWS SDK 进行操作。要了解更多信息,请参阅关于快照副本的技术文档。


相得益彰:Mem0 记忆框架与亚马逊云科技的企业级 AI 实践

AWS账单代付阅读(22)

亚马逊AWS官方博客

相得益彰:Mem0 记忆框架与亚马逊云科技的企业级 AI 实践

想象一下这样的场景:

个人助手场景:当你的 AI 助手能够记住你三个月前提到”不喜欢吃辣”这个细节,在今天推荐餐厅时自动筛选掉川菜和湘菜;或者它记得你上周提到的重要会议,主动在会前提醒 你准备相关材料。

企业服务场景:客户致电咨询时,AI 客服立即调取该客户的完整交互历史——从半年前 的产品咨询,到上个月的技术支持,再到最近的投诉处理,为客户提供连贯一致的个性化 服务体验,而不是每次都要重新解释问题背景。

这就是记忆增强型 AI 的魅力所在——它让 AI 从”健忘的工具”升级为”贴心的伙伴”。

本文将深入探讨如何通过 Mem0 智能记忆框架与亚马逊云科技服务生态的深度集成,构建 真正的生产级记忆增强型 AI 系统。我们不仅会分析记忆机制的技术原理,更会展示如何 利用 Aurora Serverless、Bedrock、Neptune 等云原生服务,实现从概念验证到企业级部 署的完整解决方案。

记忆对于 Agentic AI 应用的重要性

记忆缺失:当前 AI 系统的根本局限

传统的大语言模型受限于固定的上下文窗口,就像患有”失忆症”的助手,无法在跨会话的长期对话中保持一致性和连贯性。每次对话都是全新的开始,用户不得不反复解释自己的偏好和背景。

这种记忆缺失会引发典型问题:遗忘用户偏好、重复询问已知信息、甚至推翻之前确认的事实。例如,在饮食推荐场景中,用户明确表示不喜欢某类食物,但系统在后续交互中仍会推荐相同类型的食物,这一常见情况从根本上损害了用户体验并破坏了用户信任。

为了解决记忆问题,业界尝试通过扩大上下文窗口来缓解,但这种方法存在根本性局限:1)持续的人机交互必然导致对话历史超出任何固定窗口限制。2)人类能够动态整合新信息并修正过时认知,而LLM只能通过完全重置来更新知识。3)真实对话缺乏主题连贯性——用户可能从讨论食物偏好跳转到工作任务,再回到晚餐选择,迫使系统在大量无关信息中寻找相关线索。4)更关键的是,仅仅提供更长上下文无法保证有效信息检索,因为注意力机制对远距离token的处理能力会显著退化,导致真正有价值的信息被噪声淹没。

人类记忆的启示

人类记忆是智能的根基,它不仅定义了我们的身份,更是明智决策、持续学习和建立深度关系的基础。人类能够回忆过往互动,推断他人偏好,并构建与交往对象不断演化的心理模型。理想的AI记忆框架应当模拟这种动态、适应性的记忆机制,实现从简单存储到智能管理的根本转变。

记忆增强型Agent的突破

真正的解决方案在于构建具备记忆能力的智能体。这类Agent能够突破传统LLM的局限,在交互环境中实现质的飞跃。

核心能力提升体现在四个方面:

  • 个性化预测能力。通过记住用户历史行为和偏好,Agent能够更准确地预测客户需求,提供真正个性化的服务体验。
  • 经验学习能力。系统能够从过往错误中学习,建立经验库以避免重复失误,并将在特定任务中获得的知识有效泛化到其他相关场景。
  • 因果推理能力。记忆功能使Agent能够理解行动与结果之间的因果关系,基于深层逻辑进行决策优化,在动态变化的环境中实现更有效的适应。
  • 长期规划能力。跨会话的记忆连续性确保了多轮对话中的推理一致性,使Agent从简单的被动响应升级为具备长期规划能力的智能系统。

在医疗诊断、个性化教育、企业客户支持等对历史信息依赖性强的关键领域,这种记忆增强能力的价值更加凸显。

Mem0框架:构建生产级记忆增强型AI Agent

Mem0框架概述

Mem0 是一个专为现代 AI Agent 设计的智能记忆层,它使AI Agent能够记住过去的交互、存储重要的用户偏好和事实背景,并从成功和失败中学习。通过与Mem0集成,Agent从无状态转变为有状态,能够保持上下文、回忆重要信息,并随着时间推移表现得更加智能。

与传统的简单数据存储不同,Mem0专注于智能记忆管理,支持多种记忆类型:工作记忆(临时对话上下文)、事实记忆(客观信息)、情景记忆(具体事件)和语义记忆(概念知识),并提供智能的LLM提取、过滤和衰减机制,有效降低计算成本。系统不仅支持多模态(文本和图像),还提供Graph记忆功能来连接跨会话的见解和实体。作为一个既可以使用托管服务也可以自建的解决方案,Mem0让AI Agent能够建立持久的记忆,从而提供更个性化和适应性更强的服务。

技术架构设计

Mem0采用分层设计理念,通过六个核心模块的协同工作,实现智能记忆管理的完整闭环。

分层架构的设计优势在于各层职责清晰、可独立优化。核心记忆层作为控制中枢,统筹记忆的CRUD操作;大语言模型层提供智能决策能力,负责信息提取和更新策略生成;嵌入模型层将自然语言转换为机器可理解的向量表示;向量存储层基于语义相似性实现高效检索;图存储层构建实体关系网络,支持复杂推理;持久化存储层确保元数据和操作历史的可靠保存。

这种架构设计既保证了系统的模块化和可扩展性,又通过层间的紧密协作实现了智能记忆管理的核心目标。

核心技术创新

Mem0的技术创新体现在四个关键方面,这些创新共同构成了其智能记忆管理的核心竞争力。

双LLM架构是其最重要的创新之一。系统通过两次不同的LLM调用实现复杂的分工:第一次专注于信息提取,第二次专门处理决策过程。这种关注点分离不仅提高了准确性和可靠性,还允许对各阶段进行专门优化,避免了单一模型处理复杂任务时的性能瓶颈。

上下文感知处理确保了记忆系统的一致性。Mem0不是孤立处理信息,而是在现有记忆的上下文中分析新数据,防止碎片化并维护相关信息间的逻辑关系。

智能去重机制通过向量相似性搜索与基于LLM判断的结合,创建了强大的去重能力,既防止了冗余信息存储,又保持了记忆质量和系统效率。

冲突解决能力使系统能够智能处理矛盾信息,在用户偏好和环境随时间演变时维护记忆准确性,这是传统存储系统难以实现的高级功能。

记忆操作机制

Mem0的核心价值在于智能化的记忆管理。系统封装了记忆添加、检索、更新和删除四大核心操作,其中添加和检索是关键环节。

智能记忆添加流程

Mem0对于记忆的添加流程如下图所示。(图片来源:Mem0官方文档)

Mem0的记忆添加过程体现了从自然语言到结构化知识的智能转换,分为三个关键阶段:

  • 信息提取阶段:系统通过LLM和专门提示词进行深度语义分析,从”我和John在星巴克讨论AI项目”中提取工作关系、场所偏好、兴趣领域等多维信息。
  • 上下文检索阶段:融合向量相似性搜索、图谱关系检索和上下文关联分析,在严格用户隔离的前提下,确保新信息与历史记忆的有机关联。
  • 智能决策阶段:通过第二次LLM调用,智能决定ADD、UPDATE、DELETE或NONE操作,维护记忆库的质量和一致性。
  • 高效记忆检索流程

Mem0对于记忆的检索流程如下图所示。(图片来源:Mem0官方文档)

检索流程采用查询预处理、并行化搜索、结果整合三阶段设计,通过向量语义搜索和图谱关系搜索的并行执行,实现高效准确的记忆检索。

通过这种精心设计的双流程架构,Mem0为构建生产级记忆增强型AI Agent提供了坚实的技术基础。

Mem0与Agent框架的集成

在实际应用中,开发者可以通过两种方式将Mem0集成到Agent系统中:一是在环境变量配置依赖信息后,直接调用Mem0的接口函数(如添加、查找、更新记忆等);二是将Mem0封装成工具传入Agent框架,由Agent根据处理逻辑自主调用相应方法。

集成方式选择

直接接口调用方式:开发者可以在配置好环境变量和依赖信息后,直接使用Mem0提供的接口函数。这种方式给予开发者完全的控制权,可以精确地决定何时添加记忆、何时检索记忆、如何更新记忆内容。适合对记忆管理有特定需求或需要与现有系统深度集成的场景。

工具封装方式:将Mem0封装成工具传入Agent框架,让Agent根据自身的处理逻辑自主决定何时调用相应的记忆方法。这种方式更加智能化,Agent可以根据对话上下文和用户需求自动判断是否需要存储新记忆或检索历史记忆,减少了人工干预的需要。

框架选择策略

使用支持Mem0的Agent框架:对于希望快速构建记忆增强型Agent的开发者,可以选择支持Mem0的现有Agent框架。这些框架通常内置了专门的记忆管理工具集,封装了完整的Mem0能力,具有开箱即用、集成最佳实践、适合快速原型开发等优势。

自定义Agent框架集成:对于需要更高灵活性和定制化的场景,开发者可以在自己的Agent框架中直接集成Mem0。通过配置化的方式,可以直接指定相应的LLM、Embedding模型以及向量数据库等组件,创建定制化的Mem0实例。

基础集成示例

以下是直接接口调用的基本示例:

from mem0 import Memory

config = {

“llm”: {

“provider”: “aws_bedrock”,

“config”: {

“model”: “us.anthropic.claude-3-7-sonnet-[已去除电话]-v1:0”,

“temperature”: 0.2,

“max_tokens”: 2000,

}

},

“embedder”: {

“provider”: “aws_bedrock”,

“config”: {

“model”: “amazon.titan-embed-text-v2:0”

}

},

“vector_store”: {

“provider”: “pgvector”,

“config”: {

“user”: “$DB_USER”,

“password”: “$DB_PASSWORD”,

“host”: “$DB_HOST”,

“port”: “$DB_PORT”,

“embedding_model_dims”: “1024”,

“collection_name”: “mem0_memories”,

}

},

“graph_store”: {

“provider”: “neptune”,

“config”: {

“endpoint”: “neptune-graph://gXXXXXX”

}

}

}

创建 Memory 实例并使用

memory = Memory.from_config(config)

memory.add(“I love sci-fi movies”, user_id=”user123″)

relevant_memories = memory.search(“movie preferences”, user_id=”user123″)

通过灵活的集成策略,开发者可以根据具体需求选择最适合的实现方式。然而,要实现企业级的生产部署,仅仅有Mem0框架是不够的,还需要充分利用云服务提供商的基础设施优势。

接下来,我们将深入探讨如何将Mem0与亚马逊云科技的服务生态进行深度集成,利用Aurora Serverless、Bedrock、Neptune等云原生服务,构建真正的生产级记忆增强型AI系统。

亚马逊云科技与Mem0的深度集成

亚马逊云科技为Mem0提供了完整的服务生态支持,开发者可以根据具体需求选择最适合的亚马逊云科技服务组合。

如上图所示,完整的解决方案采用分层架构设计,从上到下包含四个核心层次:

  • 用户应用层:StrandsAgent、LangGraph等开发框架,提供多样化的Agent构建方式
  • Mem0智能记忆层:核心记忆管理引擎,协调信息提取、智能决策、记忆检索和生命周期管理。
  • 亚马逊云科技托管服务层:Amazon Bedrock提供AI能力,Aurora Serverless提供向量存储,Neptune Analytics提供图存储
  • 安全与管理层:IAM、KMS、VPC等服务确保企业级安全保障

这种云原生架构的核心优势在于弹性伸缩、按需付费和全托管服务,让开发者专注于业务逻辑而非基础设施管理。下面我们按照数据流向,深入探讨各层的技术特点和集成优势。

AI能力层:Amazon Bedrock

作为整个架构的”智能大脑”,Amazon Bedrock为Mem0的双LLM架构提供强大的AI推理能力,承担信息提取、智能决策和语义理解的核心任务。

丰富的模型生态选择

Amazon Bedrock作为托管基础模型服务,为Mem0提供了业界丰富的模型选择,包括Anthropic的Claude系列、Amazon的Nova系列、Meta的Llama系列、Cohere的Command系列,以及AI21 Labs的Jurassic系列等。

这种多模型支持使得开发者可以根据具体的应用场景、性能要求和成本预算选择最适合的模型组合:

  • LLM模型选择:Amazon Nova系列作为亚马逊云科技自研的多模态大模型,在文本、图像等多模态理解和生成方面表现优异;Claude系列在复杂推理任务中表现卓越;Llama和Command系列则为成本敏感的应用提供高性价比选择。
  • 嵌入模型选择:Titan Embed系列提供高质量的文本嵌入能力,支持长文本处理和多模态记忆功能;Cohere嵌入模型则在多语言场景中表现优异
  • 企业级安全与合规

Bedrock提供企业级的安全保障,所有模型调用都在亚马逊云科技安全边界内进行,数据传输采用TLS加密,静态数据通过KMS加密。与IAM深度集成,支持细粒度的访问控制和审计日志,满足金融、医疗等行业的合规要求。

统一API与智能调度

Bedrock的统一API设计简化了多模型管理,支持无服务器部署和按需付费。

Bedrock多模型配置示例 – 针对Mem0优化的生产配置

bedrock_config = {

“llm”: {

“provider”: “aws_bedrock”,

“config”: {

“model”: “us.anthropic.claude-3-7-sonnet-[已去除电话]-v1:0”,# 信息提取和决策

“temperature”: 0.1,

“max_tokens”: 4000,

“top_p”: 0.9

}

},

“embedder”: {

“provider”: “aws_bedrock”,

“config”: {

“model”: “amazon.titan-embed-text-v2:0”, # 向量化记忆内容

“normalize”: True,

“dimensions”: 1024

}

}

}

通过Bedrock丰富的模型生态和统一的API管理,Mem0能够实现真正智能的记忆管理,从简单的信息存储升级为具备理解、推理和决策能力的认知系统。

存储层:Aurora Serverless + Neptune Analytics

作为Mem0记忆系统的数据基础,Aurora Serverless和Neptune Analytics构成了完整的存储解决方案:Aurora负责向量化记忆的高效存储和检索,Neptune负责实体关系的图谱管理和推理,两者协同工作为记忆增强型AI提供强大的数据支撑。

向量存储:Amazon Aurora Serverless for PostgreSQL

Aurora Serverless for PostgreSQL凭借pgvector插件的原生向量支持、标准SQL接口和活跃社区生态,为Mem0提供了理想的向量存储基础。

核心技术优势

  • 原生向量能力:pgvector插件支持欧几里得距离、余弦相似度和内积计算等核心向量操作,完美匹配记忆检索需求
  • 弹性伸缩架构:Aurora Serverless能够根据负载自动调整计算资源,支持从0到256TiB的存储自动扩展,实现真正的按需付费
  • 企业级可靠性:AZ+1级别的可用性保证,即使在一个可用区和一个额外故障发生时仍能对外提供服务,确保珍贵记忆数据的安全
  • 全球化部署能力:Aurora Global Database为需要全球部署的记忆增强型Agent应用提供关键支持。主区域的记忆数据可以在1秒内复制到最多10个次要区域,确保全球用户都能快速访问及时更新的记忆。
  • Aurora Serverless配置示例

aurora_config = {

“vector_store”: {

“provider”: “pgvector”,

“config”: {

“host”: “aurora-serverless-cluster.cluster-xxx.us-east-1.rds.amazonaws.com”,

“port”: 5432,

“database”: “mem0_db”,

“user”: “mem0_user”,

“password”: “${AWS_SECRET_MANAGER_PASSWORD}”,

“embedding_model_dims”: 1024,

“collection_name”: “mem0_memories”,

“distance_metric”: “cosine”

}

}

}

图存储:Amazon Neptune Analytics

Neptune Analytics作为Serverless图分析服务,与Mem0实现深度集成,为记忆增强型AI提供强大的图存储和关系推理能力。

核心技术优势

  • Serverless弹性架构:根据查询复杂度和数据量自动调整计算资源,采用智能内存管理策略,支持大规模图遍历操作和并行处理
  • 深度集成优势:与Mem0原生集成,系统能够自动从对话中提取实体和关系,构建知识图谱,通过多跳图推理显著提升AI响应质量
  • 混合检索能力:支持在图遍历中集成向量相似性搜索,为记忆系统提供强大的关联分析能力
  • 企业级安全保障:与IAM深度集成提供细粒度访问控制,支持身份基础策略和临时凭证,API调用采用TLS 1.2+加密传输。
  • Neptune Analytics配置示例

neptune_config = {

“graph_store”: {

“provider”: “neptune”,

“config”: {

“endpoint”: “neptune-graph://g-XXXXXX”,

“query_timeout”: 30000,

“max_results”: 1000,

“retry_config”: {

“max_attempts”: 3,

“backoff_multiplier”: 2

}

}

}

}

存储层协同优势

通过Aurora和Neptune的协同工作,Mem0实现了完整的记忆数据管理:向量存储处理语义相似性检索,图存储处理实体关系推理,两者结合为AI Agent提供了既能理解语义又能推理关系的强大记忆能力。这种双存储架构不仅提高了检索精度,更重要的是为复杂的记忆场景提供了多维度的数据支撑。

应用开发层 – StrandsAgent + LangGraph

在完整的存储基础之上,开发者可以通过多种框架构建记忆增强型Agent应用。StrandsAgent和LangGraph作为两种主流的开发框架,为不同的开发需求提供了灵活的集成方案。

StrandsAgent:工具化记忆集成

StrandsAgent作为亚马逊云科技提供的Agent开发框架,原生集成了Mem0记忆管理功能。为了更好地支持不同客户的需求,我们对mem0_memory工具进行了两个增强:

  • 向量数据库扩展:原始框架主要支持OpenSearch作为向量存储后端,我们新增了对Aurora PostgreSQL的原生支持,充分利用pgvector插件和Serverless架构的优势
  • 多模型提供商支持:除了Amazon Bedrock外,还支持OpenAI兼容接口,比如DeepSeek等模型,通过STRANDS_MODEL_PROVIDER环境变量实现灵活切换,满足不同地区的部署需求,使得该方案不仅可在亚马逊云科技海外区域部署,同样也适用于北京和宁夏区域。
  • 核心改动:完整的增强实现

class Mem0ServiceClient:

新增:Aurora PostgreSQL + Bedrock配置

PG_CONFIG_BEDROCK = {

“embedder”: {

“provider”: “aws_bedrock”,

“config”: {

“model”: os.environ.get(“EMBEDDING_MODEL”, “amazon.titan-embed-text-v2:0”)

}

},

“llm”: {

“provider”: “aws_bedrock”,

“config”: {

“model”: os.environ.get(“LLM_MODEL”, “us.anthropic.claude-3-7-sonnet-[已去除电话]-v1:0”),

“temperature”: 0.1,

“max_tokens”: 2000,

},

},

“vector_store”: {

“provider”: “pgvector”,

“config”: {

“collection_name”: “mem0_memories”,

“host”: os.environ.get(“POSTGRESQL_HOST”),

“port”: os.environ.get(“POSTGRESQL_PORT”),

“user”: os.environ.get(“POSTGRESQL_USER”),

“password”: os.environ.get(“POSTGRESQL_PASSWORD”),

“embedding_model_dims”: “1024”,

}

}

}

新增:OpenAI兼容 + PostgreSQL配置

PG_CONFIG_OPENAI = {

“llm”: {

“provider”: “openai”,

“config”: {

“model”: os.environ.get(“LLM_MODEL”),

“api_key”: os.environ.get(“OPENAI_API_KEY”),

“temperature”: 0.1,

“max_tokens”: 2000,

}

},

“embedder”: {

“provider”: “openai”,

“config”: {

“model”: os.environ.get(“EMBEDDING_MODEL”),

“api_key”: os.environ.get(“OPENAI_API_KEY”),

}

},

“vector_store”: {

“provider”: “pgvector”,

“config”: {

“collection_name”: “mem0_memories”,

“host”: os.environ.get(“POSTGRESQL_HOST”),

“port”: os.environ.get(“POSTGRESQL_PORT”),

“user”: os.environ.get(“POSTGRESQL_USER”),

“dbname”: os.environ.get(“DB_NAME”),

“password”: os.environ.get(“POSTGRESQL_PASSWORD”),

“embedding_model_dims”: “1024”,

}

}

def _initialize_client(self, config: Optional[Dict] = None) -> Any:

“””增强的客户端初始化逻辑”””

if os.environ.get(“MEM0_API_KEY”):

return MemoryClient()

新增:PostgreSQL后端支持

if os.environ.get(“POSTGRESQL_HOST”):

logger.info(“Using PostgreSQL backend with Aurora Serverless”)

return self._initialize_postgresql_client(config)

if os.environ.get(“OPENSEARCH_HOST”):

logger.info(“Using OpenSearch backend”)

return self._initialize_opensearch_client(config)

return self._initialize_faiss_client(config)

def _initialize_postgresql_client(self, config: Optional[Dict] = None) -> Mem0Memory:

“””新增:根据模型提供商选择相应配置”””

pg_config = (self.PG_CONFIG_BEDROCK

if os.environ.get(‘STRANDS_MODEL_PROVIDER’) == “bedrock”

else self.PG_CONFIG_OPENAI)

merged_config = self._merge_config(pg_config.copy())

return Mem0Memory.from_config(config_dict=merged_config)

使用方式

增强后的工具使用方式保持不变,但现在支持更多的部署选项:

import os

from strands import Agent

from strands_tools import mem0_memory

场景1:Aurora PostgreSQL + Amazon Bedrock上的模型

os.environ[“STRANDS_MODEL_PROVIDER”] = “bedrock”

os.environ[“LLM_MODEL”] = “us.anthropic.claude-3-7-sonnet-[已去除电话]-v1:0”

os.environ[“EMBEDDING_MODEL”] = “amazon.titan-embed-text-v2:0”

场景2:Aurora PostgreSQL + OpenAI兼容的模型

os.environ[“STRANDS_MODEL_PROVIDER”] = “openai”

os.environ[“LLM_MODEL”] = ” deepseek-ai/DeepSeek-R1/wvvkzheh2i”

os.environ[“EMBEDDING_MODEL”] = “Pro/BAAI/bge-m3”

os.environ[“OPENAI_API_KEY”] = “your-api-key”

PostgreSQL配置(通用)

os.environ[“POSTGRESQL_HOST”] = “aurora-cluster.cluster-xxx.us-east-1.rds.amazonaws.com”

os.environ[“POSTGRESQL_USER”] = “mem0_user”

os.environ[“POSTGRESQL_PASSWORD”] = “${AWS_SECRET_MANAGER_PASSWORD}”

os.environ[“DB_NAME”] = “mem0_db”

创建Agent,Agent后续会根据tool的定义,按需调用记忆增加、检索等接口

agent = Agent(tools=[mem0_memory])

技术优势

这些增强带来的核心价值:

  • 存储优化:Aurora PostgreSQL的Serverless架构和成本效益
  • 全球部署:支持不同地区的模型提供商和合规需求
  • 开发友好:保持API兼容性,通过环境变量实现灵活配置
  • 生态整合:与亚马逊云科技服务深度集成,简化运维管理
  • LangGraph:状态驱动的记忆工作流

LangGraph作为专门用于构建有状态、多智能体协作的编程框架,与Mem0的结合为构建具备长短期记忆的智能Agent提供了强大支撑。

核心集成优势

  • 状态管理:LangGraph的状态管理机制与Mem0的记忆系统完美结合,实现跨会话的记忆连续性
  • 工作流编排:通过节点和边的设计,可以精确控制记忆的检索、处理和存储时机
  • 弹性架构集成:与Aurora Serverless深度集成,实现云原生的弹性记忆管理
  • LangGraph工作流示例 – 记忆驱动的对话流程

from langgraph.graph import StateGraph, START, END

定义记忆增强的工作流

workflow = StateGraph(State)

workflow.add_node(“memory_retrieval”, memory_search_node)

workflow.add_node(“agent_processing”, memory_enhanced_chatbot)

workflow.add_node(“memory_storage”, memory_save_node)

记忆驱动的状态转换

workflow.add_edge(START, “memory_retrieval”)

workflow.add_edge(“memory_retrieval”, “agent_processing”)

workflow.add_edge(“agent_processing”, “memory_storage”)

workflow.add_edge(“memory_storage”, END)

编译并运行

app = workflow.compile()

result = app.invoke({“query”: “推荐一部电影”, “user_id”: “alice”})

基于项目实际实现的最小化节点函数示例

async def memory_enhanced_chatbot(state: AgentState) -> AgentState:

“””记忆增强的聊天机器人节点 – 基于项目中的响应生成节点”””

获取用户输入和记忆上下文

user_input = state.get(“user_input”, “”)

recalled_memories = state.get(“recalled_memories”, [])

生成记忆增强的响应

response = await generate_response_with_memory(

user_input=user_input,

memories=recalled_memories,

user_id=state.get(“user_id”, “default”)

)

更新状态

state[“final_response”] = response

state[“processing_complete”] = True

return state

async def memory_search_node(state: AgentState) -> AgentState:

“””记忆搜索节点 – 基于项目中的召回模型节点”””

user_input = state.get(“user_input”, “”)

user_id = state.get(“user_id”, “default”)

使用Mem0搜索相关记忆

mem0 = get_mem0_instance() # 获取Mem0实例

search_results = mem0.search(user_input, user_id=user_id)

处理搜索结果

if isinstance(search_results, dict) and “results” in search_results:

memories = search_results[“results”]

else:

memories = search_results or []

更新状态

state[“recalled_memories”] = memories

state[“memory_search_complete”] = True

logger.info(f”Retrieved {len(memories)} memories for user {user_id}”)

return state

async def memory_save_node(state: AgentState) -> AgentState:

“””记忆保存节点 – 基于项目中的最终化节点”””

user_input = state.get(“user_input”, “”)

final_response = state.get(“final_response”, “”)

user_id = state.get(“user_id”, “default”)

保存用户输入到记忆

mem0 = get_mem0_instance()

构建记忆内容和元数据

memory_content = f”User said: {user_input}”

metadata = {

“type”: “user_input”,

“timestamp”: datetime.now().isoformat(),

“conversation_id”: state.get(“conversation_id”),

“response_generated”: bool(final_response)

}

添加到记忆系统

result = mem0.add(memory_content, user_id=user_id, metadata=metadata)

更新状态

state[“memory_saved”] = True

state[“memory_save_result”] = result

logger.info(f”Memory saved for user {user_id}”)

return state

辅助函数

async def generate_response_with_memory(user_input: str, memories: List[Dict], user_id: str) -> str:

“””基于记忆生成响应的辅助函数”””

构建包含记忆上下文的提示

memory_context = “”

if memories:

memory_texts = [mem.get(“memory”, “”) for mem in memories[:3]] # 取前3个最相关的记忆

memory_context = f”相关记忆: {‘; ‘.join(memory_texts)}”

这里可以调用LLM生成响应

在实际项目中,这会调用Amazon Bedrock或其他LLM服务

response = f”基于记忆回复: {memory_context}\n用户问题: {user_input}”

return response

def get_mem0_instance():

“””获取Mem0实例的辅助函数”””

这里返回配置好的Mem0实例

在实际项目中,这会从配置中初始化Mem0

from mem0 import Memory

config = {

“version”: “v1.1”,

“llm”: {

“provider”: “aws_bedrock”,

“config”: {

“model”: “us.anthropic.claude-3-7-sonnet-[已去除电话]-v1:0”,

“max_tokens”: 1000,

“temperature”: 0.7

}

},

“vector_store”: {

“provider”: “pgvector”,

“config”: {

“host”: “aurora-serverless-cluster.cluster-xxx.us-east-1.rds.amazonaws.com”,

“port”: 5432,

“database”: “mem0_db”,

“user”: “mem0_user”,

“password”: “${AWS_SECRET_MANAGER_PASSWORD}”,

“embedding_model_dims”: 1024,

“collection_name”: “mem0_memories”,

“distance_metric”: “cosine”

}

}

}

return Memory.from_config(config)

进一步思考:分层记忆管理

基于认知科学的启发,我们可以基于Mem0实现分层记忆管理系统,模拟人类记忆的工作原理。系统将记忆按重要性分为工作记忆、短期记忆、长期记忆和核心记忆四个层级,实现智能的记忆资源管理。

分层记忆设计理念

  • 工作记忆层:存储当前对话的临时信息,具有最高访问速度但最短保存时间
  • 短期记忆层:保存近期的用户偏好和行为模式,为个性化服务提供基础数据
  • 长期记忆层:存储用户的稳定特征和重要历史信息,提供深度的用户理解能力
  • 核心记忆层:保存用户的身份信息和最关键特征,几乎永久保存
  • 分层记忆配置示例

memory_config = {

“importance_thresholds”: {

MemoryType.WORKING: 1.0, # 工作记忆重要性阈值

MemoryType.SHORT_TERM: 3.0, # 短期记忆重要性阈值

MemoryType.LONG_TERM: 5.0, # 长期记忆重要性阈值

MemoryType.CORE: 8.0 # 核心记忆重要性阈值

},

“decay_rates”: {

MemoryType.WORKING: 0.8, # 高衰减率:快速遗忘临时信息

MemoryType.SHORT_TERM: 0.3, # 中等衰减率:逐步淡化近期信息

MemoryType.LONG_TERM: 0.05, # 低衰减率:缓慢衰减长期信息

MemoryType.CORE: 0.01 # 极低衰减率:几乎永久保存

},

“max_age_hours”: {

MemoryType.WORKING: 24, # 工作记忆最大存活时间:1天

MemoryType.SHORT_TERM: 168, # 短期记忆最大存活时间:1周

MemoryType.LONG_TERM: 8760, # 长期记忆最大存活时间:1年

MemoryType.CORE: float(‘inf’) # 核心记忆:永久保存

},

“enable_automatic_promotion”: True, # 启用自动提升

“maintenance_interval_hours”: 6 # 维护间隔

}

记忆提升工作流程

智能提升机制通过以下步骤自动管理记忆层级:

  • 初始评估阶段:新记忆通过LLM进行重要性评分,低于1.0阈值的信息被直接丢弃,确保只有有价值的信息进入记忆系统
  • 工作记忆阶段:符合条件的记忆进入工作记忆层,系统开始监控访问频次和强化次数,为后续提升决策收集数据
  • 短期记忆提升:当访问次数达到3次且被强化2次以上,同时存在时间超过1小时时,系统自动将记忆提升至短期记忆层
  • 长期记忆提升:在短期记忆层停留24小时以上,且访问次数达到7次、重要性评分超过5.0时,记忆被提升至长期记忆层
  • 核心记忆提升:长期记忆中的高频访问内容(15次以上)且重要性评分达到8.0时,提升至核心记忆层,获得几乎永久的保存
  • 自然衰减机制:各层记忆根据不同的衰减率自然降低活跃度,长时间未访问的记忆会被自动清理,释放存储资源

整个提升过程完全自动化,无需人工干预。系统会根据访问频次、重要性评分、强化次数和时间因素综合判断,确保重要信息得到适当的保存层级,同时让临时信息自然衰减。

开发框架选择指南

  • 选择StrandsAgent:工具化集成方式,Agent自动决定记忆操作时机,适合快速开发和标准化场景
  • 选择LangGraph:工作流编排方式,开发者精确控制记忆处理流程,适合复杂业务逻辑和定制化需求
  • 直接API集成:完全自主控制,适合现有系统集成和特殊定制需求

通过这些开发框架和高级特性的支持,开发者可以根据具体需求选择最适合的集成方式,快速构建生产级的记忆增强型Agent应用。

监控管理层:可视化仪表板

为了有效管理和监控记忆系统,我们开发了基于Streamlit的可视化仪表板,展示如何在生产环境中操作和维护Mem0与PostgreSQL的集成。

功能特性

  • 实时监控:记忆总数、新增趋势、类型分布等关键指标
  • 数据库监控:Aurora PostgreSQL连接状态、性能指标和错误率
  • 智能搜索:支持语义搜索和多维度筛选分析
  • 开发支持:详细日志记录和错误处理机制
  • 核心技术实现

仪表板采用渐进式的数据获取策略,确保在不同数据状态和网络条件下都能正常工作:

核心数据获取实现 – 从Mem0读取记忆数据

@st.cache_data(ttl=30)

def get_user_memories(user_id: str) -> List[Dict[Any, Any]]:

“””初始化Mem0 对象”””

mem0 = init_mem0()

if not mem0 or not user_id.strip():

return []

try:

方法1:从Mem0检索记忆

search_result = mem0.search(“user”, user_id=user_id)

if isinstance(search_result, dict) and “results” in search_result:

memories = search_result[“results”]

else:

memories = search_result

if memories:

processed_memories = []

for memory in memories:

if isinstance(memory, dict):

processed_memories.append(memory)

elif hasattr(memory, ‘__dict__’):

processed_memories.append(memory.__dict__)

else:

标准化记忆对象

memory_dict = {

‘id’: getattr(memory, ‘id’, str(uuid.uuid4())),

‘memory’: str(memory),

‘user_id’: user_id,

‘created_at’: getattr(memory, ‘created_at’, datetime.now().isoformat()),

‘metadata’: getattr(memory, ‘metadata’, {})

}

processed_memories.append(memory_dict)

return processed_memories

方法2: 回退到全量数据获取

memories = mem0.get_all(user_id=user_id)

if isinstance(memories, dict) and ‘results’ in memories:

memories_list = memories[‘results’]

elif isinstance(memories, list):

memories_list = memories

else:

return []

processed_memories = []

for memory in memories_list:

if isinstance(memory, dict):

processed_memories.append(memory)

elif hasattr(memory, ‘__dict__’):

processed_memories.append(memory.__dict__)

return processed_memories

except Exception as e:

st.error(f”获取记忆数据时发生错误: {str(e)}”)

return []

*可视化界面展示*

如上图所示,仪表板提供了直观的记忆系统监控界面,包括记忆统计、搜索功能、数据筛选和详细的记忆内容展示。

业务价值体现

通过统一的监控界面,开发团队能够快速诊断问题、优化记忆策略、提升开发效率,为构建生产级记忆增强型AI系统提供完整的运维支撑。

架构总结

通过亚马逊云科技与Mem0的深度集成,我们构建了一个企业级记忆增强型AI解决方案:

  • 技术完整性:覆盖了从大模型调用、向量存储、图存储到应用开发的完整技术栈
  • 部署灵活性:支持多种模型提供商和存储后端,适应不同地区和场景的部署需求
  • 开发便利性:提供了工具调用和Agent集成的方式,满足不同复杂度的应用需求
  • 运维支撑:通过可视化仪表板和企业级安全特性,为生产环境提供必要的监控和管理能力

这种集成方案为开发者提供了构建记忆增强型AI应用的技术基础和实践参考。

Mem0使用效果示例

为了展示Mem0框架的智能记忆管理能力,我们通过一个企业内部知识分享的多轮对话场景,演示系统如何处理记忆演进和复杂关系推理。

第一次对话:建立基础关系

用户

“我和Bob是同一个项目组的,他是我们的技术负责人,在推荐系统方面很有经验。”

助手+系统响应

您好!我看到您提供了关于您的团队信息,我会将这些内容存储到记忆中,以便在未来的对话中更好地为您提供帮助。

Using Tool: mem0_memory

Tool Result (success)

感谢您分享关于您项目组的信息。我已记录下您与Bob在同一个项目组工作的情况,以及Bob作为技术负责人在推荐系统方面的丰富经验。这些信息对我们今后的交流很有帮助。

如果您有任何关于推荐系统或者与您项目相关的问题,或者需要其他方面的帮助,请随时告诉我。我会根据您提供的背景信息,尽可能地提供相关和个性化的支持。

向量数据库存储

第二次对话:扩展关系网络

用户

“Bob推荐我去找Carol学习机器学习,她是AI团队的专家,在深度学习方面特别厉害。”

助手+系统响应

我看到您提到Bob推荐您去找Carol学习机器学习,并且Carol是AI团队的专家,尤其在深度学习方面很厉害。让我先检查一下我是否有关于您的任何现有信息,以便为您提供更加个性化的帮助。

Using Tool: mem0_memory

Tool Result (success)

我看到您之前和Bob一起工作,他是项目的技术负责人,在推荐系统方面拥有丰富的经验。现在让我来为您存储关于Carol的新信息:

Using Tool: mem0_memory

Tool Result (success)

Using Tool: mem0_memory

Tool Result (success)

感谢您分享这些信息!我已经记录下了关于Carol的专业知识以及Bob的推荐。

根据我们之前的交流,我了解到您和Bob是同事,他是项目的技术负责人,在推荐系统方面有丰富经验。现在他推荐您向Carol学习机器学习,这是个很好的建议!

Carol作为AI团队的专家,特别是在深度学习领域有专长,确实是学习机器学习的理想人选。如果您有兴趣深入学习这个领域,找Carol指导是非常合适的。

您计划如何联系Carol呢?或者您对机器学习有什么特定的学习目标或问题想先了解一下?我很乐意帮您准备一些与Carol交流的关键问题或者提供一些机器学习的基础资源。

向量数据库存储

第三次对话:深化专业关联

用户

“Carol提到她和David合作过一个很成功的电商推荐系统项目,用的是深度学习技术。”

助手+系统响应

我注意到您提到了关于Carol和David合作的电商推荐系统项目的信息。为了更好地理解和保存这条重要信息,我需要先检查一下是否已经有关于Carol的记录,然后将这条新信息存储起来。

Using Tool: mem0_memory

Tool Result (success)

现在我看到了一些关于Carol的现有信息,我将存储这条新的关于Carol和David合作项目的信息:

Using Tool: mem0_memory

Tool Result (success)

感谢您分享这条信息。我已经记录了Carol和David合作开发的电商推荐系统项目的细节。

根据我已有的记忆,我了解到:

  • Carol是AI团队的专家,特别擅长深度学习领域
  • 现在我知道她与David合作开发了一个成功的电商推荐系统项目,应用了深度学习技术
  • 此前Bob曾推荐向Carol学习,特别是在机器学习方面
  • 这个项目信息很有价值。如果您想了解更多关于Carol的专业知识或者这个推荐系统项目的细节,或者您有任何其他信息想要分享,请随时告诉我。
  • 向量数据库存储

    第四次对话:查询,记忆存储价值体现

    用户

“我想找一个推荐系统方面的专家,最好是通过可信渠道了解到的,能给我一些实战经验。”

助手+系统响应

您好!我理解您正在寻找推荐系统方面的专家,希望获得一些实战经验。为了更好地为您服务,我想先了解一下您之前是否与我们讨论过相关话题或有任何推荐系统方面的偏好。

让我先查询一下您的历史记录,看看我们是否有相关的信息:

Using Tool: mem0_memory

Tool Result (success)

根据我查询到的信息,我发现您之前可能已经与一些推荐系统方面的专家有所联系或了解。基于我们的记录,我可以为您提供以下推荐:

  • Bob – 他是推荐系统方面的专家,并且是某项目的技术负责人。根据记录,您曾与Bob在同一个项目组中工作,他在推荐系统领域有丰富的经验。这是一个通过您的工作关系建立的可信渠道。
  • Carol – 她是一位AI团队的专家,特别在深度学习方面非常厉害。有趣的是,正是Bob之前推荐您向Carol学习。Carol与另一位同事David曾合作开发了一个成功的电商推荐系统项目,使用了深度学习技术。

鉴于您希望通过可信渠道了解到的专家,且需要实战经验,我建议您可以:

  • 优先联系Bob,他既是您的同事,也是推荐系统领域的专家,可以直接分享他的实战经验。
  • 请Bob引荐您认识Carol,她在深度学习推荐系统方面有成功的电商项目经验,特别是如果您对深度学习在推荐系统中的应用感兴趣。
  • 也可以请Carol介绍她与David合作的电商推荐系统项目的实战经验和关键技术点。

您希望我为您提供更多关于这些专家的详细信息,或者您对其他方面的推荐系统专家有特定要求吗?

无记忆系统的对比

记忆清除

登陆到向量数据库,将会话历史删除。

用户

“我想找一个推荐系统方面的专家,最好是通过可信渠道了解到的,能给我一些实战经验。”

助手+系统响应

您好!我很乐意帮您寻找推荐系统方面的专家。为了更好地为您提供个性化服务,我需要先了解一下您之前是否已经与我讨论过相关的需求或提供过一些信息。让我先查询一下您的历史记录。

Using Tool: mem0_memory

Tool Result (success)

看起来我们之前没有存储关于您的信息。为了能够更好地帮助您,我想了解一些更具体的信息,这样我可以将其保存下来,以便今后为您提供更个性化的服务。

关于您寻找推荐系统专家的需求,我想了解几点:

  • 您对推荐系统的哪些方面特别感兴趣?(例如:电商推荐、内容推荐、算法实现等)
  • 您希望这位专家能提供什么类型的实战经验?(例如:系统搭建、算法优化、A/B测试等)
  • 您计划通过什么方式与专家交流?(例如:咨询、合作、指导等)

我会将您提供的这些信息保存下来,以便今后能够更精准地为您服务。同时,我可以根据您的需求,推荐一些可以寻找推荐系统专家的可信渠道,如专业社区、行业会议、学术机构等。

核心价值对比

通过上述多轮对话演示,我们可以清晰地看到记忆系统与传统无记忆系统的核心差异:

|对比维度

|有记忆系统

|无记忆系统

|专家推荐

|基于已知关系推荐Bob、Carol、David

|无法提供具体推荐,需要用户自行寻找

|关系分析

|提供完整的关系链:Bob→Carol→David

|缺乏人际关系和背景关联信息

|上下文理解

|记住用户背景和历史交互内容

|每次对话都需要重新了解需求

|交互效率

|一次查询获得完整解决方案

|需要多轮问答收集基础信息

总结和展望

Mem0框架标志着Agentic AI应用从无状态向有状态的重要转变。通过双LLM架构和智能记忆管理机制,解决了传统AI系统的记忆缺失问题,实现了真正的个性化和上下文连续性。

Mem0框架与亚马逊云科技的深度集成真正体现了”相得益彰”的协同效应——Mem0提供智能记忆管理核心能力,亚马逊云科技提供企业级基础设施支撑。这种结合实现了能力倍增,推动记忆增强型AI从概念验证走向生产实践。该方案不仅可在亚马逊云科技海外区域部署,同样也适用于北京和宁夏区域。

记忆增强型Agent在企业客户服务、个性化教育、知识管理等领域展现出巨大潜力。随着多模态AI技术发展,未来的记忆系统将支持更丰富的数据类型和更强的自主学习能力。记忆增强型Agentic AI将推动人工智能从工具向伙伴的根本性转变,为用户创造更加智能、个性化的数字体验。

*前述特定亚马逊云科技生成式人工智能相关的服务目前在亚马逊云科技海外区域可用。亚马逊云科技中国区域相关云服务由西云数据和光环新网运营,具体信息以中国区域官网为准。


AWS HealthOmics 现支持为 Nextflow 工作流设置任务级别超时控制

AWS账单代付阅读(25)

AWS HealthOmics 现支持为 Nextflow 工作流设置任务级别超时控制

发布于: 2025年8月28日

AWS HealthOmics 推出了对 Nextflow 时间指令的支持,客户可通过该指令设置任务级别的超时控制,以限制特定任务的运行时长。通过此次发布,客户现在可以为其 Nextflow 工作流任务设置精细控制,当特定任务耗时超出预期时,它们将自动取消运行。AWS HealthOmics 是一项符合 HIPAA 标准的服务,能够通过完全托管的生物数据存储和工作流程,帮助医疗保健与生命科学客户加速取得科学突破。

现在,以下提供 AWS HealthOmics 的所有区域都支持 Nextflow 时间指令:美国东部(弗吉尼亚州北部)、美国西部(俄勒冈州)、欧洲地区(法兰克福、爱尔兰、伦敦)、亚太地区(新加坡)和以色列(特拉维夫)。

要了解有关工作流和 Nextflow 时间指令支持的更多信息,请参阅 AWS HealthOmics 文档。


Amazon Q 开发者版现支持 MCP 管理员控制

AWS账单代付阅读(24)

Amazon Q 开发者版现支持 MCP 管理员控制

发布于: 2025年8月28日

现在,管理员可以在 AWS 管理控制台中直接管理安装在 Amazon Q 开发者版客户端应用程序中的模型上下文协议(MCP)服务器。这为组织提供了安全有效地管理外部资源所需的精细控制。

此次发布后,管理员能够为组织内所有 Q 开发者版客户端启用或禁用 MCP 功能。这意味着如果管理员禁用该功能,用户将无法添加任何 MCP 服务器,也无法初始化先前定义的服务器。Q 开发者版会在每次会话开始时检查并应用管理员设置,并在客户端运行期间每 24 小时执行一次此操作。

Amazon Q 开发者版的 MCP 管理员控制支持涵盖 Amazon Q 开发者版命令行界面(CLI)及所有 Amazon Q 开发者版插件:VSCode、JetBrains、Visual Studio 和 Eclipse。要了解更多信息,请参阅文档。


AWS免费套餐到期后如何省钱:2025年最全面的成本优化毕业指南

AWS账单代付阅读(33)

引言:AWS免费套餐到期的”毕业焦虑”

对于许多AWS用户来说,12个月免费套餐的结束意味着真正的挑战开始。突然间,原本免费的服务开始产生账单,月度成本可能从$0飙升至$100甚至更多。本文作为您的AWS”毕业指南”,将帮助您在AWS免费套餐到期后如何省钱,实现平稳过渡到付费阶段。

AWS免费套餐到期影响全面分析

免费套餐到期后的成本变化

服务类型 免费套餐额度 到期后费用 典型使用成本/月
EC2 (t2.micro) 750小时/月 $0.0116/小时 $8.35
RDS (db.t2.micro) 750小时/月 $0.017/小时 $12.24
S3存储 5GB $0.023/GB $2.30 (100GB)
EBS存储 30GB $0.10/GB $10 (100GB)
数据传输 15GB出站 $0.09/GB $9 (100GB)
Lambda 100万请求 $0.20/百万请求 $2-5
CloudWatch 10个指标 $0.30/指标 $3-10

预计总成本:$50-100/月(基础使用)

AWS免费套餐到期后如何省钱:心理准备

免费套餐到期三个阶段:
1. 震惊期(第1个月):"怎么突然这么贵?"
2. 适应期(第2-3个月):"原来可以这样优化"
3. 成熟期(第4个月后):"成本在控制中"

立即执行:AWS免费套餐到期后紧急省钱措施

第一步:24小时内必做清单

优先级 行动项 预期节省 操作难度
紧急 停止所有非生产实例 30-50% 简单
紧急 删除未使用的资源 20-30% 简单
设置预算警报 防止超支 简单
降级开发环境配置 20-40% 中等
实施自动关机策略 30-65% 中等

紧急资源清理脚本

import boto3
from datetime import datetime, timedelta

class PostFreeTierOptimizer:
    """AWS免费套餐到期后如何省钱 - 紧急优化器"""

    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.rds = boto3.client('rds')
        self.s3 = boto3.client('s3')

    def emergency_cleanup(self):
        """紧急清理未使用资源"""

        total_savings = 0

        # 1. 停止闲置EC2实例
        instances = self.ec2.describe_instances(
            Filters=[
                {'Name': 'instance-state-name', 'Values': ['running']}
            ]
        )

        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                # 检查CPU利用率(需要CloudWatch)
                if self.is_instance_idle(instance['InstanceId']):
                    self.ec2.stop_instances(InstanceIds=[instance['InstanceId']])
                    print(f"Stopped idle instance: {instance['InstanceId']}")
                    total_savings += 8.35  # t2.micro月度成本

        # 2. 删除未挂载的EBS卷
        volumes = self.ec2.describe_volumes(
            Filters=[
                {'Name': 'status', 'Values': ['available']}
            ]
        )

        for volume in volumes['Volumes']:
            self.ec2.delete_volume(VolumeId=volume['VolumeId'])
            print(f"Deleted unattached volume: {volume['VolumeId']}")
            total_savings += volume['Size'] * 0.10

        # 3. 清理旧快照
        snapshots = self.ec2.describe_snapshots(OwnerIds=['self'])
        cutoff_date = datetime.now() - timedelta(days=30)

        for snapshot in snapshots['Snapshots']:
            if snapshot['StartTime'].replace(tzinfo=None) < cutoff_date:
                self.ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
                print(f"Deleted old snapshot: {snapshot['SnapshotId']}")
                total_savings += 0.05 * snapshot.get('VolumeSize', 0)

        # 4. 释放未使用的弹性IP
        addresses = self.ec2.describe_addresses()

        for address in addresses['Addresses']:
            if 'InstanceId' not in address:
                self.ec2.release_address(AllocationId=address['AllocationId'])
                print(f"Released unused EIP: {address['PublicIp']}")
                total_savings += 3.60

        print(f"n紧急清理完成!预计月度节省: ${total_savings:.2f}")
        return total_savings

    def is_instance_idle(self, instance_id, threshold=5.0):
        """检查实例是否闲置(CPU < 5%)"""
        cw = boto3.client('cloudwatch')

        response = cw.get_metric_statistics(
            Namespace='AWS/EC2',
            MetricName='CPUUtilization',
            Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
            StartTime=datetime.now() - timedelta(hours=1),
            EndTime=datetime.now(),
            Period=3600,
            Statistics=['Average']
        )

        if response['Datapoints']:
            avg_cpu = response['Datapoints'][0]['Average']
            return avg_cpu < threshold
        return False

# 执行紧急优化
optimizer = PostFreeTierOptimizer()
optimizer.emergency_cleanup()

AWS免费套餐到期后的长期省钱策略

策略一:智能实例管理

实例优化决策矩阵:

当前实例类型 使用模式 推荐方案 预期节省
t2.micro 24/7 持续低负载 t4g.micro (ARM) 20%
t2.micro 24/7 间歇高负载 t3.micro + 积分 15%
t2.small+ 开发环境 定时开关 65%
任意类型 可中断任务 Spot实例 70-90%
多个小实例 微服务 ECS Fargate 30-40%

自动化实例管理脚本:

def setup_instance_scheduler():
    """AWS免费套餐到期后如何省钱 - 实例调度器"""

    import json

    # Lambda函数代码
    lambda_code = '''
import boto3
from datetime import datetime

def lambda_handler(event, context):
    ec2 = boto3.client('ec2')
    current_hour = datetime.now().hour

    # 工作时间: 9 AM - 6 PM
    if 9 <= current_hour < 18:
        # 启动开发/测试实例
        instances = ec2.describe_instances(
            Filters=[
                {'Name': 'tag:Environment', 'Values': ['Development', 'Testing']},
                {'Name': 'instance-state-name', 'Values': ['stopped']}
            ]
        )

        instance_ids = []
        for r in instances['Reservations']:
            for i in r['Instances']:
                instance_ids.append(i['InstanceId'])

        if instance_ids:
            ec2.start_instances(InstanceIds=instance_ids)
            print(f"Started {len(instance_ids)} instances")
    else:
        # 停止开发/测试实例
        instances = ec2.describe_instances(
            Filters=[
                {'Name': 'tag:Environment', 'Values': ['Development', 'Testing']},
                {'Name': 'instance-state-name', 'Values': ['running']}
            ]
        )

        instance_ids = []
        for r in instances['Reservations']:
            for i in r['Instances']:
                instance_ids.append(i['InstanceId'])

        if instance_ids:
            ec2.stop_instances(InstanceIds=instance_ids)
            print(f"Stopped {len(instance_ids)} instances")

    return {'statusCode': 200}
    '''

    # 创建EventBridge规则(每小时运行)
    events = boto3.client('events')

    rule_response = events.put_rule(
        Name='InstanceScheduler',
        ScheduleExpression='rate(1 hour)',
        State='ENABLED',
        Description='AWS免费套餐到期后省钱 - 自动开关实例'
    )

    print("实例调度器设置完成")
    print("预计节省: 65% 的开发/测试环境成本")

策略二:存储优化方案

S3智能分层配置:

def optimize_s3_storage():
    """AWS免费套餐到期后如何省钱 - S3存储优化"""

    s3 = boto3.client('s3')

    # 获取所有桶
    buckets = s3.list_buckets()

    for bucket in buckets['Buckets']:
        bucket_name = bucket['Name']

        # 配置智能分层
        lifecycle_policy = {
            'Rules': [
                {
                    'Id': 'IntelligentTiering',
                    'Status': 'Enabled',
                    'Transitions': [
                        {
                            'Days': 0,
                            'StorageClass': 'INTELLIGENT_TIERING'
                        }
                    ]
                },
                {
                    'Id': 'ArchiveOldData',
                    'Status': 'Enabled',
                    'Transitions': [
                        {
                            'Days': 90,
                            'StorageClass': 'GLACIER_IR'
                        },
                        {
                            'Days': 180,
                            'StorageClass': 'DEEP_ARCHIVE'
                        }
                    ]
                },
                {
                    'Id': 'DeleteOldData',
                    'Status': 'Enabled',
                    'Expiration': {
                        'Days': 365
                    },
                    'NoncurrentVersionExpiration': {
                        'NoncurrentDays': 30
                    }
                }
            ]
        }

        try:
            s3.put_bucket_lifecycle_configuration(
                Bucket=bucket_name,
                LifecycleConfiguration=lifecycle_policy
            )
            print(f"优化 {bucket_name} 存储策略")
        except Exception as e:
            print(f"Error optimizing {bucket_name}: {e}")

    print("nS3存储优化完成")
    print("预计节省: 40-60% 的存储成本")

策略三:数据库优化

RDS成本优化方案:

优化方法 适用场景 实施步骤 节省比例
Aurora Serverless v2 不规则负载 迁移到Serverless 40-80%
降级实例规格 过度配置 监控后降级 30-50%
单AZ部署 开发环境 禁用Multi-AZ 50%
预留实例 稳定负载 购买1年RI 30-40%
定时停止 非生产环境 夜间/周末停止 60-70%
def optimize_rds_costs():
    """AWS免费套餐到期后如何省钱 - RDS优化"""

    rds = boto3.client('rds')

    # 获取所有数据库实例
    db_instances = rds.describe_db_instances()

    recommendations = []
    total_savings = 0

    for db in db_instances['DBInstances']:
        db_id = db['DBInstanceIdentifier']
        instance_class = db['DBInstanceClass']
        multi_az = db['MultiAZ']
        engine = db['Engine']

        # 分析并提供建议
        if 'prod' not in db_id.lower() and multi_az:
            recommendations.append({
                'DB': db_id,
                'Action': 'Disable Multi-AZ',
                'Savings': '$12/month'
            })
            total_savings += 12

        if instance_class == 'db.t2.micro' and engine in ['mysql', 'postgres']:
            recommendations.append({
                'DB': db_id,
                'Action': 'Consider Aurora Serverless',
                'Savings': '$8/month'
            })
            total_savings += 8

        # 检查是否可以降级
        if 'dev' in db_id.lower() or 'test' in db_id.lower():
            if instance_class != 'db.t2.micro':
                recommendations.append({
                    'DB': db_id,
                    'Action': f'Downgrade from {instance_class} to db.t3.micro',
                    'Savings': '$20/month'
                })
                total_savings += 20

    # 打印建议
    print("RDS优化建议:")
    for rec in recommendations:
        print(f"- {rec['DB']}: {rec['Action']} (节省 {rec['Savings']})")

    print(f"n总计月度节省: ${total_savings}")

    return recommendations

AWS免费套餐到期后的成本监控体系

建立三层监控体系

def setup_cost_monitoring():
    """AWS免费套餐到期后如何省钱 - 成本监控"""

    cw = boto3.client('cloudwatch')
    sns = boto3.client('sns')

    # 创建SNS主题
    topic = sns.create_topic(Name='AWSCostAlerts')
    topic_arn = topic['TopicArn']

    # 订阅邮件通知
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='email',
        Endpoint='your-email@example.com'
    )

    # 层级1:每日支出警报(超过$5)
    cw.put_metric_alarm(
        AlarmName='DailySpendAlert',
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=1,
        MetricName='EstimatedCharges',
        Namespace='AWS/Billing',
        Period=86400,
        Statistic='Maximum',
        Threshold=5.0,
        ActionsEnabled=True,
        AlarmActions=[topic_arn],
        AlarmDescription='Alert when daily spend exceeds $5',
        Dimensions=[
            {'Name': 'Currency', 'Value': 'USD'}
        ]
    )

    # 层级2:月度预算警报(80%时)
    budgets = boto3.client('budgets')

    budgets.create_budget(
        AccountId=boto3.client('sts').get_caller_identity()['Account'],
        Budget={
            'BudgetName': 'MonthlyBudget',
            'BudgetLimit': {
                'Amount': '50',
                'Unit': 'USD'
            },
            'TimeUnit': 'MONTHLY',
            'BudgetType': 'COST'
        },
        NotificationsWithSubscribers=[
            {
                'Notification': {
                    'NotificationType': 'ACTUAL',
                    'ComparisonOperator': 'GREATER_THAN',
                    'Threshold': 80,
                    'ThresholdType': 'PERCENTAGE'
                },
                'Subscribers': [
                    {
                        'SubscriptionType': 'EMAIL',
                        'Address': 'your-email@example.com'
                    }
                ]
            }
        ]
    )

    # 层级3:异常检测
    ce = boto3.client('ce')

    ce.create_anomaly_detector(
        AnomalyDetector={
            'DimensionValues': ['SERVICE'],
            'MonitorDimension': 'SERVICE'
        }
    )

    print("成本监控体系建立完成")
    print("- 每日支出警报: >$5")
    print("- 月度预算警报: >80% ($40)")
    print("- 异常检测: 自动识别异常支出")

AWS免费套餐到期后的架构转型

向Serverless架构迁移

传统架构 vs Serverless成本对比:

组件 传统架构 月成本 Serverless替代 月成本 节省
Web服务器 EC2 t2.small $16.79 Lambda + API Gateway $5 70%
数据库 RDS t2.micro $12.41 DynamoDB On-Demand $3 76%
队列 EC2 + RabbitMQ $8.35 SQS $0.50 94%
缓存 ElastiCache $12.40 DynamoDB DAX $2 84%
文件存储 EBS 100GB $10 S3 $2.30 77%
总计 $59.95 $12.80 79%

Serverless迁移实施指南

def migrate_to_serverless():
    """AWS免费套餐到期后如何省钱 - Serverless迁移"""

    # Lambda函数示例(替代EC2 Web服务器)
    lambda_function = '''
import json
import boto3

def lambda_handler(event, context):
    # 处理HTTP请求
    method = event['httpMethod']
    path = event['path']

    if method == 'GET' and path == '/users':
        # 从DynamoDB获取数据
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('Users')

        response = table.scan()

        return {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json'
            },
            'body': json.dumps(response['Items'])
        }

    return {
        'statusCode': 404,
        'body': json.dumps({'error': 'Not found'})
    }
    '''

    # API Gateway配置
    api_config = {
        'swagger': '2.0',
        'info': {
            'title': 'Serverless API',
            'version': '1.0.0'
        },
        'paths': {
            '/users': {
                'get': {
                    'x-amazon-apigateway-integration': {
                        'uri': 'arn:aws:apigateway:region:lambda:path/functions/arn/invocations',
                        'passthroughBehavior': 'when_no_match',
                        'httpMethod': 'POST',
                        'type': 'aws_proxy'
                    }
                }
            }
        }
    }

    # DynamoDB表配置(按需付费)
    dynamodb_config = {
        'TableName': 'Users',
        'BillingMode': 'PAY_PER_REQUEST',  # 按需付费
        'AttributeDefinitions': [
            {'AttributeName': 'userId', 'AttributeType': 'S'}
        ],
        'KeySchema': [
            {'AttributeName': 'userId', 'KeyType': 'HASH'}
        ]
    }

    print("Serverless架构配置完成")
    print("预计成本: $12.80/月 (vs 传统架构 $59.95/月)")
    print("节省: 79% 或 $47.15/月")

购买折扣计划:最大化长期节省

AWS折扣选项对比

折扣类型 承诺期限 平均折扣 适用服务 灵活性
Savings Plans 1年或3年 30-72% EC2/Lambda/Fargate
Reserved Instances 1年或3年 30-75% EC2/RDS/ElastiCache
Spot Instances 70-90% EC2
预付积分 5-10% 所有服务

折扣计划决策框架

def calculate_discount_options():
    """AWS免费套餐到期后如何省钱 - 折扣计算器"""

    # 假设当前月度支出
    current_monthly_spend = {
        'EC2': 30,
        'RDS': 20,
        'Lambda': 10,
        'Other': 10
    }

    total_monthly = sum(current_monthly_spend.values())

    # 计算不同折扣方案的节省
    options = []

    # 选项1:Compute Savings Plan (1年,全预付)
    compute_sp_discount = 0.30  # 30%折扣
    compute_spend = current_monthly_spend['EC2'] + current_monthly_spend['Lambda']
    compute_savings = compute_spend * compute_sp_discount * 12

    options.append({
        'Plan': 'Compute Savings Plan (1年)',
        'Upfront': compute_spend * 12 * (1 - compute_sp_discount),
        'Annual Savings': compute_savings,
        'ROI': f'{compute_sp_discount * 100:.0f}%'
    })

    # 选项2:EC2 Reserved Instance (1年,无预付)
    ec2_ri_discount = 0.40  # 40%折扣
    ec2_savings = current_monthly_spend['EC2'] * ec2_ri_discount * 12

    options.append({
        'Plan': 'EC2 Reserved Instance (1年)',
        'Upfront': 0,
        'Annual Savings': ec2_savings,
        'ROI': f'{ec2_ri_discount * 100:.0f}%'
    })

    # 选项3:混合策略
    hybrid_savings = (
        current_monthly_spend['EC2'] * 0.40 +  # RI for EC2
        current_monthly_spend['Lambda'] * 0.30 +  # SP for Lambda
        current_monthly_spend['RDS'] * 0.35  # RI for RDS
    ) * 12

    options.append({
        'Plan': '混合策略',
        'Upfront': 'Variable',
        'Annual Savings': hybrid_savings,
        'ROI': f'{(hybrid_savings / (total_monthly * 12)) * 100:.0f}%'
    })

    # 打印分析结果
    print("=" * 60)
    print("AWS免费套餐到期后省钱方案分析")
    print("=" * 60)
    print(f"当前月度支出: ${total_monthly}")
    print(f"年度支出(无优化): ${total_monthly * 12}")
    print("n折扣方案对比:")

    for option in options:
        print(f"n{option['Plan']}:")
        print(f"  预付: ${option['Upfront']}")
        print(f"  年度节省: ${option['Annual Savings']:.2f}")
        print(f"  投资回报率: {option['ROI']}")

    return options

实战案例:从$200到$50的优化之旅

真实案例分析

初创公司优化案例:

背景:
- SaaS产品,10,000活跃用户
- AWS免费套餐刚到期
- 首月账单:$198

优化前架构:
- 2个 t2.small EC2 (Web服务器): $33.58
- 1个 t2.small EC2 (应用服务器): $16.79
- RDS t2.small (MySQL): $24.82
- 100GB EBS: $10
- 200GB S3: $4.60
- 数据传输 500GB: $45
- ElastiCache t2.micro: $12.41
- 2个 ELB: $36
- 其他: $14.80
总计: $198/月

优化措施实施:
优化步骤 具体操作 成本变化 累计节省
Week 1 停止开发环境夜间运行 -$25 $25
Week 2 合并Web服务器,使用ALB -$20 $45
Week 3 RDS改为Aurora Serverless -$15 $60
Week 4 S3智能分层 -$2 $62
Month 2 迁移静态资源到CloudFront -$30 $92
Month 2 实施Spot实例 -$18 $110
Month 3 购买Savings Plan -$38 $148

最终结果:$50/月(节省75%)

工具和自动化脚本集合

一键优化脚本

class AWSGraduationOptimizer:
    """AWS免费套餐到期后如何省钱 - 综合优化器"""

    def __init__(self):
        self.session = boto3.Session()
        self.account_id = self.session.client('sts').get_caller_identity()['Account']

    def run_full_optimization(self):
        """运行完整优化流程"""

        print("=" * 60)
        print("AWS免费套餐毕业优化程序")
        print("=" * 60)

        steps = [
            ("分析当前成本", self.analyze_costs),
            ("清理未使用资源", self.cleanup_unused),
            ("优化实例类型", self.optimize_instances),
            ("配置自动调度", self.setup_scheduling),
            ("优化存储", self.optimize_storage),
            ("设置监控告警", self.setup_monitoring),
            ("生成优化报告", self.generate_report)
        ]

        total_savings = 0

        for step_name, step_function in steps:
            print(f"n执行: {step_name}")
            print("-" * 40)

            try:
                savings = step_function()
                total_savings += savings
                print(f"✓ 完成 - 节省: ${savings:.2f}/月")
            except Exception as e:
                print(f"✗ 错误: {e}")

        print("n" + "=" * 60)
        print(f"优化完成!")
        print(f"预计月度节省: ${total_savings:.2f}")
        print(f"年度节省: ${total_savings * 12:.2f}")
        print("=" * 60)

        return total_savings

    def analyze_costs(self):
        """分析成本构成"""
        ce = self.session.client('ce')

        response = ce.get_cost_and_usage(
            TimePeriod={
                'Start': (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d'),
                'End': datetime.now().strftime('%Y-%m-%d')
            },
            Granularity='MONTHLY',
            Metrics=['UnblendedCost'],
            GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}]
        )

        print("过去30天成本分析:")
        for result in response['ResultsByTime']:
            for group in result['Groups']:
                service = group['Keys'][0]
                cost = float(group['Metrics']['UnblendedCost']['Amount'])
                if cost > 0.01:
                    print(f"  {service}: ${cost:.2f}")

        return 0  # 分析不产生节省

    def cleanup_unused(self):
        """清理未使用资源"""
        # 实现清理逻辑
        return 25.0  # 返回预计节省

    def optimize_instances(self):
        """优化实例配置"""
        # 实现优化逻辑
        return 30.0

    def setup_scheduling(self):
        """设置自动调度"""
        # 实现调度逻辑
        return 40.0

    def optimize_storage(self):
        """优化存储配置"""
        # 实现存储优化
        return 15.0

    def setup_monitoring(self):
        """设置成本监控"""
        # 实现监控设置
        return 0

    def generate_report(self):
        """生成优化报告"""
        report = f"""
AWS免费套餐毕业优化报告
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

优化建议优先级:
1. 立即执行:清理未使用资源
2. 本周完成:实例类型优化和调度设置
3. 本月完成:存储优化和折扣计划评估
4. 持续执行:成本监控和定期审查

下一步行动:
- 每周审查成本报告
- 每月优化一次资源配置
- 每季度评估折扣计划
        """

        with open('optimization_report.txt', 'w') as f:
            f.write(report)

        print("报告已生成: optimization_report.txt")
        return 0

# 运行优化器
if __name__ == "__main__":
    optimizer = AWSGraduationOptimizer()
    total_savings = optimizer.run_full_optimization()

总结:AWS免费套餐到期后的省钱智慧

核心省钱原则

原则 具体做法 预期效果
只为使用付费 Serverless优先、按需计费 节省60-80%
自动化一切 调度、监控、优化全自动 节省40-60%
持续优化 每周审查、每月调整 持续改进
合理投资 评估并购买折扣计划 长期节省30-70%

毕业后的成长路径

月度预算进化路径:
第1个月: $100+ (初始冲击)
第2个月: $70 (基础优化)
第3个月: $50 (深度优化)
第6个月: $40 (折扣计划)
第12个月: $30 (成熟优化)

最终建议

AWS免费套餐到期后如何省钱的关键在于:

  1. 心态调整:从”免费”到”优化”的思维转变
  2. 快速行动:第一时间清理和优化
  3. 系统方法:建立成本优化体系
  4. 持续改进:定期审查和调整
  5. 专业支持:必要时寻求专业帮助

记住,AWS免费套餐的结束不是结束,而是您云计算旅程真正专业化的开始。通过本指南的策略和工具,您完全可以将成本控制在合理范围内,同时享受AWS强大的云服务能力。


AWS代付、代充值免实名

联系我们阿里云国际免实名