This guide includes two examples:
dagster-pyspark
package.Passing PySpark DataFrames between solids requires a little bit of extra care, for a couple reasons:
write
or collect
is called on a DataFrame.fs_io_manager
won't work for them.In this example, we've defined an Asset Store that knows how to store and retrieve PySpark DataFrames that are produced and consumed by solids.
This example assumes that all the outputs within the pipeline will be PySpark DataFrames and stored in the same way. To learn how to use different IO managers for different outputs within the same pipeline, take a look at the IO Manager concept page.
This example writes out DataFrames to the local file system, but can be tweaked to write to cloud object stores like S3 by changing to the write
and read
invocations.
import os
from dagster import IOManager, ModeDefinition, io_manager, pipeline, repository, solid
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
class LocalParquetStore(IOManager):
def _get_path(self, context):
return os.path.join(context.run_id, context.step_key, context.name)
def handle_output(self, context, obj):
obj.write.parquet(self._get_path(context))
def load_input(self, context):
spark = SparkSession.builder.getOrCreate()
return spark.read.parquet(self._get_path(context.upstream_output))
@io_manager
def local_parquet_store(_):
return LocalParquetStore()
@solid
def make_people():
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
spark = SparkSession.builder.getOrCreate()
return spark.createDataFrame(rows, schema)
@solid
def filter_over_50(people):
return people.filter(people["age"] > 50)
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": local_parquet_store})])
def my_pipeline():
filter_over_50(make_people())
This example demonstrates how to have a solid run as a Spark step on an EMR cluster. In it, each of the three solids will be executed as a separate EMR step on the same EMR cluster.
from pathlib import Path
from dagster import (
ModeDefinition,
make_python_type_usable_as_dagster_type,
pipeline,
repository,
solid,
)
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
from dagster_pyspark import DataFrame as DagsterPySparkDataFrame
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
# Make pyspark.sql.DataFrame map to dagster_pyspark.DataFrame
make_python_type_usable_as_dagster_type(python_type=DataFrame, dagster_type=DagsterPySparkDataFrame)
@solid(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
return context.resources.pyspark.spark_session.createDataFrame(rows, schema)
@solid(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(_, people: DataFrame) -> DataFrame:
return people.filter(people["age"] > 50)
@solid(required_resource_keys={"pyspark_step_launcher"})
def count_people(_, people: DataFrame) -> int:
return people.count()
emr_mode = ModeDefinition(
name="emr",
resource_defs={
"pyspark_step_launcher": emr_pyspark_step_launcher.configured(
{
"cluster_id": {"env": "EMR_CLUSTER_ID"},
"local_pipeline_package_path": str(Path(__file__).parent),
"deploy_local_pipeline_package": True,
"region_name": "us-west-1",
"staging_bucket": "my_staging_bucket",
"wait_for_logs": True,
}
),
"pyspark": pyspark_resource.configured({"spark_conf": {"spark.executor.memory": "2g"}}),
"s3": s3_resource,
"io_manager": s3_pickle_io_manager.configured(
{"s3_bucket": "my_staging_bucket", "s3_prefix": "simple-pyspark"}
),
},
)
local_mode = ModeDefinition(
name="local",
resource_defs={
"pyspark_step_launcher": no_step_launcher,
"pyspark": pyspark_resource.configured({"spark_conf": {"spark.default.parallelism": 1}}),
},
)
@pipeline(mode_defs=[emr_mode, local_mode])
def my_pipeline():
count_people(filter_over_50(make_people()))
It accomplishes this by using the emr_pyspark_step_launcher
, which knows how to launch an EMR step that runs the contents of a solid. The example defines a mode that links the resource key "pyspark_step_launcher" to the emr_pyspark_step_launcher
resource definition, and then requires that "pyspark_step_launcher" resource key for the solid which it wants to launch remotely.
The EMR PySpark step launcher relies on S3 to shuttle config and events to and from EMR.
More generally, a step launcher is any resource that extends the StepLauncher abstract class, whose methods can be invoked where a solid would otherwise be executed in-process to instead launch a remote process with the solid running inside it. To use a step launcher for a particular solid, set a required resource key for the solid that points to that resource.