-
Notifications
You must be signed in to change notification settings - Fork 143
[DEV-13929] add hash key for incremental updates #4547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: qat
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test(s) for the load commands to ensure that they produce the expected tables?. There are some patterns in this file that we may be able to use.
| .join(fa, on=taa.federal_account_id == fa.id, how="leftouter") | ||
| .join(ta, on=fa.parent_toptier_agency_id == ta.toptier_agency_id, how="leftouter") | ||
| .withColumn("submission_period", fy_quarter_period()) | ||
| .withColumn("merge_hash_key", sf.xxhash64("*")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hash column should be added after the select statement. Otherwise this column will not be included in the returned dataframe. Also, we just want to include the columns in the select as inputs to the hashing algorithm.
| ts.national_interest_action.alias("national_interest_action_code"), | ||
| ts.national_interest_desc.alias("national_interest_action"), | ||
| sa.reporting_agency_name.alias("reporting_agency_name"), | ||
| sf.when( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic to create this column is in the function fy_quarter_period in the account_balances_download file. We could move it to a utils files or similar and use it here and elsewhere that we need to determine if we're dealing with a quarter or a period.
| .alias("prime_award_summary_recipient_cd_current"), | ||
| sf.coalesce( | ||
| ts.legal_entity_zip4, | ||
| sf.concat(ts.recipient_location_zip5.StringType(), ts.legal_entity_zip_last4.StringType()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to use cast here:
| sf.concat(ts.recipient_location_zip5.StringType(), ts.legal_entity_zip_last4.StringType()), | |
| sf.concat(ts.recipient_location_zip5.cast(StringType()), ts.legal_entity_zip_last4.cast(StringType())), |
| .join(defc, on=defc.code == fabpaoc.disaster_emergency_fund_code, how="left") | ||
| .join(cgac_aid, on=taa.agency_id == cgac_aid.cgac_code, how="left") | ||
| .join(cgac_ata, on=cgac_ata.cgac_code == taa.allocation_transfer_agency_id, how="left") | ||
| .withColumn("merge_hash_key", sf.xxhash64("*")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, this will need to be after the select.
| .join(tta, on=tta.toptier_agency_id == taa.funding_toptier_agency_id, how="left") | ||
| .join(cgac_aid, on=cgac_aid.code == taa.agency_id, how="left") | ||
| .join(cgac_ata, on=cgac_ata.cgac_code == taa.allocation_transfer_agency_id, how="left") | ||
| .withColumn("merge_hash_key", sf.xxhash64("*")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will have to be after the select.
| sa.reporting_fiscal_quarter, | ||
| sa.reporting_fiscal_year, | ||
| sa.quarter_format_flag, | ||
| sf.when( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above, can reuse the code from account_balances_download
| ts.national_interest_action.alias("national_interest_action_code"), | ||
| ts.national_interest_desc.alias("national_interest_action"), | ||
| sa.reporting_agency_name.alias("reporting_agency_name"), | ||
| sf.when( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .withColumn("submission_period", fy_quarter_period()) line adds this to the dataframe, but you will also need to add sf.col("submission_period"), to the select statement in order to have the column present in the result.
Description:
This adds hashing for incremental updates for rpt.account_balances_download, rpt.object_class_program_activity_download, rpt.award_financial_download using a similar implementation as transaction search (#4497)
Technical Details:
I updated rpt.object_class_program_activity_download and rpt.award_finanical_download to be dataframes since they were going to be changed anyway and this way all three files could follow the same pattern.
To add change data feed to the tables is something that's done directly to the tables not an api change. Within databricks-qat for each table I did:
ALTER TABLE {{table}} SET TBLPROPERTIES (delta.enableChangeDataFeed = true);Requirements for PR Merge:
Explain N/A in above checklist: