"""
Core Licensable Data: Orchestrator DAG
"""
import datetime
import logging
import json
from pathlib import Path
from airflow import DAG
from airflow.models import Variable
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
logger = logging.getLogger(__name__)
DAG_ID = "core_licensable_data_orchestrator"
SUB_DAG_ID = "core_licensable_data_sub_dag"
OWNER = "data_science"
DAG_TAG = [
"turquoise-health",
"data-science",
"cld",
"orchestrator"
]
DEFAULT_SETTINGS = Variable.get(
key=f"datascience/default_settings_{DAG_ID}", default_var={}, deserialize_json=True
)
DEFAULT_PARAMS = {
"orchestrator_sql_loc": DEFAULT_SETTINGS.get("orchestrator_sql_loc", f"{Path(__file__).parent}/sql"),
"sql_loc": DEFAULT_SETTINGS.get("sql_loc", f"{Path(__file__).parent}/sql"),
"s3_trino_bucket": DEFAULT_SETTINGS.get("s3_trino_bucket", "turquoise-health-payer-export-main"),
"db_conn_name": DEFAULT_SETTINGS.get("db_conn_name", "trino_default"),
"payer_schemas_list": DEFAULT_SETTINGS.get(
"payer_schemas_list",
[
"tq_production.public_2025_09",
"tq_production.public_2025_08",
"tq_production.public_2025_07",
"tq_production.public_2025_06",
"tq_production.public_2025_02",
"tq_production.public_2024_09"
]
),
"hospital_schemas_list": DEFAULT_SETTINGS.get(
"hospital_schemas_list",
[
"tq_production.hospital_data",
"tq_production.hospital_historical_2025_08",
"tq_production.hospital_historical_2025_07",
"tq_production.hospital_historical_2025_06",
"tq_production.hospital_historical_2025_02",
"tq_production.hospital_historical_2024_09",
]
),
"sub_version_list": DEFAULT_SETTINGS.get(
"sub_version_list",
[
"2025_09",
"2025_08",
"2025_07",
"2025_06",
]
),
"historic_sub_version_dict": DEFAULT_SETTINGS.get(
"historic_sub_version_dict",
{
"6m": "2025_02",
"12m": "2024_09",
}
),
"version": DEFAULT_SETTINGS.get("version", "vX_X"),
"release_note_compare_version": DEFAULT_SETTINGS.get("version", None),
"schema_name": DEFAULT_SETTINGS.get("schema_name", "tq_dev.internal_dev_csong_cld_"),
"provider_types": DEFAULT_SETTINGS.get(
"provider_types",
[
'Hospital',
'ASC',
'Physician Group',
'Laboratory',
'Imaging Center'
]
),
"run_historic_cmv_subdags": DEFAULT_SETTINGS.get("run_historic_cmv_subdags", False),
}
TEST_PARAMS = {
"orchestrator_sql_loc": "/opt/airflow/dags/core_licensable_data_orchestrator/sql",
"sql_loc": "/opt/airflow/dags/core_licensable_data_orchestrator/sql",
"s3_trino_bucket": "turquoise-health-payer-export-main",
"db_conn_name": "trino_default",
"payer_schemas_list": ["hive.cld_utils"],
"hospital_schemas_list": ["hive.cld_utils"],
"sub_version_list": ["test"],
"version": "v0_1",
"schema_name": "tq_dev.internal_dev_csong_cld_"
}
DEFAULT_ARGS = {
"owner": OWNER,
"depends_on_past": False,
"start_date": datetime.datetime(2024, 1, 1, 0, 0),
}
DB_CONN_NAME = "trino_default"
with DAG(
dag_id=DAG_ID,
description="Builds core licensable data",
default_args=DEFAULT_ARGS,
schedule_interval=None,
catchup=False,
tags=DAG_TAG,
params=DEFAULT_PARAMS,
max_active_runs=1,
max_active_tasks=12,
) as dag:
from data_science.cld.core_licensable_data_orchestrator import tasks, views
from data_science.cld.core_licensable_data_sub_dag import qa as v
from data_science.cld.core_licensable_data_sub_dag.qa import (
uniqueness as qau,
rate_types as qart,
prod as qap
)
months_list = tasks.get_sub_versions_list()
trigger_config_list = tasks.build_trigger_config(months_list)
with TaskGroup("run_sub_dags") as run_sub_dags:
sleep_1_minutes = tasks.sleep_1_minutes()
trigger_sub_dags = TriggerDagRunOperator.partial(
task_id="trigger_core_licensable_data_sub_dag",
trigger_dag_id=SUB_DAG_ID,
wait_for_completion=True,
).expand(
conf=trigger_config_list
)
sleep_1_minutes >> trigger_sub_dags
cmv_trigger_config_list = tasks.build_trigger_config_cmv()
with TaskGroup("run_cmv_only_sub_dags") as run_cmv_only_sub_dags:
trigger_cmv_only_sub_dags = TriggerDagRunOperator.partial(
task_id="trigger_core_licensable_data_sub_dag",
trigger_dag_id=SUB_DAG_ID,
wait_for_completion=True,
).expand(
conf=cmv_trigger_config_list
)
trigger_cmv_only_sub_dags
with TaskGroup("traceability_tables") as traceability_tables:
build_traceability_to_raw_data = tasks.build_traceability_to_raw_data()
traceability_formulas = tasks.build_traceability_formulas_table()
with TaskGroup("rate_object_space") as rate_object_space:
tasks.build_merged_rate_object_space()
with TaskGroup("prod_tables") as prod_tables:
chunks = tasks.get_payer_ids(
large_payer_max_chunk_n=25_000_000,
max_payer_n=25_000_000,
n_chunks=35_000_000,
table_name="tmp_prod_chunks_abridged"
)
main_chunks = tasks.build_merged_combined_chunks.partial().expand(
n_chunk=chunks
)
with TaskGroup("qa_abridged") as qa_abridged:
for test in v.ORCHESTRATOR_ABRIDGED_TESTS:
test('prod_combined_abridged')
combine_abridged_joined_chunks = tasks.build_merged_combined_abridged_joined_chunks.partial().expand(
n_chunk=chunks
)
combine_abridged = tasks.build_union_combined_abridged(
chunks
)
with TaskGroup("qa_all") as qa_all:
for test in v.ORCHESTRATOR_COMBINED_ALL_TESTS:
test('prod_combined_all')
combine_all_joined_chunks = tasks.build_merged_combined_all_joined_chunks.partial().expand(
n_chunk=chunks
)
combine_all = tasks.build_union_combined_all(
chunks
)
cleanup_chunks = tasks.cleanup_prod_abridged_and_all_chunks(chunks)
(
main_chunks
>> combine_abridged_joined_chunks
>> combine_abridged
>> qa_abridged,
main_chunks
>> combine_all_joined_chunks
>> combine_all
>> qa_all
)
(
qa_abridged >> cleanup_chunks,
qa_all >> cleanup_chunks
)
with TaskGroup("prod_views") as prod_views:
with TaskGroup("prod_rollups") as prod_rollups:
views.create_rollup_provider()
views.create_rollup_payer()
views.create_rollup_network()
views.create_rollup_provider_network()
views.create_rollup_code()
views.create_rollup_cbsa_code()
with TaskGroup("rollups_validation") as rollups_validation:
rollups_qa = []
for table,cols in qau.ROLLUP_UNIQUENESS_TESTS.items():
rollups_qa.append(
qau
.uniqueness_check
.override(task_id=f"rollups_qa_{table}_unique")
(table=table, columns=cols)
)
rollups_qa.append(
qau
.nonnull_check
.override(task_id=f"rollups_qa_{table}_nonnull")
(table=table)
)
(
prod_rollups >> rollups_validation
)
with TaskGroup("release_notes") as release_notes:
tasks.build_release_notes()
with TaskGroup("prod_combined_checks") as prod_combined_checks:
with TaskGroup("prod_table_misc_tests") as prod_table_misc_tests:
qap.non_outlier_median_canonical_rate(table="prod_combined_abridged")
qap.non_outlier_median_canonical_percentage_of_state_avg_medicare(table="prod_combined_abridged")
qap.no_negative_rates(table="prod_combined_abridged")
qap.no_rates_gtr_20m(table="prod_combined_abridged")
qap.non_outlier_coverage_gtr_30_pct(table="prod_combined_abridged")
qap.provider_type_coverage(table="prod_combined_abridged")
with TaskGroup("traceability") as traceability:
qap.prof_rates_have_best_payer_hospital_rate(table="prod_combined_abridged")
qap.ip_op_have_best_payer_hospital_rate(table="prod_combined_abridged")
qap.best_payer_hospital_score_completeness(table="prod_combined_abridged")
with TaskGroup("consistency") as consistency:
qap.version_comparison_test(table="prod_combined_abridged")
with TaskGroup("analyze_report_simulation_checks") as analyze_report_simulation_checks:
for config in qap.ANALYZE_REPRODUCE_CONFIGS:
(
qap
.analyze_report_reproduction
.override(task_id=f"analyze_report_reproduction_{config['org']}")
(
table="prod_combined_abridged",
config=config
)
)
with TaskGroup("rate_type_checks") as rate_type_checks:
with TaskGroup("raw_rate_types") as raw_rate_types:
raw_rate_type_tests = []
for rate_type in qart.RAW_HOSPITAL_RATE_TYPES:
raw_rate_type_tests.append(
qart
.raw_hospital_rate_type
.override(task_id=f"raw_hospital_{rate_type.split(': ')[-1]}")
(table="prod_combined_all", rate_type=rate_type)
)
for rate_type in qart.RAW_HOSPITAL_ALLOWED_AMOUNT_RATE_TYPES:
raw_rate_type_tests.append(
qart
.raw_hospital_allowed_amount_rate_type
.override(task_id=f"raw_hospital_aa_{rate_type.split(': ')[-1]}")
(table="prod_combined_all", rate_type=rate_type)
)
for rate_type in qart.RAW_PAYER_RATE_TYPES:
raw_rate_type_tests.append(
qart
.raw_payer_rate_type
.override(task_id=f"raw_payer_{rate_type.split(': ')[-1]}")
(table="prod_combined_all", rate_type=rate_type)
)
with TaskGroup("transform_rate_types") as transform_rate_types:
for rate_type in qart.TRANSFORMED_HOSPITAL_RATE_TYPES:
raw_rate_type_tests.append(
qart
.transform_hospital_rate_type
.override(task_id=f"transform_hospital_{rate_type.split(': ')[-1]}")
(table="prod_combined_all", rate_type=rate_type)
)
for rate_type in qart.TRANSFORMED_PAYER_RATE_TYPES:
raw_rate_type_tests.append(
qart
.transform_payer_rate_type
.override(task_id=f"transform_payer_{rate_type.split(': ')[-1]}")
(table="prod_combined_all", rate_type=rate_type)
)
with TaskGroup("impute_rate_types") as impute_rate_types:
impute_rate_type_tests = []
for rate_type, rate_type_config in qart.IMPUTATION_RATE_TYPES.items():
impute_rate_type_tests.append(
qart
.impute_rate_type
.override(task_id=f"impute_{rate_type.split(': ')[-1]}")
(table="prod_combined_all", rate_type=rate_type, rate_type_config=rate_type_config)
)
canonical_method_params_and_formula = qap.canonical_method_params_formula(
table="prod_combined_abridged"
)
(
(
raw_rate_types
>> transform_rate_types
>> impute_rate_types
),
canonical_method_params_and_formula
)
(
prod_table_misc_tests
>> traceability
>> consistency
>> analyze_report_simulation_checks
>> rate_type_checks
)
create_prod_clear_rates_external = tasks.build_prod_clear_rates_external()
trigger_recommendation_features = TriggerDagRunOperator(
task_id="trigger_recommendation_features",
trigger_dag_id="recommendation_features",
wait_for_completion=False,
conf={
"cld_schema_version": "{{ params.version }}",
"schema_month": "{{ params.sub_version_list[0] }}",
},
)
(
trigger_config_list
>> run_sub_dags
>> [
traceability_tables,
rate_object_space
]
>> prod_tables
>> [
prod_views,
run_cmv_only_sub_dags
]
>> release_notes
>> prod_combined_checks
>> create_prod_clear_rates_external
>> trigger_recommendation_features
)