Python Client Examples
Real-world usage patterns and workflows for the cyber-trackr-live Python client.
Basic Operations
List and Filter Documents
python
from cyber_trackr_helper import CyberTrackrHelper
helper = CyberTrackrHelper()
# Get all STIGs
stigs = helper.list_stigs()
print(f"Total STIGs: {len(stigs)}")
# Search for Red Hat STIGs (flexible matching)
for search_term in ["Red Hat", "RedHat", "red_hat"]:
results = helper.search_documents(search_term, doc_type='stig')
print(f"'{search_term}' found: {len(results)}") # All return 8
Fuzzy Search for User Input
python
# User might not know exact name
user_input = "linux rhel"
# Fuzzy search finds close matches
results = helper.fuzzy_search_documents(user_input, threshold=0.5)
print("Did you mean one of these?")
for name, score, versions in results[:10]:
print(f" {score:.0%} match: {name}")
Working with STIGs
Get Latest STIG Version
python
# Find latest version automatically
latest = helper.get_latest_version("Red_Hat_Enterprise_Linux_9")
if latest:
print(f"Latest RHEL 9 STIG: v{latest.version}r{latest.release}")
print(f"Released: {latest.released}")
# Fetch the complete STIG
stig = helper.fetch_complete_stig(
"Red_Hat_Enterprise_Linux_9",
latest.version,
latest.release
)
Focus on High-Severity Requirements
python
# Get only critical (HIGH) requirements
high_reqs = helper.get_high_severity_requirements(
"Red_Hat_Enterprise_Linux_9", "1", "2"
)
print(f"HIGH severity requirements: {len(high_reqs)}")
for v_id, req in list(high_reqs.items())[:5]:
print(f"\n{v_id}: {req.get('title')}")
print(f" Severity: {req.get('severity')}")
print(f" Rule: {req.get('rule')}")
Get Severity Summary
python
# Quick overview of STIG by severity
summary = helper.get_requirements_summary_by_severity(
"Red_Hat_Enterprise_Linux_9", "1", "2"
)
print("Severity Distribution:")
print(f" HIGH: {len(summary['high'])} requirements")
print(f" MEDIUM: {len(summary['medium'])} requirements")
print(f" LOW: {len(summary['low'])} requirements")
print(f" TOTAL: {sum(len(v) for v in summary.values())}")
Working with RMF Controls
Explore Control Families
python
# Get all families
families = ['AC', 'AT', 'AU', 'CA', 'CM', 'CP', 'IA', 'IR', 'MA', 'MP',
'PE', 'PL', 'PM', 'PS', 'PT', 'RA', 'SA', 'SC', 'SI', 'SR']
for family_code in families[:5]:
info = helper.get_control_family_info(family_code, revision=5)
print(f"{family_code}: {info['family_name']} ({info['control_count']} controls)")
# Output:
# AC: Access Control (25 controls)
# AT: Awareness and Training (6 controls)
# AU: Audit and Accountability (16 controls)
Compare Control Revisions
python
# See what changed between Rev 4 and Rev 5
comparison = helper.compare_control_revisions('AC-1')
print("AC-1 Comparison:")
print(f" Rev 4 Title: {comparison['rev4']['title']}")
print(f" Rev 5 Title: {comparison['rev5']['title']}")
# Check baseline differences
rev4_baseline = comparison['rev4'].get('baseline', [])
rev5_baseline = comparison['rev5'].get('baseline', [])
print(f" Baseline change: {rev4_baseline} → {rev5_baseline}")
Working with CCIs
Search and Explore CCIs
python
# Find CCIs related to passwords
password_ccis = helper.search_ccis('password')
print(f"Found {len(password_ccis)} password-related CCIs")
for cci_id, definition in list(password_ccis.items())[:3]:
print(f"\n{cci_id}:")
print(f" {definition[:100]}...")
# Get full CCI details with RMF mapping
details = helper.get_cci_with_controls(cci_id)
print(f" Maps to: {list(details['mapped_controls'].keys())}")
Get CCIs for a Control
python
# Find all CCIs that map to AC-1
ac1_ccis = helper.get_ccis_for_rmf_control('AC-1', revision=5)
print(f"AC-1 has {len(ac1_ccis)} mapped CCIs:")
for cci_id in ac1_ccis:
print(f" - {cci_id}")
Cross-Referencing: STIG → CCI → RMF
Compliance Chain for One Requirement
python
# Get full compliance traceability
chain = helper.get_compliance_chain(
"Red_Hat_Enterprise_Linux_9", "1", "2",
"V-257777"
)
print(f"STIG Requirement: {chain['stig_requirement']['title']}")
print(f"Severity: {chain['stig_requirement']['severity']}")
print()
print("Maps to CCIs:")
for cci in chain['ccis']:
print(f" - {cci['id']}: {cci['definition'][:50]}...")
print()
print(f"Which map to RMF controls: {chain['rmf_controls']}")
Map Entire STIG to RMF
python
# Map all requirements in a STIG to RMF controls
mapping = helper.map_stig_to_rmf(
"Red_Hat_Enterprise_Linux_9", "1", "2",
delay=0.2 # Be respectful to API
)
# Analyze the mapping
rmf_coverage = {}
for v_id, data in mapping.items():
for control in data.get('rmf_controls', []):
if control not in rmf_coverage:
rmf_coverage[control] = []
rmf_coverage[control].append(v_id)
print("RMF Control Coverage:")
for control, v_ids in sorted(rmf_coverage.items())[:10]:
print(f" {control}: {len(v_ids)} requirements")
Production Workflows
Caching for Dashboard/Reports
python
# Production setup with aggressive caching
helper = CyberTrackrHelper(
enable_cache=True,
cache_ttl=7200, # 2 hours
cache_backend='redis', # Shared cache
rate_limit_per_second=10
)
# These calls will be fast after first request
stigs = helper.list_stigs() # Cached
ac_controls = helper.get_control_family('AC') # Cached
password_ccis = helper.search_ccis('password') # Cached
# Build dashboard data
dashboard_data = {
'total_stigs': len(helper.list_stigs()),
'total_srgs': len(helper.list_srgs()),
'total_ccis': len(helper.search_ccis('')), # All CCIs
'rmf_families': 20 # AC, AT, AU, etc.
}
Batch Processing with Progress
python
from tqdm import tqdm # pip install tqdm
# Process multiple STIGs with progress bar
red_hat_stigs = helper.search_documents("Red_Hat", doc_type='stig')
results = []
for name in tqdm(red_hat_stigs.keys(), desc="Processing Red Hat STIGs"):
latest = helper.get_latest_version(name)
if latest:
stig = helper.fetch_complete_stig(
name, latest.version, latest.release,
delay=0.2
)
results.append(stig)
print(f"Processed {len(results)} STIGs")
Export to Different Formats
python
import json
import csv
# Export STIG to JSON
stig = helper.fetch_complete_stig("Red_Hat_Enterprise_Linux_9", "1", "2")
with open('rhel9_stig.json', 'w') as f:
json.dump(stig, f, indent=2)
# Export requirements to CSV
with open('rhel9_requirements.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['V-ID', 'Title', 'Severity', 'SV-ID'])
for v_id, req in stig['requirements'].items():
writer.writerow([
v_id,
req.get('title', ''),
req.get('severity', ''),
req.get('rule', '')
])
Advanced Patterns
Async Batch Processing
python
import asyncio
from cyber_trackr_api_client import Client
from cyber_trackr_api_client.api.documents import get_document
async def fetch_multiple_stigs():
"""Fetch multiple STIGs concurrently"""
async with Client(base_url="https://cyber.trackr.live/api") as client:
tasks = [
get_document.asyncio(
name="Red_Hat_Enterprise_Linux_9",
version="1",
release="2",
client=client
),
get_document.asyncio(
name="Red_Hat_Enterprise_Linux_8",
version="1",
release="14",
client=client
),
]
results = await asyncio.gather(*tasks)
return results
# Run async
stigs = asyncio.run(fetch_multiple_stigs())
print(f"Fetched {len(stigs)} STIGs concurrently")
Custom Error Handling
python
from cyber_trackr_api_client.errors import UnexpectedStatus
def safe_fetch_stig(name, version, release):
"""Fetch STIG with graceful error handling"""
try:
return helper.fetch_complete_stig(name, version, release)
except UnexpectedStatus as e:
if e.status_code == 500:
print(f"Server error (likely invalid params): {name}/{version}/{release}")
return None
else:
raise
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Use it
stig = safe_fetch_stig("Red_Hat_Enterprise_Linux_9", "1", "2")
if stig:
print("Successfully fetched STIG")
Data Analysis
python
import pandas as pd
# Analyze STIG severity distribution across multiple STIGs
all_red_hat = helper.search_documents("Red_Hat", doc_type='stig')
data = []
for name in list(all_red_hat.keys())[:5]: # First 5 for demo
latest = helper.get_latest_version(name)
if latest:
summary = helper.get_requirements_summary_by_severity(
name, latest.version, latest.release
)
data.append({
'name': name,
'version': f"{latest.version}.{latest.release}",
'high': len(summary['high']),
'medium': len(summary['medium']),
'low': len(summary['low']),
'total': sum(len(v) for v in summary.values())
})
# Create DataFrame
df = pd.DataFrame(data)
print(df)
# Export to Excel
df.to_excel('red_hat_stig_analysis.xlsx', index=False)
Integration Examples
InSpec Profile Generation (Concept)
python
def generate_inspec_profile(stig_data, output_dir):
"""
Generate InSpec profile from STIG data.
This is a conceptual example - full implementation would
require more STIG parsing logic.
"""
import os
os.makedirs(output_dir, exist_ok=True)
# Create inspec.yml
inspec_yml = {
'name': stig_data['id'],
'title': stig_data['title'],
'version': stig_data.get('version', '1.0.0'),
'summary': stig_data['description']
}
# Create controls for each requirement
for v_id, req in stig_data['requirements'].items():
# Generate InSpec control file
# control_content = ...
pass
Compliance Dashboard Data
python
def build_dashboard_data():
"""Build data for compliance dashboard"""
helper = CyberTrackrHelper(enable_cache=True, cache_ttl=3600)
return {
'statistics': {
'total_stigs': len(helper.list_stigs()),
'total_srgs': len(helper.list_srgs()),
'rmf_rev5_controls': sum(
len(helper.get_control_family(f, revision=5))
for f in ['AC', 'AU', 'SC', 'SI'] # Example families
)
},
'recent_stigs': get_recently_updated_stigs(helper),
'severity_summary': get_severity_breakdown(helper)
}
def get_recently_updated_stigs(helper, limit=10):
"""Get most recently updated STIGs"""
all_stigs = helper.list_stigs()
# Sort by date (if available)
dated_stigs = []
for name, versions in all_stigs.items():
for v in versions:
if hasattr(v, 'date') and v.date:
dated_stigs.append((name, v))
dated_stigs.sort(key=lambda x: x[1].date, reverse=True)
return dated_stigs[:limit]
See Also
- Helper Methods Reference - Complete method documentation
- Optimization Guide - Performance best practices
- Ruby Examples - Similar patterns in Ruby