# pylint: disable=super-init-not-called
from typing import AbstractSet, Any, Dict, NamedTuple, Optional, Union, cast
from dagster import check
from dagster.config import Shape
from dagster.core.definitions.composition import PendingNodeInvocation
from dagster.core.definitions.dependency import Solid, SolidHandle
from dagster.core.definitions.hook import HookDefinition
from dagster.core.definitions.mode import ModeDefinition
from dagster.core.definitions.pipeline import PipelineDefinition
from dagster.core.definitions.resource import IContainsGenerator, Resources, ScopedResourcesBuilder
from dagster.core.definitions.solid import SolidDefinition
from dagster.core.definitions.step_launcher import StepLauncher
from dagster.core.errors import (
DagsterInvalidConfigError,
DagsterInvalidInvocationError,
DagsterInvalidPropertyError,
DagsterInvariantViolationError,
)
from dagster.core.execution.build_resources import build_resources
from dagster.core.instance import DagsterInstance
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.types.dagster_type import DagsterType
from dagster.utils import merge_dicts
from dagster.utils.backcompat import experimental_fn_warning
from dagster.utils.forked_pdb import ForkedPdb
from .compute import SolidExecutionContext
from .system import StepExecutionContext, TypeCheckContext
def _property_msg(prop_name: str, method_name: str) -> str:
return (
f"The {prop_name} {method_name} is not set on the context when a solid is directly invoked."
)
class UnboundSolidExecutionContext(SolidExecutionContext):
"""The ``context`` object available as the first argument to a solid's compute function when
being invoked directly. Can also be used as a context manager.
"""
def __init__(
self,
solid_config: Any,
resources_dict: Optional[Dict[str, Any]],
instance: Optional[DagsterInstance],
): # pylint: disable=super-init-not-called
from dagster.core.execution.context_creation_pipeline import initialize_console_manager
from dagster.core.execution.api import ephemeral_instance_if_missing
self._solid_config = solid_config
self._instance_provided = (
check.opt_inst_param(instance, "instance", DagsterInstance) is not None
)
# Construct ephemeral instance if missing
self._instance_cm = ephemeral_instance_if_missing(instance)
# Pylint can't infer that the ephemeral_instance context manager has an __enter__ method,
# so ignore lint error
self._instance = self._instance_cm.__enter__() # pylint: disable=no-member
# Open resource context manager
self._resources_cm = build_resources(
check.opt_dict_param(resources_dict, "resources_dict", key_type=str), instance
)
self._resources = self._resources_cm.__enter__() # pylint: disable=no-member
self._resources_contain_cm = isinstance(self._resources, IContainsGenerator)
self._log = initialize_console_manager(None)
self._pdb: Optional[ForkedPdb] = None
self._cm_scope_entered = False
def __enter__(self):
self._cm_scope_entered = True
return self
def __exit__(self, *exc):
self._resources_cm.__exit__(*exc) # pylint: disable=no-member
if self._instance_provided:
self._instance_cm.__exit__(*exc) # pylint: disable=no-member
def __del__(self):
if self._resources_contain_cm and not self._cm_scope_entered:
self._resources_cm.__exit__(None, None, None) # pylint: disable=no-member
if self._instance_provided and not self._cm_scope_entered:
self._instance_cm.__exit__(None, None, None) # pylint: disable=no-member
@property
def solid_config(self) -> Any:
return self._solid_config
@property
def resources(self) -> Resources:
if self._resources_contain_cm and not self._cm_scope_entered:
raise DagsterInvariantViolationError(
"At least one provided resource is a generator, but attempting to access "
"resources outside of context manager scope. You can use the following syntax to "
"open a context manager: `with build_solid_context(...) as context:`"
)
return self._resources
@property
def pipeline_run(self) -> PipelineRun:
raise DagsterInvalidPropertyError(_property_msg("pipeline_run", "property"))
@property
def instance(self) -> DagsterInstance:
return self._instance
@property
def pdb(self) -> ForkedPdb:
"""dagster.utils.forked_pdb.ForkedPdb: Gives access to pdb debugging from within the solid.
Example:
.. code-block:: python
@solid
def debug_solid(context):
context.pdb.set_trace()
"""
if self._pdb is None:
self._pdb = ForkedPdb()
return self._pdb
@property
def step_launcher(self) -> Optional[StepLauncher]:
raise DagsterInvalidPropertyError(_property_msg("step_launcher", "property"))
@property
def run_id(self) -> str:
"""str: Hard-coded value to indicate that we are directly invoking solid."""
return "EPHEMERAL"
@property
def run_config(self) -> dict:
raise DagsterInvalidPropertyError(_property_msg("run_config", "property"))
@property
def pipeline_def(self) -> PipelineDefinition:
raise DagsterInvalidPropertyError(_property_msg("pipeline_def", "property"))
@property
def pipeline_name(self) -> str:
raise DagsterInvalidPropertyError(_property_msg("pipeline_name", "property"))
@property
def mode_def(self) -> ModeDefinition:
raise DagsterInvalidPropertyError(_property_msg("mode_def", "property"))
@property
def log(self) -> DagsterLogManager:
"""DagsterLogManager: A console manager constructed for this context."""
return self._log
@property
def solid_handle(self) -> SolidHandle:
raise DagsterInvalidPropertyError(_property_msg("solid_handle", "property"))
@property
def solid(self) -> Solid:
raise DagsterInvalidPropertyError(_property_msg("solid", "property"))
@property
def solid_def(self) -> SolidDefinition:
raise DagsterInvalidPropertyError(_property_msg("solid_def", "property"))
def has_tag(self, key: str) -> bool:
raise DagsterInvalidPropertyError(_property_msg("has_tag", "method"))
def get_tag(self, key: str) -> str:
raise DagsterInvalidPropertyError(_property_msg("get_tag", "method"))
def get_step_execution_context(self) -> StepExecutionContext:
raise DagsterInvalidPropertyError(_property_msg("get_step_execution_context", "methods"))
def bind(
self, solid_def_or_invocation: Union[SolidDefinition, PendingNodeInvocation]
) -> "BoundSolidExecutionContext":
solid_def = (
solid_def_or_invocation
if isinstance(solid_def_or_invocation, SolidDefinition)
else solid_def_or_invocation.node_def
)
_validate_resource_requirements(self.resources, solid_def)
solid_config = _resolve_bound_config(self.solid_config, solid_def)
return BoundSolidExecutionContext(
solid_def=solid_def,
solid_config=solid_config,
resources=self.resources,
instance=self.instance,
log_manager=self.log,
pdb=self.pdb,
tags=solid_def_or_invocation.tags
if isinstance(solid_def_or_invocation, PendingNodeInvocation)
else None,
hook_defs=solid_def_or_invocation.hook_defs
if isinstance(solid_def_or_invocation, PendingNodeInvocation)
else None,
alias=solid_def_or_invocation.given_alias
if isinstance(solid_def_or_invocation, PendingNodeInvocation)
else None,
)
def _validate_resource_requirements(resources: "Resources", solid_def: SolidDefinition) -> None:
"""Validate correctness of resources against required resource keys"""
resources_dict = resources._asdict() # type: ignore[attr-defined]
required_resource_keys: AbstractSet[str] = solid_def.required_resource_keys or set()
for resource_key in required_resource_keys:
if resource_key not in resources_dict:
raise DagsterInvalidInvocationError(
f'Solid "{solid_def.name}" requires resource "{resource_key}", but no resource '
"with that key was found on the context."
)
def _resolve_bound_config(solid_config: Any, solid_def: SolidDefinition) -> Any:
"""Validate config against config schema, and return validated config."""
from dagster.config.validate import process_config
# Config processing system expects the top level config schema to be a dictionary, but solid
# config schema can be scalar. Thus, we wrap it in another layer of indirection.
outer_config_shape = Shape({"config": solid_def.get_config_field()})
config_evr = process_config(
outer_config_shape, {"config": solid_config} if solid_config else {}
)
if not config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for solid ",
config_evr.errors,
solid_config,
)
validated_config = config_evr.value.get("config")
mapped_config_evr = solid_def.apply_config_mapping({"config": validated_config})
if not mapped_config_evr.success:
raise DagsterInvalidConfigError(
"Error in config for solid ", mapped_config_evr.errors, solid_config
)
validated_config = mapped_config_evr.value.get("config")
return validated_config
class BoundSolidExecutionContext(SolidExecutionContext):
"""The solid execution context that is passed to the compute function during invocation.
This context is bound to a specific solid definition, for which the resources and config have
been validated.
"""
def __init__(
self,
solid_def: SolidDefinition,
solid_config: Any,
resources: "Resources",
instance: DagsterInstance,
log_manager: DagsterLogManager,
pdb: Optional[ForkedPdb],
tags: Optional[Dict[str, str]],
hook_defs: Optional[AbstractSet[HookDefinition]],
alias: Optional[str],
):
self._solid_def = solid_def
self._solid_config = solid_config
self._resources = resources
self._instance = instance
self._log = log_manager
self._pdb = pdb
self._tags = merge_dicts(self._solid_def.tags, tags) if tags else self._solid_def.tags
self._hook_defs = hook_defs
self._alias = alias if alias else self._solid_def.name
@property
def solid_config(self) -> Any:
return self._solid_config
@property
def resources(self) -> Resources:
return self._resources
@property
def pipeline_run(self) -> PipelineRun:
raise DagsterInvalidPropertyError(_property_msg("pipeline_run", "property"))
@property
def instance(self) -> DagsterInstance:
return self._instance
@property
def pdb(self) -> ForkedPdb:
"""dagster.utils.forked_pdb.ForkedPdb: Gives access to pdb debugging from within the solid.
Example:
.. code-block:: python
@solid
def debug_solid(context):
context.pdb.set_trace()
"""
if self._pdb is None:
self._pdb = ForkedPdb()
return self._pdb
@property
def step_launcher(self) -> Optional[StepLauncher]:
raise DagsterInvalidPropertyError(_property_msg("step_launcher", "property"))
@property
def run_id(self) -> str:
"""str: Hard-coded value to indicate that we are directly invoking solid."""
return "EPHEMERAL"
@property
def run_config(self) -> dict:
raise DagsterInvalidPropertyError(_property_msg("run_config", "property"))
@property
def pipeline_def(self) -> PipelineDefinition:
raise DagsterInvalidPropertyError(_property_msg("pipeline_def", "property"))
@property
def pipeline_name(self) -> str:
raise DagsterInvalidPropertyError(_property_msg("pipeline_name", "property"))
@property
def mode_def(self) -> ModeDefinition:
raise DagsterInvalidPropertyError(_property_msg("mode_def", "property"))
@property
def log(self) -> DagsterLogManager:
"""DagsterLogManager: A console manager constructed for this context."""
return self._log
@property
def solid_handle(self) -> SolidHandle:
raise DagsterInvalidPropertyError(_property_msg("solid_handle", "property"))
@property
def solid(self) -> Solid:
raise DagsterInvalidPropertyError(_property_msg("solid", "property"))
@property
def solid_def(self) -> SolidDefinition:
return self._solid_def
def has_tag(self, key: str) -> bool:
return key in self._tags
def get_tag(self, key: str) -> str:
return self._tags.get(key)
@property
def alias(self) -> str:
return self._alias
def get_step_execution_context(self) -> StepExecutionContext:
raise DagsterInvalidPropertyError(_property_msg("get_step_execution_context", "methods"))
def for_type(self, dagster_type: DagsterType) -> TypeCheckContext:
resources = cast(NamedTuple, self.resources)
return TypeCheckContext(
self.run_id, self.log, ScopedResourcesBuilder(resources._asdict()), dagster_type
)
[docs]def build_solid_context(
resources: Optional[Dict[str, Any]] = None,
config: Optional[Any] = None,
instance: Optional[DagsterInstance] = None,
) -> UnboundSolidExecutionContext:
"""Builds solid execution context from provided parameters.
``build_solid_context`` can be used as either a function or context manager. If there is a
provided resource that is a context manager, then ``build_solid_context`` must be used as a
context manager. This function can be used to provide the context argument when directly
invoking a solid.
Args:
resources (Optional[Dict[str, Any]]): The resources to provide to the context. These can be
either values or resource definitions.
config (Optional[Any]): The solid config to provide to the context.
instance (Optional[DagsterInstance]): The dagster instance configured for the context.
Defaults to DagsterInstance.ephemeral().
Examples:
.. code-block:: python
context = build_solid_context()
solid_to_invoke(context)
with build_solid_context(resources={"foo": context_manager_resource}) as context:
solid_to_invoke(context)
"""
experimental_fn_warning("build_solid_context")
return UnboundSolidExecutionContext(
resources_dict=check.opt_dict_param(resources, "resources", key_type=str),
solid_config=config,
instance=check.opt_inst_param(instance, "instance", DagsterInstance),
)