Skip to main content
Version: Canary - 2.3 🚧

Merge Sub-DAGs

Schema

  • sub_versions_list is a list of sub_versions, e.g. ['2024_11', '2024_10', '2024_09', '2024_08']

1. prepare chunks​

Get best sub_version for each roid.

Get Payer IDs​

SELECT 
payer_id,
network_id,
provider_type,
bill_type,
substring(billing_code,1,1) as prefix,
count(*) as n
FROM {{ schema_name }}.prod_rate_object_space
GROUP BY 1, 2, 3, 4, 5

Merge Combined Chunks​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.prod_combined_base_{{ n_chunk}}
-- Chunk: {{ n_chunk }}

{% set identifiers = [
'roid', 'payer_id', 'network_id', 'provider_id', 'bill_type',
'billing_code', 'billing_code_type', 'facility'
] %}

-- SPINES
{% set payer_spine_cols = cld_params.PAYER_SPINE_COLS %}
{% set network_spine_cols = cld_params.NETWORK_SPINE_COLS %}
{% set prov_spine_cols = cld_params.PROV_SPINE_COLS %}
{% set code_spine_cols = cld_params.CODE_SPINE_COLS %}


-- CHARGES + ALLOWED AMOUNTS
{% set gross_charges = cld_params.GROSS_CHARGE_COLS + cld_params.GROSS_CHARGE_STATS %}
{% set cash_charges = cld_params.CASH_COLS %}

-- BENCHMARKS
{% set benchmark_cols = cld_params.BENCHMARK_COLS %}

-- UTILIZATIONS
{% set utilization_cols = cld_params.UTILIZATION_COLS %}

-- OUTLIER BOUNDS
{% set outlier_bounds = cld_params.OUTLIER_BOUNDS %}

-- settings: dags/core_licensable_data_sub_dag/utils/params.py
-- WHISPERS
{% set payer_whispers = cld_params.PAYER_WHISPERS %}
{% set network_whispers = cld_params.NETWORK_WHISPERS %}
{% set provider_whispers = cld_params.PROVIDER_WHISPERS %}
{% set code_whispers = cld_params.CODE_WHISPERS %}
{% set provider_code_whispers = cld_params.PROVIDER_CODE_WHISPERS %}
{% set provider_payer_whispers = cld_params.PROVIDER_PAYER_WHISPERS %}
{% set provider_payer_network_whispers = cld_params.PROVIDER_PAYER_NETWORK_WHISPERS %}
{% set payer_code_whispers = cld_params.PAYER_CODE_WHISPERS %}

-- ====================
-- COMBINE
-- ====================
{% set spines = prov_spine_cols + payer_spine_cols + network_spine_cols + code_spine_cols %}
{% set whispers = payer_whispers + provider_whispers + code_whispers + provider_code_whispers + provider_payer_whispers + provider_payer_network_whispers + payer_code_whispers %}

{% set base_columns = identifiers + spines +
gross_charges + cash_charges +
benchmark_cols + outlier_bounds
+ utilization_cols + whispers
+ ['payer_network_name']
%}

{% set abridged_columns = cld_params.views.ABRIDGED_COLUMNS.keys() %}

{% set sub_versions = sub_versions_list | sort(reverse=True) %}

{% macro render_array(field, alias) -%}
ARRAY[
{% if field in ('canonical_rate_score','best_payer_rate_score','best_hospital_rate_score') %}
0
-- always populate medicare_rate and medicare_pricing_type from the latest subversion if best score is 0
{% elif field in ('medicare_rate', 'medicare_pricing_type') %}
v{{ sub_versions[0] }}.{{ field }}
{%- else -%}
NULL
{%- endif -%},
{%- for sub_version in sub_versions -%}
{%- if field == 'sub_version' -%}
'{{ sub_version }}'
{%- elif field in ('canonical_rate_score', 'best_payer_rate_score', 'best_hospital_rate_score') -%}
COALESCE(v{{ sub_version }}.{{ field }}, 0)
{%- else -%}
v{{ sub_version }}.{{ field }}
{%- endif -%}{% if not loop.last %},{% endif %}
{%- endfor -%}
] as {{ alias }}
{%- endmacro %}


{% macro render_historic_subversion_columns(fields, version_map) -%}
{%- set items = [] -%}
{%- for label, sub_version in version_map.items() -%}
{%- for field in fields -%}
{%- set _ = items.append("v%s.%s AS lag_%s_%s" % (sub_version, field, label, field)) -%}
{%- endfor -%}
{%- endfor -%}
{{ items | join(', ') }}
{%- endmacro %}


CREATE OR REPLACE TABLE {{ schema_name }}.prod_combined_base_{{ n_chunk}}

WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
WITH

main_abridged AS (
SELECT
{% for col in identifiers + ['provider_type'] %}
v{{ sub_versions[0] }}.{{ col }},
{% endfor %}

-- subversions
{{ render_array('sub_version', 'canonical_rate_subversion_array') }},

-- scores
{{ render_array('canonical_rate_score', 'canonical_rate_score_array') }},
{{ render_array('canonical_rate_id', 'canonical_rate_id_array') }},
{{ render_array('canonical_rate_source', 'canonical_rate_source_array') }},
{{ render_array('canonical_rate_type', 'canonical_rate_type_array') }},
{{ render_array('canonical_method_params', 'canonical_method_params_array') }},

-- medicare_rate
{{ render_array('medicare_rate', 'medicare_rate_array') }},
{{ render_array('medicare_pricing_type', 'medicare_pricing_type_array') }},

-- best payer
{{ render_array('best_payer_rate_score', 'best_payer_rate_score_array') }},
{{ render_array('best_payer_rate', 'best_payer_rate_array') }},
{{ render_array('best_payer_rate_type', 'best_payer_rate_type_array') }},

-- best hospital
{{ render_array('best_hospital_rate_score', 'best_hospital_rate_score_array') }},
{{ render_array('best_hospital_rate', 'best_hospital_rate_array') }},
{{ render_array('best_hospital_rate_type', 'best_hospital_rate_type_array') }},

-- historic fields
{{ render_historic_subversion_columns([
'canonical_rate',
'canonical_rate_type',
'canonical_method_params',
'canonical_rate_source',
'canonical_rate_percent_of_medicare',
'canonical_rate_percent_of_state_avg_medicare',
'canonical_rate_score',
'canonical_gross_charge',
'canonical_gross_charge_type',
'canonical_gross_charge_source',
], historic_sub_versions_dict) }}

FROM {{ schema_name }}.tmp_int_combined_{{ sub_versions[0] }} v{{ sub_versions[0] }}
JOIN {{ schema_name }}.tmp_prod_chunks_abridged chunks
ON chunks.chunk_id = '{{ n_chunk }}'
AND chunks.payer_id = v{{ sub_versions[0] }}.payer_id
AND chunks.network_id = v{{ sub_versions[0] }}.network_id
AND chunks.provider_type = v{{ sub_versions[0] }}.provider_type
AND chunks.bill_type = v{{ sub_versions[0] }}.bill_type
AND chunks.prefix = SUBSTRING(v{{ sub_versions[0] }}.billing_code, 1, 1)
{% for sub_version in sub_versions[1:] %}
LEFT JOIN {{ schema_name }}.tmp_int_combined_{{ sub_version }} v{{ sub_version }}
ON v{{ sub_versions[0] }}.roid = v{{ sub_version }}.roid
AND v{{ sub_versions[0] }}.payer_id = v{{ sub_version }}.payer_id
AND v{{ sub_versions[0] }}.network_id = v{{ sub_version }}.network_id
AND v{{ sub_versions[0] }}.provider_type = v{{ sub_version }}.provider_type
AND v{{ sub_versions[0] }}.bill_type = v{{ sub_version }}.bill_type
{% endfor %}
-- historic subversions
{% for sub_version in historic_sub_versions_dict.values() %}
LEFT JOIN {{ schema_name }}.tmp_int_combined_{{ sub_version }} v{{ sub_version }}
ON v{{ sub_versions[0] }}.roid = v{{ sub_version }}.roid
AND v{{ sub_versions[0] }}.payer_id = v{{ sub_version }}.payer_id
AND v{{ sub_versions[0] }}.network_id = v{{ sub_version }}.network_id
AND v{{ sub_versions[0] }}.provider_type = v{{ sub_version }}.provider_type
AND v{{ sub_versions[0] }}.bill_type = v{{ sub_version }}.bill_type
{% endfor %}
WHERE
v{{ sub_versions[0] }}.payer_id IN (
SELECT DISTINCT payer_id
FROM {{ schema_name }}.tmp_prod_chunks_abridged
WHERE chunk_id = '{{ n_chunk }}'
)
),
canonical_selections AS (
SELECT

{% for col in identifiers + ['provider_type'] %}
main.{{ col }},
{% endfor %}

ARRAY_MAX(main.canonical_rate_score_array) as canonical_rate_score,

main.canonical_rate_subversion_array[
ARRAY_POSITION(main.canonical_rate_score_array, ARRAY_MAX(main.canonical_rate_score_array))
] as canonical_rate_subversion,

main.canonical_rate_id_array[
ARRAY_POSITION(main.canonical_rate_score_array, ARRAY_MAX(main.canonical_rate_score_array))
] as canonical_rate_id,

main.canonical_rate_source_array[
ARRAY_POSITION(main.canonical_rate_score_array, ARRAY_MAX(main.canonical_rate_score_array))
] as canonical_rate_source,

main.canonical_rate_type_array[
ARRAY_POSITION(main.canonical_rate_score_array, ARRAY_MAX(main.canonical_rate_score_array))
] as canonical_rate_type,

main.canonical_method_params_array[
ARRAY_POSITION(main.canonical_rate_score_array, ARRAY_MAX(main.canonical_rate_score_array))
] as canonical_method_params,

-- medicare_rate
main.medicare_rate_array[
ARRAY_POSITION(main.canonical_rate_score_array, ARRAY_MAX(main.canonical_rate_score_array))
] as medicare_rate,

main.medicare_pricing_type_array[
ARRAY_POSITION(main.canonical_rate_score_array, ARRAY_MAX(main.canonical_rate_score_array))
] as medicare_pricing_type,

-- best payer rate

main.best_payer_rate_array[
ARRAY_POSITION(main.best_payer_rate_score_array, ARRAY_MAX(main.best_payer_rate_score_array))
] as best_payer_rate,

main.best_payer_rate_type_array[
ARRAY_POSITION(main.best_payer_rate_score_array, ARRAY_MAX(main.best_payer_rate_score_array))
] as best_payer_rate_type,

ARRAY_MAX(main.best_payer_rate_score_array) as best_payer_rate_score,

main.canonical_rate_subversion_array[
ARRAY_POSITION(main.best_payer_rate_score_array, ARRAY_MAX(main.best_payer_rate_score_array))
] as best_payer_rate_subversion,

-- best hospital rate
main.best_hospital_rate_array[
ARRAY_POSITION(main.best_hospital_rate_score_array, ARRAY_MAX(main.best_hospital_rate_score_array))
] as best_hospital_rate,

main.best_hospital_rate_type_array[
ARRAY_POSITION(main.best_hospital_rate_score_array, ARRAY_MAX(main.best_hospital_rate_score_array))
] as best_hospital_rate_type,

ARRAY_MAX(main.best_hospital_rate_score_array) as best_hospital_rate_score,

main.canonical_rate_subversion_array[
ARRAY_POSITION(main.best_hospital_rate_score_array, ARRAY_MAX(main.best_hospital_rate_score_array))
] as best_hospital_rate_subversion,

{{ historic_fields| join(', main.') }}

FROM main_abridged as main
)
SELECT
{% for col in identifiers + ['provider_type'] %}
cs.{{ col }},
{% endfor %}
cs.canonical_rate_source,
cs.canonical_rate_type,
cs.canonical_rate_score,
cs.canonical_rate_id,
cs.canonical_rate_subversion,
cs.canonical_method_params,
-- special handling for anesthesia canonical method formula because it is payer-specific
CASE
WHEN cs.canonical_method_params LIKE '%anesthesia%'
THEN JSON_EXTRACT_SCALAR(cs.canonical_method_params, '$.anesthesia_conversion_method')
ELSE f.canonical_method_formula
END AS canonical_method_formula,
cs.medicare_rate,
cs.medicare_pricing_type,
cs.best_payer_rate,
cs.best_payer_rate_type,
cs.best_payer_rate_score,
cs.best_payer_rate_subversion,
cs.best_hospital_rate,
cs.best_hospital_rate_type,
cs.best_hospital_rate_score,
cs.best_hospital_rate_subversion,

{{ historic_fields | join(', cs.') }},

trace.traceability_id,
trace.sub_version as traceability_subversion,
trace.payer_file_hash,
trace.payer_location_data_source_name,
{% if sub_versions_list[0] == 'test' %}
CAST(TRY_CAST(trace.payer_last_updated_on AS TIMESTAMP) AS DATE) as payer_last_updated_on,
trace.hospital_file_id,
CAST(TRY_CAST(trace.hospital_ingested_on AS TIMESTAMP) AS DATE) as hospital_ingested_on,
{% else %}
COALESCE(
CAST(TRY_CAST(trace.payer_last_updated_on AS TIMESTAMP) AS DATE),
CAST(
replace(cs.canonical_rate_subversion, '_', '-') || '-01' AS DATE
)
) as payer_last_updated_on,
trace.hospital_file_id,
COALESCE(
CAST(TRY_CAST(trace.hospital_ingested_on AS TIMESTAMP) AS DATE),
CAST(
replace(cs.canonical_rate_subversion, '_', '-') || '-01' AS DATE
)
) as hospital_ingested_on,
{% endif %}
trace.hospital_loaded_on,
trace.hospital_filename,
trace.hospital_version,
trace.raw_id as trace_raw_id
FROM canonical_selections cs
LEFT JOIN {{ schema_name }}.prod_traceability_to_raw_data trace
ON cs.roid = trace.roid
AND cs.canonical_rate_id = trace.raw_id
AND cs.canonical_rate_subversion = trace.sub_version
AND cs.payer_id = trace.payer_id
AND cs.canonical_rate_source = trace.source
AND trace.rn = 1
AND trace.payer_id IN (
SELECT DISTINCT payer_id
FROM {{ schema_name }}.tmp_prod_chunks_abridged
WHERE chunk_id = '{{ n_chunk }}'
)
LEFT JOIN {{ schema_name }}.prod_traceability_formulas f
ON cs.canonical_rate_type = f.canonical_rate_type


2. prod_combined_abridged​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.prod_combined_abridged_joined_{{ n_chunk}}
-- Chunk: {{ n_chunk }}

{% set pre_loaded_columns = [
'roid',
'payer_id',
'network_id',
'provider_id',
'bill_type',
'billing_code',
'billing_code_type',
'facility',
'provider_type',

'canonical_rate_source',
'canonical_rate_type',
'canonical_rate_score',
'canonical_rate_subversion',
'medicare_rate',
'medicare_pricing_type',
'best_payer_rate',
'best_payer_rate_score',
'best_payer_rate_type',
'best_payer_rate_subversion',
'best_hospital_rate',
'best_hospital_rate_score',
'best_hospital_rate_type',
'best_hospital_rate_subversion',
'payer_location_data_source_name',
'payer_file_hash',
'payer_last_updated_on',
'hospital_filename',
'hospital_ingested_on',
'canonical_method_formula'
] %}

{% set best_payer_hospital_cols = [
'best_payer_rate',
'best_payer_rate_score',
'best_payer_rate_type',
'best_payer_rate_subversion',
'best_hospital_rate',
'best_hospital_rate_score',
'best_hospital_rate_type',
'best_hospital_rate_subversion'
] %}

{% set rate_cols = [
'medicare_rate',
'state_avg_medicare_rate',
'national_avg_medicare_rate',
'cbsa_avg_medicare_rate',
'asc_avg_medicare_rate',
'lab_avg_medicare_rate',
'discounted_cash_rate',
'canonical_gross_charge',
'canonical_rate',
'canonical_rate_percent_of_medicare',
'canonical_rate_percent_of_cbsa_avg_medicare',
'canonical_rate_percent_of_state_avg_medicare',
'canonical_rate_percent_of_national_avg_medicare',
'canonical_rate_percent_of_asc_medicare',
'canonical_rate_percent_of_lab_medicare',
'canonical_rate_percent_of_list',
'best_payer_rate',
'best_hospital_rate',
'lag_6m_canonical_rate',
'lag_12m_canonical_rate'
] %}

{% set ns = namespace(abridged_exclusive=[]) %}
{% for col in cld_params.views.ABRIDGED_COLUMNS.keys() %}
{% if col not in pre_loaded_columns and not col.startswith('lag_') %}
{% set _ = ns.abridged_exclusive.append(col) %}
{% endif %}
{% endfor %}

{% set sub_versions = sub_versions_list | sort(reverse=True) %}

CREATE OR REPLACE TABLE {{ schema_name }}.prod_combined_abridged_joined_{{ n_chunk }}
WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
WITH

{% for sub_version in sub_versions %}
joined_{{ sub_version }} AS (
SELECT
{% for col in pre_loaded_columns %}
{% if col in best_payer_hospital_cols %}
CASE
WHEN df.canonical_rate_score = {{ cld_params.VALIDATED_SCORE }}
THEN
{% if col in rate_cols %}
ROUND(v{{ sub_version }}.{{ col }}, 2)
{% elif 'subversion' in col %}
'{{ sub_version }}'
{% elif 'score' in col %}
{{ cld_params.VALIDATED_SCORE }}
{% else %}
v{{ sub_version }}.{{ col }}
{% endif %}
ELSE df.{{ col }}
END AS {{ col }},
{% else %}
df.{{ col }},
{% endif %}
{% endfor %}
{% for col in ns.abridged_exclusive %}
{% if col in rate_cols %}
ROUND(v{{ sub_version }}.{{ col }}, 2) AS {{ col }},
{% else %}
v{{ sub_version }}.{{ col }},
{% endif %}
{% endfor %}
{% for col in historic_fields %}
{% if col in rate_cols %}
ROUND(df.{{ col }}, 2) AS {{ col }},
{% else %}
df.{{ col }},
{% endif %}
{% endfor %}

{% macro delta_expression(label, rate_col, sub_version) %}
v{{ sub_version }}.{{ rate_col }} - df.lag_{{ label }}_{{ rate_col }} AS lag_{{ label }}_{{ rate_col }}_delta
{% endmacro %}

{% set deltas = [] %}
{% for label in historic_sub_versions_dict.keys() %}
{% for rate_col in ['canonical_rate', 'canonical_rate_percent_of_medicare', 'canonical_rate_percent_of_state_avg_medicare'] %}
{% set _ = deltas.append(delta_expression(label, rate_col, sub_version)) %}
{% endfor %}
{% endfor %}
{{ deltas | join(',\n') }}

FROM {{ schema_name }}.prod_combined_base_{{ n_chunk }} df
LEFT JOIN {{ schema_name }}.tmp_int_combined_{{ sub_version }} v{{ sub_version }}
ON df.roid = v{{ sub_version }}.roid
AND df.payer_id = v{{ sub_version }}.payer_id
AND df.network_id = v{{ sub_version }}.network_id
AND df.provider_type = v{{ sub_version }}.provider_type
AND df.bill_type = v{{ sub_version }}.bill_type
WHERE df.canonical_rate_subversion = '{{ sub_version }}'
),
{% endfor %}

-- NULL subversions
joined_null AS (
SELECT
{% for col in pre_loaded_columns %}
{% if col in rate_cols %}
ROUND(df.{{ col }}, 2) AS {{ col }},
{% else %}
df.{{ col }},
{% endif %}
{% endfor %}
{% for col in ns.abridged_exclusive %}
NULL AS {{ col }},
{% endfor %}

{% for col in historic_fields %}
NULL AS {{ col }},
{% endfor %}

{% set delta_names = [] %}
{% for label in historic_sub_versions_dict.keys() %}
{% for rate_col in ['canonical_rate', 'canonical_rate_percent_of_medicare', 'canonical_rate_percent_of_state_avg_medicare'] %}
{% set _ = delta_names.append('lag_' ~ label ~ '_' ~ rate_col ~ '_delta') %}
{% endfor %}
{% endfor %}
NULL AS {{ delta_names | join(', NULL AS ') }}
FROM {{ schema_name }}.prod_combined_base_{{ n_chunk }} df
WHERE df.canonical_rate_subversion IS NULL
)

-- UNION ALL
{% for sub_version in sub_versions %}
SELECT
{% for col in cld_params.views.ABRIDGED_COLUMNS.keys() %}
{{ col }}{% if not loop.last %}, {% endif %}
{% endfor %}
FROM joined_{{ sub_version }}
UNION ALL
{% endfor %}
SELECT
{% for col in cld_params.views.ABRIDGED_COLUMNS.keys() %}
{{ col }}{% if not loop.last %}, {% endif %}
{% endfor %}
FROM joined_null

union abridged:​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.prod_combined_abridged

CREATE OR REPLACE TABLE {{ schema_name }}.prod_combined_abridged
WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
{% for n_chunk in n_chunks %}
SELECT * FROM {{ schema_name }}.prod_combined_abridged_joined_{{ n_chunk }}
{% if not loop.last %} UNION ALL {% endif %}
{% endfor %}

3. prod_combined_all​

for each payer_id chunk:​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.prod_combined_all_joined_{{ n_chunk }}
-- Chunk: {{ n_chunk }}

{% set pre_loaded_columns = [
'roid',
'payer_id',
'network_id',
'provider_id',
'bill_type',
'billing_code',
'billing_code_type',
'facility',
'provider_type',

'canonical_rate_source',
'canonical_rate_type',
'canonical_rate_score',
'canonical_rate_subversion',
'medicare_rate',
'medicare_pricing_type',
'best_payer_rate',
'best_payer_rate_score',
'best_payer_rate_type',
'best_hospital_rate',
'best_hospital_rate_score',
'best_hospital_rate_type',
'payer_location_data_source_name',
'payer_file_hash',
'payer_last_updated_on',
'hospital_filename',
'hospital_ingested_on',
'canonical_method_formula',
'trace_raw_id'
] %}

{% set canonical_columns = [
'canonical_rate',
'canonical_rate_original_billing_codes',
'canonical_rate_original_billing_code_type',
'canonical_rate_category',
'canonical_contract_methodology',
'canonical_method_params',
'canonical_gross_charge',
'canonical_gross_charge_type',
'canonical_gross_charge_source',
'canonical_rate_gross_charge_original_billing_codes',
'canonical_rate_gross_charge_original_billing_code_type',
'canonical_rate_validation_method',
'canonical_rate_percent_of_medicare',
'canonical_rate_percent_of_cbsa_avg_medicare',
'canonical_rate_percent_of_state_avg_medicare',
'canonical_rate_percent_of_national_avg_medicare',
'canonical_rate_percent_of_asc_medicare',
'canonical_rate_percent_of_lab_medicare',
'canonical_rate_percent_of_list',
'canonical_rate_id',
] %}

{% set identifiers = [
'roid', 'payer_id', 'network_id', 'provider_id', 'bill_type',
'billing_code', 'billing_code_type', 'facility'
] %}

-- SPINES
{% set payer_spine_cols = cld_params.PAYER_SPINE_COLS %}
{% set network_spine_cols = cld_params.NETWORK_SPINE_COLS %}
{% set prov_spine_cols = cld_params.PROV_SPINE_COLS %}
{% set code_spine_cols = cld_params.CODE_SPINE_COLS %}


-- CHARGES + ALLOWED AMOUNTS
{% set gross_charges = cld_params.GROSS_CHARGE_COLS + cld_params.GROSS_CHARGE_STATS %}
{% set cash_charges = cld_params.CASH_COLS %}

-- RATE SOURCES
{% set imputation_cols = cld_params.IMPUTATION_COLS %}
{% set rate_cols = cld_params.INPATIENT_COLS_FULL %}
-- print rate hierarchy
{% for col in rate_cols %}
-- {{ col }}
{% endfor %}
{% set original_nontransformed_cols = cld_params.ORIGINAL_NONTRANSFORMED_COLS %}

-- BENCHMARKS
{% set benchmark_cols = cld_params.BENCHMARK_COLS %}

-- UTILIZATIONS
{% set utilization_cols = cld_params.UTILIZATION_COLS %}

-- OUTLIER BOUNDS
{% set outlier_bounds = cld_params.OUTLIER_BOUNDS %}

-- TRANSFORMATION STATS
{% set transformation_cols = cld_params.TRANSFORMATION_COLS %}

-- IMPUTATION STATS
{% set imputation_stats = cld_params.IMPUTATION_STATS_COLS %}


-- settings: dags/core_licensable_data_sub_dag/utils/params.py
-- WHISPERS
{% set payer_whispers = cld_params.PAYER_WHISPERS %}
{% set network_whispers = cld_params.NETWORK_WHISPERS %}
{% set provider_whispers = cld_params.PROVIDER_WHISPERS %}
{% set code_whispers = cld_params.CODE_WHISPERS %}
{% set provider_code_whispers = cld_params.PROVIDER_CODE_WHISPERS %}
{% set provider_payer_whispers = cld_params.PROVIDER_PAYER_WHISPERS %}
{% set provider_payer_network_whispers = cld_params.PROVIDER_PAYER_NETWORK_WHISPERS %}
{% set payer_code_whispers = cld_params.PAYER_CODE_WHISPERS %}

-- ====================
-- COMBINE
-- ====================
{% set spines = prov_spine_cols + payer_spine_cols + network_spine_cols + code_spine_cols %}
{% set whispers = payer_whispers + network_whispers + provider_whispers
+ code_whispers + provider_code_whispers + provider_payer_whispers + provider_payer_network_whispers + payer_code_whispers
%}

{% set base_columns = identifiers + spines +
gross_charges +
cash_charges +
original_nontransformed_cols +
utilization_cols +
imputation_stats +
transformation_cols +
rate_cols +
benchmark_cols +
outlier_bounds +
whispers +
['payer_network_name', 'generic_search'] +
canonical_columns
%}
{% set base_columns_exclusive = base_columns | reject("in", pre_loaded_columns) | list %}
{% set sub_versions = sub_versions_list | sort(reverse=True) %}

CREATE OR REPLACE TABLE {{ schema_name }}.prod_combined_all_joined_{{ n_chunk }}
WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
WITH

{% for sub_version in sub_versions %}
joined_{{ sub_version }} AS (
SELECT
{% for col in pre_loaded_columns %}
df.{{ col }},
{% endfor %}
{% for col in base_columns_exclusive %}
v{{ sub_version }}.{{ col }},
{% endfor %}
{% for col in historic_fields %}
df.{{ col }},
{% endfor %}

{% macro delta_expression(label, rate_col, sub_version) %}
v{{ sub_version }}.{{ rate_col }} - df.lag_{{ label }}_{{ rate_col }} AS lag_{{ label }}_{{ rate_col }}_delta
{% endmacro %}

{% set deltas = [] %}
{% for label in historic_sub_versions_dict.keys() %}
{% for rate_col in ['canonical_rate', 'canonical_rate_percent_of_medicare', 'canonical_rate_percent_of_state_avg_medicare'] %}
{% set _ = deltas.append(delta_expression(label, rate_col, sub_version)) %}
{% endfor %}
{% endfor %}

{{ deltas | join(',\n') }}

FROM {{ schema_name }}.prod_combined_base_{{ n_chunk }} df
LEFT JOIN {{ schema_name }}.tmp_int_combined_{{ sub_version }} v{{ sub_version }}
ON df.roid = v{{ sub_version }}.roid
AND df.payer_id = v{{ sub_version }}.payer_id
AND df.network_id = v{{ sub_version }}.network_id
AND df.provider_type = v{{ sub_version }}.provider_type
AND df.bill_type = v{{ sub_version }}.bill_type
WHERE df.canonical_rate_subversion = '{{ sub_version }}'
),
{% endfor %}

-- NULL subversions
joined_null AS (
SELECT
{% for col in pre_loaded_columns %}
df.{{ col }},
{% endfor %}
{% for col in base_columns_exclusive %}
NULL AS {{ col }},
{% endfor %}

{% for col in historic_fields %}
NULL AS {{ col }},
{% endfor %}

{% set delta_names = [] %}
{% for label in historic_sub_versions_dict.keys() %}
{% for rate_col in ['canonical_rate', 'canonical_rate_percent_of_medicare', 'canonical_rate_percent_of_state_avg_medicare'] %}
{% set _ = delta_names.append('lag_' ~ label ~ '_' ~ rate_col ~ '_delta') %}
{% endfor %}
{% endfor %}
NULL AS {{ delta_names | join(', NULL AS ') }}

FROM {{ schema_name }}.prod_combined_base_{{ n_chunk }} df
WHERE df.canonical_rate_subversion IS NULL
)

-- UNION ALL
{% for sub_version in sub_versions %}
SELECT
{% for col in pre_loaded_columns %}
{{ col }},
{% endfor %}
{% for col in base_columns_exclusive %}
{{ col }},
{% endfor %}

{% for col in historic_fields %}
{{ col }},
{% endfor %}

{{ delta_names | join(', ') }}
FROM joined_{{ sub_version }}
UNION ALL
{% endfor %}
SELECT
{% for col in pre_loaded_columns %}
{{ col }},
{% endfor %}
{% for col in base_columns_exclusive %}
{{ col }},
{% endfor %}

{% for col in historic_fields %}
{{ col }},
{% endfor %}

{{ delta_names | join(', ') }}
FROM joined_null

union chunks:​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.prod_combined_all

CREATE OR REPLACE TABLE {{ schema_name }}.prod_combined_all
WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
{% for n_chunk in n_chunks %}
SELECT * FROM {{ schema_name }}.prod_combined_all_joined_{{ n_chunk }}
{% if not loop.last %} UNION ALL {% endif %}
{% endfor %}

4. prod_rate_object_space​

{% set sub_versions = sub_versions_list | sort(reverse=True) %}

CREATE OR REPLACE TABLE {{ schema_name }}.prod_rate_object_space AS
SELECT
roid,
payer_id,
network_id,
provider_id,
provider_type,
bill_type,
billing_code,
billing_code_type,
facility
FROM {{ schema_name }}.tmp_rate_object_space_{{ sub_versions[0] }}