Skip to main content
Version: Canary - 2.3 🚧

Policy Reporter

SchemaMethodology

Policy Reporter data are incorporated into CLD through whispers. It allows us to identify the key payers based on covered lives by geography.

CBSA Crosswalk​

location: hive.cld_utils.ref_policy_reporter_cbsa_xwalk

Policy reporter only has cbsa name, and does not have a cbsa code. Most of the names are identical to what we have in provider_demographics; but there are exceptions, e.g. "Phoenix-Mesa-Chandler" vs "Phoenix-Mesa-Scottsdale", "Denver-Aurora-Centennial" vs "Denver-Aurora-Lakewood".

See Python Code
# %%
import pandas as pd
from tqds.sql import db, sql
import tqdm
import os
from fuzzywuzzy import process, fuzz
from tqds.aws import utils as tu

trino_conn = db.TrinoClient().conn
query = sql.Query()


def fuzzy_match_cbsa(row, choices, scorer=fuzz.partial_ratio, threshold=80):
match, score = process.extractOne(
row['policy_reporter_cbsa'],
choices,
scorer=scorer
)
return match if score >= threshold else None

def get_policy_reporter_cbsa(trino_conn):
query = """
SELECT
cbsa as match_key_pr,
state_short as state,
sum(covered_lives) as covered_lives
FROM redshift.reference.policy_reporter_county
WHERE cbsa != 'Unassigned'
AND state_short NOT IN (
'AA', 'AE', 'AP', 'AS', 'GU', 'MH', 'MP', 'NA', 'PW', 'VI', 'FM'
)
GROUP BY 1, 2
"""
return pd.read_sql(query, con=trino_conn)

def get_provider_demographics_cbsa(trino_conn):
query = """
SELECT
DISTINCT
cbsa,
cbsa_description,
state,
split_part(cbsa_description, ',', 1) as match_key_pd
FROM redshift.reference.provider_demographics
WHERE cbsa IS NOT NULL
AND state IS NOT NULL
AND state NOT IN (
'AA', 'AE', 'AP', 'AS', 'GU', 'MH', 'MP', 'NA', 'PW', 'VI', 'FM'
)
"""
return pd.read_sql(query, con=trino_conn)

def get_matches(pr_cbsa, pd_cbsa):
match_key_pd = pd_cbsa['match_key_pd'].unique()
matches = []
for _, row in tqdm.tqdm(pr_cbsa.iterrows(), total=pr_cbsa.shape[0]):
match, score = process.extractOne(
row['match_key_pr'],
match_key_pd,
scorer=fuzz.token_sort_ratio
)
matches.append({"match": match, "score": score})
return matches

def exclude_exceptions(df, exceptions):
"""
Exclude rows from the DataFrame that match any of the exception criteria.
Each exception is a dictionary with keys "match_key_pd" and "match_key_pr".
"""
for exception in exceptions:
df = df[
~(
(df['match_key_pd'] == exception["match_key_pd"]) &
(df['match_key_pr'] == exception["match_key_pr"])
)
]
return df

def build_cbsa_xwalk(trino_conn, exceptions=None):
"""
Build a CBSA crosswalk DataFrame by:
- Retrieving Policy Reporter and Provider Demographics datasets.
- Performing fuzzy matching between their CBSA keys.
- Merging the datasets on state and matched description.
- Excluding rows that match specified exceptions.

Parameters:
- exceptions: A list of exception dictionaries to exclude from the final DataFrame.

Returns:
- Merged DataFrame (CBSA crosswalk) with exceptions excluded.
"""

df_pr = get_policy_reporter_cbsa(trino_conn)
df_pd = get_provider_demographics_cbsa(trino_conn)

# fuzzy matching
matches = get_matches(df_pr, df_pd)
df_pr['fuzzy_matched_description'] = [m['match'] for m in matches]
df_pr['policy_reporter_fuzzy_score'] = [m['score'] for m in matches]

# merge on state and matched description
df_merged = pd.merge(
df_pd,
df_pr,
left_on=['state', 'match_key_pd'],
right_on=['state', 'fuzzy_matched_description'],
how='inner'
)

# exclude exceptions
if exceptions:
df_merged = exclude_exceptions(df_merged, exceptions)

df_merged = df_merged[[
'state',
'cbsa',
'cbsa_description',
'match_key_pr',
'policy_reporter_fuzzy_score'
]]

return df_merged.rename(columns={"match_key_pr": "policy_reporter_cbsa"})

# %%
VERSION = "v0_0"

exceptions = [
{
"match_key_pd": "Ashland",
"match_key_pr": "Ashtabula"
},
{
"match_key_pd": "Colorado Springs",
"match_key_pr": "Glenwood Springs"
},
{
"match_key_pd": "Colorado Springs",
"match_key_pr": "Glenwood Springs"
}
]

df = build_cbsa_xwalk(trino_conn, exceptions=exceptions)


# %%
df.to_csv("policy_reporter_cbsa_xwalk.csv", index=False)
pd.read_csv("policy_reporter_cbsa_xwalk.csv")

query.execute_query(f"""
DROP TABLE IF EXISTS hive.cld_{VERSION}.ref_policy_reporter_cbsa_xwalk_tmp
""", con=trino_conn)

query.execute_query(f"""
DROP TABLE IF EXISTS hive.cld_{VERSION}.ref_policy_reporter_cbsa_xwalk
""", con=trino_conn)

tu.push_to_aws_s3(
local_loc="policy_reporter_cbsa_xwalk.csv",
dest_loc=f"s3://turquoise-health-payer-export-main/cld/{VERSION}/policy_reporter_cbsa_xwalk/policy_reporter_cbsa_xwalk.csv",
capture_output=True
)

tu.s3_to_trino(
headers=df.columns,
tablename=f"hive.cld_{VERSION}.ref_policy_reporter_cbsa_xwalk_tmp",
s3_loc=f"s3://turquoise-health-payer-export-main/cld/{VERSION}/policy_reporter_cbsa_xwalk/",
query=query,
trino_conn=trino_conn
)

query.execute_query(f"""
CREATE TABLE hive.cld_{VERSION}.ref_policy_reporter_cbsa_xwalk AS
SELECT
state,
cbsa,
cbsa_description,
policy_reporter_cbsa,
CAST(policy_reporter_fuzzy_score AS INTEGER) as policy_reporter_fuzzy_score
FROM hive.cld_{VERSION}.ref_policy_reporter_cbsa_xwalk_tmp
""", con=trino_conn)

query.execute_query(f"""
DROP TABLE hive.cld_{VERSION}.ref_policy_reporter_cbsa_xwalk_tmp
""", con=trino_conn)

os.remove("policy_reporter_cbsa_xwalk.csv")

On this page: