Dagster provides a GraphQL Python Client to interface with Dagster's GraphQL API from Python.
Name | Description |
---|---|
DagsterGraphQLClient Experimental | The client class to interact with Dagster's GraphQL API from Python. |
DagsterGraphQLClientError | The exception that the client raises upon a response error. |
The Dagster Python Client provides bindings in Python to programmatically interact with Dagster's GraphQL API.
When is this useful? Dagster exposes a powerful GraphQL API, but this level of flexibility is not always necessary. For example, when submitting a new pipeline run, you may only want to think about the pipeline name and configuration and to think less about maintaining a long GraphQL query.
DagsterGraphQLClient
provides a way to solve this issue by providing a module with a simple interface to interact with the GraphQL API.
Note that all GraphQL methods on the API are not yet available in Python - the DagsterGraphQLClient
currently only provides the following methods:
DagsterGraphQLClient.submit_pipeline_execution
DagsterGraphQLClient.get_run_status
DagsterGraphQLClient.reload_repository_location
DagsterGraphQLClient.shutdown_repository_location
The snippet below shows example instantiation of the client:
from dagster_graphql import DagsterGraphQLClient
client = DagsterGraphQLClient("localhost", port_number=3000)
You can use the client to get the status of a pipeline run as follows:
from dagster_graphql import DagsterGraphQLClientError
from dagster import PipelineRunStatus
try:
status: PipelineRunStatus = client.get_run_status(RUN_ID)
if status == PipelineRunStatus.SUCCESS:
do_something_on_success()
else:
do_something_else()
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc
You can also reload a repository location in a Dagster deployment.
This reloads all repositories in that repository location. This is useful in a variety of contexts, including refreshing Dagit without restarting the server. Example usage is as follows:
from dagster_graphql import (
ReloadRepositoryLocationInfo,
ReloadRepositoryLocationStatus,
)
reload_info: ReloadRepositoryLocationInfo = client.reload_repository_location(REPO_NAME)
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
do_something_on_success()
else:
raise Exception(
"Repository location reload failed because of a "
f"{reload_info.failure_type} error: {reload_info.message}"
)
You can use the client to submit a pipeline run as follows:
from dagster_graphql import DagsterGraphQLClientError
try:
new_run_id: str = client.submit_pipeline_execution(
PIPELINE_NAME,
repository_location_name=REPO_LOCATION_NAME,
repository_name=REPO_NAME,
run_config={},
mode="default",
)
do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc
You can also submit a pipeline from a preset with the client:
from dagster_graphql import DagsterGraphQLClientError
try:
new_run_id: str = client.submit_pipeline_execution(
PIPELINE_NAME,
repository_location_name=REPO_LOCATION_NAME,
repository_name=REPO_NAME,
preset=PRESET_NAME,
)
do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc
If you're running your own gRPC server, we generally recommend updating your repository code by building a new Docker image with a new tag and redeploying your server using that new image, but sometimes you may want to restart your server without changing the image (for example, if your pipeline definitions are generated programatically from a database, and you want to restart the server and re-generate your repositories even though the underlying Python code hasn't changed). In these situations, reload_repository_location
is insufficient, since it refreshes Dagit's information about the repositories but doesn't actually restart the server or reload the repository definition.
One way to cause your server to restart and your repositories to be reloaded is to run your server in an environment like Kubernetes that automatically restarts services when they fail (or docker-compose with restart: always
set on the service), and then use the shutdown_repository_location
function on the GraphQL client to shut down the server. The server will then be restarted by your environment, which will be automatically detected by Dagit.
Example usage:
from dagster_graphql import (
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
)
shutdown_info: ShutdownRepositoryLocationInfo = client.shutdown_repository_location(REPO_NAME)
if shutdown_info.status == ShutdownRepositoryLocationStatus.SUCCESS:
do_something_on_success()
else:
raise Exception(f"Repository location shutdown failed: {shutdown_info.message}")
Note that specifying the repository location name and repository name are not always necessary; the GraphQL client will infer the repository name and repository location name if the pipeline name is unique.
from dagster_graphql import DagsterGraphQLClientError
try:
new_run_id: str = client.submit_pipeline_execution(
PIPELINE_NAME,
run_config={},
mode="default",
)
do_something_on_success(new_run_id)
except DagsterGraphQLClientError as exc:
do_something_with_exc(exc)
raise exc