MDFactoryMDFactory

db_operations

funcwait_for_schema(dataset, expected_columns, timeout_seconds=SCHEMA_POLL_TIMEOUT_SECONDS, interval_seconds=SCHEMA_POLL_INTERVAL_SECONDS)None

Wait for Foundry schema to be visible with expected columns.

Polls the dataset until the schema matches expected columns or timeout.

paramdatasetAny

Foundry dataset object with query_foundry_sql method

paramexpected_columnslist[str]

List of column names that should be visible

paramtimeout_secondsint
= SCHEMA_POLL_TIMEOUT_SECONDS

Maximum time to wait (default: SCHEMA_POLL_TIMEOUT_SECONDS)

paraminterval_secondsint
= SCHEMA_POLL_INTERVAL_SECONDS

Polling interval (default: SCHEMA_POLL_INTERVAL_SECONDS)

Returns

None
funcdedupe_records(records, key_fields)list[dict[str, Any]]

Deduplicate records by key fields, keeping the last occurrence.

paramrecordslist[dict[str, Any]]

List of record dicts to deduplicate

paramkey_fieldslist[str]

Field names to use as deduplication key

Returns

list

Deduplicated records (last occurrence wins)

funcdedupe_dataframe(df, key_fields, table_name)pd.DataFrame

Deduplicate DataFrame rows by key fields, keeping the last occurrence.

paramdfpd.DataFrame

DataFrame to deduplicate

paramkey_fieldslist[str]

Column names to use as deduplication key

paramtable_namestr

Table name for logging

Returns

pandas.DataFrame

Deduplicated DataFrame

funcdrop_placeholder(df)pd.DataFrame

Remove placeholder rows from DataFrame.

paramdfpd.DataFrame

DataFrame that may contain placeholder rows

Returns

pandas.DataFrame

DataFrame with placeholder rows removed

funcfoundry_merge_upload(dm, records, key_fields, table_name, force, diff)int

Merge and upload records to Foundry dataset.

Handles force (overwrite) and diff (skip existing) modes for Foundry which requires loading all data, merging, and writing in one transaction.

DataManager instance for the table

paramrecordslist[dict[str, Any]]

Records to upload

paramkey_fieldslist[str]

Fields used for deduplication/matching

paramtable_namestr

Table name for logging

paramforcebool

If True, overwrite existing records with matching keys

paramdiffbool

If True, skip records with keys already in database

Returns

int

Number of records uploaded

funclocal_upload_with_modes(dm, records, key_fields, table_name, force, diff)int

Upload records to a local table (SQLite or CSV) with force/diff/default behavior.

DataManager instance for the target table

paramrecordslist[dict[str, Any]]

Records to upload

paramkey_fieldslist[str]

Key fields used for duplicate detection and overwrite logic

paramtable_namestr

Table name for logging and error messages

paramforcebool

If True, delete existing rows matching keys before insert

paramdiffbool

If True, skip rows that already exist by key

Returns

int

Number of records inserted

funcupload_records(records, table_name, key_fields, force=False, diff=False)int

Upload records to database with deduplication and backend dispatch.

Deduplicates input records, then routes to the appropriate backend (Foundry or SQLite) with the specified conflict-resolution mode.

paramrecordslist[dict[str, Any]]

Records to upload

paramtable_namestr

Target table name

paramkey_fieldslist[str]

Fields used for duplicate detection

paramforcebool
= False

If True, overwrite existing records with matching keys

paramdiffbool
= False

If True, skip records with keys already in database

Returns

int

Number of records uploaded

funcinit_sqlite_tables(tables_to_init, reset=False)dict[str, bool]

Initialize SQLite database tables with schema via placeholder records.

Creates the database file if needed, then creates each table by inserting and removing a placeholder record (mirroring the Foundry initialization pattern). With reset=True, drops existing tables before recreating them.

paramtables_to_initlist[tuple[str, dict, list[str]]]

List of (table_name, placeholder_record, columns) to initialize

paramresetbool
= False

Drop and recreate tables that already exist

Returns

dict

{table_name: was_created}

funcinit_csv_tables(tables_to_init, reset=False)dict[str, bool]

Initialize CSV file tables with headers.

Creates CSV files for each table with column headers only. With reset=True, overwrites existing files.

paramtables_to_initlist[tuple[str, dict, list[str]]]

List of (table_name, placeholder_record, columns) to initialize

paramresetbool
= False

Overwrite existing CSV files

Returns

dict

{table_name: was_created}

funcquery_existing_hashes(table_name)set[str]

Query database for all existing hashes in a table (excludes placeholder).

paramtable_namestr

Table name to query

Returns

set

Set of hash values currently in table

funcinit_foundry_tables(tables_to_init, reset=False)dict[str, bool]

Initialize Foundry dataset tables.

Creates Foundry datasets with placeholder records to establish schemas.

paramtables_to_initlist[tuple[str, dict, list[str]]]

List of (table_name, placeholder_record, columns) to initialize

paramresetbool
= False

Recreate tables even if they exist

Returns

dict

{table_name: was_created}