DagsterDocs

Source code for dagster.core.instance

import inspect
import logging
import os
import sys
import tempfile
import time
import warnings
import weakref
from collections import defaultdict
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Union

import yaml
from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.definitions.pipeline import PipelineDefinition, PipelineSubsetDefinition
from dagster.core.definitions.pipeline_base import InMemoryPipeline
from dagster.core.errors import (
    DagsterHomeNotSetError,
    DagsterInvariantViolationError,
    DagsterNoStepsToExecuteException,
    DagsterRunAlreadyExists,
    DagsterRunConflict,
)
from dagster.core.storage.pipeline_run import (
    PipelineRun,
    PipelineRunStatus,
    PipelineRunsFilter,
    RunRecord,
)
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
from dagster.core.system_config.objects import ResolvedRunConfig
from dagster.core.utils import str_format_list
from dagster.serdes import ConfigurableClass
from dagster.seven import get_current_datetime_in_utc
from dagster.utils.error import serializable_error_info_from_exc_info

from .config import DAGSTER_CONFIG_YAML_FILENAME, is_dagster_home_set
from .ref import InstanceRef

# 'airflow_execution_date' and 'is_airflow_ingest_pipeline' are hardcoded tags used in the
# airflow ingestion logic (see: dagster_pipeline_factory.py). 'airflow_execution_date' stores the
# 'execution_date' used in Airflow operator execution and 'is_airflow_ingest_pipeline' determines
# whether 'airflow_execution_date' is needed.
# https://github.com/dagster-io/dagster/issues/2403
AIRFLOW_EXECUTION_DATE_STR = "airflow_execution_date"
IS_AIRFLOW_INGEST_PIPELINE_STR = "is_airflow_ingest_pipeline"


if TYPE_CHECKING:
    from dagster.core.events import DagsterEvent, DagsterEventType
    from dagster.core.host_representation import HistoricalPipeline
    from dagster.core.snap import PipelineSnapshot
    from dagster.daemon.types import DaemonHeartbeat


def is_memoized_run(tags):
    return tags is not None and MEMOIZED_RUN_TAG in tags and tags.get(MEMOIZED_RUN_TAG) == "true"


def _check_run_equality(pipeline_run, candidate_run):
    check.inst_param(pipeline_run, "pipeline_run", PipelineRun)
    check.inst_param(candidate_run, "candidate_run", PipelineRun)

    field_diff = {}
    for field in pipeline_run._fields:
        expected_value = getattr(pipeline_run, field)
        candidate_value = getattr(candidate_run, field)
        if expected_value != candidate_value:
            field_diff[field] = (expected_value, candidate_value)

    return field_diff


def _format_field_diff(field_diff):
    return "\n".join(
        [
            (
                "    {field_name}:\n"
                + "        Expected: {expected_value}\n"
                + "        Received: {candidate_value}"
            ).format(
                field_name=field_name,
                expected_value=expected_value,
                candidate_value=candidate_value,
            )
            for field_name, (
                expected_value,
                candidate_value,
            ) in field_diff.items()
        ]
    )


class _EventListenerLogHandler(logging.Handler):
    def __init__(self, instance):
        self._instance = instance
        super(_EventListenerLogHandler, self).__init__()

    def emit(self, record):
        from dagster.core.events.log import construct_event_record, StructuredLoggerMessage

        try:
            event = construct_event_record(
                StructuredLoggerMessage(
                    name=record.name,
                    message=record.msg,
                    level=record.levelno,
                    meta=record.dagster_meta,
                    record=record,
                )
            )

            self._instance.handle_new_event(event)

        except Exception as e:  # pylint: disable=W0703
            logging.critical("Error during instance event listen")
            logging.exception(str(e))
            raise


class InstanceType(Enum):
    PERSISTENT = "PERSISTENT"
    EPHEMERAL = "EPHEMERAL"


class MayHaveInstanceWeakref:
    """Mixin for classes that can have a weakref back to a Dagster instance."""

    def __init__(self):
        self._instance_weakref: weakref.ref = None

    @property
    def _instance(self):
        return (
            self._instance_weakref()
            # Backcompat with custom subclasses that don't call super().__init__()
            # in their own __init__ implementations
            if (hasattr(self, "_instance_weakref") and self._instance_weakref is not None)
            else None
        )

    def register_instance(self, instance):
        check.inst_param(instance, "instance", DagsterInstance)
        check.invariant(
            # Backcompat with custom subclasses that don't call super().__init__()
            # in their own __init__ implementations
            (not hasattr(self, "_instance_weakref") or self._instance_weakref is None),
            "Must only call initialize once",
        )

        # Store a weakref to avoid a circular reference / enable GC
        self._instance_weakref = weakref.ref(instance)


[docs]class DagsterInstance: """Core abstraction for managing Dagster's access to storage and other resources. Use DagsterInstance.get() to grab the current DagsterInstance which will load based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Alternatively, DagsterInstance.ephemeral() can use used which provides a set of transient in-memory components. Configuration of this class should be done by setting values in ``$DAGSTER_HOME/dagster.yaml``. For example, to use Postgres for run and event log storage, you can write a ``dagster.yaml`` such as the following: .. literalinclude:: ../../../../docs/sections/deploying/postgres_dagster.yaml :caption: dagster.yaml :language: YAML Args: instance_type (InstanceType): Indicates whether the instance is ephemeral or persistent. Users should not attempt to set this value directly or in their ``dagster.yaml`` files. local_artifact_storage (LocalArtifactStorage): The local artifact storage is used to configure storage for any artifacts that require a local disk, such as schedules, or when using the filesystem system storage to manage files and intermediates. By default, this will be a :py:class:`dagster.core.storage.root.LocalArtifactStorage`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. run_storage (RunStorage): The run storage is used to store metadata about ongoing and past pipeline runs. By default, this will be a :py:class:`dagster.core.storage.runs.SqliteRunStorage`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. event_storage (EventLogStorage): Used to store the structured event logs generated by pipeline runs. By default, this will be a :py:class:`dagster.core.storage.event_log.SqliteEventLogStorage`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. compute_log_manager (ComputeLogManager): The compute log manager handles stdout and stderr logging for solid compute functions. By default, this will be a :py:class:`dagster.core.storage.local_compute_log_manager.LocalComputeLogManager`. Configurable in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery. run_coordinator (RunCoordinator): A runs coordinator may be used to manage the execution of pipeline runs. run_launcher (Optional[RunLauncher]): Optionally, a run launcher may be used to enable a Dagster instance to launch pipeline runs, e.g. on a remote Kubernetes cluster, in addition to running them locally. settings (Optional[Dict]): Specifies certain per-instance settings, such as feature flags. These are set in the ``dagster.yaml`` under a set of whitelisted keys. ref (Optional[InstanceRef]): Used by internal machinery to pass instances across process boundaries. """ _PROCESS_TEMPDIR = None def __init__( self, instance_type, local_artifact_storage, run_storage, event_storage, compute_log_manager, schedule_storage=None, scheduler=None, run_coordinator=None, run_launcher=None, settings=None, ref=None, ): from dagster.core.storage.compute_log_manager import ComputeLogManager from dagster.core.storage.event_log import EventLogStorage from dagster.core.storage.root import LocalArtifactStorage from dagster.core.storage.runs import RunStorage from dagster.core.storage.schedules import ScheduleStorage from dagster.core.scheduler import Scheduler from dagster.core.run_coordinator import RunCoordinator from dagster.core.launcher import RunLauncher self._instance_type = check.inst_param(instance_type, "instance_type", InstanceType) self._local_artifact_storage = check.inst_param( local_artifact_storage, "local_artifact_storage", LocalArtifactStorage ) self._event_storage = check.inst_param(event_storage, "event_storage", EventLogStorage) self._event_storage.register_instance(self) self._run_storage = check.inst_param(run_storage, "run_storage", RunStorage) self._run_storage.register_instance(self) self._compute_log_manager = check.inst_param( compute_log_manager, "compute_log_manager", ComputeLogManager ) self._compute_log_manager.register_instance(self) self._scheduler = check.opt_inst_param(scheduler, "scheduler", Scheduler) self._schedule_storage = check.opt_inst_param( schedule_storage, "schedule_storage", ScheduleStorage ) if self._schedule_storage: self._schedule_storage.register_instance(self) self._run_coordinator = check.inst_param(run_coordinator, "run_coordinator", RunCoordinator) self._run_coordinator.register_instance(self) if hasattr(self._run_coordinator, "initialize") and inspect.ismethod( getattr(self._run_coordinator, "initialize") ): warnings.warn( "The initialize method on RunCoordinator has been deprecated as of 0.11.0 and will " "no longer be called during DagsterInstance init. Instead, the DagsterInstance " "will be made automatically available on any run coordinator associated with a " "DagsterInstance. In test, you may need to call RunCoordinator.register_instance() " "(mixed in from MayHaveInstanceWeakref). If you need to make use of the instance " "to set up your custom RunCoordinator, you should override " "RunCoordintor.register_instance(). This warning will be removed in 0.12.0." ) self._run_launcher = check.inst_param(run_launcher, "run_launcher", RunLauncher) self._run_launcher.register_instance(self) if hasattr(self._run_launcher, "initialize") and inspect.ismethod( getattr(self._run_launcher, "initialize") ): warnings.warn( "The initialize method on RunLauncher has been deprecated as of 0.11.0 and will " "no longer be called during DagsterInstance init. Instead, the DagsterInstance " "will be made automatically available on any run launcher associated with a " "DagsterInstance. In test, you may need to call RunLauncher.register_instance() " "(mixed in from MayHaveInstanceWeakref). If you need to make use of the instance " "to set up your custom RunLauncher, you should override " "RunLauncher.register_instance(). This warning will be removed in 0.12.0." ) self._settings = check.opt_dict_param(settings, "settings") self._ref = check.opt_inst_param(ref, "ref", InstanceRef) self._subscribers = defaultdict(list) # ctors @staticmethod def ephemeral(tempdir=None, preload=None): from dagster.core.run_coordinator import DefaultRunCoordinator from dagster.core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher from dagster.core.storage.event_log import InMemoryEventLogStorage from dagster.core.storage.root import LocalArtifactStorage from dagster.core.storage.runs import InMemoryRunStorage from dagster.core.storage.noop_compute_log_manager import NoOpComputeLogManager if tempdir is None: tempdir = DagsterInstance.temp_storage() return DagsterInstance( InstanceType.EPHEMERAL, local_artifact_storage=LocalArtifactStorage(tempdir), run_storage=InMemoryRunStorage(preload=preload), event_storage=InMemoryEventLogStorage(preload=preload), compute_log_manager=NoOpComputeLogManager(), run_coordinator=DefaultRunCoordinator(), run_launcher=SyncInMemoryRunLauncher(), ) @staticmethod def get(): dagster_home_path = os.getenv("DAGSTER_HOME") if not dagster_home_path: raise DagsterHomeNotSetError( ( "The environment variable $DAGSTER_HOME is not set. \n" "Dagster requires this environment variable to be set to an existing directory in your filesystem. " "This directory is used to store metadata across sessions, or load the dagster.yaml " "file which can configure storing metadata in an external database.\n" "You can resolve this error by exporting the environment variable. For example, you can run the following command in your shell or include it in your shell configuration file:\n" '\texport DAGSTER_HOME="~/dagster_home"\n' "Alternatively, DagsterInstance.ephemeral() can be used for a transient instance.\n" ) ) dagster_home_path = os.path.expanduser(dagster_home_path) if not os.path.isabs(dagster_home_path): raise DagsterInvariantViolationError( ( '$DAGSTER_HOME "{}" must be an absolute path. Dagster requires this ' "environment variable to be set to an existing directory in your filesystem." ).format(dagster_home_path) ) if not (os.path.exists(dagster_home_path) and os.path.isdir(dagster_home_path)): raise DagsterInvariantViolationError( ( '$DAGSTER_HOME "{}" is not a directory or does not exist. Dagster requires this ' "environment variable to be set to an existing directory in your filesystem" ).format(dagster_home_path) ) return DagsterInstance.from_config(dagster_home_path) @staticmethod def local_temp(tempdir=None, overrides=None): if tempdir is None: tempdir = DagsterInstance.temp_storage() return DagsterInstance.from_ref(InstanceRef.from_dir(tempdir, overrides=overrides)) @staticmethod def from_config( config_dir, config_filename=DAGSTER_CONFIG_YAML_FILENAME, ): instance_ref = InstanceRef.from_dir(config_dir, config_filename=config_filename) return DagsterInstance.from_ref(instance_ref) @staticmethod def from_ref(instance_ref): check.inst_param(instance_ref, "instance_ref", InstanceRef) # DagsterInstance doesn't implement ConfigurableClass, but we may still sometimes want to # have custom subclasses of DagsterInstance. This machinery allows for those custom # subclasses to receive additional keyword arguments passed through the config YAML. Note # that unlike a ConfigurableClass, these additional arguments are not type checked -- the # raw Python dict returned by yaml.load is just splatted into kwargs. klass = instance_ref.custom_instance_class or DagsterInstance kwargs = instance_ref.custom_instance_class_config return klass( instance_type=InstanceType.PERSISTENT, local_artifact_storage=instance_ref.local_artifact_storage, run_storage=instance_ref.run_storage, event_storage=instance_ref.event_storage, compute_log_manager=instance_ref.compute_log_manager, schedule_storage=instance_ref.schedule_storage, scheduler=instance_ref.scheduler, run_coordinator=instance_ref.run_coordinator, run_launcher=instance_ref.run_launcher, settings=instance_ref.settings, ref=instance_ref, **kwargs, ) # flags @property def is_persistent(self): return self._instance_type == InstanceType.PERSISTENT @property def is_ephemeral(self): return self._instance_type == InstanceType.EPHEMERAL def get_ref(self): if self._ref: return self._ref check.failed( "Attempted to prepare an ineligible DagsterInstance ({inst_type}) for cross " "process communication.{dagster_home_msg}".format( inst_type=self._instance_type, dagster_home_msg="\nDAGSTER_HOME environment variable is not set, set it to " "a directory on the filesystem for dagster to use for storage and cross " "process coordination." if os.getenv("DAGSTER_HOME") is None else "", ) ) @property def root_directory(self): return self._local_artifact_storage.base_dir @staticmethod def temp_storage(): if DagsterInstance._PROCESS_TEMPDIR is None: DagsterInstance._PROCESS_TEMPDIR = tempfile.TemporaryDirectory() return DagsterInstance._PROCESS_TEMPDIR.name def _info(self, component): # ConfigurableClass may not have inst_data if it's a direct instantiation # which happens for ephemeral instances if isinstance(component, ConfigurableClass) and component.inst_data: return component.inst_data.info_dict() if type(component) is dict: return component return component.__class__.__name__ def _info_str_for_component(self, component_name, component): return yaml.dump( {component_name: self._info(component)}, default_flow_style=False, sort_keys=False ) def info_dict(self): settings = self._settings if self._settings else {} ret = { "local_artifact_storage": self._info(self._local_artifact_storage), "run_storage": self._info(self._run_storage), "event_log_storage": self._info(self._event_storage), "compute_logs": self._info(self._compute_log_manager), "schedule_storage": self._info(self._schedule_storage), "scheduler": self._info(self._scheduler), "run_coordinator": self._info(self._run_coordinator), "run_launcher": self._info(self._run_launcher), } ret.update( { settings_key: self._info(settings_value) for settings_key, settings_value in settings.items() } ) return ret def info_str(self): return yaml.dump(self.info_dict(), default_flow_style=False, sort_keys=False) @property def run_storage(self): return self._run_storage @property def event_log_storage(self): return self._event_storage # schedule storage @property def schedule_storage(self): return self._schedule_storage @property def scheduler(self): return self._scheduler @property def scheduler_class(self): return self.scheduler.__class__.__name__ if self.scheduler else None # run coordinator @property def run_coordinator(self): return self._run_coordinator # run launcher @property def run_launcher(self): return self._run_launcher # compute logs @property def compute_log_manager(self): return self._compute_log_manager def get_settings(self, settings_key): check.str_param(settings_key, "settings_key") if self._settings and settings_key in self._settings: return self._settings.get(settings_key) return {} @property def telemetry_enabled(self): if self.is_ephemeral: return False dagster_telemetry_enabled_default = True telemetry_settings = self.get_settings("telemetry") if not telemetry_settings: return dagster_telemetry_enabled_default if "enabled" in telemetry_settings: return telemetry_settings["enabled"] else: return dagster_telemetry_enabled_default def upgrade(self, print_fn=None): from dagster.core.storage.migration.utils import upgrading_instance with upgrading_instance(self): if print_fn: print_fn("Updating run storage...") self._run_storage.upgrade() self._run_storage.build_missing_indexes(print_fn=print_fn) if print_fn: print_fn("Updating event storage...") self._event_storage.upgrade() if print_fn: print_fn("Updating schedule storage...") self._schedule_storage.upgrade() def optimize_for_dagit(self, statement_timeout): if self._schedule_storage: self._schedule_storage.validate_stored_schedules(self.scheduler_class) self._schedule_storage.optimize_for_dagit(statement_timeout=statement_timeout) self._run_storage.optimize_for_dagit(statement_timeout=statement_timeout) self._event_storage.optimize_for_dagit(statement_timeout=statement_timeout) def reindex(self, print_fn=lambda _: None): print_fn("Checking for reindexing...") self._event_storage.reindex(print_fn) self._run_storage.reindex(print_fn) print_fn("Done.") def dispose(self): self._run_storage.dispose() self.run_coordinator.dispose() self._run_launcher.dispose() self._event_storage.dispose() self._compute_log_manager.dispose() # run storage def get_run_by_id(self, run_id: str) -> PipelineRun: return self._run_storage.get_run_by_id(run_id) def get_pipeline_snapshot(self, snapshot_id: str) -> "PipelineSnapshot": return self._run_storage.get_pipeline_snapshot(snapshot_id) def has_pipeline_snapshot(self, snapshot_id: str) -> bool: return self._run_storage.has_pipeline_snapshot(snapshot_id) def get_historical_pipeline(self, snapshot_id: str) -> "HistoricalPipeline": from dagster.core.host_representation import HistoricalPipeline snapshot = self._run_storage.get_pipeline_snapshot(snapshot_id) parent_snapshot = ( self._run_storage.get_pipeline_snapshot(snapshot.lineage_snapshot.parent_snapshot_id) if snapshot.lineage_snapshot else None ) return HistoricalPipeline( self._run_storage.get_pipeline_snapshot(snapshot_id), snapshot_id, parent_snapshot ) def has_historical_pipeline(self, snapshot_id): return self._run_storage.has_pipeline_snapshot(snapshot_id) def get_execution_plan_snapshot(self, snapshot_id): return self._run_storage.get_execution_plan_snapshot(snapshot_id) def get_run_stats(self, run_id): return self._event_storage.get_stats_for_run(run_id) def get_run_step_stats(self, run_id, step_keys=None): return self._event_storage.get_step_stats_for_run(run_id, step_keys) def get_run_tags(self): return self._run_storage.get_run_tags() def get_run_group(self, run_id): return self._run_storage.get_run_group(run_id) def create_run_for_pipeline( self, pipeline_def, execution_plan=None, run_id=None, run_config=None, mode=None, solids_to_execute=None, step_keys_to_execute=None, status=None, tags=None, root_run_id=None, parent_run_id=None, solid_selection=None, ): from dagster.core.execution.plan.plan import ExecutionPlan from dagster.core.snap import snapshot_from_execution_plan check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) check.opt_inst_param(execution_plan, "execution_plan", ExecutionPlan) # note that solids_to_execute is required to execute the solid subset, which is the # frozenset version of the previous solid_subset. # solid_selection is not required and will not be converted to solids_to_execute here. # i.e. this function doesn't handle solid queries. # solid_selection is only used to pass the user queries further down. check.opt_set_param(solids_to_execute, "solids_to_execute", of_type=str) check.opt_list_param(solid_selection, "solid_selection", of_type=str) if solids_to_execute: if isinstance(pipeline_def, PipelineSubsetDefinition): # for the case when pipeline_def is created by IPipeline or ExternalPipeline check.invariant( solids_to_execute == pipeline_def.solids_to_execute, "Cannot create a PipelineRun from pipeline subset {pipeline_solids_to_execute} " "that conflicts with solids_to_execute arg {solids_to_execute}".format( pipeline_solids_to_execute=str_format_list(pipeline_def.solids_to_execute), solids_to_execute=str_format_list(solids_to_execute), ), ) else: # for cases when `create_run_for_pipeline` is directly called pipeline_def = pipeline_def.get_pipeline_subset_def( solids_to_execute=solids_to_execute ) if execution_plan: final_execution_plan = execution_plan check.invariant( step_keys_to_execute is None, "Should not pass execution_plan and step_keys_to_execute to create_run", ) step_keys_to_execute = execution_plan.step_keys_to_execute else: resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode) full_execution_plan = ExecutionPlan.build( InMemoryPipeline(pipeline_def), resolved_run_config, ) if is_memoized_run(tags): from dagster.core.execution.resolve_versions import resolve_memoized_execution_plan if step_keys_to_execute: raise DagsterInvariantViolationError( "step_keys_to_execute parameter cannot be used in conjunction with memoized " "pipeline runs." ) final_execution_plan = resolve_memoized_execution_plan( full_execution_plan, pipeline_def, run_config, self, resolved_run_config, ) # TODO: tighter integration with existing step_keys_to_execute functionality step_keys_to_execute = final_execution_plan.step_keys_to_execute if not step_keys_to_execute: raise DagsterNoStepsToExecuteException( "No steps found to execute. " "This is because every step in the plan has already been memoized." ) elif step_keys_to_execute: final_execution_plan = full_execution_plan.build_subset_plan( step_keys_to_execute, pipeline_def, resolved_run_config ) else: final_execution_plan = full_execution_plan return self.create_run( pipeline_name=pipeline_def.name, run_id=run_id, run_config=run_config, mode=check.opt_str_param(mode, "mode", default=pipeline_def.get_default_mode_name()), solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=status, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot=pipeline_def.get_pipeline_snapshot(), execution_plan_snapshot=snapshot_from_execution_plan( final_execution_plan, pipeline_def.get_pipeline_snapshot_id(), ), parent_pipeline_snapshot=pipeline_def.get_parent_pipeline_snapshot(), ) def _construct_run_with_snapshots( self, pipeline_name, run_id, run_config, mode, solids_to_execute, step_keys_to_execute, status, tags, root_run_id, parent_run_id, pipeline_snapshot, execution_plan_snapshot, parent_pipeline_snapshot, solid_selection=None, external_pipeline_origin=None, ): # https://github.com/dagster-io/dagster/issues/2403 if tags and IS_AIRFLOW_INGEST_PIPELINE_STR in tags: if AIRFLOW_EXECUTION_DATE_STR not in tags: tags[AIRFLOW_EXECUTION_DATE_STR] = get_current_datetime_in_utc().isoformat() check.invariant( not (not pipeline_snapshot and execution_plan_snapshot), "It is illegal to have an execution plan snapshot and not have a pipeline snapshot. " "It is possible to have no execution plan snapshot since we persist runs " "that do not successfully compile execution plans in the scheduled case.", ) pipeline_snapshot_id = ( self._ensure_persisted_pipeline_snapshot(pipeline_snapshot, parent_pipeline_snapshot) if pipeline_snapshot else None ) execution_plan_snapshot_id = ( self._ensure_persisted_execution_plan_snapshot( execution_plan_snapshot, pipeline_snapshot_id, step_keys_to_execute ) if execution_plan_snapshot and pipeline_snapshot_id else None ) return PipelineRun( pipeline_name=pipeline_name, run_id=run_id, run_config=run_config, mode=mode, solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=status, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot_id=pipeline_snapshot_id, execution_plan_snapshot_id=execution_plan_snapshot_id, external_pipeline_origin=external_pipeline_origin, ) def _ensure_persisted_pipeline_snapshot(self, pipeline_snapshot, parent_pipeline_snapshot): from dagster.core.snap import create_pipeline_snapshot_id, PipelineSnapshot check.inst_param(pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot) check.opt_inst_param(parent_pipeline_snapshot, "parent_pipeline_snapshot", PipelineSnapshot) if pipeline_snapshot.lineage_snapshot: if not self._run_storage.has_pipeline_snapshot( pipeline_snapshot.lineage_snapshot.parent_snapshot_id ): check.invariant( create_pipeline_snapshot_id(parent_pipeline_snapshot) == pipeline_snapshot.lineage_snapshot.parent_snapshot_id, "Parent pipeline snapshot id out of sync with passed parent pipeline snapshot", ) returned_pipeline_snapshot_id = self._run_storage.add_pipeline_snapshot( parent_pipeline_snapshot ) check.invariant( pipeline_snapshot.lineage_snapshot.parent_snapshot_id == returned_pipeline_snapshot_id ) pipeline_snapshot_id = create_pipeline_snapshot_id(pipeline_snapshot) if not self._run_storage.has_pipeline_snapshot(pipeline_snapshot_id): returned_pipeline_snapshot_id = self._run_storage.add_pipeline_snapshot( pipeline_snapshot ) check.invariant(pipeline_snapshot_id == returned_pipeline_snapshot_id) return pipeline_snapshot_id def _ensure_persisted_execution_plan_snapshot( self, execution_plan_snapshot, pipeline_snapshot_id, step_keys_to_execute ): from dagster.core.snap.execution_plan_snapshot import ( ExecutionPlanSnapshot, create_execution_plan_snapshot_id, ) check.inst_param(execution_plan_snapshot, "execution_plan_snapshot", ExecutionPlanSnapshot) check.str_param(pipeline_snapshot_id, "pipeline_snapshot_id") check.opt_list_param(step_keys_to_execute, "step_keys_to_execute", of_type=str) check.invariant( execution_plan_snapshot.pipeline_snapshot_id == pipeline_snapshot_id, ( "Snapshot mismatch: Snapshot ID in execution plan snapshot is " '"{ep_pipeline_snapshot_id}" and snapshot_id created in memory is ' '"{pipeline_snapshot_id}"' ).format( ep_pipeline_snapshot_id=execution_plan_snapshot.pipeline_snapshot_id, pipeline_snapshot_id=pipeline_snapshot_id, ), ) check.invariant( set(step_keys_to_execute) == set(execution_plan_snapshot.step_keys_to_execute) if step_keys_to_execute else set(execution_plan_snapshot.step_keys_to_execute) == set([step.key for step in execution_plan_snapshot.steps]), "We encode step_keys_to_execute twice in our stack, unfortunately. This check " "ensures that they are consistent. We check that step_keys_to_execute in the plan " "matches the step_keys_to_execute params if it is set. If it is not, this indicates " "a full execution plan, and so we verify that.", ) execution_plan_snapshot_id = create_execution_plan_snapshot_id(execution_plan_snapshot) if not self._run_storage.has_execution_plan_snapshot(execution_plan_snapshot_id): returned_execution_plan_snapshot_id = self._run_storage.add_execution_plan_snapshot( execution_plan_snapshot ) check.invariant(execution_plan_snapshot_id == returned_execution_plan_snapshot_id) return execution_plan_snapshot_id def create_run( self, pipeline_name, run_id, run_config, mode, solids_to_execute, step_keys_to_execute, status, tags, root_run_id, parent_run_id, pipeline_snapshot, execution_plan_snapshot, parent_pipeline_snapshot, solid_selection=None, external_pipeline_origin=None, ): pipeline_run = self._construct_run_with_snapshots( pipeline_name=pipeline_name, run_id=run_id, run_config=run_config, mode=mode, solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=status, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot=pipeline_snapshot, execution_plan_snapshot=execution_plan_snapshot, parent_pipeline_snapshot=parent_pipeline_snapshot, external_pipeline_origin=external_pipeline_origin, ) return self._run_storage.add_run(pipeline_run) def register_managed_run( self, pipeline_name, run_id, run_config, mode, solids_to_execute, step_keys_to_execute, tags, root_run_id, parent_run_id, pipeline_snapshot, execution_plan_snapshot, parent_pipeline_snapshot, solid_selection=None, ): # The usage of this method is limited to dagster-airflow, specifically in Dagster # Operators that are executed in Airflow. Because a common workflow in Airflow is to # retry dags from arbitrary tasks, we need any node to be capable of creating a # PipelineRun. # # The try-except DagsterRunAlreadyExists block handles the race when multiple "root" tasks # simultaneously execute self._run_storage.add_run(pipeline_run). When this happens, only # one task succeeds in creating the run, while the others get DagsterRunAlreadyExists # error; at this point, the failed tasks try again to fetch the existing run. # https://github.com/dagster-io/dagster/issues/2412 pipeline_run = self._construct_run_with_snapshots( pipeline_name=pipeline_name, run_id=run_id, run_config=run_config, mode=mode, solid_selection=solid_selection, solids_to_execute=solids_to_execute, step_keys_to_execute=step_keys_to_execute, status=PipelineRunStatus.MANAGED, tags=tags, root_run_id=root_run_id, parent_run_id=parent_run_id, pipeline_snapshot=pipeline_snapshot, execution_plan_snapshot=execution_plan_snapshot, parent_pipeline_snapshot=parent_pipeline_snapshot, ) def get_run(): candidate_run = self.get_run_by_id(pipeline_run.run_id) field_diff = _check_run_equality(pipeline_run, candidate_run) if field_diff: raise DagsterRunConflict( "Found conflicting existing run with same id {run_id}. Runs differ in:" "\n{field_diff}".format( run_id=pipeline_run.run_id, field_diff=_format_field_diff(field_diff), ), ) return candidate_run if self.has_run(pipeline_run.run_id): return get_run() try: return self._run_storage.add_run(pipeline_run) except DagsterRunAlreadyExists: return get_run() def add_run(self, pipeline_run: PipelineRun): return self._run_storage.add_run(pipeline_run) def handle_run_event(self, run_id: str, event: "DagsterEvent"): return self._run_storage.handle_run_event(run_id, event) def add_run_tags(self, run_id: str, new_tags: Dict[str, str]): return self._run_storage.add_run_tags(run_id, new_tags) def has_run(self, run_id: str) -> bool: return self._run_storage.has_run(run_id) def get_runs( self, filters: PipelineRunsFilter = None, cursor: str = None, limit: int = None ) -> Iterable[PipelineRun]: return self._run_storage.get_runs(filters, cursor, limit) def get_runs_count(self, filters: PipelineRunsFilter = None) -> int: return self._run_storage.get_runs_count(filters) def get_run_groups( self, filters: PipelineRunsFilter = None, cursor: str = None, limit: int = None ) -> Dict[str, Dict[str, Union[Iterable[PipelineRun], int]]]: return self._run_storage.get_run_groups(filters=filters, cursor=cursor, limit=limit)
[docs] def get_run_records( self, filters: PipelineRunsFilter = None, limit: int = None, order_by: str = None, ascending: bool = False, ) -> List[RunRecord]: """Return a list of run records stored in the run storage, sorted by the given column in given order. Args: filters (Optional[PipelineRunsFilter]): the filter by which to filter runs. limit (Optional[int]): Number of results to get. Defaults to infinite. order_by (Optional[str]): Name of the column to sort by. Defaults to id. ascending (Optional[bool]): Sort the result in ascending order if True, descending otherwise. Defaults to descending. Returns: List[RunRecord]: List of run records stored in the run storage. """ return self._run_storage.get_run_records(filters, limit, order_by, ascending)
def wipe(self): self._run_storage.wipe() self._event_storage.wipe() def delete_run(self, run_id: str): self._run_storage.delete_run(run_id) self._event_storage.delete_events(run_id) # event storage def logs_after(self, run_id, cursor, of_type: "DagsterEventType" = None): return self._event_storage.get_logs_for_run(run_id, cursor=cursor, of_type=of_type) def all_logs(self, run_id, of_type: "DagsterEventType" = None): return self._event_storage.get_logs_for_run(run_id, of_type=of_type) def watch_event_logs(self, run_id, cursor, cb): return self._event_storage.watch(run_id, cursor, cb) # asset storage def all_asset_keys(self): return self._event_storage.all_asset_keys() def has_asset_key(self, asset_key: AssetKey) -> bool: return self._event_storage.has_asset_key(asset_key) def events_for_asset_key( self, asset_key, partitions=None, before_cursor=None, after_cursor=None, cursor=None, before_timestamp=None, limit=None, ascending=False, ): check.inst_param(asset_key, "asset_key", AssetKey) return self._event_storage.get_asset_events( asset_key, partitions, before_cursor, after_cursor, limit, before_timestamp=before_timestamp, ascending=ascending, include_cursor=True, cursor=cursor, ) def run_ids_for_asset_key(self, asset_key): check.inst_param(asset_key, "asset_key", AssetKey) return self._event_storage.get_asset_run_ids(asset_key) def all_asset_tags(self): return self._event_storage.all_asset_tags() def get_asset_tags(self, asset_key): check.inst_param(asset_key, "asset_key", AssetKey) return self._event_storage.get_asset_tags(asset_key) def wipe_assets(self, asset_keys): check.list_param(asset_keys, "asset_keys", of_type=AssetKey) for asset_key in asset_keys: self._event_storage.wipe_asset(asset_key) # event subscriptions def get_logger(self): logger = logging.Logger("__event_listener") logger.addHandler(_EventListenerLogHandler(self)) logger.setLevel(10) return logger def handle_new_event(self, event): run_id = event.run_id self._event_storage.store_event(event) if event.is_dagster_event and event.dagster_event.is_pipeline_event: self._run_storage.handle_run_event(run_id, event.dagster_event) for sub in self._subscribers[run_id]: sub(event) def add_event_listener(self, run_id, cb): self._subscribers[run_id].append(cb)
[docs] def report_engine_event( self, message, pipeline_run, engine_event_data=None, cls=None, step_key=None, ): """ Report a EngineEvent that occurred outside of a pipeline execution context. """ from dagster.core.events import EngineEventData, DagsterEvent, DagsterEventType from dagster.core.events.log import EventLogEntry check.class_param(cls, "cls") check.str_param(message, "message") check.inst_param(pipeline_run, "pipeline_run", PipelineRun) engine_event_data = check.opt_inst_param( engine_event_data, "engine_event_data", EngineEventData, EngineEventData([]), ) if cls: message = "[{}] {}".format(cls.__name__, message) log_level = logging.INFO if engine_event_data and engine_event_data.error: log_level = logging.ERROR dagster_event = DagsterEvent( event_type_value=DagsterEventType.ENGINE_EVENT.value, pipeline_name=pipeline_run.pipeline_name, message=message, event_specific_data=engine_event_data, ) event_record = EventLogEntry( message=message, user_message=message, level=log_level, pipeline_name=pipeline_run.pipeline_name, run_id=pipeline_run.run_id, error_info=None, timestamp=time.time(), step_key=step_key, dagster_event=dagster_event, ) self.handle_new_event(event_record) return dagster_event
def report_run_canceling(self, run, message=None): from dagster.core.events import DagsterEvent, DagsterEventType from dagster.core.events.log import EventLogEntry check.inst_param(run, "run", PipelineRun) message = check.opt_str_param( message, "message", "Sending pipeline termination request.", ) canceling_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_CANCELING.value, pipeline_name=run.pipeline_name, message=message, ) event_record = EventLogEntry( message=message, user_message="", level=logging.INFO, pipeline_name=run.pipeline_name, run_id=run.run_id, error_info=None, timestamp=time.time(), dagster_event=canceling_event, ) self.handle_new_event(event_record) def report_run_canceled( self, pipeline_run, message=None, ): from dagster.core.events import DagsterEvent, DagsterEventType from dagster.core.events.log import EventLogEntry check.inst_param(pipeline_run, "pipeline_run", PipelineRun) message = check.opt_str_param( message, "mesage", "This pipeline run has been marked as canceled from outside the execution context.", ) dagster_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_CANCELED.value, pipeline_name=pipeline_run.pipeline_name, message=message, ) event_record = EventLogEntry( message=message, user_message=message, level=logging.ERROR, pipeline_name=pipeline_run.pipeline_name, run_id=pipeline_run.run_id, error_info=None, timestamp=time.time(), dagster_event=dagster_event, ) self.handle_new_event(event_record) return dagster_event def report_run_failed(self, pipeline_run, message=None): from dagster.core.events import DagsterEvent, DagsterEventType from dagster.core.events.log import EventLogEntry check.inst_param(pipeline_run, "pipeline_run", PipelineRun) message = check.opt_str_param( message, "message", "This pipeline run has been marked as failed from outside the execution context.", ) dagster_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_FAILURE.value, pipeline_name=pipeline_run.pipeline_name, message=message, ) event_record = EventLogEntry( message=message, user_message=message, level=logging.ERROR, pipeline_name=pipeline_run.pipeline_name, run_id=pipeline_run.run_id, error_info=None, timestamp=time.time(), dagster_event=dagster_event, ) self.handle_new_event(event_record) return dagster_event # directories def file_manager_directory(self, run_id): return self._local_artifact_storage.file_manager_dir(run_id) def intermediates_directory(self, run_id): return self._local_artifact_storage.intermediates_dir(run_id) def storage_directory(self): return self._local_artifact_storage.storage_dir def schedules_directory(self): return self._local_artifact_storage.schedules_dir # Runs coordinator
[docs] def submit_run(self, run_id, external_pipeline): """Submit a pipeline run to the coordinator. This method delegates to the ``RunCoordinator``, configured on the instance, and will call its implementation of ``RunCoordinator.submit_run()`` to send the run to the coordinator for execution. Runs should be created in the instance (e.g., by calling ``DagsterInstance.create_run()``) *before* this method is called, and should be in the ``PipelineRunStatus.NOT_STARTED`` state. They also must have a non-null ExternalPipelineOrigin. Args: run_id (str): The id of the run. """ from dagster.core.host_representation import ExternalPipelineOrigin run = self.get_run_by_id(run_id) check.inst( run.external_pipeline_origin, ExternalPipelineOrigin, "External pipeline origin must be set for submitted runs", ) try: submitted_run = self._run_coordinator.submit_run( run, external_pipeline=external_pipeline ) except: from dagster.core.events import EngineEventData error = serializable_error_info_from_exc_info(sys.exc_info()) self.report_engine_event( error.message, run, EngineEventData.engine_error(error), ) self.report_run_failed(run) raise return submitted_run
# Run launcher
[docs] def launch_run(self, run_id, external_pipeline): """Launch a pipeline run. This method is typically called using `instance.submit_run` rather than being invoked directly. This method delegates to the ``RunLauncher``, if any, configured on the instance, and will call its implementation of ``RunLauncher.launch_run()`` to begin the execution of the specified run. Runs should be created in the instance (e.g., by calling ``DagsterInstance.create_run()``) *before* this method is called, and should be in the ``PipelineRunStatus.NOT_STARTED`` state. Args: run_id (str): The id of the run the launch. """ run = self.get_run_by_id(run_id) from dagster.core.events import EngineEventData, DagsterEvent, DagsterEventType from dagster.core.events.log import EventLogEntry launch_started_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_STARTING.value, pipeline_name=run.pipeline_name, ) event_record = EventLogEntry( message="", user_message="", level=logging.INFO, pipeline_name=run.pipeline_name, run_id=run.run_id, error_info=None, timestamp=time.time(), dagster_event=launch_started_event, ) self.handle_new_event(event_record) run = self.get_run_by_id(run_id) try: self._run_launcher.launch_run(run, external_pipeline=external_pipeline) except: error = serializable_error_info_from_exc_info(sys.exc_info()) self.report_engine_event( error.message, run, EngineEventData.engine_error(error), ) self.report_run_failed(run) raise return run
# Scheduler def reconcile_scheduler_state(self, external_repository): return self._scheduler.reconcile_scheduler_state(self, external_repository) def start_schedule_and_update_storage_state(self, external_schedule): return self._scheduler.start_schedule_and_update_storage_state(self, external_schedule) def stop_schedule_and_update_storage_state(self, schedule_origin_id): return self._scheduler.stop_schedule_and_update_storage_state(self, schedule_origin_id) def stop_schedule_and_delete_from_storage(self, schedule_origin_id): return self._scheduler.stop_schedule_and_delete_from_storage(self, schedule_origin_id) def running_schedule_count(self, schedule_origin_id): if self._scheduler: return self._scheduler.running_schedule_count(self, schedule_origin_id) return 0 def scheduler_debug_info(self): from dagster.core.scheduler import SchedulerDebugInfo from dagster.core.definitions.run_request import JobType from dagster.core.scheduler.job import JobStatus errors = [] schedules = [] for schedule_state in self.all_stored_job_state(job_type=JobType.SCHEDULE): if schedule_state.status == JobStatus.RUNNING and not self.running_schedule_count( schedule_state.job_origin_id ): errors.append( "Schedule {schedule_name} is set to be running, but the scheduler is not " "running the schedule.".format(schedule_name=schedule_state.job_name) ) elif schedule_state.status == JobStatus.STOPPED and self.running_schedule_count( schedule_state.job_origin_id ): errors.append( "Schedule {schedule_name} is set to be stopped, but the scheduler is still running " "the schedule.".format(schedule_name=schedule_state.job_name) ) if self.running_schedule_count(schedule_state.job_origin_id) > 1: errors.append( "Duplicate jobs found: More than one job for schedule {schedule_name} are " "running on the scheduler.".format(schedule_name=schedule_state.job_name) ) schedule_info = { schedule_state.job_name: { "status": schedule_state.status.value, "cron_schedule": schedule_state.job_specific_data.cron_schedule, "repository_pointer": schedule_state.origin.get_repo_cli_args(), "schedule_origin_id": schedule_state.job_origin_id, "repository_origin_id": schedule_state.repository_origin_id, } } schedules.append(yaml.safe_dump(schedule_info, default_flow_style=False)) return SchedulerDebugInfo( scheduler_config_info=self._info_str_for_component("Scheduler", self.scheduler), scheduler_info=self.scheduler.debug_info(), schedule_storage=schedules, errors=errors, ) # Schedule Storage def start_sensor(self, external_sensor): from dagster.core.scheduler.job import JobState, JobStatus, SensorJobData from dagster.core.definitions.run_request import JobType job_state = self.get_job_state(external_sensor.get_external_origin_id()) if not job_state: self.add_job_state( JobState( external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING, SensorJobData(min_interval=external_sensor.min_interval_seconds), ) ) elif job_state.status != JobStatus.RUNNING: self.update_job_state(job_state.with_status(JobStatus.RUNNING)) def stop_sensor(self, job_origin_id): from dagster.core.scheduler.job import JobStatus job_state = self.get_job_state(job_origin_id) if job_state: self.update_job_state(job_state.with_status(JobStatus.STOPPED)) def all_stored_job_state(self, repository_origin_id=None, job_type=None): return self._schedule_storage.all_stored_job_state(repository_origin_id, job_type) def get_job_state(self, job_origin_id): return self._schedule_storage.get_job_state(job_origin_id) def add_job_state(self, job_state): return self._schedule_storage.add_job_state(job_state) def update_job_state(self, job_state): return self._schedule_storage.update_job_state(job_state) def delete_job_state(self, job_origin_id): return self._schedule_storage.delete_job_state(job_origin_id) def get_job_ticks(self, job_origin_id, before=None, after=None, limit=None): return self._schedule_storage.get_job_ticks( job_origin_id, before=before, after=after, limit=limit ) def get_latest_job_tick(self, job_origin_id): return self._schedule_storage.get_latest_job_tick(job_origin_id) def create_job_tick(self, job_tick_data): return self._schedule_storage.create_job_tick(job_tick_data) def update_job_tick(self, tick): return self._schedule_storage.update_job_tick(tick) def get_job_tick_stats(self, job_origin_id): return self._schedule_storage.get_job_tick_stats(job_origin_id) def purge_job_ticks(self, job_origin_id, tick_status, before): self._schedule_storage.purge_job_ticks(job_origin_id, tick_status, before) def wipe_all_schedules(self): if self._scheduler: self._scheduler.wipe(self) self._schedule_storage.wipe() def logs_path_for_schedule(self, schedule_origin_id): return self._scheduler.get_logs_path(self, schedule_origin_id) def __enter__(self): return self def __exit__(self, exception_type, exception_value, traceback): self.dispose()
[docs] def get_addresses_for_step_output_versions(self, step_output_versions): """ For each given step output, finds whether an output exists with the given version, and returns its address if it does. Args: step_output_versions (Dict[(str, StepOutputHandle), str]): (pipeline name, step output handle) -> version. Returns: Dict[(str, StepOutputHandle), str]: (pipeline name, step output handle) -> address. For each step output, an address if there is one and None otherwise. """ return self._event_storage.get_addresses_for_step_output_versions(step_output_versions)
# dagster daemon
[docs] def add_daemon_heartbeat(self, daemon_heartbeat: "DaemonHeartbeat"): """Called on a regular interval by the daemon""" self._run_storage.add_daemon_heartbeat(daemon_heartbeat)
[docs] def get_daemon_heartbeats(self) -> Dict[str, "DaemonHeartbeat"]: """Latest heartbeats of all daemon types""" return self._run_storage.get_daemon_heartbeats()
def wipe_daemon_heartbeats(self): self._run_storage.wipe_daemon_heartbeats() def get_required_daemon_types(self): from dagster.core.run_coordinator import QueuedRunCoordinator from dagster.core.scheduler import DagsterDaemonScheduler from dagster.daemon.daemon import SchedulerDaemon, SensorDaemon, BackfillDaemon from dagster.daemon.run_coordinator.queued_run_coordinator_daemon import ( QueuedRunCoordinatorDaemon, ) if self.is_ephemeral: return [] daemons = [SensorDaemon.daemon_type(), BackfillDaemon.daemon_type()] if isinstance(self.scheduler, DagsterDaemonScheduler): daemons.append(SchedulerDaemon.daemon_type()) if isinstance(self.run_coordinator, QueuedRunCoordinator): daemons.append(QueuedRunCoordinatorDaemon.daemon_type()) return daemons # backfill def get_backfills(self, status=None, cursor=None, limit=None): return self._run_storage.get_backfills(status=status, cursor=cursor, limit=limit) def get_backfill(self, backfill_id): return self._run_storage.get_backfill(backfill_id) def add_backfill(self, partition_backfill): self._run_storage.add_backfill(partition_backfill) def update_backfill(self, partition_backfill): return self._run_storage.update_backfill(partition_backfill)