Skip to main content
Version: Canary - 2.3 🚧

Combine Raw Data

Schema

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.tmp_int_combined_raw_{{ sub_version }}
-- Subversion: {{ sub_version }}


CREATE OR REPLACE TABLE {{ schema_name }}.tmp_int_combined_raw_{{ sub_version }}
WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
WITH

-- Declare Variables
{%- set payer_types = ['percentage', 'negotiated', 'fee schedule', 'per diem', 'derived'] %}
{%- set hospital_dollar_methods = ['case rate', 'percent of total billed charges', 'fee schedule', 'other', 'per diem', None] %}
{%- set hospital_percentage_methods = ['case rate', 'percent of total billed charges', 'fee schedule', 'other', 'per diem', None] %}
{%- set hospital_allowed_amount_methods = ['case rate', 'percent of total billed charges', 'fee schedule', 'other', 'per diem', None] %}

-- CHARGES + ALLOWED AMOUNTS
{% set gross_charges = cld_params.GROSS_CHARGE_COLS + cld_params.GROSS_CHARGE_STATS %}
{% set cash_charges = cld_params.CASH_COLS %}


-- Create CTEs for payer negotiated types
{%- for payer_type in payer_types %}
payer_{{ payer_type | replace(' ', '_') }} AS (
SELECT
roid,
payer_id,
core_rates_id,
original_billing_codes,
original_billing_code_type,
rate
FROM {{ schema_name }}.tmp_raw_mrf_payer_rates_{{ sub_version }}
WHERE negotiated_type = '{{ payer_type }}'
),
{%- endfor %}

-- Create CTEs for hospital contract methodologies (dollar)
{%- for method in hospital_dollar_methods %}
hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_dollar AS (
SELECT
roid,
payer_id,
hospital_rates_id,
original_billing_codes,
original_billing_code_type,
negotiated_dollar
FROM {{ schema_name }}.tmp_raw_mrf_hospital_rates_{{ sub_version }}
WHERE contract_methodology {{ 'IS NULL' if method is none else "= '" + method + "'" }}
AND negotiated_dollar > 0
),
{%- endfor %}

-- Create CTEs for hospital contract methodologies (percentage)
{%- for method in hospital_percentage_methods %}
hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_percentage AS (
SELECT
roid,
payer_id,
hospital_rates_id,
original_billing_codes,
original_billing_code_type,
negotiated_percentage
FROM {{ schema_name }}.tmp_raw_mrf_hospital_rates_{{ sub_version }}
WHERE contract_methodology {{ 'IS NULL' if method is none else "= '" + method + "'" }}
AND negotiated_percentage > 0
),
{%- endfor %}

-- Create CTE for hospital estimated allowed amount
{%- for method in hospital_allowed_amount_methods %}
hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_allowed_amount AS (
SELECT
roid,
payer_id,
hospital_rates_id,
original_billing_codes,
original_billing_code_type,
estimated_allowed_amount
FROM {{ schema_name }}.tmp_raw_mrf_hospital_rates_{{ sub_version }}
WHERE contract_methodology {{ 'IS NULL' if method is none else "= '" + method + "'" }}
AND estimated_allowed_amount > 0
),
{%- endfor %}

-- Create CTEs for hospital gross charge and Komodo average allowed amount
hospital_discounted_cash_rate AS (
SELECT
hr.provider_id,
hr.billing_code_type,
hr.billing_code,
approx_percentile(hr.discounted_cash_rate,0.5) as discounted_cash_rate
FROM {{ hospital_schema }}.hospital_rates hr
WHERE hr.discounted_cash_rate >= 0.1
AND hr.discounted_cash_rate <= 50000000
GROUP BY 1, 2, 3
)

-- Combine all raw data into a single table
SELECT
r.roid,
r.network_id,
r.provider_id,
r.bill_type,
r.billing_code,
r.billing_code_type,
r.facility,
r.provider_type,
r.is_plausible,
cs.is_surg_code,
cs.is_drug_code,
cs.is_lab_code,

{% for col in cash_charges %}
CASE
WHEN hdcr.{{ col }} BETWEEN co.lower_bound AND co.upper_bound
THEN hdcr.{{ col }}
ELSE NULL
END AS {{ col }},
{% endfor %}

{% for col in gross_charges %}
gc.{{ col }},
{% endfor %}

{%- for payer_type in payer_types %}
p{{ loop.index }}.rate AS payer_{{ payer_type | replace(' ', '_') }}_rate,
p{{ loop.index }}.core_rates_id AS payer_{{ payer_type | replace(' ', '_') }}_rate_id,
p{{ loop.index }}.original_billing_codes AS payer_{{ payer_type | replace(' ', '_') }}_rate_original_billing_codes,
p{{ loop.index }}.original_billing_code_type AS payer_{{ payer_type | replace(' ', '_') }}_rate_original_billing_code_type,
CASE
WHEN p{{ loop.index }}.original_billing_codes IS NULL
THEN p{{ loop.index }}.rate
ELSE NULL
END AS payer_{{ payer_type | replace(' ', '_') }}_untransformed_rate,
CASE
WHEN p{{ loop.index }}.original_billing_codes IS NULL
THEN p{{ loop.index }}.core_rates_id
ELSE NULL
END AS payer_{{ payer_type | replace(' ', '_') }}_untransformed_rate_id,
CAST(NULL AS array(varchar)) AS payer_{{ payer_type | replace(' ', '_') }}_untransformed_rate_original_billing_codes,
CAST(NULL AS varchar) AS payer_{{ payer_type | replace(' ', '_') }}_untransformed_rate_original_billing_code_type,
{%- endfor %}

{%- for method in hospital_dollar_methods %}
h{{ loop.index }}.negotiated_dollar AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_dollar,
h{{ loop.index }}.hospital_rates_id AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_dollar_id,
h{{ loop.index }}.original_billing_codes AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_dollar_original_billing_codes,
h{{ loop.index }}.original_billing_code_type AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_dollar_original_billing_code_type,
CASE
WHEN h{{ loop.index }}.original_billing_codes IS NULL
THEN h{{ loop.index }}.negotiated_dollar
ELSE NULL
END AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_dollar,
CASE
WHEN h{{ loop.index }}.original_billing_codes IS NULL
THEN h{{ loop.index }}.hospital_rates_id
ELSE NULL
END AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_dollar_id,
CAST(NULL AS array(varchar)) AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_dollar_original_billing_codes,
CAST(NULL AS varchar) AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_dollar_original_billing_code_type,
{%- endfor %}

{%- for method in hospital_percentage_methods %}
h{{ loop.index + hospital_dollar_methods | length }}.negotiated_percentage AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_percentage,
h{{ loop.index + hospital_dollar_methods | length }}.hospital_rates_id AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_percentage_id,
h{{ loop.index + hospital_dollar_methods | length }}.original_billing_codes AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_percentage_original_billing_codes,
h{{ loop.index + hospital_dollar_methods | length }}.original_billing_code_type AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_percentage_original_billing_code_type,
{%- endfor %}

{%- for method in hospital_allowed_amount_methods %}
haa{{ loop.index }}.estimated_allowed_amount AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_allowed_amount,
haa{{ loop.index }}.hospital_rates_id AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_allowed_amount_id,
haa{{ loop.index }}.original_billing_codes AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_allowed_amount_original_billing_codes,
haa{{ loop.index }}.original_billing_code_type AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_allowed_amount_original_billing_code_type,
CASE
WHEN haa{{ loop.index }}.original_billing_codes IS NULL
THEN haa{{ loop.index }}.estimated_allowed_amount
ELSE NULL
END AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_allowed_amount,
CASE
WHEN haa{{ loop.index }}.original_billing_codes IS NULL
THEN haa{{ loop.index }}.hospital_rates_id
ELSE NULL
END AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_allowed_amount_id,
CAST(NULL AS array(varchar)) AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_allowed_amount_original_billing_codes,
CAST(NULL AS varchar) AS hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_untransformed_allowed_amount_original_billing_code_type,
{%- endfor %}

r.payer_id

FROM
{{ schema_name }}.tmp_rate_object_space_{{ sub_version }} r

{%- for payer_type in payer_types %}
LEFT JOIN payer_{{ payer_type | replace(' ', '_') }} p{{ loop.index }}
ON r.roid = p{{ loop.index }}.roid
AND r.payer_id = p{{ loop.index }}.payer_id
{%- endfor %}

{%- for method in hospital_dollar_methods %}
LEFT JOIN hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_dollar h{{ loop.index }}
ON r.roid = h{{ loop.index }}.roid
AND r.payer_id = h{{ loop.index }}.payer_id
{%- endfor %}

{%- for method in hospital_percentage_methods %}
LEFT JOIN hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_percentage h{{ loop.index + hospital_dollar_methods | length }}
ON r.roid = h{{ loop.index + hospital_dollar_methods | length }}.roid
AND r.payer_id = h{{ loop.index + hospital_dollar_methods | length }}.payer_id
{%- endfor %}

{%- for method in hospital_allowed_amount_methods %}
LEFT JOIN hospital_{{ (method if method else 'null_methodology') | replace(' ', '_') }}_allowed_amount haa{{ loop.index }}
ON r.roid = haa{{ loop.index }}.roid
AND r.payer_id = haa{{ loop.index }}.payer_id
{%- endfor %}

LEFT JOIN {{ schema_name }}.tmp_raw_gross_charges_{{ sub_version }} gc
ON r.roid = gc.roid
AND r.payer_id = gc.payer_id
LEFT JOIN {{ schema_name }}.tmp_ref_code_spine_{{ sub_version }} cs
ON r.bill_type = cs.bill_type
AND r.billing_code_type = cs.billing_code_type
AND r.billing_code = cs.billing_code
AND (
r.facility = cs.facility
OR (r.facility IS NULL AND cs.facility IS NULL)
)
LEFT JOIN hospital_discounted_cash_rate hdcr
ON r.provider_id = hdcr.provider_id
AND r.billing_code_type = hdcr.billing_code_type
AND r.billing_code = hdcr.billing_code
AND COALESCE(r.facility, true) = true
LEFT JOIN (
SELECT
bill_type,
billing_code_type,
billing_code,
avg(lower_bound) as lower_bound,
avg(upper_bound) as upper_bound
FROM {{ cld_params.Tables.CASH_OUTLIER_TABLE.value }}
GROUP BY 1, 2, 3
) co
ON r.billing_code_type = co.billing_code_type
AND r.billing_code = co.billing_code
AND r.bill_type = co.bill_type

Hospital MRF Components​

Build PG​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.tmp_raw_mrf_hospital_physician_group_rates_{{n_chunk}}_{{ sub_version }}
-- Subversion: {{ sub_version }}
-- n_chunk: {{ n_chunk }}

{% set common_columns = [
'payer_id',
'network_id',
'provider_id',
'hospital_rates_id',
'bill_type',
'billing_code',
'apr_drg_formatted_billing_code',
'apc_formatted_billing_code',
'billing_code_type',
'description',
'revenue_code',
'billing_code_modifiers',
'billing_class',
'drug_billing_class',
'ndc',
'drug_unit_of_measurement',
'drug_type_of_measurement',
'setting',
'negotiated_dollar',
'negotiated_percentage',
'estimated_allowed_amount',
'negotiated_rate',
'min_standard_charge',
'max_standard_charge',
'gross_charge',
'discounted_cash_rate',
'negotiated_algorithm',
'contract_methodology',
'additional_generic_notes',
'additional_payer_notes',
'file_id',
'ingested_on',
'loaded_on',
'plan_name',
'raw_payer_name',
'payer_class_name'
] %}

CREATE OR REPLACE TABLE {{ schema_name }}.tmp_raw_mrf_hospital_physician_group_rates_{{n_chunk}}_{{ sub_version }}
WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
WITH

pg_to_many_hco_for_singly_affiliatd_hs AS (
SELECT
DISTINCT
pg.provider_id,
pg.group_pac_id,
COALESCE(s.provider_id, s2.provider_id) as hospital_provider_id,
COALESCE(s.provider_healthsystem_id, s2.provider_healthsystem_id) as provider_healthsystem_id
FROM {{ cld_params.Tables.SPINES_PROVIDER_PHYSICIAN_GROUPS.value }} pg
LEFT JOIN {{ cld_params.Tables.SPINES_PROVIDER_PHYSICIAN_GROUPS_ASSOCIATED_FACILITIES.value }} hco
ON hco.provider_id = pg.provider_id
LEFT JOIN {{ cld_params.Tables.SPINES_PROVIDER_PHYSICIAN_GROUPS_ASSOCIATED_TINS.value }} tin
ON tin.provider_id = pg.provider_id
LEFT JOIN {{ cld_params.Tables.SPINES_PROVIDER_HOSPITAL_ADDITIONAL_NPIS.value }} h
ON hco.provider_id = h.provider_id
LEFT JOIN {{ cld_params.Tables.SPINES_PROVIDER_PHYSICIAN_GROUPS_ASSOCIATED_HS.value }} hs
ON hs.provider_id = pg.provider_id
LEFT JOIN {{ cld_params.Tables.SPINES_PROVIDER_HOSPITAL.value }} s
ON h.provider_id = s.provider_id
LEFT JOIN {{ cld_params.Tables.SPINES_PROVIDER_HOSPITAL.value }} s2
ON tin.tin = s2.tin
WHERE
hs.health_system_id = COALESCE(s.provider_healthsystem_id, s2.provider_healthsystem_id)
),

------------------------------------
-- Spines and Rate Object Space
------------------------------------
plan_bridge AS (
SELECT
id,
payer_id,
provider_id as pb_provider_id,
network_name
FROM {{ schema_name }}.tmp_ref_pg_plan_bridge_{{ sub_version }}
),
network_spine AS (
SELECT
p.payer_id,
n.network_id,
n.network_name
FROM {{ schema_name }}.tmp_ref_network_spine_{{ sub_version }} n
LEFT JOIN {{ schema_name }}.tmp_ref_payer_spine_{{ sub_version }} p
ON n.payer_id = p.payer_id
),

apc_xwalk AS (
SELECT
DISTINCT
apc,
hcpcs
FROM {{ cld_params.Tables.OPPS_ADDENDUM_B.value }} opps
WHERE is_latest_start_effective_date = true
AND apc != '0'
),

eapg_xwalk AS (
SELECT
DISTINCT
eapg,
hcpcs
FROM {{ cld_params.Tables.HCPCS_EAPG_CROSSWALK_TABLE.value }}
),

aprdrg_xwalk AS (
SELECT *
FROM {{ cld_params.Tables.APR_DRG_XWALK_TABLE.value }}
),

rate_object_space AS (
SELECT
ros.roid,
ros.payer_id,
ros.network_id,
ros.provider_id,
ros.bill_type,
ros.billing_code,
ros.billing_code_type
FROM {{ schema_name }}.tmp_rate_object_space_{{ sub_version }} ros
WHERE provider_type IN (
'Physician Group'
)
),

-- drug codes
drug_codes AS (
SELECT billing_code
FROM {{ schema_name }}.tmp_ref_code_spine_{{ sub_version }}
WHERE is_drug_code = True
),

------------------------------------
-- Hospital Rates Preprocessing
------------------------------------

-- add new columns
hospital_rates_with_new_columns_added AS (
SELECT
{% set excluded_columns = [
'hospital_rates_id', 'payer_id', 'network_id', 'provider_id', 'billing_code', 'billing_code_type',
'apr_drg_formatted_billing_code', 'apc_formatted_billing_code',
'negotiated_rate', 'bill_type', 'drug_billing_class',
'ingested_on', 'loaded_on'
] %}

{% for column in common_columns %}
{% if column not in excluded_columns %}
{{ column }},
{% endif %}
{% endfor %}

rates.id as hospital_rates_id,
pb.payer_id,
ns.network_id,
pg.provider_id,
CASE
WHEN billing_code_type = 'DRG' THEN 'MS-DRG'
WHEN billing_code_type IS NULL AND ndc IS NOT NULL THEN 'HCPCS'
ELSE billing_code_type
END as billing_code_type,
CASE
WHEN billing_code_type = 'MS-DRG' THEN lpad(billing_code,3,'0')
ELSE COALESCE(billing_code, ndc_h.derived_hcpcs_code)
END as billing_code,

LPAD(SPLIT_PART(billing_code, '-', 1), 4, '0') ||
CASE
WHEN REGEXP_LIKE(SPLIT_PART(billing_code, '-', 2), '^[0-9]+$')
THEN '-' || SPLIT_PART(billing_code, '-', 2)
ELSE ''
END AS apr_drg_formatted_billing_code,

CASE
WHEN REGEXP_LIKE(replace(billing_code, 'APC-', ''), '^[0-9.]+$')
THEN LPAD(CAST(CAST(replace(billing_code, 'APC-', '') AS DECIMAL(8,0)) AS VARCHAR), 4, '0')
ELSE NULL
END as apc_formatted_billing_code,

CASE
WHEN billing_code_type IN ('MS-DRG','APR-DRG','DRG')
THEN 'Inpatient'
WHEN billing_code_type IS NULL AND ndc IS NOT NULL THEN 'Outpatient'
WHEN COALESCE(billing_class, '') != 'Professional' AND billing_code_type IN ('CPT', 'HCPCS', 'APC', 'EAPG')
THEN 'Outpatient'
WHEN billing_code IN (SELECT billing_code FROM drug_codes)
THEN 'Outpatient'
WHEN billing_class = 'Professional' AND billing_code_type IN ('CPT', 'HCPCS', 'APC', 'EAPG')
THEN 'Professional'
END as bill_type,
CASE
WHEN billing_code IN (SELECT billing_code FROM drug_codes)
THEN COALESCE(billing_class, 'Outpatient')
ELSE NULL
END AS drug_billing_class,
CASE
WHEN negotiated_dollar IS NOT NULL THEN negotiated_dollar
WHEN negotiated_percentage IS NOT NULL THEN negotiated_percentage
WHEN (estimated_allowed_amount > 0 AND estimated_allowed_amount < 10000000) IS NOT NULL THEN estimated_allowed_amount
ELSE NULL
END as negotiated_rate,

CAST(ingested_on AS TIMESTAMP) AS ingested_on,
CAST(loaded_on AS TIMESTAMP) AS loaded_on

FROM {{ hospital_schema }}.hospital_rates rates
JOIN plan_bridge pb
ON rates.id = pb.id
AND rates.provider_id = pb.pb_provider_id
AND rates.payer_id = CAST(pb.payer_id AS INT)
JOIN network_spine ns
ON pb.payer_id = ns.payer_id
AND pb.network_name = ns.network_name
JOIN pg_to_many_hco_for_singly_affiliatd_hs pg
ON rates.provider_id = pg.hospital_provider_id
LEFT JOIN {{ schema_name }}.tmp_ndc_derived_hcpcs_{{ sub_version }} ndc_h
ON rates.ndc = ndc_h.hospital_ndc
WHERE (
negotiated_dollar > 0
OR negotiated_percentage > 0
OR (estimated_allowed_amount > 0 AND estimated_allowed_amount < 10000000)
)
-- setting + billing code type filter
AND (
(
billing_code IS NULL
AND ndc IS NOT NULL
)
OR (
billing_code_type IN ('HCPCS', 'APC', 'EAPG')
)
)
AND billing_class = 'Professional'
AND rates.provider_id IN ({{ provider_ids }})
),


-- HCPCS + MS-DRG + APR-DRG: no action required
hcpcs_msdrg AS (
SELECT
{% for column in common_columns %}
{{ column }} {{ ', ' if not loop.last }}
{% endfor %}
FROM hospital_rates_with_new_columns_added
WHERE billing_code_type IN ('MS-DRG', 'APR-DRG', 'HCPCS')
),

-- MS-DRG: use many-to-one mapping of MS-DRG to APR-DRG
-- compute the average negotiated dollar and percentage for each APR-DRG
ms_drg_to_apr_drg AS (
SELECT
payer_id,
provider_id,
network_id,
dc.apr_drg as billing_code,
'APR-DRG' as billing_code_type,
contract_methodology,
avg(negotiated_dollar) as negotiated_dollar,
avg(negotiated_percentage) as negotiated_percentage,
avg(estimated_allowed_amount) as estimated_allowed_amount,
array_join(array_agg(hospital_rates_id),',') as hospital_rates_id,
array_distinct(array_agg(billing_code)) as original_billing_codes,
'MS-DRG' as original_billing_code_type,

{% set exclude_apr_drg_cte_cols = [
'payer_id', 'provider_id', 'network_id', 'billing_code', 'billing_code_type',
'contract_methodology', 'negotiated_dollar', 'negotiated_percentage',
'estimated_allowed_amount', 'hospital_rates_id',
] %}

{% for column in common_columns %}
{% if column not in exclude_apr_drg_cte_cols %}
ANY_VALUE({{ column }}) AS {{ column }} {{ ', ' if not loop.last }}
{% endif %}
{% endfor %}

FROM hospital_rates_with_new_columns_added r
JOIN aprdrg_xwalk dc
ON r.billing_code = dc.ms_drg
WHERE billing_code_type = 'MS-DRG'
GROUP BY
payer_id,
provider_id,
network_id,
dc.apr_drg,
contract_methodology
),

-- APR-DRG: use many-to-one mapping of APR-DRG to MS-DRG
-- compute the average negotiated dollar and percentage for each MS-DRG
apr_drg_to_ms_drg AS (
SELECT
payer_id,
provider_id,
network_id,
dc.ms_drg as billing_code,
'MS-DRG' as billing_code_type,
contract_methodology,
avg(negotiated_dollar) as negotiated_dollar,
avg(negotiated_percentage) as negotiated_percentage,
avg(estimated_allowed_amount) as estimated_allowed_amount,
array_join(array_agg(hospital_rates_id),',') as hospital_rates_id,
array_distinct(array_agg(billing_code)) as original_billing_codes,
'APR-DRG' as original_billing_code_type,

{% set exclude_apr_drg_cte_cols = [
'payer_id', 'provider_id', 'network_id', 'billing_code', 'billing_code_type',
'contract_methodology', 'negotiated_dollar', 'negotiated_percentage',
'estimated_allowed_amount', 'hospital_rates_id',
] %}

{% for column in common_columns %}
{% if column not in exclude_apr_drg_cte_cols %}
ANY_VALUE({{ column }}) AS {{ column }} {{ ', ' if not loop.last }}
{% endif %}
{% endfor %}

FROM hospital_rates_with_new_columns_added r
JOIN aprdrg_xwalk dc
ON r.apr_drg_formatted_billing_code = dc.apr_drg
WHERE billing_code_type = 'APR-DRG'
GROUP BY
payer_id,
provider_id,
network_id,
dc.ms_drg,
contract_methodology
),


-- APC: use one-to-many mapping of APC to HCPCS
-- each HCPCS will be assigned the rate value of the APC
apc AS (
SELECT
xwalk.hcpcs as billing_code,
'HCPCS' as billing_code_type,
ARRAY[billing_code] as original_billing_codes,
'APC' as original_billing_code_type,

{% set exclude_apc_cte_cols = [
'billing_code', 'billing_code_type',
] %}
{% for column in common_columns %}
{% if column not in exclude_apc_cte_cols %}
r.{{ column }} {{ ', ' if not loop.last }}
{% endif %}
{% endfor %}
FROM hospital_rates_with_new_columns_added r
JOIN apc_xwalk xwalk
ON r.apc_formatted_billing_code = xwalk.apc
WHERE billing_code_type = 'APC'
),

-- EAPG: use one-to-many mapping of EAPG to HCPCS
-- each HCPCS will be assigned the rate value of the EAPG
eapg AS (
SELECT
xwalk.hcpcs as billing_code,
'HCPCS' as billing_code_type,
ARRAY[billing_code] as original_billing_codes,
'EAPG' as original_billing_code_type,

{% set exclude_eapg_cte_cols = [
'billing_code', 'billing_code_type',
] %}
{% for column in common_columns %}
{% if column not in exclude_eapg_cte_cols %}
r.{{ column }} {{ ', ' if not loop.last }}
{% endif %}
{% endfor %}
FROM hospital_rates_with_new_columns_added r
JOIN eapg_xwalk xwalk
ON r.billing_code = xwalk.eapg
WHERE billing_code_type = 'EAPG'
),

-- NDC: NDC-only rates that are converted to HCPCS
ndc AS (
SELECT
billing_code,
'HCPCS' as billing_code_type,
ARRAY[ndc] as original_billing_codes,
'NDC' as original_billing_code_type,

{% set exclude_ndc_cte_cols = [
'billing_code', 'billing_code_type',
] %}
{% for column in common_columns %}
{% if column not in exclude_ndc_cte_cols %}
{{ column }} {{ ', ' if not loop.last }}
{% endif %}
{% endfor %}
FROM hospital_rates_with_new_columns_added
WHERE ndc IS NOT NULL
),

-- combine HCPCS + MS-DRG and APR-DRG and APC and EAPG and NDC
rates AS (
SELECT
{% for column in common_columns %}
{{ column }} {{ ', ' if not loop.last }}
{% endfor %},
NULL as original_billing_codes,
NULL as original_billing_code_type
FROM hcpcs_msdrg
UNION ALL
SELECT
{% for column in common_columns %}
{{ column }} {{ ', ' if not loop.last }}
{% endfor %},
original_billing_codes,
original_billing_code_type
FROM ms_drg_to_apr_drg
UNION ALL
SELECT
{% for column in common_columns %}
{{ column }} {{ ', ' if not loop.last }}
{% endfor %},
original_billing_codes,
original_billing_code_type
FROM apr_drg_to_ms_drg
UNION ALL
SELECT
{% for column in common_columns %}
{{ column }} {{ ', ' if not loop.last }}
{% endfor %},
original_billing_codes,
original_billing_code_type
FROM apc
UNION ALL
SELECT
{% for column in common_columns %}
{{ column }} {{ ', ' if not loop.last }}
{% endfor %},
original_billing_codes,
original_billing_code_type
FROM eapg
UNION ALL
SELECT
{% for column in common_columns %}
{{ column }} {{ ', ' if not loop.last }}
{% endfor %},
original_billing_codes,
original_billing_code_type
FROM ndc
),

-- filter raw data to rate object space
rates_filtered AS (
SELECT
ros.roid,
{% for column in common_columns %}
rates.{{ column }},
{% endfor %}
original_billing_codes,
original_billing_code_type,
mrf.filename,
mrf.version
FROM rates
JOIN rate_object_space ros
ON rates.payer_id = ros.payer_id
AND rates.network_id = ros.network_id
AND rates.provider_id = ros.provider_id
AND rates.bill_type = ros.bill_type
AND rates.billing_code = ros.billing_code
AND rates.billing_code_type = ros.billing_code_type
-- for traceability columns
LEFT JOIN tq_production.hospital_data.hospital_mrf_history mrf
ON rates.file_id = mrf.id
),

------------------------------------
-- Deduplicate Rates
------------------------------------

-- prioritize rates with NULL billing_code_modifiers and revenue_code
rates_ranked AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY
payer_id,
network_id,
provider_id,
bill_type,
billing_code,
billing_code_type,
contract_methodology
ORDER BY
drug_billing_class ASC,
CASE
WHEN (billing_code_modifiers IS NULL AND revenue_code IS NULL) THEN -1
WHEN billing_code_modifiers IS NULL THEN 0
WHEN revenue_code IS NULL THEN 0
ELSE 1
END ASC,
negotiated_rate DESC
) as rank
FROM rates_filtered
)

------------------------------------
-- Output
------------------------------------
SELECT
rr.hospital_rates_id,
rr.roid,
rr.provider_id,
rr.network_id,
rr.bill_type,
rr.billing_code,
rr.billing_code_type,
rr.original_billing_codes,
rr.original_billing_code_type,
rr.description,
rr.revenue_code,
rr.billing_code_modifiers,
rr.billing_class,
rr.ndc,
rr.drug_unit_of_measurement,
rr.drug_type_of_measurement,
rr.setting,
rr.negotiated_dollar,
CASE
WHEN rr.negotiated_percentage < 1 THEN rr.negotiated_percentage * 100
WHEN rr.negotiated_percentage > 1000 THEN rr.negotiated_percentage / 100
ELSE rr.negotiated_percentage
END AS negotiated_percentage,
CASE
WHEN rr.estimated_allowed_amount >= 10000000 THEN NULL
ELSE rr.estimated_allowed_amount
END as estimated_allowed_amount,
rr.negotiated_rate,
rr.min_standard_charge,
rr.max_standard_charge,
rr.gross_charge,
rr.discounted_cash_rate,
rr.negotiated_algorithm,
rr.contract_methodology,
rr.additional_generic_notes,
rr.additional_payer_notes,
rr.file_id,
rr.ingested_on,
rr.loaded_on,
rr.filename,
rr.version,
rr.plan_name,
rr.raw_payer_name,
rr.payer_id,
rr.payer_class_name
FROM rates_ranked rr
WHERE rank = 1

Union​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.tmp_raw_mrf_hospital_rates_{{ sub_version }}
-- Subversion: {{ sub_version }}


CREATE OR REPLACE TABLE {{ schema_name }}.tmp_raw_mrf_hospital_rates_{{ sub_version }}
WITH (
PARTITIONING = ARRAY['payer_id']
)
AS
{% for n_chunk in n_chunks %}
SELECT * FROM {{ schema_name }}.tmp_raw_mrf_hospital_rates_{{ n_chunk }}_{{ sub_version }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}

UNION ALL

{% for n_chunk in pg_n_chunks %}
SELECT * FROM {{ schema_name }}.tmp_raw_mrf_hospital_physician_group_rates_{{ n_chunk }}_{{ sub_version }}
{% if not loop.last %}UNION ALL{% endif %}
{% endfor %}

NDC Derived HCPCS​

-- 🪟💲🌟
-- run_id: {{ run_id }}
-- task_name: {{ task_name }}
-- Table: {{ schema_name }}.tmp_ndc_derived_hcpcs_{{ sub_version }}
-- Subversion: {{ sub_version }}
--
-- Purpose: Preprocess NDC normalization to derive HCPCS codes from Medispan
-- This processes ALL NDCs (no filtering on billing_code) to create a reusable lookup table
-- Downstream tasks will selectively use these mappings based on their needs

CREATE OR REPLACE TABLE {{ schema_name }}.tmp_ndc_derived_hcpcs_{{ sub_version }}
AS
WITH

------------------------------------
-- NDC Normalization for HCPCS Crosswalking
-- Processes ALL NDCs from hospital_rates
------------------------------------
ndc_variants AS (
SELECT DISTINCT ndc
FROM {{ hospital_schema }}.hospital_rates
WHERE ndc IS NOT NULL
),

ndc_normalized AS (
SELECT
ndc as hospital_ndc,
ndc
FROM ndc_variants
UNION ALL
SELECT
ndc as hospital_ndc,
REPLACE(CONCAT('0', ndc), '-', '') as ndc
FROM ndc_variants
UNION ALL
SELECT
ndc as hospital_ndc,
SUBSTRING(REPLACE(ndc, '-',''), 1, 5) ||
SUBSTRING(REPLACE(ndc, '-',''), 6, 4) ||
'0' || SUBSTRING(REPLACE(ndc, '-',''), 10, 1) as ndc
FROM ndc_variants
UNION ALL
SELECT
ndc as hospital_ndc,
SUBSTRING(REPLACE(ndc, '-',''), 1, 5) ||
'0' || SUBSTRING(REPLACE(ndc, '-',''), 6, 3) ||
SUBSTRING(REPLACE(ndc, '-',''), 9, 2) as ndc
FROM ndc_variants
),

-- Get latest Medispan record per NDC (with HCPCS code, strength, and strength_unit)
medispan_latest AS (
SELECT ndc, hcpc, strength, strength_unit
FROM (
SELECT m.ndc, m.hcpc, m.strength, m.strength_unit,
ROW_NUMBER() OVER (PARTITION BY m.ndc ORDER BY m.price_effective_date DESC) as rn
FROM {{ cld_params.Tables.MEDISPAN_TABLE.value }} m
WHERE EXISTS (
SELECT 1 FROM ndc_normalized n
WHERE m.ndc = n.ndc
)
) WHERE rn = 1
),

-- Deduped NDC lookup in Medispan (1:1 mapping per hospital NDC with HCPCS code and strength info)
ndc_hcpcs_derived AS (
SELECT
n.hospital_ndc,
ANY_VALUE(m.hcpc) as derived_hcpcs_code,
ANY_VALUE(m.strength) as strength,
ANY_VALUE(m.strength_unit) as strength_unit
FROM ndc_normalized n
LEFT JOIN medispan_latest m ON m.ndc = n.ndc
GROUP BY 1
)

SELECT *
FROM ndc_hcpcs_derived