"""
Core Licensable Data: Sub DAG
"""
import datetime
import logging
import json
from pathlib import Path
from airflow import DAG
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
from airflow.providers.trino.hooks.trino import TrinoHook
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
logger = logging.getLogger(__name__)
DAG_ID = "core_licensable_data_sub_dag"
OWNER = "data_science"
DAG_TAG = [
"turquoise-health",
"data-science",
"cld",
"sub-dag"
]
DEFAULT_SETTINGS = Variable.get(
key=f"datascience/default_settings_{DAG_ID}", default_var={}, deserialize_json=True
)
DEFAULT_PARAMS = {
"sql_loc": DEFAULT_SETTINGS.get("sql_loc", f"{Path(__file__).parent}/sql"),
"s3_trino_bucket": DEFAULT_SETTINGS.get("s3_trino_bucket", "turquoise-health-payer-export-main"),
"db_conn_name": DEFAULT_SETTINGS.get("db_conn_name", "trino_default"),
"payer_schema": DEFAULT_SETTINGS.get("payer_schema", "tq_production.public_2025_09"),
"hospital_schema": DEFAULT_SETTINGS.get("hospital_schema", "tq_production.hospital_data"),
"sub_version": DEFAULT_SETTINGS.get("sub_version", "2025_09"),
"version": DEFAULT_SETTINGS.get("version", "vX_X"),
"schema_name": DEFAULT_SETTINGS.get("schema_name", "tq_dev.internal_dev_csong_cld_"),
"cmv_codes": DEFAULT_SETTINGS.get("cmv_codes", False),
"provider_types": DEFAULT_SETTINGS.get(
"provider_types",
[
'Hospital',
'ASC',
'Physician Group',
'Laboratory',
'Imaging Center'
]
),
"historic_run": DEFAULT_SETTINGS.get("historic_run", False),
"lookback_run": DEFAULT_SETTINGS.get("lookback_run", False),
"drug_dosage_standardization_month": DEFAULT_SETTINGS.get("drug_dosage_standardization_month", "2025_07")
}
TESTING_PARAMS = {
"sql_loc": "/opt/airflow/dags/core_licensable_data_sub_dag/sql",
"s3_trino_bucket": "turquoise-health-payer-export-main",
"db_conn_name": "trino_default",
"payer_schema": "tq_intermediate.cld_utils",
"hospital_schema": "tq_intermediate.cld_utils",
"sub_version": "test",
"version": "test",
"schema_name": "tq_dev.internal_dev_csong_cld_"
}
DEFAULT_ARGS = {
"owner": OWNER,
"depends_on_past": False,
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
"provide_context": True,
}
DB_CONN_NAME = "trino_default"
REDSHIFT_DB_CONN_NAME = "redshift_default"
with DAG(
dag_id=DAG_ID,
description="Builds core licensable data",
default_args=DEFAULT_ARGS,
schedule_interval=None,
catchup=False,
tags=DAG_TAG,
params=DEFAULT_PARAMS,
max_active_runs=8,
max_active_tasks=6,
) as dag:
from data_science.cld.core_licensable_data_sub_dag.tasks import (
reference,
raw,
custom,
whispers,
imputations,
transformations,
benchmarks,
accuracy,
main,
utils
)
from data_science.cld.core_licensable_data_sub_dag import qa as v
from data_science.cld.core_licensable_data_sub_dag.qa import (
uniqueness as qau
)
start_dag = EmptyOperator(task_id="start_dag")
wait_30_seconds = BashOperator(
task_id="wait_30_seconds",
bash_command="sleep 30"
)
with TaskGroup("rate_object_space", tooltip="Builds Rate Object Space using CLD-Spines") as build_ros_and_spines:
spines = [
reference.build_provider_spines(),
reference.build_payer_spines(),
reference.build_network_spines(),
reference.build_code_spines(),
]
build_asc_plausibility_task = reference.build_asc_plausibility()
wait_for_spines = EmptyOperator(task_id="wait_for_spines")
with TaskGroup("spine_validations") as spine_validations:
for table_name, tests in v.SPINE_VALIDATION_TESTS.items():
for test in tests:
test(table_name)
build_rate_object_space = reference.build_rate_object_space()
with TaskGroup("rate_object_space_validations") as rate_object_space_validations:
for test in v.RATE_OBJECT_SPACE_VALIDATION_TESTS:
test('tmp_rate_object_space')
(
spines
>> build_asc_plausibility_task
>> wait_for_spines
>> spine_validations
>> build_rate_object_space
>> rate_object_space_validations
)
with TaskGroup("raw_data", tooltip="Maps raw data to rate object space") as build_raw_tables:
with TaskGroup("raw_payer_data") as raw_payer_data:
payer_id_chunks = raw.get_payer_payer_chunks(n_chunks=6)
payer_chunks = raw.build_payer_mrf.partial().expand(payer_id_chunk=payer_id_chunks)
pg_prebuild = raw.build_pg_rates_prebuild.partial().expand(payer_id_chunk=payer_id_chunks)
payer_pg_chunks = raw.build_payer_pg_mrf.partial().expand(payer_id_chunk=payer_id_chunks)
custom_pg_logic_chunks = custom.apply_custom_pg_logic.partial().expand(payer_id_chunk=payer_id_chunks)
union_chunks = raw.union_payer_mrf(payer_id_chunks)
with TaskGroup("raw_payer_data_validations") as raw_payer_data_validations:
for test in v.RAW_PAYER_VALIDATION_TESTS:
test('tmp_raw_mrf_payer_rates')
(
payer_id_chunks
>> (payer_chunks, (pg_prebuild >> payer_pg_chunks >> custom_pg_logic_chunks))
>> union_chunks
>> raw_payer_data_validations
)
with TaskGroup("raw_hospital_data") as raw_hospital_data:
provider_id_lists = raw.get_hospital_chunks(n_chunks=3)
pg_provider_id_lists = raw.get_hospital_chunks(n_chunks=1)
ndc_derived_hcpcs = raw.ndc_derived_hcpcs()
with TaskGroup("custom_plan_bridge") as custom_plan_bridge:
hosp_prepare_plan_bridge_hmo_chunks = (
custom.hosp_prepare_plan_bridge_hmo.partial()
.expand(provider_id_chunk=provider_id_lists)
)
hosp_prepare_plan_bridge_ppo_chunks = (
custom.hosp_prepare_plan_bridge_ppo.partial()
.expand(provider_id_chunk=provider_id_lists)
)
hosp_build_custom_plan_bridge_chunks = (
custom.hosp_build_custom_plan_bridge.partial()
.expand(provider_id_chunk=provider_id_lists)
)
(hosp_prepare_plan_bridge_hmo_chunks, hosp_prepare_plan_bridge_ppo_chunks) >> hosp_build_custom_plan_bridge_chunks
hosp_prepare_base_chunks = (
raw.hosp_prepare_base.partial()
.expand(provider_id_chunk=provider_id_lists)
)
hosp_prepare_base = raw.hosp_prepare_base_union(provider_id_lists)
hosp_prepare_plan_bridge_combined_chunks = (
raw.hosp_prepare_plan_bridge_combined.partial()
.expand(provider_id_chunk=provider_id_lists)
)
hosp_build_plan_bridge_chunks = (
raw.hosp_build_plan_bridge.partial()
.expand(provider_id_chunk=provider_id_lists)
)
hosp_build_plan_bridge = raw.hosp_build_plan_bridge_union(provider_id_lists)
pg_prepare_plan_bridge_combined_chunks = (
raw.pg_prepare_plan_bridge_combined.partial()
.expand(provider_id_chunk=pg_provider_id_lists)
)
pg_build_plan_bridge_chunks = (
raw.pg_build_plan_bridge.partial()
.expand(provider_id_chunk=pg_provider_id_lists)
)
pg_build_plan_bridge = raw.pg_build_plan_bridge_union(pg_provider_id_lists)
hospital_providers_chunks = (
raw.build_hospital_mrf.partial()
.expand(provider_id_chunk=provider_id_lists)
)
hospital_pg_providers_chunks = (
raw.build_hospital_pg_mrf.partial()
.expand(provider_id_chunk=pg_provider_id_lists)
)
hospital_mrf = raw.build_hospital_mrf_union(provider_id_lists, pg_provider_id_lists)
with TaskGroup("raw_hospital_data_validations") as raw_hospital_data_validations:
for test in v.RAW_HOSPITAL_VALIDATION_TESTS:
test('tmp_raw_mrf_hospital_rates')
(
provider_id_lists
>> (
ndc_derived_hcpcs
>> hosp_prepare_base_chunks
>> hosp_prepare_base
>> hosp_prepare_plan_bridge_combined_chunks
>> hosp_build_plan_bridge_chunks
>> custom_plan_bridge
>> hosp_build_plan_bridge
>> hospital_providers_chunks,
pg_prepare_plan_bridge_combined_chunks
>> pg_build_plan_bridge_chunks
>> pg_build_plan_bridge
>> hospital_pg_providers_chunks
)
>> hospital_mrf
>> raw_hospital_data_validations
)
komodo_allowables = raw.build_komodo_allowables()
with TaskGroup("gross_charges") as gross_charges_group:
gc_provider_id_lists = raw.get_hospital_chunks(n_chunks=7, state_chunk=True)
gross_charges = raw.build_gross_charges.partial().expand(provider_id_list=gc_provider_id_lists)
gross_charges_union = raw.build_gross_charges_union(gc_provider_id_lists)
gc_provider_id_lists >> gross_charges >> gross_charges_union
combine_raw = raw.build_combined_raw()
with TaskGroup("combined_raw_validations") as combined_raw_validations:
for test in v.COMBINED_RAW_VALIDATION_TESTS:
test('tmp_int_combined_raw')
(
raw_payer_data,
raw_hospital_data,
komodo_allowables,
gross_charges,
) >> combine_raw >> combined_raw_validations
with TaskGroup("analysis", tooltip="benchmarks, imputations, transformations, accuracy") as build_analysis_tables:
benchmarks_task = benchmarks.build_benchmarks()
transformations_task = transformations.get_transformations()
with TaskGroup("accuracy_raw", tooltip="evaluate accuracy of raw and transformed values") as accuracy_raw_in_chunks:
payer_id_chunks = utils.get_ros_payer_chunks()
accuracy_chunks = (
accuracy.build_accuracy_raw.partial().expand(payer_id_chunk=payer_id_chunks)
)
accuracy_union = accuracy.build_accuracy_raw_union(payer_id_chunks)
accuracy_chunks >> accuracy_union
with TaskGroup("imputations") as imputations_group:
with TaskGroup("imputations_dependencies") as imputations_dependencies:
imputations_long_rates_raw_columns = imputations.get_imputations_long_rates_raw_columns()
imputations_long_rates_transformed_columns = imputations.get_imputations_long_rates_transformed_columns()
imputations_long_rates_combined = imputations.get_imputations_long_rates_combined()
imputations_long_rates_perc = imputations.get_imputations_long_rates_perc()
imputations_rc_global = imputations.get_imputations_rc_global()
imputations_rc_carveouts = imputations.get_imputations_rc_carveouts()
(imputations_long_rates_raw_columns, imputations_long_rates_transformed_columns) >> imputations_long_rates_combined
imputations_rc_global >> imputations_rc_carveouts
with TaskGroup("imputations_in_chunks") as imputations_in_chunks:
payer_id_chunks = utils.get_ros_payer_chunks()
imputations_chunks = (
imputations.get_imputations.partial().expand(payer_id_chunk=payer_id_chunks)
)
imputations_union = imputations.get_imputations_union(payer_id_chunks)
imputations_chunks >> imputations_union
imputations_msdrg_base_rates = imputations.get_imputations_msdrg_base_rates()
with TaskGroup("imputations_derived_in_chunks") as imputations_derived_in_chunks:
payer_id_chunks = utils.get_ros_payer_chunks()
imputations_derived_chunks = (
imputations.get_imputations_derived.partial().expand(payer_id_chunk=payer_id_chunks)
)
imputations_derived_union = imputations.get_imputations_derived_union(payer_id_chunks)
imputations_derived_chunks >> imputations_derived_union
imputations_cstm = imputations.get_imputations_cstm()
imputations_aprdrg = imputations.get_imputations_aprdrg()
(
imputations_dependencies
>> [
imputations_in_chunks,
imputations_msdrg_base_rates
]
>> imputations_derived_in_chunks
>> imputations_aprdrg,
imputations_cstm >> imputations_aprdrg
)
with TaskGroup(
"combined_brit_and_accuracy",
tooltip="""Add imputations, transformations, benchmarks to
`tmp_int_combined_raw` then evaluate accuracy all rates once again,
including imputations
"""
) as brit_combined:
payer_id_chunks = utils.get_ros_payer_chunks()
brit_chunks = (
main.build_combined_main_brit.partial().expand(
payer_id_chunk=payer_id_chunks
)
)
brit_union = (
main.build_combined_main_brit_union(payer_id_chunks)
)
with TaskGroup("accuracy_brit_validations") as accuracy_brit_validations:
for test in v.ACCURACY_BENCHMARK_VALIDATION_TESTS:
test('tmp_int_accuracy_brit')
build_accuracy_brit_drugs_task = accuracy.build_accuracy_brit_drugs()
build_accuracy_brit_labs_task = accuracy.build_accuracy_brit_labs()
build_accuracy_brit_medical_task = accuracy.build_accuracy_brit_medical()
build_accuracy_brit_physician_groups_task = accuracy.build_accuracy_brit_physician_groups()
build_accuracy_brit_union_task = accuracy.build_accuracy_brit_union()
(
brit_chunks
>> brit_union
>> [build_accuracy_brit_drugs_task, build_accuracy_brit_labs_task, build_accuracy_brit_medical_task, build_accuracy_brit_physician_groups_task]
>> build_accuracy_brit_union_task
>> accuracy_brit_validations
)
(
[
benchmarks_task,
transformations_task
]
>> accuracy_raw_in_chunks
>> imputations_group
>> brit_combined
)
with TaskGroup("main") as build_main_tables:
payer_id_chunks = utils.get_ros_payer_chunks()
with TaskGroup("main_all") as build_main_all_tables:
main_combined_chunks = (
main.build_combined_main.partial().expand(payer_id_chunk=payer_id_chunks)
)
main_combined_create = main.create_combined_main_union()
main_combined_insert = main.insert_combined_main_union(payer_id_chunks)
main_combined_chunks >> main_combined_create >> main_combined_insert
with TaskGroup("main_no_whisp_validations") as main_no_whisp_combined_validations:
for test in v.MAIN_VALIDATION_TESTS:
test('tmp_int_combined_no_whisp')
with TaskGroup("whispers") as build_whispers_tables:
build_whisper_references = [
reference.build_policy_reporter(),
reference.build_definitive_hospital_payer_mix(),
reference.build_definitive_hospital_market_share(),
reference.build_quality_cms(),
reference.build_quality_leapfrog(),
reference.build_utilizations()
]
wait_for_whisper_references = EmptyOperator(task_id="wait_for_whisper_references")
whisper_tasks = [
whispers.build_payer(),
whispers.build_network(),
whispers.build_provider(),
whispers.build_code(),
whispers.build_provider_payer(),
whispers.build_provider_code(),
whispers.build_payer_code(),
whispers.build_provider_payer_network(),
]
wait_for_whispers = EmptyOperator(task_id="wait_for_whispers")
with TaskGroup("whispers_validation") as whispers_validation:
whispers_qa = []
for table,cols in qau.WHISPERS_UNIQUENESS_TESTS.items():
whispers_qa.append(
qau
.uniqueness_check
.override(task_id=f"whispers_qa_{table}_unique")
(table=table, columns=cols)
)
whispers_qa.append(
qau
.nonnull_check
.override(task_id=f"whispers_qa_{table}_nonnull")
(table=table)
)
(
build_whisper_references
>> wait_for_whisper_references
>> whisper_tasks
>> wait_for_whispers
>> whispers_validation
)
with TaskGroup("main_add_whispers") as build_add_whispers_to_main_all:
main_combined_chunks = (
main.add_whispers_to_main_all.partial().expand(payer_id_chunk=payer_id_chunks)
)
main_combined_create = main.create_whispers_to_main_all_union()
main_combined_insert = main.insert_whispers_to_main_all_union(payer_id_chunks)
main_combined_chunks >> main_combined_create >> main_combined_insert
with TaskGroup("main_all_validations") as main_combined_validations:
for test in v.MAIN_VALIDATION_TESTS:
test('tmp_int_combined')
cleanup_main_with_whisper_chunk_tables = main.cleanup_whispers_to_main_all_union(payer_id_chunks)
(
build_main_all_tables
>> main_no_whisp_combined_validations
>> build_whispers_tables
>> build_add_whispers_to_main_all
>> main_combined_validations
>> cleanup_main_with_whisper_chunk_tables
)
(
start_dag
>> wait_30_seconds
>> build_ros_and_spines
>> build_raw_tables
>> build_analysis_tables
>> build_main_tables
)