Terraform Reference - DCS Level 1: Data Labeling
Overview
This document provides the Terraform configuration to build the DCS Level 1 architecture. You can use this as-is or as a reference for manual AWS Console setup.
Prerequisites
- AWS Account with admin access
- Terraform >= 1.5
- AWS CLI configured with credentials
- A domain name (optional, for API Gateway custom domain)
Directory structure
terraform/
main.tf # Provider and backend config
variables.tf # Input variables
s3.tf # Data bucket and audit bucket
lambda-authorizer.tf # Authorization Lambda
lambda-labeler.tf # Auto-labeling Lambda
api-gateway.tf # API Gateway configuration
iam.tf # IAM roles for simulated users
cloudtrail.tf # Audit logging
outputs.tf # Outputs (API URL, bucket name, etc.)
lambda/
authorizer/
index.py # Authorization logic
labeler/
index.py # Auto-labeling logic
Core Terraform configuration
main.tf
terraform {
required_version = ">= 1.5"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
provider "aws" {
region = var.aws_region
default_tags {
tags = {
Project = "dcs-level-1"
Environment = "demo"
ManagedBy = "terraform"
}
}
}
variables.tf
variable "aws_region" {
description = "AWS region to deploy into"
type = string
default = "eu-west-2" # London - appropriate for UK defence context
}
variable "project_name" {
description = "Project name used for resource naming"
type = string
default = "dcs-level-1"
}
variable "classification_levels" {
description = "Valid classification levels (ordered lowest to highest)"
type = list(string)
default = ["UNCLASSIFIED", "OFFICIAL", "SECRET", "TOP-SECRET"]
}
variable "valid_nationalities" {
description = "Valid nationality codes for releasability"
type = list(string)
default = ["GBR", "USA", "POL"]
}
s3.tf - Data bucket with tagging enforcement
resource "aws_s3_bucket" "data" {
bucket = "${var.project_name}-data-${data.aws_caller_identity.current.account_id}"
}
resource "aws_s3_bucket_versioning" "data" {
bucket = aws_s3_bucket.data.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "data" {
bucket = aws_s3_bucket.data.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
}
}
}
# Require DCS tags on all uploaded objects
resource "aws_s3_bucket_policy" "require_tags" {
bucket = aws_s3_bucket.data.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "RequireDCSTags"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.data.arn}/*"
Condition = {
"Null" = {
"s3:RequestObjectTag/dcs:classification" = "true"
}
}
},
{
Sid = "RequireReleasabilityTag"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.data.arn}/*"
Condition = {
"Null" = {
"s3:RequestObjectTag/dcs:releasable-to" = "true"
}
}
}
]
})
}
# Separate bucket for audit logs
resource "aws_s3_bucket" "audit" {
bucket = "${var.project_name}-audit-${data.aws_caller_identity.current.account_id}"
}
resource "aws_s3_bucket_versioning" "audit" {
bucket = aws_s3_bucket.audit.id
versioning_configuration {
status = "Enabled"
}
}
# Block public access on both buckets
resource "aws_s3_bucket_public_access_block" "data" {
bucket = aws_s3_bucket.data.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_public_access_block" "audit" {
bucket = aws_s3_bucket.audit.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
data "aws_caller_identity" "current" {}
iam.tf - Simulated coalition users
# UK user with SECRET clearance
resource "aws_iam_user" "uk_secret" {
name = "${var.project_name}-user-gbr-secret"
tags = {
"dcs:clearance" = "SECRET"
"dcs:nationality" = "GBR"
"dcs:saps" = "WALL"
"dcs:organisation" = "UK-MOD"
}
}
# Polish user with NATO SECRET clearance
resource "aws_iam_user" "pol_secret" {
name = "${var.project_name}-user-pol-ns"
tags = {
"dcs:clearance" = "NATO-SECRET"
"dcs:nationality" = "POL"
"dcs:saps" = ""
"dcs:organisation" = "PL-MON"
}
}
# US user with IL-6 clearance
resource "aws_iam_user" "us_il6" {
name = "${var.project_name}-user-usa-il6"
tags = {
"dcs:clearance" = "IL-6"
"dcs:nationality" = "USA"
"dcs:saps" = "WALL"
"dcs:organisation" = "US-DOD"
}
}
# Unclassified contractor (should be denied SECRET data)
resource "aws_iam_user" "contractor" {
name = "${var.project_name}-user-contractor"
tags = {
"dcs:clearance" = "UNCLASSIFIED"
"dcs:nationality" = "GBR"
"dcs:saps" = ""
"dcs:organisation" = "CONTRACTOR"
}
}
# All users get basic API Gateway invoke permissions
resource "aws_iam_policy" "api_invoke" {
name = "${var.project_name}-api-invoke"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = "execute-api:Invoke"
Resource = "${aws_api_gateway_rest_api.dcs.execution_arn}/*"
}
]
})
}
resource "aws_iam_user_policy_attachment" "uk_api" {
user = aws_iam_user.uk_secret.name
policy_arn = aws_iam_policy.api_invoke.arn
}
resource "aws_iam_user_policy_attachment" "pol_api" {
user = aws_iam_user.pol_secret.name
policy_arn = aws_iam_policy.api_invoke.arn
}
resource "aws_iam_user_policy_attachment" "us_api" {
user = aws_iam_user.us_il6.name
policy_arn = aws_iam_policy.api_invoke.arn
}
resource "aws_iam_user_policy_attachment" "contractor_api" {
user = aws_iam_user.contractor.name
policy_arn = aws_iam_policy.api_invoke.arn
}
lambda-authorizer.tf
resource "aws_lambda_function" "authorizer" {
function_name = "${var.project_name}-authorizer"
runtime = "python3.12"
handler = "index.handler"
filename = data.archive_file.authorizer.output_path
role = aws_iam_role.authorizer_role.arn
timeout = 10
memory_size = 256
environment {
variables = {
DATA_BUCKET = aws_s3_bucket.data.id
CLASSIFICATION_LEVELS = jsonencode(var.classification_levels)
}
}
}
data "archive_file" "authorizer" {
type = "zip"
source_dir = "${path.module}/lambda/authorizer"
output_path = "${path.module}/lambda/authorizer.zip"
}
resource "aws_iam_role" "authorizer_role" {
name = "${var.project_name}-authorizer-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = { Service = "lambda.amazonaws.com" }
}]
})
}
# Authorizer needs to read S3 object tags and IAM user tags
resource "aws_iam_role_policy" "authorizer_policy" {
name = "${var.project_name}-authorizer-policy"
role = aws_iam_role.authorizer_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:GetObjectTagging"]
Resource = "${aws_s3_bucket.data.arn}/*"
},
{
Effect = "Allow"
Action = ["iam:GetUser", "iam:ListUserTags"]
Resource = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:user/${var.project_name}-*"
},
{
Effect = "Allow"
Action = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"]
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
cloudtrail.tf
resource "aws_cloudtrail" "dcs_audit" {
name = "${var.project_name}-audit-trail"
s3_bucket_name = aws_s3_bucket.audit.id
include_global_service_events = true
is_multi_region_trail = false
enable_logging = true
event_selector {
read_write_type = "All"
include_management_events = true
data_resource {
type = "AWS::S3::Object"
values = ["${aws_s3_bucket.data.arn}/"]
}
}
}
Lambda authorizer code
lambda/authorizer/index.py
import json
import os
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
iam = boto3.client('iam')
DATA_BUCKET = os.environ['DATA_BUCKET']
CLASSIFICATION_LEVELS = json.loads(os.environ['CLASSIFICATION_LEVELS'])
def get_classification_level(classification):
"""Convert classification string to numeric level for comparison."""
# Map various national classifications to a common level
mapping = {
'UNCLASSIFIED': 0,
'OFFICIAL': 1,
'NATO-RESTRICTED': 1,
'SECRET': 2,
'NATO-SECRET': 2,
'IL-5': 2,
'IL-6': 2,
'TOP-SECRET': 3,
'COSMIC-TOP-SECRET': 3,
'IL-7': 3,
}
return mapping.get(classification.upper(), -1)
def get_user_attributes(username):
"""Get DCS attributes from IAM user tags."""
response = iam.list_user_tags(UserName=username)
attrs = {}
for tag in response['Tags']:
if tag['Key'].startswith('dcs:'):
attrs[tag['Key']] = tag['Value']
return attrs
def get_object_labels(object_key):
"""Get DCS labels from S3 object tags."""
response = s3.get_object_tagging(Bucket=DATA_BUCKET, Key=object_key)
labels = {}
for tag in response['TagSet']:
if tag['Key'].startswith('dcs:'):
labels[tag['Key']] = tag['Value']
return labels
def evaluate_access(user_attrs, object_labels):
"""Evaluate whether user attributes satisfy object label requirements."""
reasons = []
# Check classification level
user_level = get_classification_level(user_attrs.get('dcs:clearance', 'UNCLASSIFIED'))
object_level = get_classification_level(object_labels.get('dcs:classification', 'TOP-SECRET'))
if user_level < object_level:
reasons.append(
f"Clearance insufficient: user has {user_attrs.get('dcs:clearance')} "
f"(level {user_level}), object requires "
f"{object_labels.get('dcs:classification')} (level {object_level})"
)
# Check nationality / releasability
user_nationality = user_attrs.get('dcs:nationality', '')
releasable_to = [
n.strip() for n in object_labels.get('dcs:releasable-to', '').split(',')
]
if releasable_to != ['ALL'] and user_nationality not in releasable_to:
reasons.append(
f"Nationality {user_nationality} not in releasable-to list {releasable_to}"
)
# Check SAP requirements
required_sap = object_labels.get('dcs:sap', 'NONE')
user_saps = [s.strip() for s in user_attrs.get('dcs:saps', '').split(',') if s.strip()]
if required_sap != 'NONE' and required_sap not in user_saps:
reasons.append(
f"Missing required SAP: {required_sap}. User SAPs: {user_saps}"
)
return len(reasons) == 0, reasons
def handler(event, context):
"""Lambda authorizer for DCS Level 1 access control."""
# Extract user identity and requested object
username = event.get('requestContext', {}).get('identity', {}).get('user', 'unknown')
object_key = event.get('pathParameters', {}).get('objectKey', '')
logger.info(f"Access request: user={username}, object={object_key}")
try:
user_attrs = get_user_attributes(username)
object_labels = get_object_labels(object_key)
authorized, reasons = evaluate_access(user_attrs, object_labels)
# Log the full decision context (this is the DCS audit trail)
decision_log = {
'user': username,
'object': object_key,
'user_attributes': user_attrs,
'object_labels': object_labels,
'authorized': authorized,
'reasons': reasons,
}
logger.info(f"DCS_ACCESS_DECISION: {json.dumps(decision_log)}")
if authorized:
return {
'statusCode': 200,
'body': json.dumps({
'authorized': True,
'object': object_key,
'classification': object_labels.get('dcs:classification'),
})
}
else:
return {
'statusCode': 403,
'body': json.dumps({
'authorized': False,
'reasons': reasons,
})
}
except Exception as e:
logger.error(f"Authorization error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Authorization service error'})
}
Lambda auto-labeler code
lambda/labeler/index.py
import json
import re
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
# Classification indicators (simplified for demonstration)
CLASSIFICATION_PATTERNS = {
'TOP-SECRET': [
r'\bTOP\s*SECRET\b', r'\bCOSMIC\s*TOP\s*SECRET\b', r'\bTS/SCI\b'
],
'SECRET': [
r'\bSECRET\b', r'\bNATO\s*SECRET\b', r'\bUK\s*EYES\s*ONLY\b',
r'\bGRID\s+\d{8}\b', # Grid references indicate at least SECRET
],
'OFFICIAL': [
r'\bOFFICIAL\b', r'\bNATO\s*RESTRICTED\b'
],
}
SAP_PATTERNS = {
'WALL': [r'\bWALL\b', r'\bOPERATION\s+WALL\b'],
}
def analyze_content(content):
"""Analyze text content and determine appropriate DCS labels."""
classification = 'UNCLASSIFIED'
sap = 'NONE'
# Check classification (highest match wins)
for level in ['TOP-SECRET', 'SECRET', 'OFFICIAL']:
for pattern in CLASSIFICATION_PATTERNS[level]:
if re.search(pattern, content, re.IGNORECASE):
classification = level
break
if classification != 'UNCLASSIFIED':
break
# Check SAP indicators
for sap_name, patterns in SAP_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, content, re.IGNORECASE):
sap = sap_name
break
return classification, sap
def handler(event, context):
"""Auto-label new S3 objects based on content analysis."""
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
logger.info(f"Auto-labeling: bucket={bucket}, key={key}")
try:
# Read object content (first 10KB for analysis)
response = s3.get_object(Bucket=bucket, Key=key, Range='bytes=0-10240')
content = response['Body'].read().decode('utf-8', errors='ignore')
classification, sap = analyze_content(content)
# Apply DCS tags
from datetime import datetime
tags = {
'dcs:classification': classification,
'dcs:sap': sap,
'dcs:labeled-by': 'auto-labeler',
'dcs:labeled-at': datetime.utcnow().isoformat(),
}
# Get existing tags to preserve manually-set ones
existing = s3.get_object_tagging(Bucket=bucket, Key=key)
existing_tags = {t['Key']: t['Value'] for t in existing['TagSet']}
# Only set auto-labels if not already manually labeled
if 'dcs:classification' not in existing_tags:
tag_set = [{'Key': k, 'Value': v} for k, v in {**existing_tags, **tags}.items()]
s3.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={'TagSet': tag_set}
)
logger.info(f"DCS_AUTO_LABEL: key={key}, classification={classification}, sap={sap}")
else:
logger.info(f"DCS_SKIP_LABEL: key={key}, already labeled manually")
except Exception as e:
logger.error(f"Auto-labeling error for {key}: {str(e)}")
# Fail secure: if we can't analyze, label as highest
try:
s3.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={'TagSet': [
{'Key': 'dcs:classification', 'Value': 'TOP-SECRET'},
{'Key': 'dcs:sap', 'Value': 'NONE'},
{'Key': 'dcs:labeled-by', 'Value': 'auto-labeler-failsafe'},
]}
)
except Exception:
pass
Manual setup instructions
If you prefer to build this by hand in the AWS Console instead of using Terraform:
- Create S3 data bucket with versioning enabled, SSE-KMS encryption
- Create S3 audit bucket for CloudTrail
- Create IAM users with DCS tags (clearance, nationality, saps)
- Create Lambda authorizer function with the Python code above
- Create Lambda auto-labeler function with S3 trigger
- Create API Gateway REST API with Lambda proxy integration
- Enable CloudTrail with S3 data event logging
- Upload test objects to S3 with DCS tags
- Test access as different IAM users
See the interactive guide (guide/index.html) for step-by-step walkthrough.