Skip to content

Python Client Examples

Real-world usage patterns and workflows for the cyber-trackr-live Python client.

Basic Operations

List and Filter Documents

python
from cyber_trackr_helper import CyberTrackrHelper

helper = CyberTrackrHelper()

# Get all STIGs
stigs = helper.list_stigs()
print(f"Total STIGs: {len(stigs)}")

# Search for Red Hat STIGs (flexible matching)
for search_term in ["Red Hat", "RedHat", "red_hat"]:
    results = helper.search_documents(search_term, doc_type='stig')
    print(f"'{search_term}' found: {len(results)}")  # All return 8

Fuzzy Search for User Input

python
# User might not know exact name
user_input = "linux rhel"

# Fuzzy search finds close matches
results = helper.fuzzy_search_documents(user_input, threshold=0.5)

print("Did you mean one of these?")
for name, score, versions in results[:10]:
    print(f"  {score:.0%} match: {name}")

Working with STIGs

Get Latest STIG Version

python
# Find latest version automatically
latest = helper.get_latest_version("Red_Hat_Enterprise_Linux_9")

if latest:
    print(f"Latest RHEL 9 STIG: v{latest.version}r{latest.release}")
    print(f"Released: {latest.released}")

    # Fetch the complete STIG
    stig = helper.fetch_complete_stig(
        "Red_Hat_Enterprise_Linux_9",
        latest.version,
        latest.release
    )

Focus on High-Severity Requirements

python
# Get only critical (HIGH) requirements
high_reqs = helper.get_high_severity_requirements(
    "Red_Hat_Enterprise_Linux_9", "1", "2"
)

print(f"HIGH severity requirements: {len(high_reqs)}")

for v_id, req in list(high_reqs.items())[:5]:
    print(f"\n{v_id}: {req.get('title')}")
    print(f"  Severity: {req.get('severity')}")
    print(f"  Rule: {req.get('rule')}")

Get Severity Summary

python
# Quick overview of STIG by severity
summary = helper.get_requirements_summary_by_severity(
    "Red_Hat_Enterprise_Linux_9", "1", "2"
)

print("Severity Distribution:")
print(f"  HIGH:   {len(summary['high'])} requirements")
print(f"  MEDIUM: {len(summary['medium'])} requirements")
print(f"  LOW:    {len(summary['low'])} requirements")
print(f"  TOTAL:  {sum(len(v) for v in summary.values())}")

Working with RMF Controls

Explore Control Families

python
# Get all families
families = ['AC', 'AT', 'AU', 'CA', 'CM', 'CP', 'IA', 'IR', 'MA', 'MP',
            'PE', 'PL', 'PM', 'PS', 'PT', 'RA', 'SA', 'SC', 'SI', 'SR']

for family_code in families[:5]:
    info = helper.get_control_family_info(family_code, revision=5)
    print(f"{family_code}: {info['family_name']} ({info['control_count']} controls)")

# Output:
# AC: Access Control (25 controls)
# AT: Awareness and Training (6 controls)
# AU: Audit and Accountability (16 controls)

Compare Control Revisions

python
# See what changed between Rev 4 and Rev 5
comparison = helper.compare_control_revisions('AC-1')

print("AC-1 Comparison:")
print(f"  Rev 4 Title: {comparison['rev4']['title']}")
print(f"  Rev 5 Title: {comparison['rev5']['title']}")

# Check baseline differences
rev4_baseline = comparison['rev4'].get('baseline', [])
rev5_baseline = comparison['rev5'].get('baseline', [])
print(f"  Baseline change: {rev4_baseline}{rev5_baseline}")

Working with CCIs

Search and Explore CCIs

python
# Find CCIs related to passwords
password_ccis = helper.search_ccis('password')
print(f"Found {len(password_ccis)} password-related CCIs")

for cci_id, definition in list(password_ccis.items())[:3]:
    print(f"\n{cci_id}:")
    print(f"  {definition[:100]}...")

    # Get full CCI details with RMF mapping
    details = helper.get_cci_with_controls(cci_id)
    print(f"  Maps to: {list(details['mapped_controls'].keys())}")

Get CCIs for a Control

python
# Find all CCIs that map to AC-1
ac1_ccis = helper.get_ccis_for_rmf_control('AC-1', revision=5)

print(f"AC-1 has {len(ac1_ccis)} mapped CCIs:")
for cci_id in ac1_ccis:
    print(f"  - {cci_id}")

Cross-Referencing: STIG → CCI → RMF

Compliance Chain for One Requirement

python
# Get full compliance traceability
chain = helper.get_compliance_chain(
    "Red_Hat_Enterprise_Linux_9", "1", "2",
    "V-257777"
)

print(f"STIG Requirement: {chain['stig_requirement']['title']}")
print(f"Severity: {chain['stig_requirement']['severity']}")
print()
print("Maps to CCIs:")
for cci in chain['ccis']:
    print(f"  - {cci['id']}: {cci['definition'][:50]}...")
print()
print(f"Which map to RMF controls: {chain['rmf_controls']}")

Map Entire STIG to RMF

python
# Map all requirements in a STIG to RMF controls
mapping = helper.map_stig_to_rmf(
    "Red_Hat_Enterprise_Linux_9", "1", "2",
    delay=0.2  # Be respectful to API
)

# Analyze the mapping
rmf_coverage = {}
for v_id, data in mapping.items():
    for control in data.get('rmf_controls', []):
        if control not in rmf_coverage:
            rmf_coverage[control] = []
        rmf_coverage[control].append(v_id)

print("RMF Control Coverage:")
for control, v_ids in sorted(rmf_coverage.items())[:10]:
    print(f"  {control}: {len(v_ids)} requirements")

Production Workflows

Caching for Dashboard/Reports

python
# Production setup with aggressive caching
helper = CyberTrackrHelper(
    enable_cache=True,
    cache_ttl=7200,  # 2 hours
    cache_backend='redis',  # Shared cache
    rate_limit_per_second=10
)

# These calls will be fast after first request
stigs = helper.list_stigs()  # Cached
ac_controls = helper.get_control_family('AC')  # Cached
password_ccis = helper.search_ccis('password')  # Cached

# Build dashboard data
dashboard_data = {
    'total_stigs': len(helper.list_stigs()),
    'total_srgs': len(helper.list_srgs()),
    'total_ccis': len(helper.search_ccis('')),  # All CCIs
    'rmf_families': 20  # AC, AT, AU, etc.
}

Batch Processing with Progress

python
from tqdm import tqdm  # pip install tqdm

# Process multiple STIGs with progress bar
red_hat_stigs = helper.search_documents("Red_Hat", doc_type='stig')

results = []
for name in tqdm(red_hat_stigs.keys(), desc="Processing Red Hat STIGs"):
    latest = helper.get_latest_version(name)
    if latest:
        stig = helper.fetch_complete_stig(
            name, latest.version, latest.release,
            delay=0.2
        )
        results.append(stig)

print(f"Processed {len(results)} STIGs")

Export to Different Formats

python
import json
import csv

# Export STIG to JSON
stig = helper.fetch_complete_stig("Red_Hat_Enterprise_Linux_9", "1", "2")

with open('rhel9_stig.json', 'w') as f:
    json.dump(stig, f, indent=2)

# Export requirements to CSV
with open('rhel9_requirements.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['V-ID', 'Title', 'Severity', 'SV-ID'])

    for v_id, req in stig['requirements'].items():
        writer.writerow([
            v_id,
            req.get('title', ''),
            req.get('severity', ''),
            req.get('rule', '')
        ])

Advanced Patterns

Async Batch Processing

python
import asyncio
from cyber_trackr_api_client import Client
from cyber_trackr_api_client.api.documents import get_document

async def fetch_multiple_stigs():
    """Fetch multiple STIGs concurrently"""
    async with Client(base_url="https://cyber.trackr.live/api") as client:
        tasks = [
            get_document.asyncio(
                name="Red_Hat_Enterprise_Linux_9",
                version="1",
                release="2",
                client=client
            ),
            get_document.asyncio(
                name="Red_Hat_Enterprise_Linux_8",
                version="1",
                release="14",
                client=client
            ),
        ]

        results = await asyncio.gather(*tasks)
        return results

# Run async
stigs = asyncio.run(fetch_multiple_stigs())
print(f"Fetched {len(stigs)} STIGs concurrently")

Custom Error Handling

python
from cyber_trackr_api_client.errors import UnexpectedStatus

def safe_fetch_stig(name, version, release):
    """Fetch STIG with graceful error handling"""
    try:
        return helper.fetch_complete_stig(name, version, release)
    except UnexpectedStatus as e:
        if e.status_code == 500:
            print(f"Server error (likely invalid params): {name}/{version}/{release}")
            return None
        else:
            raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

# Use it
stig = safe_fetch_stig("Red_Hat_Enterprise_Linux_9", "1", "2")
if stig:
    print("Successfully fetched STIG")

Data Analysis

python
import pandas as pd

# Analyze STIG severity distribution across multiple STIGs
all_red_hat = helper.search_documents("Red_Hat", doc_type='stig')

data = []
for name in list(all_red_hat.keys())[:5]:  # First 5 for demo
    latest = helper.get_latest_version(name)
    if latest:
        summary = helper.get_requirements_summary_by_severity(
            name, latest.version, latest.release
        )
        data.append({
            'name': name,
            'version': f"{latest.version}.{latest.release}",
            'high': len(summary['high']),
            'medium': len(summary['medium']),
            'low': len(summary['low']),
            'total': sum(len(v) for v in summary.values())
        })

# Create DataFrame
df = pd.DataFrame(data)
print(df)

# Export to Excel
df.to_excel('red_hat_stig_analysis.xlsx', index=False)

Integration Examples

InSpec Profile Generation (Concept)

python
def generate_inspec_profile(stig_data, output_dir):
    """
    Generate InSpec profile from STIG data.

    This is a conceptual example - full implementation would
    require more STIG parsing logic.
    """
    import os

    os.makedirs(output_dir, exist_ok=True)

    # Create inspec.yml
    inspec_yml = {
        'name': stig_data['id'],
        'title': stig_data['title'],
        'version': stig_data.get('version', '1.0.0'),
        'summary': stig_data['description']
    }

    # Create controls for each requirement
    for v_id, req in stig_data['requirements'].items():
        # Generate InSpec control file
        # control_content = ...
        pass

Compliance Dashboard Data

python
def build_dashboard_data():
    """Build data for compliance dashboard"""
    helper = CyberTrackrHelper(enable_cache=True, cache_ttl=3600)

    return {
        'statistics': {
            'total_stigs': len(helper.list_stigs()),
            'total_srgs': len(helper.list_srgs()),
            'rmf_rev5_controls': sum(
                len(helper.get_control_family(f, revision=5))
                for f in ['AC', 'AU', 'SC', 'SI']  # Example families
            )
        },
        'recent_stigs': get_recently_updated_stigs(helper),
        'severity_summary': get_severity_breakdown(helper)
    }

def get_recently_updated_stigs(helper, limit=10):
    """Get most recently updated STIGs"""
    all_stigs = helper.list_stigs()

    # Sort by date (if available)
    dated_stigs = []
    for name, versions in all_stigs.items():
        for v in versions:
            if hasattr(v, 'date') and v.date:
                dated_stigs.append((name, v))

    dated_stigs.sort(key=lambda x: x[1].date, reverse=True)
    return dated_stigs[:limit]

See Also

Released under the Apache-2.0 License.