dagster_aws.s3.
S3ComputeLogManager
(bucket, local_dir=None, inst_data=None, prefix='dagster', use_ssl=True, verify=True, verify_cert_path=None, endpoint_url=None, skip_empty_files=False)[source]¶Logs solid compute function stdout and stderr to S3.
Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml
such as the following:
compute_logs:
module: dagster_aws.s3.compute_log_manager
class: S3ComputeLogManager
config:
bucket: "mycorp-dagster-compute-logs"
local_dir: "/tmp/cool"
prefix: "dagster-test-"
use_ssl: true
verify: true
verify_cert_path: "/path/to/cert/bundle.pem"
endpoint_url: "http://alternate-s3-host.io"
skip_empty_files: true
bucket (str) – The name of the s3 bucket to which to log.
local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default:
dagster.seven.get_system_temp_directory()
.
prefix (Optional[str]) – Prefix for the log file keys.
use_ssl (Optional[bool]) – Whether or not to use SSL. Default True.
verify (Optional[bool]) – Whether or not to verify SSL certificates. Default True.
verify_cert_path (Optional[str]) – A filename of the CA cert bundle to use. Only used if verify set to False.
endpoint_url (Optional[str]) – Override for the S3 endpoint url.
skip_empty_files – (Optional[bool]): Skip upload of empty log files.
inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when newed up from config.
dagster_aws.s3.
s3_file_manager
ResourceDefinition[source]¶FileManager that provides abstract access to S3.
Implements the FileManager
API.
dagster_aws.s3.
s3_resource
ResourceDefinition[source]¶Resource that gives solids access to S3.
The underlying S3 session is created by calling
boto3.session.Session(profile_name)
.
The returned resource object is an S3 client, an instance of botocore.client.S3.
Attach this resource definition to a ModeDefinition
in order to make it
available to your solids.
Example
from dagster import ModeDefinition, execute_solid, solid
from dagster_aws.s3 import s3_resource
@solid(required_resource_keys={'s3'})
def example_s3_solid(context):
return context.resources.s3.list_objects_v2(
Bucket='my-bucket',
Prefix='some-key'
)
result = execute_solid(
example_s3_solid,
run_config={
'resources': {
's3': {
'config': {
'region_name': 'us-west-1',
}
}
}
},
mode_def=ModeDefinition(resource_defs={'s3': s3_resource}),
)
Note that your solids must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.
You may configure this resource as follows:
resources:
s3:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the S3 session. Default is chosen
# through the ordinary boto credential chain.
use_unsigned_session: false
# Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True
endpoint_url: "http://localhost"
# Optional[str]: Specifies a custom endpoint for the S3 session. Default is None.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for S3 session. Default is default
# profile as specified in ~/.aws/credentials file
dagster_aws.s3.
S3Coordinate
DagsterType¶A dagster.DagsterType
intended to make it easier to pass information about files on S3
from solid to solid. Objects of this type should be dicts with 'bucket'
and 'key'
keys,
and may be hydrated from config in the intuitive way, e.g., for an input with the name
s3_file
:
inputs:
s3_file:
value:
bucket: my-bucket
key: my-key
dagster_aws.s3.
s3_pickle_io_manager
IOManagerDefinition[source]¶Persistent IO manager using S3 for storage.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket.
Attach this resource definition to a ModeDefinition
in order to make it available to your pipeline:
pipeline_def = PipelineDefinition(
mode_defs=[
ModeDefinition(
resource_defs={'io_manager': s3_pickle_io_manager, "s3": s3_resource, ...},
), ...
], ...
)
You may configure this storage as follows:
resources:
io_manager:
config:
s3_bucket: my-cool-bucket
s3_prefix: good/prefix-for-files-
dagster_aws.redshift.
redshift_resource
ResourceDefinition[source]¶This resource enables connecting to a Redshift cluster and issuing queries against that cluster.
Example
from dagster import ModeDefinition, execute_solid, solid
from dagster_aws.redshift import redshift_resource
@solid(required_resource_keys={'redshift'})
def example_redshift_solid(context):
return context.resources.redshift.execute_query('SELECT 1', fetch_results=True)
result = execute_solid(
example_redshift_solid,
run_config={
'resources': {
'redshift': {
'config': {
'host': 'my-redshift-cluster.us-east-1.redshift.amazonaws.com',
'port': 5439,
'user': 'dagster',
'password': 'dagster',
'database': 'dev',
}
}
}
},
mode_def=ModeDefinition(resource_defs={'redshift': redshift_resource}),
)
assert result.output_value() == [(1,)]
dagster_aws.emr.
emr_pyspark_step_launcher
ResourceDefinition[source]¶spark_config:
cluster_id: Name of the job flow (cluster) on which to execute.
region_name: The AWS region that the cluster is in.
action_on_failure: The EMR action to take when the cluster step fails: https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html
staging_bucket: S3 bucket to use for passing files between the plan process and EMR process.
staging_prefix: S3 key prefix inside the staging_bucket to use for files passed the plan process and EMR process
wait_for_logs: If set, the system will wait for EMR logs to appear on S3. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime.
local_pipeline_package_path: Absolute path to the package that contains the pipeline definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the pipeline. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_pipeline_package option, referenced on s3 via the s3_pipeline_package_path option, or installed on the cluster via bootstrap actions.
deploy_local_pipeline_package: If set, before every step run, the launcher will zip up all the code in local_pipeline_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_pipeline_package_path should not also be set.
s3_pipeline_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_pipeline_package should not be set to True.
dagster_aws.cloudwatch.
cloudwatch_logger
LoggerDefinition¶Core class for defining loggers.
Loggers are pipeline-scoped logging handlers, which will be automatically invoked whenever solids in a pipeline log messages.
logger_fn (Callable[[InitLoggerContext], logging.Logger]) – User-provided function to
instantiate the logger. This logger will be automatically invoked whenever the methods
on context.log
are called from within solid compute logic.
config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.logger_config. If not set, Dagster will accept any config provided.
description (Optional[str]) – A human-readable description of this logger.