5. Sub-DAG and Orchestration
The CLD system operates through a two-tier architecture: Sub-DAGs that process data for specific time periods and provider/payer combinations, and an Orchestrator that manages and combines Sub-DAG outputs into final production tables.
Sub-DAG Architecture​
Core Licensable Data Sub-DAG​
The core_licensable_data_sub_dag is the primary processing unit that handles a specific sub-version of data (e.g., "2025_04"). Each Sub-DAG processes a complete dataset for a given time period through a structured pipeline:
Sub-DAG Task Groups​
1. Rate Object Space (rate_object_space)
# Build foundational data structures
spines = [
build_provider_spines(), # Provider reference data
build_payer_spines(), # Payer/network mappings
build_network_spines(), # Network hierarchies
build_code_spines(), # Billing code references
]
build_rate_object_space() # Cartesian product of valid combinations
2. Raw Data Processing (raw_data)
# Process MRF data in parallel chunks
with TaskGroup("raw_payer_data"):
payer_id_chunks = get_payer_payer_chunks(n_chunks=3)
payer_chunks = build_payer_mrf.partial().expand(payer_id_chunk=payer_id_chunks)
union_chunks = union_payer_mrf(payer_id_chunks)
with TaskGroup("raw_hospital_data"):
provider_id_lists = get_hospital_chunks()
hospital_chunks = build_hospital_mrf.partial().expand(provider_id_chunk=provider_id_lists)
hospital_mrf = build_hospital_mrf_union(provider_id_lists)
3. Analysis Tables (analysis)
# Advanced processing with chunked execution
with TaskGroup("imputations"):
payer_id_chunks = get_ros_payer_chunks()
imputations_chunks = get_imputations.partial().expand(payer_id_chunk=payer_id_chunks)
imputations_union = get_imputations_union(payer_id_chunks)
with TaskGroup("accuracy_raw"):
accuracy_chunks = build_accuracy_raw.partial().expand(payer_id_chunk=payer_id_chunks)
accuracy_union = build_accuracy_raw_union(payer_id_chunks)
4. Main Tables (main)
# Final canonicalization with whispers integration
with TaskGroup("main_all"):
main_combined_chunks = build_combined_main.partial().expand(payer_id_chunk=payer_id_chunks)
main_combined_union = build_combined_main_union(payer_id_chunks)
with TaskGroup("whispers"):
whisper_tasks = [
build_payer(),
build_network(),
build_provider(),
build_code(),
build_provider_payer(),
build_provider_code(),
build_payer_code(),
]
Chunking Strategy​
The Sub-DAG uses sophisticated chunking to handle large datasets efficiently:
Payer-Based Chunking​
def assign_chunks(df, max_chunk_n, large_payer_max_chunk_n, max_payer_n):
# Step 1: Identify large payers (>max_payer_n records)
payer_totals = df.groupby("payer_id")["n"].sum().reset_index()
large_payers = set(payer_totals[payer_totals["n"] > max_payer_n]["payer_id"])
# Step 2: Process large payers individually with smaller chunks
for payer_id in large_payers:
# Split large payer into chunks of large_payer_max_chunk_n
create_payer_specific_chunks(payer_id, large_payer_max_chunk_n)
# Step 3: Process small payers together in larger chunks
combine_small_payers_into_chunks(max_chunk_n)
Provider-Based Chunking​
# Hospital data chunked by provider to optimize parallel processing
provider_id_lists = get_hospital_chunks(n_chunks=5, state_chunk=True)
# Creates chunks like:
# chunk_0: ['4740', '6204', '1234'] # providers in chunk 0
# chunk_1: ['5678', '9012', '3456'] # providers in chunk 1
Sub-DAG Configuration​
Each Sub-DAG instance is configured with:
DEFAULT_PARAMS = {
"sql_loc": "/path/to/sql/files",
"payer_schema": "tq_production.public_2025_04", # Monthly payer data
"hospital_schema": "glue.hospital_data", # Hospital MRF data
"sub_version": "2025_04", # Time period identifier
"version": "v2_0", # CLD version
"schema_name": "tq_dev.internal_dev_csong_cld_", # Output schema
"provider_types": [
'Childrens Hospital',
'Critical Access Hospital',
'Short Term Acute Care Hospital',
'Rehabilitation Hospital',
'ASC',
'Physician Group',
'Laboratory'
]
}
Orchestrator Architecture​
Core Licensable Data Orchestrator​
The core_licensable_data_orchestrator manages multiple Sub-DAG outputs and creates final production tables:
Orchestrator Responsibilities​
1. Sub-Version Management
# Track multiple sub-versions for historical comparison
historic_sub_versions_dict = {
"6m": "2024_10", # 6 months ago
"12m": "2024_04", # 12 months ago
"18m": "2023_10" # 18 months ago
}
# Get columns for historical rate comparisons
historic_columns = get_historic_columns(historic_sub_versions_dict, prefix="lag_")
# Results in: ['lag_6m_canonical_rate', 'lag_6m_canonical_rate_score', ...]
2. Cross-Sub-Version Rate Selection
-- Example merge logic in merge_combined_all_joined_chunks.sql
SELECT
roid,
-- Select best rate across all sub-versions
CASE
WHEN sub_version_2025_04.canonical_rate_score >= sub_version_2025_03.canonical_rate_score
THEN sub_version_2025_04.canonical_rate
ELSE sub_version_2025_03.canonical_rate
END as canonical_rate,
-- Track which sub-version was selected
CASE
WHEN sub_version_2025_04.canonical_rate_score >= sub_version_2025_03.canonical_rate_score
THEN '2025_04'
ELSE '2025_03'
END as canonical_rate_subversion
FROM sub_version_2025_04
LEFT JOIN sub_version_2025_03 USING (roid)
3. Production Table Creation
The Orchestrator creates multiple production views:
Segments​
@task
def create_segment_medical():
# Medical procedures segment
run_query(file="views/segments/medical.sql")
@task
def create_segment_drugs():
# Pharmaceutical products segment
run_query(file="views/segments/drugs.sql")
Rollups​
@task
def create_rollup_provider():
# Provider-level aggregations
run_query(file="views/rollups/provider.sql")
@task
def create_rollup_payer():
# Payer-level aggregations
run_query(file="views/rollups/payer.sql")
Orchestrator Task Flow​
The Orchestrator follows this execution pattern:
1. Chunk Processing
# Process each chunk independently
for chunk_id in chunk_range:
merge_combined_all_joined_chunks(chunk_id) # Merge sub-versions for chunk
# Union all processed chunks
union_combined_all() # Final prod_combined_all table
2. Quality Assurance
# Run production-level QA tests
with TaskGroup("production_qa"):
validate_canonical_rate_coverage()
check_rate_reasonableness()
verify_historical_consistency()
test_benchmark_alignment()
3. Final Table Creation
# Create final production assets
create_prod_combined_all() # Main rates table
create_prod_segment_medical() # Medical procedures view
create_prod_segment_drugs() # Drug products view
create_prod_rollup_provider() # Provider aggregations
create_prod_rollup_payer() # Payer aggregations
Data Flow Architecture​
Sub-DAG to Orchestrator Flow​
Parallel Processing Benefits​
Sub-DAG Level:
- Multiple payer chunks processed simultaneously
- Provider data processed in parallel by geographic region
- Independent processing of raw data, transformations, imputations, and accuracy scoring
Orchestrator Level:
- Multiple sub-versions can be processed concurrently
- Chunk-based merging enables memory-efficient processing of large datasets
- Segment and rollup creation parallelized
Configuration Management​
Sub-DAG Configuration​
# Each sub-DAG configured for specific time period and scope
SUB_DAG_CONFIG = {
"core_licensable_data_sub_dag_2025_04": {
"payer_schema": "tq_production.public_2025_04",
"sub_version": "2025_04",
"enabled_provider_types": ["Hospital", "ASC"],
"chunking_strategy": "large_payer_separate"
}
}
Orchestrator Configuration​
# Orchestrator manages multiple sub-versions and final outputs
ORCHESTRATOR_CONFIG = {
"sub_versions_to_merge": ["2025_04", "2025_03", "2025_02"],
"production_schema": "tq_production.cld_v2_0",
"quality_gates": ["rate_coverage", "benchmark_alignment", "historical_consistency"],
"output_segments": ["medical", "drugs", "physician_groups"]
}
Scalability and Performance​
Horizontal Scaling:
- Sub-DAGs can run on separate compute resources
- Chunk processing enables parallel execution within Sub-DAGs
- Orchestrator can merge outputs from distributed Sub-DAG execution
Memory Management:
- Large datasets processed in memory-efficient chunks
- Temporary tables cleaned up after processing
- Streaming approach for very large payer datasets
Fault Tolerance:
- Individual chunk failures don't affect other chunks
- Sub-DAG failures can be retried independently
- Orchestrator can merge partial outputs while failed Sub-DAGs are reprocessed
This two-tier architecture enables CLD to process massive healthcare rate datasets efficiently while maintaining data quality and providing flexible output formats for different use cases.