db_operations
funcwait_for_schema(dataset, expected_columns, timeout_seconds=SCHEMA_POLL_TIMEOUT_SECONDS, interval_seconds=SCHEMA_POLL_INTERVAL_SECONDS) → NoneWait for Foundry schema to be visible with expected columns.
Polls the dataset until the schema matches expected columns or timeout.
paramdatasetAnyFoundry dataset object with query_foundry_sql method
paramexpected_columnslist[str]List of column names that should be visible
paramtimeout_secondsint= SCHEMA_POLL_TIMEOUT_SECONDSMaximum time to wait (default: SCHEMA_POLL_TIMEOUT_SECONDS)
paraminterval_secondsint= SCHEMA_POLL_INTERVAL_SECONDSPolling interval (default: SCHEMA_POLL_INTERVAL_SECONDS)
Returns
Nonefuncdedupe_records(records, key_fields) → list[dict[str, Any]]Deduplicate records by key fields, keeping the last occurrence.
paramrecordslist[dict[str, Any]]List of record dicts to deduplicate
paramkey_fieldslist[str]Field names to use as deduplication key
Returns
listDeduplicated records (last occurrence wins)
funcdedupe_dataframe(df, key_fields, table_name) → pd.DataFrameDeduplicate DataFrame rows by key fields, keeping the last occurrence.
paramdfpd.DataFrameDataFrame to deduplicate
paramkey_fieldslist[str]Column names to use as deduplication key
paramtable_namestrTable name for logging
Returns
pandas.DataFrameDeduplicated DataFrame
funcdrop_placeholder(df) → pd.DataFrameRemove placeholder rows from DataFrame.
paramdfpd.DataFrameDataFrame that may contain placeholder rows
Returns
pandas.DataFrameDataFrame with placeholder rows removed
funcfoundry_merge_upload(dm, records, key_fields, table_name, force, diff) → intMerge and upload records to Foundry dataset.
Handles force (overwrite) and diff (skip existing) modes for Foundry which requires loading all data, merging, and writing in one transaction.
paramdmDataManagerDataManager instance for the table
paramrecordslist[dict[str, Any]]Records to upload
paramkey_fieldslist[str]Fields used for deduplication/matching
paramtable_namestrTable name for logging
paramforceboolIf True, overwrite existing records with matching keys
paramdiffboolIf True, skip records with keys already in database
Returns
intNumber of records uploaded
funclocal_upload_with_modes(dm, records, key_fields, table_name, force, diff) → intUpload records to a local table (SQLite or CSV) with force/diff/default behavior.
paramdmDataManagerDataManager instance for the target table
paramrecordslist[dict[str, Any]]Records to upload
paramkey_fieldslist[str]Key fields used for duplicate detection and overwrite logic
paramtable_namestrTable name for logging and error messages
paramforceboolIf True, delete existing rows matching keys before insert
paramdiffboolIf True, skip rows that already exist by key
Returns
intNumber of records inserted
funcupload_records(records, table_name, key_fields, force=False, diff=False) → intUpload records to database with deduplication and backend dispatch.
Deduplicates input records, then routes to the appropriate backend (Foundry or SQLite) with the specified conflict-resolution mode.
paramrecordslist[dict[str, Any]]Records to upload
paramtable_namestrTarget table name
paramkey_fieldslist[str]Fields used for duplicate detection
paramforcebool= FalseIf True, overwrite existing records with matching keys
paramdiffbool= FalseIf True, skip records with keys already in database
Returns
intNumber of records uploaded
funcinit_sqlite_tables(tables_to_init, reset=False) → dict[str, bool]Initialize SQLite database tables with schema via placeholder records.
Creates the database file if needed, then creates each table by inserting and removing a placeholder record (mirroring the Foundry initialization pattern). With reset=True, drops existing tables before recreating them.
paramtables_to_initlist[tuple[str, dict, list[str]]]List of (table_name, placeholder_record, columns) to initialize
paramresetbool= FalseDrop and recreate tables that already exist
Returns
dict{table_name: was_created}
funcinit_csv_tables(tables_to_init, reset=False) → dict[str, bool]Initialize CSV file tables with headers.
Creates CSV files for each table with column headers only. With reset=True, overwrites existing files.
paramtables_to_initlist[tuple[str, dict, list[str]]]List of (table_name, placeholder_record, columns) to initialize
paramresetbool= FalseOverwrite existing CSV files
Returns
dict{table_name: was_created}
funcquery_existing_hashes(table_name) → set[str]Query database for all existing hashes in a table (excludes placeholder).
paramtable_namestrTable name to query
Returns
setSet of hash values currently in table
funcinit_foundry_tables(tables_to_init, reset=False) → dict[str, bool]Initialize Foundry dataset tables.
Creates Foundry datasets with placeholder records to establish schemas.
paramtables_to_initlist[tuple[str, dict, list[str]]]List of (table_name, placeholder_record, columns) to initialize
paramresetbool= FalseRecreate tables even if they exist
Returns
dict{table_name: was_created}
