Credential dispenser

For most jobs, granting VTK access to your S3 bucket is a one-time setup: create an IAM role in your account, trust the VTK worker role, set role_arn on the job (Security → IAM). That gives every task in the job the same set of permissions for the lifetime of the job.

A credential dispenser is the alternative: an HTTPS endpoint you host that returns AWS credentials on demand. Each storage:get / storage:put task calls the endpoint and uses whatever it gets back. The worker auto-refreshes when the credentials are about to expire.

The endpoint can be anything that speaks HTTPS and returns JSON in the response contract below. Common implementations: a Lambda behind API Gateway that brokers STS sessions (the reference stack on this page does that); an existing internal credential-vending machine you already run; an API Gateway mock returning static keys; or any service that fronts your secret store.

Use a dispenser when you need any of:

  • Per-call scoping — the credentials returned for storage:get from s3://bucket/customer-a/in/ only grant read on that prefix; a later storage:put to s3://bucket/customer-a/out/ gets a separate, write-only set.

  • Central audit / rate-limiting — every credential request lands in your endpoint’s access logs; you can throttle, alert, or revoke without touching IAM.

  • Multi-tenant brokering — a single dispenser can hand out credentials for many downstream accounts based on attributes of the caller (organization, job ID, requested bucket).

  • Existing infrastructure — you already operate a credential-vending machine for other workloads and want VTK to use the same one.

If none of the above apply, prefer role_arn — it’s simpler and one fewer moving part.

How it works

┌─────────────────┐
│  VTK worker     │
│  (storage:get)  │
└────────┬────────┘
         │  1. SigV4 GET /credentials
         ▼
┌─────────────────────┐
│  API Gateway        │
│  (AWS_IAM auth)     │
└────────┬────────────┘
         │  2. invoke
         ▼
┌─────────────────────┐
│  Lambda             │
│  (your code)        │
└────────┬────────────┘
         │  3. sts:AssumeRole
         ▼
┌─────────────────────┐
│  STS / target       │
│  IAM role           │
└────────┬────────────┘
         │  4. { AccessKeyId, SecretAccessKey, SessionToken, Expiration }
         │     bubbles back up to the worker through Lambda and API Gateway
         ▼
┌─────────────────┐
│  VTK worker     │
└────────┬────────┘
         │  5. PUT/GET on S3 using the temporary credentials
         ▼
       S3
  1. The worker signs the request to your API Gateway with SigV4 using its own IAM identity — the VTK worker role arn:aws:iam::873682911326:role/VTKWorker.

  2. API Gateway authorises the call (configured as AWS_IAM) and invokes your Lambda.

  3. Your Lambda code validates the caller and calls sts:AssumeRole on a role scoped to the specific bucket/prefix you want to expose.

  4. The Lambda returns the temporary credentials.

  5. The worker uses them to talk to S3, refreshing automatically when fewer than 15 minutes remain until expiry.

Configuring storage:* to use a dispenser

Set http_credential_source on every storage:get / storage:put task. The location URL drops the {secret}@ prefix — credentials don’t come from a secret any more.

{
  "region": "aws:eu-west-1",
  "tasks": [
    {
      "tool": "storage:get",
      "parameters": {
        "location": "s3://acme-bucket/in/",
        "http_credential_source": "https://abc123.execute-api.eu-west-1.amazonaws.com/prod/credentials",
        "files": ["source.mov"]
      }
    },
    {
      "tool": "ffmpeg:cmd",
      "parameters": {
        "arguments": ["-y", "-i", "source.mov", "-c:v", "libx264", "out.mp4"]
      }
    },
    {
      "tool": "storage:put",
      "parameters": {
        "location": "s3://acme-bucket/out/{job_id}/",
        "http_credential_source": "https://abc123.execute-api.eu-west-1.amazonaws.com/prod/credentials",
        "files": ["out.mp4"]
      }
    }
  ]
}

The two storage:* tasks hit the dispenser independently. Your Lambda can inspect the request and return different credentials for the read vs. the write call.

Response contract

The dispenser must return a JSON body in the exact shape STS produces:

{
  "AccessKeyId":     "ASIA…",
  "SecretAccessKey": "…",
  "SessionToken":    "…",
  "Expiration":      "2026-05-26T23:00:00Z"
}

Expiration is ISO-8601 UTC. The worker refreshes when fewer than 15 minutes remain, so issue credentials for at least 30 minutes.

Reference Lambda implementation

A minimal Python Lambda that brokers credentials by calling sts:AssumeRole on a role in your account. Deploy behind an API Gateway with AWS_IAM authorisation and a resource policy that allows only the VTK worker role(s) to invoke it.

# lambda_function.py — VTK credential dispenser
import os
import json
import boto3

STS = boto3.client("sts")

TARGET_ROLE_ARN = os.environ["TARGET_ROLE_ARN"]      # role in your account, trusts this Lambda
DEFAULT_DURATION = int(os.environ.get("DURATION_SECONDS", "3600"))


def lambda_handler(event, context):
    # API Gateway with AWS_IAM auth puts the caller's ARN here.
    caller = (
        event.get("requestContext", {})
             .get("identity", {})
             .get("userArn", "")
    )

    # Defence in depth: only let the VTK worker role through.
    if not caller.endswith(":role/VTKWorker") and "assumed-role/VTKWorker/" not in caller:
        return {"statusCode": 403, "body": json.dumps({"error": "forbidden"})}

    # Optional: tighten the returned permissions per call.
    # Inspect event["path"] or query/headers to decide which prefix to allow,
    # then attach an inline session policy here.
    session_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": ["s3:GetObject", "s3:PutObject"],
                "Resource": "arn:aws:s3:::acme-bucket/*",
            },
            {
                "Effect": "Allow",
                "Action": ["s3:ListBucket", "s3:GetBucketLocation"],
                "Resource": "arn:aws:s3:::acme-bucket",
            },
        ],
    }

    resp = STS.assume_role(
        RoleArn=TARGET_ROLE_ARN,
        RoleSessionName=f"vtk-{context.aws_request_id[:30]}",
        DurationSeconds=DEFAULT_DURATION,
        Policy=json.dumps(session_policy),
    )
    creds = resp["Credentials"]

    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({
            "AccessKeyId":     creds["AccessKeyId"],
            "SecretAccessKey": creds["SecretAccessKey"],
            "SessionToken":    creds["SessionToken"],
            "Expiration":      creds["Expiration"].isoformat(),
        }),
    }

Lambda execution-role policy

The role the Lambda runs as needs only one permission besides standard CloudWatch logs — assuming the target role:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "sts:AssumeRole",
      "Resource": "arn:aws:iam::123456789012:role/vtk-storage-target"
    }
  ]
}

Trust policy on the target role

The role the Lambda assumes (vtk-storage-target above) trusts the Lambda’s execution role:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": { "AWS": "arn:aws:iam::123456789012:role/vtk-dispenser-lambda" },
      "Action": "sts:AssumeRole"
    }
  ]
}

API Gateway resource policy

Only the VTK worker role may invoke the API.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::873682911326:role/VTKWorker"
      },
      "Action": "execute-api:Invoke",
      "Resource": "arn:aws:execute-api:eu-west-1:123456789012:abc123/*/GET/credentials"
    }
  ]
}

Set the method’s authorisation to AWS_IAM in API Gateway so the resource policy is enforced.

Deploying with AWS CDK

The following CDK v2 (Python) stack wires up everything above: a Lambda containing the handler from the previous section, an IAM role it assumes to mint S3 credentials, and an API Gateway REST API restricted to the production and staging VTK worker roles. Drop it into a CDK project, set target_bucket to your bucket name, and cdk deploy.

# vtk_dispenser_stack.py
from aws_cdk import (
    Stack,
    CfnOutput,
    Duration,
    aws_iam as iam,
    aws_lambda as _lambda,
    aws_apigateway as apigw,
)
from constructs import Construct


# The two role ARNs that VTK signs requests with. Both are required if you
# ever submit jobs to the staging environment; drop the staging one in
# production-only deployments.
VTK_WORKER_ROLE_ARNS = [
    "arn:aws:iam::873682911326:role/VTKWorker",  # production
]


class VtkDispenserStack(Stack):
    def __init__(
        self,
        scope: Construct,
        construct_id: str,
        *,
        target_bucket: str,
        session_duration: Duration = Duration.hours(1),
        **kwargs,
    ) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # 1. Target role: scoped to the bucket. The Lambda assumes this and
        #    returns its session credentials to the VTK worker.
        target_role = iam.Role(
            self,
            "VtkStorageTargetRole",
            assumed_by=iam.AccountRootPrincipal(),  # tightened below
            max_session_duration=session_duration,
            inline_policies={
                "S3Access": iam.PolicyDocument(
                    statements=[
                        iam.PolicyStatement(
                            actions=["s3:GetObject", "s3:PutObject"],
                            resources=[f"arn:aws:s3:::{target_bucket}/*"],
                        ),
                        iam.PolicyStatement(
                            actions=["s3:ListBucket", "s3:GetBucketLocation"],
                            resources=[f"arn:aws:s3:::{target_bucket}"],
                        ),
                    ]
                )
            },
        )

        # 2. Lambda handler — the same code from the "Reference Lambda
        #    implementation" section above, with the target role + duration
        #    plumbed through as environment variables.
        handler = _lambda.Function(
            self,
            "DispenserFn",
            runtime=_lambda.Runtime.PYTHON_3_12,
            handler="index.lambda_handler",
            timeout=Duration.seconds(10),
            environment={
                "TARGET_ROLE_ARN": target_role.role_arn,
                "DURATION_SECONDS": str(int(session_duration.to_seconds())),
            },
            code=_lambda.Code.from_inline(
                """
import os, json, boto3

STS = boto3.client("sts")
TARGET_ROLE_ARN = os.environ["TARGET_ROLE_ARN"]
DURATION = int(os.environ.get("DURATION_SECONDS", "3600"))


def lambda_handler(event, context):
    caller = (
        event.get("requestContext", {})
        .get("identity", {})
        .get("userArn", "")
    )
    if "assumed-role/VTKWorker/" not in caller and not caller.endswith(":role/VTKWorker"):
        return {"statusCode": 403, "body": json.dumps({"error": "forbidden"})}

    resp = STS.assume_role(
        RoleArn=TARGET_ROLE_ARN,
        RoleSessionName=f"vtk-{context.aws_request_id[:30]}",
        DurationSeconds=DURATION,
    )
    c = resp["Credentials"]
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({
            "AccessKeyId":     c["AccessKeyId"],
            "SecretAccessKey": c["SecretAccessKey"],
            "SessionToken":    c["SessionToken"],
            "Expiration":      c["Expiration"].isoformat(),
        }),
    }
"""
            ),
        )

        # Allow the Lambda to assume the target role …
        handler.add_to_role_policy(
            iam.PolicyStatement(
                actions=["sts:AssumeRole"],
                resources=[target_role.role_arn],
            )
        )
        # … and make the target role trust the Lambda's execution role.
        target_role.assume_role_policy.add_statements(
            iam.PolicyStatement(
                actions=["sts:AssumeRole"],
                principals=[iam.ArnPrincipal(handler.role.role_arn)],
            )
        )

        # 3. API Gateway REST API restricted to the VTK worker role(s).
        api = apigw.RestApi(
            self,
            "DispenserApi",
            rest_api_name="vtk-credential-dispenser",
            policy=iam.PolicyDocument(
                statements=[
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        principals=[iam.ArnPrincipal(arn) for arn in VTK_WORKER_ROLE_ARNS],
                        actions=["execute-api:Invoke"],
                        resources=["execute-api:/*/GET/credentials"],
                    ),
                    iam.PolicyStatement(
                        effect=iam.Effect.DENY,
                        principals=[iam.AnyPrincipal()],
                        actions=["execute-api:Invoke"],
                        resources=["execute-api:/*/*/*"],
                        conditions={
                            "StringNotEquals": {
                                "aws:PrincipalArn": VTK_WORKER_ROLE_ARNS
                            }
                        },
                    ),
                ]
            ),
        )

        credentials = api.root.add_resource("credentials")
        credentials.add_method(
            "GET",
            apigw.LambdaIntegration(handler),
            authorization_type=apigw.AuthorizationType.IAM,
        )

        CfnOutput(
            self,
            "HttpCredentialSource",
            value=f"{api.url}credentials",
            description="Use as `http_credential_source` on storage:get / storage:put.",
        )

App entry point:

# app.py
import aws_cdk as cdk
from vtk_dispenser_stack import VtkDispenserStack

app = cdk.App()
VtkDispenserStack(
    app,
    "VtkDispenser",
    target_bucket="acme-bucket",
)
app.synth()

The same stack — plus a submit_job.py helper that submits a smoke-test job using the deployed dispenser — is checked in under docs/examples/credential-dispenser/ in this repo.

After cdk deploy the stack outputs the URL to paste into your VTK job:

Outputs:
VtkDispenser.HttpCredentialSource = https://abc123.execute-api.eu-west-1.amazonaws.com/prod/credentials

If you need per-call scoping (different S3 prefixes for read vs. write, multi-tenant routing, …), replace the bare STS.assume_role(...) call in the inline Lambda code with the version from Reference Lambda implementation above — that one accepts a session policy you can build from the request.

Production pattern: per-job tokens

The reference stack above lets any VTK worker that signs in with the whitelisted role ARN fetch credentials for the whole bucket. That is fine for development but too coarse for production: a leaked worker identity, or a job intentionally submitted by an attacker who has obtained a VTK API access key, could read or overwrite anything in the bucket.

The pattern customers run in production is to extend the URL with an opaque per-job token and have the dispenser look the token up in a small datastore before minting credentials:

https://abc123.execute-api.eu-west-1.amazonaws.com/prod/credentials/{token}

Flow:

  1. Before submitting the VTK job, your orchestrator generates a random token (e.g. a UUID4) and writes a record like {token, bucket, allowed_prefix, allowed_actions, expires_at} to DynamoDB (or any KV store with a TTL).

  2. The job submission embeds the per-job URL — http_credential_source: ".../credentials/<token>" — into every storage:* task. Different storage steps can use different tokens, each scoped to exactly the prefix and action set that step needs.

  3. The dispenser Lambda extracts {token} from the path, looks the record up, fails fast (410 Gone / 403) if the token is unknown, expired, or has been used up, and otherwise calls sts:AssumeRole with a session policy built from the record so the returned credentials cannot escape the declared bucket/prefix/actions.

  4. After the job reaches a terminal state, the orchestrator deletes the token. Any later call with that URL gets refused even if the worker identity is still valid. DynamoDB TTL gives you a belt-and-suspenders auto-expiry even if your cleanup code never runs.

A sketch of the Lambda body for this pattern:

def lambda_handler(event, context):
    caller = event["requestContext"]["identity"].get("userArn", "")
    if "assumed-role/VTKWorker/" not in caller and not caller.endswith(":role/VTKWorker"):
        return {"statusCode": 403, "body": "forbidden"}

    token = event["pathParameters"]["token"]
    record = TABLE.get_item(Key={"token": token}).get("Item")
    if not record or record["expires_at"] < int(time.time()):
        return {"statusCode": 410, "body": "gone"}

    session_policy = build_policy(record["bucket"], record["prefix"], record["actions"])
    resp = STS.assume_role(
        RoleArn=TARGET_ROLE_ARN,
        RoleSessionName=f"vtk-{token[:30]}",
        DurationSeconds=record.get("duration", 3600),
        Policy=json.dumps(session_policy),
    )
    # Optional: mark the token "used" or decrement a counter here.
    ...

API Gateway side, the resource changes from /credentials (the dev form in the CDK stack above) to /credentials/{token}:

credentials = api.root.add_resource("credentials")
credentials.add_resource("{token}").add_method(
    "GET",
    apigw.LambdaIntegration(handler),
    authorization_type=apigw.AuthorizationType.IAM,
)

Why this is a meaningful upgrade over the bucket-wide dispenser:

  • Least privilege per task. The credentials handed to a storage:get task can be read-only and limited to one source prefix; a sibling storage:put gets a separate write-only set on a different prefix.

  • Time-bound. Tokens expire whether or not your cleanup code runs.

  • Auditable. A 1:1 mapping between tokens and jobs makes “who fetched what” a primary-key lookup, not a log scan.

  • Revocable. Removing the token record stops further credential issuance immediately, without having to detach IAM policies or rotate the worker role.

Verifying

A submitted job that uses the dispenser leaves three useful traces:

  1. CloudWatch logs of your Lambda — one invocation per storage:* task, with the caller ARN visible. Use these to confirm that VTK is reaching the dispenser and that you’re scoping the session policy as intended.

  2. The task log in VTKGET /o/{org}/jobs/{id}/tasks/{task_id}/log. On credential failure you’ll see the boto3 error verbatim (AccessDenied, ExpiredToken, InvalidClientTokenId), which usually points at the trust policy or the resource policy.

  3. S3 server access logs / CloudTrail data events — show the target role being used to access objects, with the RoleSessionName you set in the Lambda. Good for end-to-end auditing.

See also

Next topic: Passwords and Secrets
Previous topic: IAM and cross-account S3