Skip to main content
Version: Canary - 2.3 🚧

5. Sub-DAG and Orchestration

The CLD system operates through a two-tier architecture: Sub-DAGs that process data for specific time periods and provider/payer combinations, and an Orchestrator that manages and combines Sub-DAG outputs into final production tables.

Sub-DAG Architecture​

Core Licensable Data Sub-DAG​

The core_licensable_data_sub_dag is the primary processing unit that handles a specific sub-version of data (e.g., "2025_04"). Each Sub-DAG processes a complete dataset for a given time period through a structured pipeline:

Sub-DAG Task Groups​

1. Rate Object Space (rate_object_space)

# Build foundational data structures
spines = [
build_provider_spines(), # Provider reference data
build_payer_spines(), # Payer/network mappings
build_network_spines(), # Network hierarchies
build_code_spines(), # Billing code references
]
build_rate_object_space() # Cartesian product of valid combinations

2. Raw Data Processing (raw_data)

# Process MRF data in parallel chunks
with TaskGroup("raw_payer_data"):
payer_id_chunks = get_payer_payer_chunks(n_chunks=3)
payer_chunks = build_payer_mrf.partial().expand(payer_id_chunk=payer_id_chunks)
union_chunks = union_payer_mrf(payer_id_chunks)

with TaskGroup("raw_hospital_data"):
provider_id_lists = get_hospital_chunks()
hospital_chunks = build_hospital_mrf.partial().expand(provider_id_chunk=provider_id_lists)
hospital_mrf = build_hospital_mrf_union(provider_id_lists)

3. Analysis Tables (analysis)

# Advanced processing with chunked execution
with TaskGroup("imputations"):
payer_id_chunks = get_ros_payer_chunks()
imputations_chunks = get_imputations.partial().expand(payer_id_chunk=payer_id_chunks)
imputations_union = get_imputations_union(payer_id_chunks)

with TaskGroup("accuracy_raw"):
accuracy_chunks = build_accuracy_raw.partial().expand(payer_id_chunk=payer_id_chunks)
accuracy_union = build_accuracy_raw_union(payer_id_chunks)

4. Main Tables (main)

# Final canonicalization with whispers integration
with TaskGroup("main_all"):
main_combined_chunks = build_combined_main.partial().expand(payer_id_chunk=payer_id_chunks)
main_combined_union = build_combined_main_union(payer_id_chunks)

with TaskGroup("whispers"):
whisper_tasks = [
build_payer(),
build_network(),
build_provider(),
build_code(),
build_provider_payer(),
build_provider_code(),
build_payer_code(),
]

Chunking Strategy​

The Sub-DAG uses sophisticated chunking to handle large datasets efficiently:

Payer-Based Chunking​

def assign_chunks(df, max_chunk_n, large_payer_max_chunk_n, max_payer_n):
# Step 1: Identify large payers (>max_payer_n records)
payer_totals = df.groupby("payer_id")["n"].sum().reset_index()
large_payers = set(payer_totals[payer_totals["n"] > max_payer_n]["payer_id"])

# Step 2: Process large payers individually with smaller chunks
for payer_id in large_payers:
# Split large payer into chunks of large_payer_max_chunk_n
create_payer_specific_chunks(payer_id, large_payer_max_chunk_n)

# Step 3: Process small payers together in larger chunks
combine_small_payers_into_chunks(max_chunk_n)

Provider-Based Chunking​

# Hospital data chunked by provider to optimize parallel processing
provider_id_lists = get_hospital_chunks(n_chunks=5, state_chunk=True)

# Creates chunks like:
# chunk_0: ['4740', '6204', '1234'] # providers in chunk 0
# chunk_1: ['5678', '9012', '3456'] # providers in chunk 1

Sub-DAG Configuration​

Each Sub-DAG instance is configured with:

DEFAULT_PARAMS = {
"sql_loc": "/path/to/sql/files",
"payer_schema": "tq_production.public_2025_04", # Monthly payer data
"hospital_schema": "glue.hospital_data", # Hospital MRF data
"sub_version": "2025_04", # Time period identifier
"version": "v2_0", # CLD version
"schema_name": "tq_dev.internal_dev_csong_cld_", # Output schema
"provider_types": [
'Childrens Hospital',
'Critical Access Hospital',
'Short Term Acute Care Hospital',
'Rehabilitation Hospital',
'ASC',
'Physician Group',
'Laboratory'
]
}

Orchestrator Architecture​

Core Licensable Data Orchestrator​

The core_licensable_data_orchestrator manages multiple Sub-DAG outputs and creates final production tables:

Orchestrator Responsibilities​

1. Sub-Version Management

# Track multiple sub-versions for historical comparison
historic_sub_versions_dict = {
"6m": "2024_10", # 6 months ago
"12m": "2024_04", # 12 months ago
"18m": "2023_10" # 18 months ago
}

# Get columns for historical rate comparisons
historic_columns = get_historic_columns(historic_sub_versions_dict, prefix="lag_")
# Results in: ['lag_6m_canonical_rate', 'lag_6m_canonical_rate_score', ...]

2. Cross-Sub-Version Rate Selection

-- Example merge logic in merge_combined_all_joined_chunks.sql
SELECT
roid,
-- Select best rate across all sub-versions
CASE
WHEN sub_version_2025_04.canonical_rate_score >= sub_version_2025_03.canonical_rate_score
THEN sub_version_2025_04.canonical_rate
ELSE sub_version_2025_03.canonical_rate
END as canonical_rate,
-- Track which sub-version was selected
CASE
WHEN sub_version_2025_04.canonical_rate_score >= sub_version_2025_03.canonical_rate_score
THEN '2025_04'
ELSE '2025_03'
END as canonical_rate_subversion
FROM sub_version_2025_04
LEFT JOIN sub_version_2025_03 USING (roid)

3. Production Table Creation

The Orchestrator creates multiple production views:

Segments​

@task
def create_segment_medical():
# Medical procedures segment
run_query(file="views/segments/medical.sql")

@task
def create_segment_drugs():
# Pharmaceutical products segment
run_query(file="views/segments/drugs.sql")

Rollups​

@task
def create_rollup_provider():
# Provider-level aggregations
run_query(file="views/rollups/provider.sql")

@task
def create_rollup_payer():
# Payer-level aggregations
run_query(file="views/rollups/payer.sql")

Orchestrator Task Flow​

The Orchestrator follows this execution pattern:

1. Chunk Processing

# Process each chunk independently
for chunk_id in chunk_range:
merge_combined_all_joined_chunks(chunk_id) # Merge sub-versions for chunk

# Union all processed chunks
union_combined_all() # Final prod_combined_all table

2. Quality Assurance

# Run production-level QA tests
with TaskGroup("production_qa"):
validate_canonical_rate_coverage()
check_rate_reasonableness()
verify_historical_consistency()
test_benchmark_alignment()

3. Final Table Creation

# Create final production assets
create_prod_combined_all() # Main rates table
create_prod_segment_medical() # Medical procedures view
create_prod_segment_drugs() # Drug products view
create_prod_rollup_provider() # Provider aggregations
create_prod_rollup_payer() # Payer aggregations

Data Flow Architecture​

Sub-DAG to Orchestrator Flow​

Parallel Processing Benefits​

Sub-DAG Level:

  • Multiple payer chunks processed simultaneously
  • Provider data processed in parallel by geographic region
  • Independent processing of raw data, transformations, imputations, and accuracy scoring

Orchestrator Level:

  • Multiple sub-versions can be processed concurrently
  • Chunk-based merging enables memory-efficient processing of large datasets
  • Segment and rollup creation parallelized

Configuration Management​

Sub-DAG Configuration​

# Each sub-DAG configured for specific time period and scope
SUB_DAG_CONFIG = {
"core_licensable_data_sub_dag_2025_04": {
"payer_schema": "tq_production.public_2025_04",
"sub_version": "2025_04",
"enabled_provider_types": ["Hospital", "ASC"],
"chunking_strategy": "large_payer_separate"
}
}

Orchestrator Configuration​

# Orchestrator manages multiple sub-versions and final outputs
ORCHESTRATOR_CONFIG = {
"sub_versions_to_merge": ["2025_04", "2025_03", "2025_02"],
"production_schema": "tq_production.cld_v2_0",
"quality_gates": ["rate_coverage", "benchmark_alignment", "historical_consistency"],
"output_segments": ["medical", "drugs", "physician_groups"]
}

Scalability and Performance​

Horizontal Scaling:

  • Sub-DAGs can run on separate compute resources
  • Chunk processing enables parallel execution within Sub-DAGs
  • Orchestrator can merge outputs from distributed Sub-DAG execution

Memory Management:

  • Large datasets processed in memory-efficient chunks
  • Temporary tables cleaned up after processing
  • Streaming approach for very large payer datasets

Fault Tolerance:

  • Individual chunk failures don't affect other chunks
  • Sub-DAG failures can be retried independently
  • Orchestrator can merge partial outputs while failed Sub-DAGs are reprocessed

This two-tier architecture enables CLD to process massive healthcare rate datasets efficiently while maintaining data quality and providing flexible output formats for different use cases.