Adding Stateful Ingestion to a Source
Currently, datahub supports the Stale Metadata Removal and the Redunant Run Elimination use-cases on top of the more generic stateful ingestion capability available for the sources. This document describes how to add support for these two use-cases to new sources.
Adding Stale Metadata Removal to a Source
Adding the stale metadata removal use-case to a new source involves modifying the source config, source report, and the source itself.
For a full example of all changes required: Adding stale metadata removal to the MongoDB source.
The datahub.ingestion.source.state.stale_entity_removal_handler module provides the supporting infrastructure for all the steps described above and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these steps along with examples.
1. Modify the source config
The source's config must inherit from StatefulIngestionConfigBase
, and should declare a field named stateful_ingestion
of type Optional[StatefulStaleMetadataRemovalConfig]
.
Example:
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
StatefulIngestionConfigBase,
)
class MySourceConfig(StatefulIngestionConfigBase):
# ...<other config params>...
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
2. Modify the source report
The report class of the source should inherit from StaleEntityRemovalSourceReport
instead of SourceReport
.
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
@dataclass
class MySourceReport(StatefulIngestionReport):
# <other fields here>
pass
3. Modify the source
- The source must inherit from
StatefulIngestionSourceBase
instead ofSource
. - The source should contain a custom
get_workunit_processors
method.
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionSourceBase
from datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler
class MySource(StatefulIngestionSourceBase):
def __init__(self, config: MySourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.config = config
self.report = MySourceReport()
# other initialization code here
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
# other methods here
Adding Redundant Run Elimination to a Source
This use-case applies to the sources that drive ingestion by querying logs over a specified duration via the config(such as snowflake usage, bigquery usage etc.). It typically involves expensive and long-running queries. To add redundant run elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user error or a scheduler malfunction), the following steps are required.
- Update the
SourceConfig
- Update the
SourceReport
- Modify the
Source
to- Instantiate the RedundantRunSkipHandler object.
- Check if the current run should be skipped.
- Update the state for the current run(start & end times).
The datahub.ingestion.source.state.redundant_run_skip_handler modules provides the supporting infrastructure required for all the steps described above.
NOTE: The handler currently uses a simple state, the BaseUsageCheckpointState, across all sources it supports (unlike the StaleEntityRemovalHandler).
1. Modifying the SourceConfig
The SourceConfig
must inherit from the StatefulRedundantRunSkipConfig class.
Examples:
- Snowflake Usage
from datahub.ingestion.source.state.redundant_run_skip_handler import (
StatefulRedundantRunSkipConfig,
)
class SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):
pass
2. Modifying the SourceReport
The SourceReport
must inherit from the StatefulIngestionReport class.
Examples:
- Snowflake Usage
@dataclass
class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
# <members specific to snowflake usage report>
3. Modifying the Source
The source must inherit from StatefulIngestionSourceBase
.
3.1 Instantiate RedundantRunSkipHandler in the __init__
method of the source.
The source should instantiate an instance of the RedundantRunSkipHandler
in its __init__
method.
Examples:
Snowflake Usage
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
class SnowflakeUsageSource(StatefulIngestionSourceBase):
def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext):
super(SnowflakeUsageSource, self).__init__(config, ctx)
self.config: SnowflakeUsageConfig = config
self.report: SnowflakeUsageReport = SnowflakeUsageReport()
# Create and register the stateful ingestion use-case handlers.
self.redundant_run_skip_handler = RedundantRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
3.2 Checking if the current run should be skipped.
The sources can query if the current run should be skipped using should_skip_this_run
method of RedundantRunSkipHandler
. This should done from the get_workunits
method, before doing any other work.
Example code:
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Skip a redundant run
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
):
return
# Generate the workunits.
3.3 Updating the state for the current run.
The source should use the update_state
method of RedundantRunSkipHandler
to update the current run's state if the run has not been skipped. This step can be performed in the get_workunits
if the run has not been skipped.
Example code:
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Skip a redundant run
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=self.config.start_time
):
return
# Generate the workunits.
# <code for generating the workunits>
# Update checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=self.config.start_time,
end_time_millis=self.config.end_time,
)