Docs 菜单
Docs 主页
/
Atlas
/ /

$externalFunction

$externalFunction

$externalFunction 阶段在特定的 AWS Lambda 资源中会触发进程。您向 AWS Lambda 进程发出的请求可以是同步的,也可以是异步的。

为了从 Atlas Stream Processing 管道中调用 AWS Lambda 资源,您的 AWS Lambda 必须部署在同一个部署 Atlas Stream Processing 的 AWS 区域。如要了解有关部署 AWS Lambda 资源的更多信息,请参阅 AWS 文档。

1

使用Amazon Web Services CLI或通过Amazon Web Services用户界面,创建Lambda函数。

注意

  • AWS Lambda 资源必须使用“AWS_IAM”身份验证类型进行创建,这要求您创建一个 AWS IAM 角色策略

  • 仅支持缓冲响应类型,不支持流媒体响应类型。

2

注意

此处描述的过程仅涵盖 Atlas 用户界面中的基本设置流程。如要了解更多信息,请参阅设置统一的 AWS 访问权限文档。

必需的访问权限

如要设置统一的 AWS 访问权限,您必须对项目拥有 Organization OwnerProject Owner 访问权限。

先决条件

注意

  • 您的 AWS IAM 策略必须包含 lambda:InvokeFunction 操作。

  • 您必须将占位符 ExternalIdResource 替换为您自己的值,这些值可通过统一的 AWS访问权限的配置过程获得。请注意,此示例中的 ExternalId 包含一个通配符,该通配符匹配名称以 function- 开头的任何 Lambda 函数。

3

接下来,您必须启用自管理的 AWS IAM 角色以执行您的 AWS Lambda 资源。

permission-policy.json

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "arn:aws:lambda:us-east-1:257394458927:function:<function-name>"
}
]
}
  1. 导航到 Atlas 项目中的 AWS IAM 集成页面,然后点击 Authorize an AWS IAM role 按钮。

  2. 使用模态中显示的 role-trust-policy.json 创建一个新角色(或修改现有角色)。

  3. 一旦创建角色(或使用新的信任策略更新现有角色),请将角色的 ARN 粘贴到模态窗口中。

  4. 在 AWS 控制台中,转到 IAM > Roles,然后选择您的角色。

  5. permissions 标签页中,添加新的“内联权限”以允许此角色调用您的 lambda 函数。上面提供的示例 permission-policy.json 增加了运行任何名称为 <function-name> 的 lambda 的权限。

  6. 最后,导航到您的 Atlas Stream Processing 实例,添加一个新的 AWS Lambda connection,然后选择您在上一步中配置的 AWS IAM Role ARN

为了从您的 Atlas Stream Processing 管道向您的 AWS Lambda 资源发送请求,您必须首先将您的 AWS Lambda 资源作为连接添加到您的 Atlas Stream Processing 资源中。

您可以通过Atlas用户界面、 Atlas CLI或Atlas API将Amazon Web Services Lambda资源添加为连接,如以下示例所示。 您可以使用Amazon Web Services IAM 配置中的 arn 更新示例中的 roleArn 占位符。

curl --user "username:password" --digest \
--header "Content-Type: application/json" \
--header "Accept: application/vnd.atlas.2023-02-01+json" \
--include \
--data '{"name": "TestAWSLambdaConnection","type": "AWSLambda","aws": {"roleArn": "arn:aws:iam::<aws_account>:role/<role_name>"}}' \
--request POST "https://cloud.mongodb.com/api/atlas/v2/groups/<group_id>/streams/<tenant_name>/connections"

以下示例展示了最小请求所需的必填字段。

{ $externalFunction: {
connectionName: "myLambdaConnection",
functionName: "arn:aws:lambda:region:account-id:function:function-name",
as: "response",
}}

以下自定义示例除了上面说明的必填字段外,还指定错误处理、同步执行以及预处理的 Atlas Stream Processing 文档作为负载。

{ $externalFunction: {
connectionName: "myLambdaConnection",
functionName: "arn:aws:lambda:region:account-id:function:function-name",
execution: "sync"
as: "response",
onError: "fail",
payload: [{$replaceRoot: { newRoot: "$fullDocument.payloadToSend" } }, { $addFields: { sum: { $sum: "$randomArray" }}}, { $project: { success: 1, sum: 1 }}],
}}

注意

onError 字段定义了 API 级别错误的行为,适用于对您的 AWS Lambda 资源的同步和异步请求,以及同步请求的 AWS Lambda 函数错误。

$externalFunction 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

该标签标识请求发送到的连接注册表中的连接。

functionName

字符串

必需

完整的 AWS ARN 或要触发的 AWS Lambda 函数名称。

execution

枚举

Optional

参数,用于指定应同步调用还是异步调用Amazon Web Services Lambda函数。 接受的值为:

  • sync

  • async

默认为 sync,如果 Atlas 流处理管道中的剩余阶段需要 AWS Lambda 函数的输出,则此为必需项。

as

字符串

Optional

REST API 响应的对应字段的名称。

如果此终结点返回 0 个字节,该操作符则不会设置 as 字段。

onError

字符串

Optional

当此操作符遇到 HTTPS Status Code 或 Lambda 运行时相关故障时的行为。必须是以下值之一:

  • "dlq" :将受影响的文档传递到死信队列。

  • "ignore" :对受影响的文档不执行任何操作。

  • "fail" :在出错时终止流处理器。

onError 不会因 $externalFunction 操作符本身配置不当(例如,无效表达式)而触发错误。

默认值为 "dlq"

payload

阵列

Optional

自定义内部管道,允许您自定义发送到 API 终结点的请求正文。payload 支持以下表达式:

  • $project

  • $addFields

  • $replaceRoot

  • $set

$externalFunction 阶段向指定的Amazon Web Services Lambda资源发送请求。如果请求是同步请求,则在指定字段中返回响应,并且管道继续处理。如果请求是异步请求,管道会继续处理,而无需等待响应。

如果同步请求失败,则管道的错误处理行为由 onError字段决定。如果请求是异步请求,则 onError字段仅适用于Amazon Web Services API错误,而不适用于Amazon Web Services Lambda函数错误。 如果未指定 onError 字段,默认行为是将受影响的文档发送到 死信队列(DLQ)。

onError
行为

"dlq"

受影响的文档被发送到死信队列(DLQ)。

"ignore"

受影响的文档将被忽略。

"fail"

流处理器因出错而终止。

如果管道无法将文档转换为正确的JSON,或者配置的管道未创建有效的BSON对象作为最后阶段的产品,则无论 onError 设置如何,文档都会被发送到死信队列

由于 $externalFunction操作符本身配置不正确而引起的错误(例如无效表达式)不会触发onError 行为。 相反,流处理器会失败并显示错误消息。Atlas Stream Processing管道会重试可能由于暂时性错误而导致的失败请求。

后退

$merge

在此页面上