Skip to content

Conversation

@loreleitrimberger
Copy link
Contributor

@loreleitrimberger loreleitrimberger commented Nov 18, 2025

Description:

This adds hashing for incremental updates for rpt.account_balances_download, rpt.object_class_program_activity_download, rpt.award_financial_download using a similar implementation as transaction search (#4497)

Technical Details:

I updated rpt.object_class_program_activity_download and rpt.award_finanical_download to be dataframes since they were going to be changed anyway and this way all three files could follow the same pattern.

To add change data feed to the tables is something that's done directly to the tables not an api change. Within databricks-qat for each table I did:
ALTER TABLE {{table}} SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

Requirements for PR Merge:

  1. Unit & integration tests updated
  2. N/A API documentation updated (examples listed below)
    1. API Contracts
    2. API UI
    3. Comments
  3. N/A Data validation completed (examples listed below)
    1. Does this work well with the current frontend? Or is the frontend aware of a needed change?
    2. Is performance impacted in the changes (e.g., API, pipeline, downloads, etc.)?
    3. Is the expected data returned with the expected format?
  4. N/A Appropriate Operations ticket(s) created
  5. Jira Ticket(s)
    1. DEV-13939

Explain N/A in above checklist:

Copy link
Contributor

@zachflanders-frb zachflanders-frb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test(s) for the load commands to ensure that they produce the expected tables?. There are some patterns in this file that we may be able to use.

.join(fa, on=taa.federal_account_id == fa.id, how="leftouter")
.join(ta, on=fa.parent_toptier_agency_id == ta.toptier_agency_id, how="leftouter")
.withColumn("submission_period", fy_quarter_period())
.withColumn("merge_hash_key", sf.xxhash64("*"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hash column should be added after the select statement. Otherwise this column will not be included in the returned dataframe. Also, we just want to include the columns in the select as inputs to the hashing algorithm.

ts.national_interest_action.alias("national_interest_action_code"),
ts.national_interest_desc.alias("national_interest_action"),
sa.reporting_agency_name.alias("reporting_agency_name"),
sf.when(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic to create this column is in the function fy_quarter_period in the account_balances_download file. We could move it to a utils files or similar and use it here and elsewhere that we need to determine if we're dealing with a quarter or a period.

.alias("prime_award_summary_recipient_cd_current"),
sf.coalesce(
ts.legal_entity_zip4,
sf.concat(ts.recipient_location_zip5.StringType(), ts.legal_entity_zip_last4.StringType()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to use cast here:

Suggested change
sf.concat(ts.recipient_location_zip5.StringType(), ts.legal_entity_zip_last4.StringType()),
sf.concat(ts.recipient_location_zip5.cast(StringType()), ts.legal_entity_zip_last4.cast(StringType())),

.join(defc, on=defc.code == fabpaoc.disaster_emergency_fund_code, how="left")
.join(cgac_aid, on=taa.agency_id == cgac_aid.cgac_code, how="left")
.join(cgac_ata, on=cgac_ata.cgac_code == taa.allocation_transfer_agency_id, how="left")
.withColumn("merge_hash_key", sf.xxhash64("*"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, this will need to be after the select.

.join(tta, on=tta.toptier_agency_id == taa.funding_toptier_agency_id, how="left")
.join(cgac_aid, on=cgac_aid.code == taa.agency_id, how="left")
.join(cgac_ata, on=cgac_ata.cgac_code == taa.allocation_transfer_agency_id, how="left")
.withColumn("merge_hash_key", sf.xxhash64("*"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have to be after the select.

sa.reporting_fiscal_quarter,
sa.reporting_fiscal_year,
sa.quarter_format_flag,
sf.when(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above, can reuse the code from account_balances_download

ts.national_interest_action.alias("national_interest_action_code"),
ts.national_interest_desc.alias("national_interest_action"),
sa.reporting_agency_name.alias("reporting_agency_name"),
sf.when(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .withColumn("submission_period", fy_quarter_period()) line adds this to the dataframe, but you will also need to add sf.col("submission_period"), to the select statement in order to have the column present in the result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants