MappingEngine Design Document¶
Documentation Path
You are here: Technical Reference > MappingEngine Design (Technical Architecture)
- For practical guide: See MappingEngine User Guide
- For system overview: See Data Flow Pipeline
This design document describes the technical architecture. The system implements the config-driven, loop-driven approach specified here.
Overview¶
The MappingEngine is a configuration-driven transformation engine that maps CIS Benchmark data to various XCCDF export styles (DISA STIG-compatible, CIS Native, etc.). It reads YAML configuration files that define field mappings, transformations, and composite field assembly.
December 2025 Refactor: Fully generic structure handlers enable adding new compliance frameworks (PCI-DSS, ISO 27001, HIPAA) via YAML configuration only - zero code changes required.
Purpose¶
- Read YAML configurations that define how to map our Pydantic data models to XCCDF structures
- Apply transformations (strip_html, html_to_markdown, composite field assembly)
- Generate structures generically using 3 core handlers (ident, metadata, profiles)
- Support multiple export styles without changing code (just swap YAML config)
- Be extensible for new frameworks via YAML only
Generic Structure Handlers (Dec 2025)¶
1. ident_from_list - Generic Ident Generation¶
Purpose: Generate <ident> elements from any list
Works For:
- CCIs (DoD)
- CIS Controls (v7, v8)
- MITRE ATT&CK (techniques, tactics, mitigations)
- PCI-DSS requirements
- ISO 27001 controls
- HIPAA requirements
- Any compliance framework index
Complete Config Spec:
field_name:
target_element: "ident"
structure: "ident_from_list"
source_field: "cis_controls" # Or "mitre_mapping.techniques", etc.
ident_spec:
system_template: "https://org.com/framework/v{item.version}"
value_template: "{item.id}" # Or just "{item}" for simple lists
attributes: # Optional - for namespace attributes
- name: "controlURI"
template: "https://org.com/controls/{item.id}"
namespace_prefix: "fw{item.version}"
Template Variables:
{item}- For simple string lists{item.field}- For object lists (item.version, item.control, etc.){group_key}- When used with grouping
Output Example:
<ident system="https://www.cisecurity.org/controls/v8">8:3.14</ident>
<ident system="https://attack.mitre.org/techniques">T1565</ident>
<ident system="https://www.pcisecuritystandards.org/pci_dss/v4.0">Requirement-1.2.1</ident>
Implementation: MappingEngine.generate_idents_from_config()
2. metadata_from_config - Generic Nested XML¶
Purpose: Generate ANY nested XML structure from YAML specification
Works For:
- CIS Controls metadata (official nested structure)
- PCI-DSS requirement hierarchies
- ISO 27001 control families
- HIPAA safeguard categories
- Any hierarchical compliance data
Complete Config Spec:
field_name:
target_element: "metadata"
structure: "metadata_from_config"
source_field: "cis_controls"
requires_post_processing: true # lxml elements injected after xsdata serialization
metadata_spec:
root_element: "cis_controls"
namespace: "http://cisecurity.org/controls"
namespace_prefix: "controls"
allow_empty: true # Generate empty element if no data
# Grouping (optional)
group_by: "item.version" # Group items by this field
group_element:
element: "framework"
attributes:
urn: "urn:cisecurity.org:controls:{group_key}"
item_element:
element: "safeguard"
attributes:
title: "{item.title}"
urn: "urn:cisecurity.org:controls:{item.version}:{item.control}"
children:
- element: "implementation_groups"
attributes:
ig1: "{item.ig1}"
ig2: "{item.ig2}"
ig3: "{item.ig3}"
- element: "asset_type"
content: "Unknown" # Static content
- element: "security_function"
content: "Protect"
Key Features:
- Grouping:
group_byfield groups items (e.g., by version) - Nesting: Unlimited depth via recursive children
- Attributes: Template-based with variable substitution
- Content: Static or template-based
- Empty handling: Optional empty element generation
- Type preservation: Booleans converted to lowercase ("true"/"false")
Output Example:
<metadata>
<controls:cis_controls>
<controls:framework urn="urn:cisecurity.org:controls:8">
<controls:safeguard title="Log Sensitive Data Access" urn="urn:cisecurity.org:controls:8:3.14">
<controls:implementation_groups ig1="false" ig2="false" ig3="true"/>
<controls:asset_type>Unknown</controls:asset_type>
<controls:security_function>Protect</controls:security_function>
</controls:safeguard>
</controls:framework>
</controls:cis_controls>
</metadata>
Implementation:
MappingEngine.generate_metadata_from_config()MappingEngine._build_config_item()MappingEngine._build_config_child()XCCDFUnifiedExporter._inject_metadata_from_config()
3. generate_profiles_from_rules - Profile Generation¶
Purpose: Generate Benchmark-level <Profile> elements from recommendation.profiles field
Works For:
- CIS Levels (Level 1/2/3 × Server/Workstation)
- DISA MAC levels (MAC-1/2/3 × Classified/Sensitive/Public)
- PCI-DSS SAQ types (SAQ A/B/C/D)
- Custom applicability hierarchies
Complete Config Spec:
benchmark:
profiles:
generate_from_rules: true
profile_mappings:
- match: "Level 1 - Server" # String to match in rec.profiles
id: "level-1-server"
title: "Level 1 - Server"
description: "CIS Level 1 for server environments"
- match: "Level 2 - Server"
id: "level-2-server"
title: "Level 2 - Server"
description: "CIS Level 2 for server environments"
How It Works:
1. Scans all recommendations
2. For each profile mapping, finds recommendations where profile_mapping.match is in recommendation.profiles
3. Creates Profile element with select list of matching rule IDs
4. Adds to Benchmark (not Rules)
Output Example:
<Profile id="level-1-server">
<title>Level 1 - Server</title>
<description>CIS Level 1 for server environments</description>
<select idref="xccdf_cis_rule_6_1_1" selected="true"/>
<select idref="xccdf_cis_rule_6_1_2" selected="true"/>
<!-- ... ~250 more rules -->
</Profile>
Implementation: MappingEngine.generate_profiles_from_rules()
Architecture¶
High-Level Data Flow¶
flowchart TD
subgraph Input["Input"]
PYDANTIC["Benchmark Data"]
end
subgraph Processing["MappingEngine Processing"]
CONFIG["Load Configuration"]
MAPPER["Map Fields"]
TRANSFORM["Apply Transforms"]
VARS["Substitute Variables"]
XML["Build XML Structure"]
CONFIG --> MAPPER
MAPPER --> TRANSFORM
TRANSFORM --> VARS
VARS --> XML
end
subgraph Output["Output"]
XCCDF["XCCDF Output"]
end
PYDANTIC --> CONFIG
XML --> XCCDF
style Input fill:#E8F4F8,stroke:#0066CC
style Processing fill:#fff4e1,stroke:#ff9800
style Output fill:#FFF4E6,stroke:#CC6600
Component Relationships¶
Main Processing Pipeline:
flowchart LR
CONFIG["ConfigLoader<br/>Load YAML<br/>Validate schema"]
MAPPER["FieldMapper<br/>Map source to target<br/>Resolve paths"]
PIPELINE["TransformPipeline<br/>Apply transforms"]
VARS["VariableSubstituter<br/>Resolve variables"]
BUILDER["XMLStructureBuilder<br/>Build XCCDF"]
CONFIG --> MAPPER
MAPPER --> PIPELINE
PIPELINE --> VARS
VARS --> BUILDER
style CONFIG fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style MAPPER fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style PIPELINE fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style VARS fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
style BUILDER fill:#fff9c4,stroke:#f57f17,stroke-width:2px
Available Transformers:
- StripHTML - Remove HTML tags
- Markdown - Convert HTML to Markdown
- Composite - Combine multiple fields
- Custom - User-defined transformations
Field Mapping Strategies:
flowchart TD
ABC["Base Strategy Pattern"]
ABC --> SIMPLE["Simple Mapping<br/>Direct field copy"]
ABC --> COMPOSITE["Composite Mapping<br/>Combine multiple fields"]
ABC --> EMBEDDED["Embedded XML<br/>Nested structure"]
ABC --> CCI["CCI Lookup<br/>DoD compliance"]
style ABC fill:#e8f5e9,stroke:#388e3c,stroke-width:3px
style SIMPLE fill:#e3f2fd,stroke:#1976d2
style COMPOSITE fill:#fff3e0,stroke:#f57c00
style EMBEDDED fill:#f3e5f5,stroke:#7b1fa2
style CCI fill:#ffebee,stroke:#c62828
Class Structure¶
1. Core: MappingEngine¶
class MappingEngine:
"""Configuration-driven mapping engine for transforming Pydantic models to XCCDF.
Responsibilities:
- Load YAML configuration files
- Coordinate mapping process
- Apply transformations
- Build XCCDF structures
Usage:
engine = MappingEngine.from_config('disa_style.yaml')
xccdf_benchmark = engine.map_benchmark(pydantic_benchmark)
"""
def __init__(self, config: MappingConfig):
"""Initialize engine with loaded configuration."""
self.config = config
self.field_mapper = FieldMapper(config.field_mappings)
self.transform_registry = TransformRegistry()
self.variable_substituter = VariableSubstituter()
@classmethod
def from_config(cls, config_path: str) -> 'MappingEngine':
"""Factory method: Load config and create engine."""
config = ConfigLoader.load(config_path)
return cls(config)
def map_benchmark(self, benchmark: Benchmark) -> XCCDFBenchmark:
"""Map Pydantic Benchmark to XCCDF Benchmark.
Args:
benchmark: Validated Pydantic Benchmark model
Returns:
xsdata XCCDF Benchmark ready for XML serialization
"""
def map_rule(self, recommendation: Recommendation, context: MappingContext) -> Rule:
"""Map single Recommendation to XCCDF Rule.
Args:
recommendation: Pydantic Recommendation model
context: Mapping context (variables for substitution)
Returns:
xsdata XCCDF Rule
"""
2. Configuration: ConfigLoader and MappingConfig¶
@dataclass
class MappingConfig:
"""Parsed YAML configuration.
Structure matches YAML file:
- metadata: Style info
- benchmark: Benchmark-level mappings
- rule_defaults: Default rule attributes
- rule_id: ID generation template
- field_mappings: Core field mapping definitions
- transformations: Transform function definitions
- validation: Validation rules
"""
style_name: str
description: str
xccdf_version: str
benchmark_mappings: Dict[str, FieldMapping]
rule_defaults: Dict[str, Any]
rule_id_template: str
field_mappings: Dict[str, FieldMapping]
transformations: Dict[str, TransformDef]
cci_lookup: Optional[CCILookupConfig] = None
validation: Optional[ValidationConfig] = None
class ConfigLoader:
"""Load and validate YAML configuration files."""
@staticmethod
def load(config_path: str) -> MappingConfig:
"""Load YAML and parse into MappingConfig.
Validates:
- YAML syntax
- Required sections present
- Field mapping structure
- Transform definitions valid
Raises:
ConfigValidationError: If config is invalid
"""
@staticmethod
def validate_config(config_dict: dict) -> None:
"""Validate configuration structure."""
3. Field Mapping: FieldMapper¶
@dataclass
class FieldMapping:
"""Definition of how to map one field.
Types of mappings:
1. Simple: source_field target_element (with optional transform)
2. Composite: multiple sources one target (VulnDiscussion)
3. Embedded XML: nested XML tags within XCCDF element
4. Lookup-based: CCI lookup, NIST deduplication
5. Template-based: Variable substitution
"""
target_element: str
source_field: Optional[str] = None
transform: Optional[str] = None
structure: Optional[str] = None # 'embedded_xml_tags', 'nested', 'dublin_core'
multiple: bool = False
# For composite/embedded structures
components: Optional[List[ComponentMapping]] = None
# For attributes
attributes: Optional[Dict[str, str]] = None
# For static content
content: Optional[str] = None
# For conditional mapping
optional: bool = False
condition: Optional[str] = None
@dataclass
class ComponentMapping:
"""Component of a composite or embedded XML mapping."""
tag: Optional[str] = None # For embedded XML
element: Optional[str] = None # For nested elements
sources: Optional[List[SourceDef]] = None
content: Optional[str] = None
combine: Optional[str] = None # 'join', 'concat', etc.
separator: str = "\n\n"
attributes: Optional[Dict[str, str]] = None
optional: bool = False
@dataclass
class SourceDef:
"""Source field with transformation."""
field: str
transform: Optional[str] = None
separator: str = ""
class FieldMapper:
"""Maps fields from source models to target structures.
Responsibilities:
- Resolve source field paths (nested attributes)
- Apply field-level transformations
- Handle composite field assembly
- Build nested/embedded structures
"""
def __init__(self, field_mappings: Dict[str, FieldMapping]):
self.mappings = field_mappings
def map_field(self,
mapping: FieldMapping,
source_obj: Any,
context: MappingContext) -> Any:
"""Map a single field from source to target.
Handles:
- Simple field extraction
- Composite field assembly
- Embedded XML structure
- Variable substitution in templates
Returns:
Mapped value ready for XCCDF model
"""
def resolve_source_value(self, source_field: str, source_obj: Any) -> Any:
"""Resolve potentially nested field path.
Examples:
'title' source_obj.title
'mitre_mapping.techniques' source_obj.mitre_mapping.techniques
'cis_controls[0].control' source_obj.cis_controls[0].control
"""
def build_composite_field(self,
components: List[ComponentMapping],
source_obj: Any,
context: MappingContext) -> str:
"""Build composite field from multiple sources.
Example: VulnDiscussion = description + rationale
"""
def build_embedded_xml_structure(self,
components: List[ComponentMapping],
source_obj: Any,
context: MappingContext) -> str:
"""Build embedded XML tags within text content.
Example: <description>
<VulnDiscussion>...</VulnDiscussion>
<FalsePositives/>
<Mitigations>...</Mitigations>
</description>
"""
4. Transformation: TransformPipeline and Transformers¶
class TransformRegistry:
"""Registry of available transformations.
Built-in transformations:
- none: Pass through unchanged
- strip_html: Remove all HTML tags
- strip_html_keep_code: Strip HTML but preserve code blocks
- html_to_markdown: Convert HTML to Markdown
Extensible: Register custom transforms
"""
def __init__(self):
self._transforms: Dict[str, Callable] = {}
self._register_builtin_transforms()
def register(self, name: str, transform_func: Callable):
"""Register custom transformation."""
def get_transform(self, name: str) -> Callable:
"""Get transformation function by name."""
def apply(self, transform_name: str, value: Any, **kwargs) -> Any:
"""Apply transformation to value."""
class TransformPipeline:
"""Apply series of transformations to field values."""
def __init__(self, registry: TransformRegistry):
self.registry = registry
def apply_transform(self, transform_name: str, value: Any) -> Any:
"""Apply single transformation."""
def apply_chain(self, transforms: List[str], value: Any) -> Any:
"""Apply chain of transformations in order."""
# Built-in transformers
class HTMLCleaner:
"""HTML transformation utilities."""
@staticmethod
def strip_html(html: Optional[str]) -> str:
"""Remove all HTML tags, return plain text."""
@staticmethod
def strip_html_keep_code(html: Optional[str]) -> str:
"""Strip HTML but preserve code blocks and lists."""
@staticmethod
def html_to_markdown(html: Optional[str]) -> str:
"""Convert HTML to Markdown format."""
class CompositeTransformer:
"""Build composite fields from multiple sources."""
@staticmethod
def join_with_separator(values: List[str], separator: str = "\n\n") -> str:
"""Join non-empty values with separator."""
@staticmethod
def concat(values: List[str]) -> str:
"""Concatenate values with no separator."""
5. Variable Substitution: VariableSubstituter¶
class VariableSubstituter:
"""Substitute variables in templates with actual values.
Variables:
- {ref}: Original ref (e.g., "3.1.1")
- {ref_normalized}: Normalized ref (e.g., "3_1_1")
- {platform}: Benchmark platform
- {control.version}: CIS Control version
- {control.control}: CIS Control ID
- {nist_control_id}: NIST control ID
Used in:
- ID templates: "xccdf_cis_{platform}_rule_{ref_normalized}"
- Attributes: fixref="F-{ref_normalized}"
- Content: "{control.title}"
"""
def substitute(self, template: str, context: MappingContext) -> str:
"""Substitute all variables in template.
Args:
template: String with {variable} placeholders
context: Mapping context with variable values
Returns:
String with variables replaced
"""
def normalize_ref(self, ref: str) -> str:
"""Convert ref to normalized form: 3.1.1 3_1_1"""
return ref.replace('.', '_')
def extract_platform(self, title: str) -> str:
"""Extract platform from benchmark title."""
@dataclass
class MappingContext:
"""Context for variable substitution and conditional logic.
Contains:
- Current recommendation being mapped
- Benchmark-level metadata
- Generated IDs and references
- Loop iteration variables (for multiple elements)
"""
recommendation: Recommendation
benchmark: Benchmark
platform: str
ref_normalized: str
# For iterating over collections
current_control: Optional[CISControl] = None
current_nist: Optional[str] = None
current_technique: Optional[str] = None
6. Strategy Pattern: MappingStrategy¶
class MappingStrategy(ABC):
"""Abstract base for field mapping strategies.
Different strategies for different field types:
- SimpleFieldStrategy: Direct field mapping
- CompositeFieldStrategy: Multiple sources one target
- EmbeddedXMLStrategy: Nested XML tags
- CCILookupStrategy: CCI/NIST deduplication logic
- NestedElementStrategy: Nested XCCDF elements
"""
@abstractmethod
def apply(self,
mapping: FieldMapping,
source_obj: Any,
context: MappingContext) -> Any:
"""Apply this mapping strategy."""
class SimpleFieldStrategy(MappingStrategy):
"""Simple field: source target with optional transform."""
def apply(self, mapping: FieldMapping, source_obj: Any, context: MappingContext) -> Any:
# Get source value
value = getattr(source_obj, mapping.source_field)
# Apply transformation
if mapping.transform:
value = transform_registry.apply(mapping.transform, value)
# Substitute variables
if isinstance(value, str) and '{' in value:
value = variable_substituter.substitute(value, context)
return value
class CompositeFieldStrategy(MappingStrategy):
"""Composite field: multiple sources joined together."""
def apply(self, mapping: FieldMapping, source_obj: Any, context: MappingContext) -> Any:
# Extract all source values
values = []
for source_def in mapping.sources:
value = getattr(source_obj, source_def.field)
# Apply transform to each source
if source_def.transform:
value = transform_registry.apply(source_def.transform, value)
if value:
values.append(value)
# Join with separator
return mapping.separator.join(values)
class EmbeddedXMLStrategy(MappingStrategy):
"""Embedded XML: Build XML tags within XCCDF description element."""
def apply(self, mapping: FieldMapping, source_obj: Any, context: MappingContext) -> str:
# Build embedded XML structure
xml_parts = []
for component in mapping.components:
tag = component.tag
if component.content:
# Static content
xml_parts.append(f"<{tag}>{component.content}</{tag}>")
elif component.sources:
# Dynamic content from sources
values = []
for source_def in component.sources:
value = getattr(source_obj, source_def.field, None)
if value and source_def.transform:
value = transform_registry.apply(source_def.transform, value)
if value:
values.append(value)
if values or not component.optional:
content = component.separator.join(values)
xml_parts.append(f"<{tag}>{content}</{tag}>")
return "\n".join(xml_parts)
class CCILookupStrategy(MappingStrategy):
"""CCI lookup with NIST deduplication."""
def __init__(self, cci_service: CCILookupService):
self.cci_service = cci_service
def apply(self, mapping: FieldMapping, source_obj: Any, context: MappingContext) -> List[str]:
# Get CIS control IDs
cis_control_ids = [c.control for c in source_obj.cis_controls]
# Get cited NIST controls
cited_nist = source_obj.nist_controls
# Deduplicate: Get CCIs and extra NIST controls
ccis, extra_nist = self.cci_service.deduplicate_nist_controls(
cis_control_ids, cited_nist
)
return ccis
7. Error Handling: MappingError¶
class MappingError(Exception):
"""Base exception for mapping errors."""
pass
class ConfigValidationError(MappingError):
"""YAML configuration is invalid."""
pass
class FieldMappingError(MappingError):
"""Error mapping specific field."""
def __init__(self, field_name: str, reason: str):
self.field_name = field_name
super().__init__(f"Failed to map field '{field_name}': {reason}")
class TransformError(MappingError):
"""Error applying transformation."""
def __init__(self, transform_name: str, reason: str):
self.transform_name = transform_name
super().__init__(f"Transform '{transform_name}' failed: {reason}")
class VariableSubstitutionError(MappingError):
"""Variable not found in context."""
def __init__(self, variable: str, template: str):
self.variable = variable
super().__init__(f"Variable '{variable}' not found in template: {template}")
Key Method Signatures¶
MappingEngine.map_benchmark()¶
def map_benchmark(self, benchmark: Benchmark) -> XCCDFBenchmark:
"""
High-level algorithm:
1. Create mapping context with benchmark metadata
- Extract platform from title
- Set up variable substitution context
2. Map benchmark-level fields
- id (using id_template)
- title
- description
- version
- status
3. Map each recommendation to XCCDF Rule
- Create rule-specific context (ref, ref_normalized)
- Apply rule_defaults (severity, weight)
- Map all fields per field_mappings config
- Handle composite fields (VulnDiscussion)
- Handle embedded XML (description tags)
- Handle CCI/NIST deduplication
- Handle metadata namespace
4. Assemble XCCDF Benchmark
- Create xsdata Benchmark object
- Add all rules
- Return schema-compliant XCCDF
Returns:
XCCDFBenchmark: xsdata model ready for XML serialization
Raises:
MappingError: If mapping fails for any field
ValidationError: If result doesn't validate
"""
# 1. Setup context
context = MappingContext(
benchmark=benchmark,
platform=self._extract_platform(benchmark.title),
recommendation=None # Set per-rule
)
# 2. Map benchmark
benchmark_id = self.variable_substituter.substitute(
self.config.benchmark_mappings['id_template'],
context
)
xccdf_benchmark = XCCDFBenchmark(
id=benchmark_id,
status=[Status(value="draft")],
title=[TextType(value=benchmark.title)],
description=[HtmlTextWithSubType(content=[benchmark.title])],
version=VersionType(value=benchmark.version)
)
# 3. Map rules
rules = []
for rec in benchmark.recommendations:
context.recommendation = rec
context.ref_normalized = self.variable_substituter.normalize_ref(rec.ref)
rule = self.map_rule(rec, context)
rules.append(rule)
xccdf_benchmark.rule = rules
# 4. Return
return xccdf_benchmark
MappingEngine.map_rule()¶
def map_rule(self, recommendation: Recommendation, context: MappingContext) -> Rule:
"""
Algorithm:
1. Generate rule ID from template
2. Apply rule defaults (severity, weight, selected)
3. For each field in field_mappings:
a. Determine mapping strategy (simple/composite/embedded/lookup)
b. Apply strategy to map field
c. Apply transformations
d. Substitute variables
e. Build XCCDF element
4. Assemble Rule object
5. Validate and return
Returns:
Rule: xsdata XCCDF Rule
"""
# 1. Generate ID
rule_id = self.variable_substituter.substitute(
self.config.rule_id_template,
context
)
# 2. Create Rule with defaults
rule = Rule(
id=rule_id,
severity=self.config.rule_defaults.get('severity', 'medium'),
weight=self.config.rule_defaults.get('weight', '10.0'),
selected=self.config.rule_defaults.get('selected', True)
)
# 3. Map each field
for field_name, mapping in self.config.field_mappings.items():
try:
# Choose strategy
strategy = self._get_strategy_for_mapping(mapping)
# Apply strategy
value = strategy.apply(mapping, recommendation, context)
# Set on rule
self._set_rule_attribute(rule, mapping.target_element, value)
except Exception as e:
raise FieldMappingError(field_name, str(e))
# 4. Return
return rule
FieldMapper.map_field()¶
def map_field(self, mapping: FieldMapping, source_obj: Any, context: MappingContext) -> Any:
"""
Algorithm:
1. Determine field type/structure
- Simple field
- Composite field
- Embedded XML
- Nested elements
- Lookup-based
2. Route to appropriate builder
3. Apply transformations
4. Substitute variables
5. Return mapped value
"""
# 1. Check structure type
if mapping.structure == 'embedded_xml_tags':
return self.build_embedded_xml_structure(
mapping.components,
source_obj,
context
)
elif mapping.structure == 'nested':
return self.build_nested_structure(
mapping.children,
source_obj,
context
)
elif mapping.source_logic:
# Special logic (CCI lookup, etc.)
return self._apply_source_logic(mapping, source_obj, context)
elif mapping.components:
# Composite field
return self.build_composite_field(
mapping.components,
source_obj,
context
)
else:
# Simple field
value = self.resolve_source_value(mapping.source_field, source_obj)
# Transform
if mapping.transform:
value = self.transform_pipeline.apply_transform(
mapping.transform,
value
)
# Variable substitution
if isinstance(value, str):
value = self.variable_substituter.substitute(value, context)
return value
FieldMapper.build_embedded_xml_structure()¶
def build_embedded_xml_structure(self,
components: List[ComponentMapping],
source_obj: Any,
context: MappingContext) -> str:
"""
Build embedded XML tags within XCCDF description element.
Algorithm:
1. For each component:
a. Extract tag name
b. Get content (static or from sources)
c. Apply transformations
d. Build XML tag
e. Handle optional tags (skip if no content)
2. Join all tags
3. Return as single string
Example output:
<VulnDiscussion>Combined description and rationale</VulnDiscussion>
<FalsePositives></FalsePositives>
<Mitigations>Additional info content</Mitigations>
<IAControls></IAControls>
"""
xml_parts = []
for component in components:
tag = component.tag
# Static content
if component.content is not None:
xml_parts.append(f"<{tag}>{component.content}</{tag}>")
continue
# Dynamic content from sources
if component.sources:
values = []
for source_def in component.sources:
value = self.resolve_source_value(source_def.field, source_obj)
# Transform
if source_def.transform:
value = self.transform_pipeline.apply_transform(
source_def.transform,
value
)
if value:
values.append(value)
# Join values
if values:
content = source_def.separator.join(values)
xml_parts.append(f"<{tag}>{content}</{tag}>")
elif not component.optional:
# Required but empty
xml_parts.append(f"<{tag}></{tag}>")
return "\n".join(xml_parts)
Configuration Processing¶
How YAML Config is Processed¶
# Example: Processing disa_style.yaml
# 1. Load YAML
with open('disa_style.yaml') as f:
config_dict = yaml.safe_load(f)
# 2. Parse into structured objects
config = MappingConfig(
style_name=config_dict['metadata']['style_name'],
benchmark_mappings={
'id': FieldMapping(
target_element='id',
source_field=None, # Generated from template
template=config_dict['benchmark']['id_template']
),
'title': FieldMapping(
target_element='title',
source_field='title',
transform='none'
)
},
field_mappings={
'description': FieldMapping(
target_element='description',
structure='embedded_xml_tags',
components=[
ComponentMapping(
tag='VulnDiscussion',
sources=[
SourceDef(field='description', transform='strip_html'),
SourceDef(field='rationale', transform='strip_html')
],
separator='\n\n'
),
ComponentMapping(
tag='FalsePositives',
content=''
),
# ... more components
]
),
'fixtext': FieldMapping(
target_element='fixtext',
source_field='remediation',
transform='strip_html_keep_code',
attributes={
'fixref': 'F-{ref_normalized}'
}
),
# ... more mappings
},
transformations={
'strip_html': TransformDef(
description='Remove all HTML tags',
function='HTMLCleaner.strip_html'
),
# ... more transforms
}
)
# 3. Create engine with config
engine = MappingEngine(config)
# 4. Use engine to map
xccdf_benchmark = engine.map_benchmark(pydantic_benchmark)
Transformation Pipeline¶
How Transformations are Applied¶
# Example: strip_html transformation
# 1. YAML defines transform
transformations:
strip_html:
description: "Remove all HTML tags, return plain text"
function: "HTMLCleaner.strip_html"
# 2. Transform registered in registry
transform_registry.register('strip_html', HTMLCleaner.strip_html)
# 3. Field mapping specifies transform
field_mappings:
description:
source_field: "description"
transform: "strip_html"
# 4. During mapping, transform is applied
value = recommendation.description # "<p>This is <strong>text</strong></p>"
transformed = transform_registry.apply('strip_html', value)
# Result: "This is text"
Transform Chain Example¶
# Apply multiple transforms in sequence
transform_pipeline.apply_chain(
['strip_html', 'html_to_markdown'],
value
)
# Equivalent to:
value = strip_html(value)
value = html_to_markdown(value)
Variable Substitution¶
How Variables are Resolved¶
# Example: Rule ID generation
# 1. YAML template
rule_id:
template: "xccdf_cis_{platform}_rule_{ref_normalized}"
# 2. Context has variables
context = MappingContext(
benchmark=benchmark,
recommendation=rec,
platform="eks", # Extracted from title
ref_normalized="3_1_1" # From rec.ref "3.1.1"
)
# 3. Substitute variables
rule_id = variable_substituter.substitute(
"xccdf_cis_{platform}_rule_{ref_normalized}",
context
)
# Result: "xccdf_cis_eks_rule_3_1_1"
Nested Variable Access¶
# YAML
attributes:
version: "{control.version}"
id: "{control.control}"
# Code
context.current_control = CISControl(version=8, control="4.8", ...)
attribute_value = variable_substituter.substitute(
"{control.version}",
context
)
# Result: "8"
Error Handling Strategy¶
Validation Points¶
Config Load Time
- YAML syntax valid
- Required sections present
- Field mappings well-formed
- Transform functions exist
Mapping Time
- Source fields exist
- Transformations succeed
- Variables can be resolved
- Required fields populated
Output Time
- XCCDF validates against schema
- All required elements present
- IDs unique and valid
Error Handling Example¶
try:
# Load config
config = ConfigLoader.load('disa_style.yaml')
# Create engine
engine = MappingEngine(config)
# Map benchmark
xccdf_benchmark = engine.map_benchmark(pydantic_benchmark)
# Serialize to XML
xml_output = xccdf_benchmark.to_xml()
except ConfigValidationError as e:
logger.error(f"Invalid configuration: {e}")
logger.error(f"File: {e.config_path}")
logger.error(f"Issue: {e.validation_errors}")
except FieldMappingError as e:
logger.error(f"Failed to map field: {e.field_name}")
logger.error(f"Reason: {e}")
logger.error(f"Recommendation: {context.recommendation.ref}")
except TransformError as e:
logger.error(f"Transform '{e.transform_name}' failed")
logger.error(f"Input value: {e.input_value}")
logger.error(f"Error: {e}")
except VariableSubstitutionError as e:
logger.error(f"Variable '{e.variable}' not found")
logger.error(f"Template: {e.template}")
logger.error(f"Available variables: {context.list_variables()}")
except ValidationError as e:
logger.error(f"Output XCCDF validation failed")
logger.error(f"Schema error: {e}")
Extensibility¶
Adding New Transformations¶
# 1. Define transformation function
def uppercase_transform(value: str) -> str:
"""Convert text to uppercase."""
return value.upper() if value else ""
# 2. Register with registry
transform_registry.register('uppercase', uppercase_transform)
# 3. Use in YAML config
field_mappings:
title:
source_field: "title"
transform: "uppercase"
# No code changes needed in MappingEngine!
Adding New Field Structures¶
# 1. Create new strategy
class DublinCoreStrategy(MappingStrategy):
"""Build Dublin Core metadata elements."""
def apply(self, mapping: FieldMapping, source_obj: Any, context: MappingContext):
# Build DC elements
return dc_elements
# 2. Register strategy
strategy_registry.register('dublin_core', DublinCoreStrategy)
# 3. Use in YAML
field_mappings:
reference:
target_element: "reference"
structure: "dublin_core"
dc_elements:
- element: "dc:identifier"
content: "{nist_control_id}"
# Engine automatically uses DublinCoreStrategy!
Adding New Mapping Styles¶
# Just create new YAML file!
# cis_native_style.yaml
metadata:
style_name: "cis_native"
description: "CIS native XCCDF format"
xccdf_version: "1.2"
field_mappings:
# Different mappings than DISA style
description:
target_element: "description"
source_field: "description"
transform: "html_to_markdown" # Preserve formatting
# Use it
engine = MappingEngine.from_config('cis_native_style.yaml')
xccdf = engine.map_benchmark(benchmark)
Testing Strategy¶
Unit Tests¶
def test_simple_field_mapping():
"""Test basic field mapping with transformation."""
mapping = FieldMapping(
target_element='title',
source_field='title',
transform='strip_html'
)
rec = Recommendation(title="<p>Test</p>")
context = MappingContext(recommendation=rec, ...)
strategy = SimpleFieldStrategy()
result = strategy.apply(mapping, rec, context)
assert result == "Test"
def test_composite_field_mapping():
"""Test composite field (VulnDiscussion)."""
mapping = FieldMapping(
structure='composite',
components=[
ComponentMapping(
sources=[
SourceDef(field='description', transform='strip_html'),
SourceDef(field='rationale', transform='strip_html')
],
separator='\n\n'
)
]
)
rec = Recommendation(
description="<p>Desc</p>",
rationale="<p>Rationale</p>"
)
strategy = CompositeFieldStrategy()
result = strategy.apply(mapping, rec, context)
assert result == "Desc\n\nRationale"
def test_variable_substitution():
"""Test variable substitution in templates."""
context = MappingContext(
ref_normalized="3_1_1",
platform="eks"
)
substituter = VariableSubstituter()
result = substituter.substitute(
"xccdf_cis_{platform}_rule_{ref_normalized}",
context
)
assert result == "xccdf_cis_eks_rule_3_1_1"
Integration Tests¶
def test_full_benchmark_mapping():
"""Test complete benchmark mapping using real config."""
# Load config
engine = MappingEngine.from_config('disa_style.yaml')
# Load test benchmark
benchmark = Benchmark.from_json_file('tests/fixtures/test_benchmark.json')
# Map to XCCDF
xccdf_benchmark = engine.map_benchmark(benchmark)
# Validate structure
assert xccdf_benchmark.id.startswith('xccdf_cis_')
assert len(xccdf_benchmark.rule) == len(benchmark.recommendations)
# Validate first rule
rule = xccdf_benchmark.rule[0]
assert rule.id == "xccdf_cis_eks_rule_3_1_1"
assert rule.title
assert rule.description
# Validate embedded XML in description
desc_content = rule.description[0].content[0]
assert '<VulnDiscussion>' in desc_content
assert '<FalsePositives>' in desc_content
Config Validation Tests¶
def test_config_validation_missing_required():
"""Test config validation catches missing required sections."""
with pytest.raises(ConfigValidationError) as exc:
ConfigLoader.load('tests/configs/invalid_missing_field_mappings.yaml')
assert 'field_mappings' in str(exc.value)
def test_config_validation_invalid_transform():
"""Test config validation catches undefined transforms."""
with pytest.raises(ConfigValidationError) as exc:
ConfigLoader.load('tests/configs/invalid_transform.yaml')
assert 'unknown_transform' in str(exc.value)
Performance Considerations¶
Caching¶
class MappingEngine:
"""Engine with caching for expensive operations."""
def __init__(self, config: MappingConfig):
self.config = config
self._platform_cache = {}
self._cci_cache = {}
def _extract_platform(self, title: str) -> str:
"""Extract platform with caching."""
if title not in self._platform_cache:
self._platform_cache[title] = self._do_extract_platform(title)
return self._platform_cache[title]
Batch Processing¶
def map_benchmark(self, benchmark: Benchmark) -> XCCDFBenchmark:
"""Map with batch processing for expensive operations."""
# Pre-fetch all CCIs in batch (avoid N+1 queries)
all_cis_controls = self._collect_all_cis_controls(benchmark)
cci_batch = self.cci_service.batch_lookup(all_cis_controls)
# Map rules with cached CCIs
for rec in benchmark.recommendations:
# Use cached CCI results
...
Directory Structure¶
cis_bench/
├── exporters/
│ ├── configs/
│ │ ├── disa_style.yaml
│ │ ├── cis_native_style.yaml
│ │ └── custom_style.yaml
│ │
│ ├── mapping/
│ │ ├── __init__.py
│ │ ├── engine.py # MappingEngine
│ │ ├── config.py # ConfigLoader, MappingConfig
│ │ ├── field_mapper.py # FieldMapper
│ │ ├── transforms.py # TransformRegistry, built-in transforms
│ │ ├── variables.py # VariableSubstituter, MappingContext
│ │ ├── strategies.py # Mapping strategies
│ │ └── errors.py # Exception classes
│ │
│ └── xccdf_exporter.py # Uses MappingEngine
│
└── utils/
├── html_parser.py # HTMLCleaner (used by transforms)
└── cci_lookup.py # CCILookupService (used by strategies)
Summary¶
The MappingEngine is a sophisticated, configuration-driven system that:
- Loads YAML configs defining field mappings and transformations
- Applies transformations via pluggable registry (strip_html, markdown, etc.)
- Handles complex structures (embedded XML, composite fields, nested elements)
- Supports variable substitution ({ref}, {ref_normalized}, etc.)
- Uses strategy pattern for different field mapping types
- Is extensible - add transforms, strategies, and styles without code changes
- Provides clear error handling at config, mapping, and validation stages
Benefits¶
- Separation of Concerns: Mapping logic separate from XCCDF serialization
- Maintainability: Change mappings via YAML, not code
- Testability: Each component tested independently
- Extensibility: New transforms, strategies, styles via registration
- Clarity: Explicit configuration shows what maps to what
- Flexibility: Support multiple XCCDF styles (DISA, CIS, custom)