Resources are objects that are shared across the implementations of multiple software-defined assets and ops and that can be plugged in after defining those ops and assets.
Resources typically model external components that assets and ops interact with. For example, a resource might be a connection to a data warehouse like Snowflake or a service like Slack.
So, why use resources?
Plug in different implementations in different environments - If you have a heavy external dependency that you want to use in production, but avoid using in testing, you can accomplish this by providing different resources in each environment. Check out Separating Business Logic from Environments for more info about this capability.
Share configuration across multiple ops or assets - Resources are configurable and shared, so you can supply configuration in one place instead of configuring the ops and assets individually.
Share implementations across multiple ops or assets - When multiple ops access the same external services, resources provide a standard way to structure your code to share the implementations.
Class for resource definitions. You almost never want to use initialize this class directly. Instead, you should use the @resource which returns a ResourceDefinition.
To define a resource, use the @resource decorator. Wrap a function that takes an init_context as the first parameter, which is an instance of InitResourceContext. From this function, return or yield the object that you would like to be available as a resource.
from dagster import resource
classExternalCerealFetcher:deffetch_new_cereals(self, start_ts, end_ts):pass@resourcedefcereal_fetcher(init_context):return ExternalCerealFetcher()
Resources can be provided to software-defined assets by passing them Definitions. The resources provided to Definitions are automatically bound to the assets.
from dagster import Definitions
defs = Definitions(
assets=[asset_requires_resource],
resources={"foo": foo_resource},)
When defining asset jobs (using define_asset_job), you don't need to provide resources to the job directly. The job will make use of the resources provided to the assets.
Like software-defined assets, ops use resource keys to access resources:
from dagster import op
CREATE_TABLE_1_QUERY ="create table_1 as select * from table_0"@op(required_resource_keys={"database"})defop_requires_resources(context):
context.resources.database.execute_query(CREATE_TABLE_1_QUERY)
Jobs provide resources to the ops inside them. A job has a dictionary that maps resource keys to resource definitions. You can supply this dictionary to the resource_defs argument when using either of the ways to construct a job: GraphDefinition.to_job or @job.
Supplying resources when using GraphDefinition.to_job is especially common, because you can build multiple jobs from the same graph that are distinguished by their different resources.
Supplying resources to the @job, i.e. when there aren't multiple jobs for the same graph, is also useful. For example, if you want to use an off-the-shelf resource or supply configuration in one place instead of in every op.
from dagster import job
@job(resource_defs={"database": database_resource})defdo_database_stuff_job():
op_requires_resources()
Dagster resources can serve as context managers, for scenarios where it is necessary to perform some sort of cleanup of the resource after execution. Let’s take the example of a database connection. We might want to clean up the connection once we are done using it. We can incorporate this into our resource like so:
At spinup time, Dagster will run the code within the try block, and be expecting a single yield. Having more than one yield will cause an error. The yielded object will be available to code that requires the resource:
Once execution finishes, the finally block of the resource init function will run. In the case of our db_connection resource, this will run the cleanup function.
An important nuance is that resources are initialized (and torn down) once per process. This means that if using the in-process executor, which runs all steps in a single process, resources will be initialized at the beginning of execution, and torn down after every single step is finished executing. In contrast, when using the multiprocess executor (or other out-of-process executors), where there is a single process for each step, at the beginning of each step execution, the resource will be initialized, and at the end of that step’s execution, the finally block will be run.
There are scenarios where you might want to reuse the code written within your resources outside of the context of execution. Consider a case where you have a resource db_connection, and you want to use that resource outside of the context of an execution. You can use the build_resources API to initialize this resource outside of execution.
Resources can depend upon other resources. Use the required_resource_keys parameter of the @resource decorator to specify which resources to depend upon. Access the required resources through the context object provided to the wrapped function.
Providing resources to assets while using @repository#
For those still using repository to organize definitions, you have to use with_resources to provide resources to assets. This function takes in a sequence of assets and returns transformed versions of those assets with the provided resources specified.
from dagster import repository, with_resources
@repositorydefrepo():return[*with_resources(
definitions=[asset_requires_resource],
resource_defs={"foo": foo_resource},)]
The with_resources function returns a copy of each asset, bound to the provided resources. Attempting to load a module containing an asset redefined via with_resources in the outermost scope along with the original asset may result in a "asset is defined multiple times" error. In this case, you may need to place your with_resource call within a repository definition.