from collections import namedtuple
from typing import Any, Dict, List, NamedTuple, Optional, Set, TypeVar, Union
from dagster import check
from dagster.core.definitions.events import AssetKey
from dagster.core.errors import (
DagsterError,
DagsterInvalidDefinitionError,
DagsterInvariantViolationError,
)
from dagster.core.types.dagster_type import DagsterType, resolve_dagster_type
from dagster.utils.backcompat import experimental_arg_warning
from .inference import InferredOutputProps
from .input import NoValueSentinel
from .utils import DEFAULT_OUTPUT, check_valid_name
TOut = TypeVar("TOut", bound="OutputDefinition")
class NoNameSentinel:
pass
[docs]class OutputDefinition:
"""Defines an output from a solid's compute function.
Solids can have multiple outputs, in which case outputs cannot be anonymous.
Many solids have only one output, in which case the user can provide a single output definition
that will be given the default name, "result".
Output definitions may be typed using the Dagster type system.
Args:
dagster_type (Optional[Union[Type, DagsterType]]]): The type of this output.
Users should provide the Python type of the objects that they expect the solid to yield
for this output, or a :py:class:`DagsterType` that defines a runtime check that they
want to be run on this output. Defaults to :py:class:`Any`.
name (Optional[str]): Name of the output. (default: "result")
description (Optional[str]): Human-readable description of the output.
is_required (Optional[bool]): Whether the presence of this field is required. (default: True)
io_manager_key (Optional[str]): The resource key of the output manager used for this output.
(default: "io_manager").
metadata (Optional[Dict[str, Any]]): A dict of the metadata for the output.
For example, users can provide a file path if the data object will be stored in a
filesystem, or provide information of a database table when it is going to load the data
into the table.
asset_key (Optional[Union[AssetKey, OutputContext -> AssetKey]]): (Experimental) An AssetKey
(or function that produces an AssetKey from the OutputContext) which should be associated
with this OutputDefinition. Used for tracking lineage information through Dagster.
asset_partitions (Optional[Union[Set[str], OutputContext -> Set[str]]]): (Experimental) A
set of partitions of the given asset_key (or a function that produces this list of
partitions from the OutputContext) which should be associated with this OutputDefinition.
"""
def __init__(
self,
dagster_type=None,
name=None,
description=None,
is_required=None,
io_manager_key=None,
metadata=None,
asset_key=None,
asset_partitions=None,
# make sure new parameters are updated in combine_with_inferred below
):
self._name = (
check_valid_name(check.opt_str_param(name, "name", DEFAULT_OUTPUT))
if name is not NoNameSentinel
else None
)
self._type_not_set = dagster_type is None
self._dagster_type = resolve_dagster_type(dagster_type)
self._description = check.opt_str_param(description, "description")
self._is_required = check.opt_bool_param(is_required, "is_required", default=True)
self._manager_key = check.opt_str_param(
io_manager_key, "io_manager_key", default="io_manager"
)
self._metadata = metadata
if asset_key:
experimental_arg_warning("asset_key", "OutputDefinition.__init__")
if callable(asset_key):
self._asset_key_fn = asset_key
elif asset_key is not None:
asset_key = check.opt_inst_param(asset_key, "asset_key", AssetKey)
self._asset_key_fn = lambda _: asset_key
else:
self._asset_key_fn = None
if asset_partitions:
experimental_arg_warning("asset_partitions", "OutputDefinition.__init__")
check.param_invariant(
asset_key is not None,
"asset_partitions",
'Cannot specify "asset_partitions" argument without also specifying "asset_key"',
)
if callable(asset_partitions):
self._asset_partitions_fn = asset_partitions
elif asset_partitions is not None:
asset_partitions = check.opt_set_param(asset_partitions, "asset_partitions", str)
self._asset_partitions_fn = lambda _: asset_partitions
else:
self._asset_partitions_fn = None
@property
def name(self):
return self._name
@property
def dagster_type(self):
return self._dagster_type
@property
def description(self):
return self._description
@property
def optional(self):
return not self._is_required
@property
def is_required(self):
return self._is_required
@property
def io_manager_key(self):
return self._manager_key
@property
def metadata(self):
return self._metadata
@property
def is_dynamic(self):
return False
@property
def is_asset(self):
return self._asset_key_fn is not None
def get_asset_key(self, context) -> Optional[AssetKey]:
"""Get the AssetKey associated with this OutputDefinition for the given
:py:class:`OutputContext` (if any).
Args:
context (OutputContext): The OutputContext that this OutputDefinition is being evaluated
in
"""
if self._asset_key_fn is None:
return None
return self._asset_key_fn(context)
def get_asset_partitions(self, context) -> Optional[Set[str]]:
"""Get the set of partitions associated with this OutputDefinition for the given
:py:class:`OutputContext` (if any).
Args:
context (OutputContext): The OutputContext that this OutputDefinition is being evaluated
in
"""
if self._asset_partitions_fn is None:
return None
return self._asset_partitions_fn(context)
def mapping_from(self, solid_name, output_name=None):
"""Create an output mapping from an output of a child solid.
In a CompositeSolidDefinition, you can use this helper function to construct
an :py:class:`OutputMapping` from the output of a child solid.
Args:
solid_name (str): The name of the child solid from which to map this output.
input_name (str): The name of the child solid's output from which to map this output.
Examples:
.. code-block:: python
output_mapping = OutputDefinition(Int).mapping_from('child_solid')
"""
return OutputMapping(self, OutputPointer(solid_name, output_name))
@staticmethod
def create_from_inferred(inferred: InferredOutputProps) -> "OutputDefinition":
return OutputDefinition(
dagster_type=_checked_inferred_type(inferred.annotation),
description=inferred.description,
)
def combine_with_inferred(self: TOut, inferred: InferredOutputProps) -> TOut:
dagster_type = self._dagster_type
if self._type_not_set:
dagster_type = _checked_inferred_type(inferred.annotation)
if self._description is None:
description = inferred.description
else:
description = self.description
return self.__class__(
name=self._name,
dagster_type=dagster_type,
description=description,
is_required=self._is_required,
io_manager_key=self._manager_key,
metadata=self._metadata,
asset_key=self._asset_key_fn,
asset_partitions=self._asset_partitions_fn,
)
def _checked_inferred_type(inferred: Any) -> DagsterType:
try:
return resolve_dagster_type(inferred)
except DagsterError as e:
raise DagsterInvalidDefinitionError(
f"Problem using type '{inferred}' from return type annotation, correct the issue "
"or explicitly set the dagster_type on your OutputDefinition."
) from e
[docs]class DynamicOutputDefinition(OutputDefinition):
"""
(Experimental) Variant of :py:class:`OutputDefinition <dagster.OutputDefinition>` for an
output that will dynamically alter the graph at runtime.
When using in a composition function such as :py:func:`@pipeline <dagster.pipeline>`,
dynamic outputs must be used with either
* ``map`` - clone downstream solids for each separate :py:class:`DynamicOutput`
* ``collect`` - gather across all :py:class:`DynamicOutput` in to a list
Uses the same constructor as :py:class:`OutputDefinition <dagster.OutputDefinition>`
.. code-block:: python
@solid(
config_schema={
"path": Field(str, default_value=file_relative_path(__file__, "sample"))
},
output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
path = context.solid_config["path"]
dirname, _, filenames = next(os.walk(path))
for file in filenames:
yield DynamicOutput(os.path.join(dirname, file), mapping_key=_clean(file))
@pipeline
def process_directory():
files = files_in_directory()
# use map to invoke a solid on each dynamic output
file_results = files.map(process_file)
# use collect to gather the results in to a list
summarize_directory(file_results.collect())
"""
@property
def is_dynamic(self):
return True
class OutputPointer(namedtuple("_OutputPointer", "solid_name output_name")):
def __new__(cls, solid_name, output_name=None):
return super(OutputPointer, cls).__new__(
cls,
check.str_param(solid_name, "solid_name"),
check.opt_str_param(output_name, "output_name", DEFAULT_OUTPUT),
)
[docs]class OutputMapping(namedtuple("_OutputMapping", "definition maps_from")):
"""Defines an output mapping for a composite solid.
Args:
definition (OutputDefinition): Defines the output of the composite solid.
solid_name (str): The name of the child solid from which to map the output.
output_name (str): The name of the child solid's output from which to map the output.
"""
def __new__(cls, definition, maps_from):
return super(OutputMapping, cls).__new__(
cls,
check.inst_param(definition, "definition", OutputDefinition),
check.inst_param(maps_from, "maps_from", OutputPointer),
)
class Out(
namedtuple(
"_Out",
"dagster_type name description is_required io_manager_key metadata asset_key "
"asset_partitions",
)
):
"""Experimental replacement for OutputDefinition intended to decrease verbosity."""
def __new__(
cls,
dagster_type=NoValueSentinel,
name=None,
description=None,
is_required=None,
io_manager_key=None,
metadata=None,
asset_key=None,
asset_partitions=None,
# make sure new parameters are updated in combine_with_inferred below
):
return super(Out, cls).__new__(
cls,
dagster_type=dagster_type,
name=name,
description=description,
is_required=is_required,
io_manager_key=io_manager_key,
metadata=metadata,
asset_key=asset_key,
asset_partitions=asset_partitions,
)
def to_definition(self, inferred: InferredOutputProps) -> "OutputDefinition":
dagster_type = (
self.dagster_type if self.dagster_type is not NoValueSentinel else inferred.annotation
)
return OutputDefinition(
dagster_type=dagster_type,
name=self.name,
description=self.description,
is_required=self.is_required,
io_manager_key=self.io_manager_key,
metadata=self.metadata,
asset_key=self.asset_key,
asset_partitions=self.asset_partitions,
)
class MultiOut(NamedTuple("_MultiOut", [("outs", List[Out])])):
"""Experimental replacement for providing a list of output definitions, to decrease verbosity."""
def __new__(cls, outs: Union[List[Out], Dict[str, Out]]):
if isinstance(outs, dict):
if any([out.name is not None for out in outs.values()]):
raise DagsterInvariantViolationError(
"Cannot provide name to Out if providing a dictionary of outs. The Out will "
"take on the dict key as the name."
)
outs = [
Out(
dagster_type=out.dagster_type,
name=key,
description=out.description,
is_required=out.is_required,
io_manager_key=out.io_manager_key,
metadata=out.metadata,
asset_key=out.asset_key,
asset_partitions=out.asset_partitions,
)
for key, out in outs.items()
]
return super(MultiOut, cls).__new__(
cls,
check.list_param(outs, "outs", Out),
)
def to_definition_list(self, inferred: InferredOutputProps) -> List[OutputDefinition]:
output_defs = []
for idx, out in enumerate(self.outs):
annotation_type = inferred.annotation.__args__[idx] if inferred.annotation else None
output_defs.append(
OutputDefinition(
dagster_type=annotation_type,
name=out.name,
description=out.description,
is_required=out.is_required,
io_manager_key=out.io_manager_key,
metadata=out.metadata,
asset_key=out.asset_key,
asset_partitions=out.asset_partitions,
)
)
return output_defs