定义
$externalFunction
阶段在特定的 AWS Lambda 资源中会触发进程。您向 AWS Lambda 进程发出的请求可以是同步的,也可以是异步的。
创建 AWS Lambda 并使用统一的 AWS 访问进行身份验证
为了从 Atlas Stream Processing 管道中调用 AWS Lambda 资源,您的 AWS Lambda 必须部署在同一个部署 Atlas Stream Processing 的 AWS 区域。如要了解有关部署 AWS Lambda 资源的更多信息,请参阅 AWS 文档。
创建Amazon Web Services Lambda函数。
使用Amazon Web Services CLI或通过Amazon Web Services用户界面,创建Lambda函数。
配置统一的 AWS 访问权限。
注意
此处描述的过程仅涵盖 Atlas 用户界面中的基本设置流程。如要了解更多信息,请参阅设置统一的 AWS 访问权限文档。
必需的访问权限
如要设置统一的 AWS 访问权限,您必须对项目拥有 Organization Owner
或 Project Owner
访问权限。
先决条件
一个 Atlas 帐户。
注意
您的 AWS IAM 策略必须包含
lambda:InvokeFunction
操作。您必须将占位符
ExternalId
和Resource
替换为您自己的值,这些值可通过统一的 AWS访问权限的配置过程获得。请注意,此示例中的ExternalId
包含一个通配符,该通配符匹配名称以function-
开头的任何 Lambda 函数。
在 Atlas 用户界面中为现有角色添加信任关系
接下来,您必须启用自管理的 AWS IAM 角色以执行您的 AWS Lambda 资源。
permission-policy.json
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": "arn:aws:lambda:us-east-1:257394458927:function:<function-name>" } ] }
导航到 Atlas 项目中的 AWS IAM 集成页面,然后点击 Authorize an AWS IAM role 按钮。
使用模态中显示的
role-trust-policy.json
创建一个新角色(或修改现有角色)。一旦创建角色(或使用新的信任策略更新现有角色),请将角色的 ARN 粘贴到模态窗口中。
在 AWS 控制台中,转到 IAM > Roles,然后选择您的角色。
在 permissions 标签页中,添加新的“内联权限”以允许此角色调用您的 lambda 函数。上面提供的示例
permission-policy.json
增加了运行任何名称为<function-name>
的 lambda 的权限。最后,导航到您的 Atlas Stream Processing 实例,添加一个新的 AWS Lambda connection,然后选择您在上一步中配置的 AWS IAM Role ARN。
将您的 Atlas Stream Processing 实例连接到您的 AWS Lambda 函数
为了从您的 Atlas Stream Processing 管道向您的 AWS Lambda 资源发送请求,您必须首先将您的 AWS Lambda 资源作为连接添加到您的 Atlas Stream Processing 资源中。
您可以通过Atlas用户界面、 Atlas CLI或Atlas API将Amazon Web Services Lambda资源添加为连接,如以下示例所示。 您可以使用Amazon Web Services IAM 配置中的 arn
更新示例中的 roleArn
占位符。
curl --user "username:password" --digest \ --header "Content-Type: application/json" \ --header "Accept: application/vnd.atlas.2023-02-01+json" \ --include \ --data '{"name": "TestAWSLambdaConnection","type": "AWSLambda","aws": {"roleArn": "arn:aws:iam::<aws_account>:role/<role_name>"}}' \ --request POST "https://cloud.mongodb.com/api/atlas/v2/groups/<group_id>/streams/<tenant_name>/connections"
语法
最小化请求
以下示例展示了最小请求所需的必填字段。
{ $externalFunction: { connectionName: "myLambdaConnection", functionName: "arn:aws:lambda:region:account-id:function:function-name", as: "response", }}
自定义请求
以下自定义示例除了上面说明的必填字段外,还指定错误处理、同步执行以及预处理的 Atlas Stream Processing 文档作为负载。
{ $externalFunction: { connectionName: "myLambdaConnection", functionName: "arn:aws:lambda:region:account-id:function:function-name", execution: "sync" as: "response", onError: "fail", payload: [{$replaceRoot: { newRoot: "$fullDocument.payloadToSend" } }, { $addFields: { sum: { $sum: "$randomArray" }}}, { $project: { success: 1, sum: 1 }}], }}
注意
onError
字段定义了 API 级别错误的行为,适用于对您的 AWS Lambda 资源的同步和异步请求,以及同步请求的 AWS Lambda 函数错误。
$externalFunction
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 字符串 | 必需 | 该标签标识请求发送到的连接注册表中的连接。 |
| 字符串 | 必需 | 完整的 AWS ARN 或要触发的 AWS Lambda 函数名称。 |
| 枚举 | Optional | 参数,用于指定应同步调用还是异步调用Amazon Web Services Lambda函数。 接受的值为:
默认为 |
| 字符串 | Optional | REST API 响应的对应字段的名称。 如果此终结点返回 0 个字节,该操作符则不会设置 |
| 字符串 | Optional | 当此操作符遇到
默认值为 |
| 阵列 | Optional | 自定义内部管道,允许您自定义发送到 API 终结点的请求正文。
|
行为
$externalFunction
阶段向指定的Amazon Web Services Lambda资源发送请求。如果请求是同步请求,则在指定字段中返回响应,并且管道继续处理。如果请求是异步请求,管道会继续处理,而无需等待响应。
如果同步请求失败,则管道的错误处理行为由 onError
字段决定。如果请求是异步请求,则 onError
字段仅适用于Amazon Web Services API错误,而不适用于Amazon Web Services Lambda函数错误。 如果未指定 onError
字段,默认行为是将受影响的文档发送到 死信队列(DLQ)。
onError 值 | 行为 |
---|---|
| 受影响的文档被发送到死信队列(DLQ)。 |
| 受影响的文档将被忽略。 |
| 流处理器因出错而终止。 |
如果管道无法将文档转换为正确的JSON,或者配置的管道未创建有效的BSON对象作为最后阶段的产品,则无论 onError
设置如何,文档都会被发送到死信队列。
由于 $externalFunction
操作符本身配置不正确而引起的错误(例如无效表达式)不会触发onError
行为。 相反,流处理器会失败并显示错误消息。Atlas Stream Processing管道会重试可能由于暂时性错误而导致的失败请求。