destinations.job_client_impl
SqlLoadJob Objects
class SqlLoadJob(LoadJob)
A job executing sql statement, without followup trait
SqlJobClientBase Objects
class SqlJobClientBase(JobClientBase, WithStateSync)
INFO_TABLES_QUERY_THRESHOLD
Fallback to querying all tables in the information schema if checking more than threshold
drop_tables
def drop_tables(*tables: str, delete_schema: bool = True) -> None
Drop tables in destination database and optionally delete the stored schema as well. Clients that support ddl transactions will have both operations performed in a single transaction.
Arguments:
tables
- Names of tables to drop.delete_schema
- If True, also delete all versions of the current schema from storage
maybe_ddl_transaction
@contextlib.contextmanager
def maybe_ddl_transaction() -> Iterator[None]
Begins a transaction if sql client supports it, otherwise works in auto commit.
create_table_chain_completed_followup_jobs
def create_table_chain_completed_followup_jobs(
table_chain: Sequence[TTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None
) -> List[NewLoadJob]
Creates a list of followup jobs for merge write disposition and staging replace strategies
start_file_load
def start_file_load(table: TTableSchema, file_path: str,
load_id: str) -> LoadJob
Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs
restore_file_load
def restore_file_load(file_path: str) -> LoadJob
Returns a completed SqlLoadJob or None to let derived classes to handle their specific jobs
Returns completed jobs as SqlLoadJob is executed atomically in start_file_load so any jobs that should be recreated are already completed. Obviously the case of asking for jobs that were never created will not be handled. With correctly implemented loader that cannot happen.
Arguments:
file_path
str - a path to a job file
Returns:
LoadJob
- A restored job or none
get_storage_tables
def get_storage_tables(
table_names: Iterable[str]
) -> Iterable[Tuple[str, TTableSchemaColumns]]
Uses INFORMATION_SCHEMA to retrieve table and column information for tables in table_names
iterator.
Table names should be normalized according to naming convention and will be further converted to desired casing
in order to (in most cases) create case-insensitive name suitable for search in information schema.
The column names are returned as in information schema. To match those with columns in existing table, you'll need to use
schema.get_new_table_columns
method and pass the correct casing. Most of the casing function are irreversible so it is not
possible to convert identifiers into INFORMATION SCHEMA back into case sensitive dlt schema.
get_storage_table
def get_storage_table(table_name: str) -> Tuple[bool, TTableSchemaColumns]
Uses get_storage_tables to get single table_name
schema.
Returns (True, ...) if table exists and (False, {}) when not