Snowflake with PySpark (dagster-snowflake-pyspark)

This library provides an integration with the Snowflake data warehouse and PySpark data processing library.

To use this library, you should first ensure that you have an appropriate Snowflake user configured to access your data warehouse.

Related Guides:

dagster_snowflake_pyspark.snowflake_pyspark_io_manager IOManagerDefinition

Config Schema:
database (dagster.StringSource):

Name of the database to use.

account (dagster.StringSource):

Your Snowflake account name. For more details, see https://bit.ly/2FBL320.

user (dagster.StringSource):

User login name.

password (dagster.StringSource, optional):

User password.

warehouse (dagster.StringSource, optional):

Name of the warehouse to use.

schema (dagster.StringSource, optional):

Name of the schema to use.

role (dagster.StringSource, optional):

Name of the role to use.

private_key (dagster.StringSource, optional):

Raw private key to use. See https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details.

private_key_path (dagster.StringSource, optional):

Path to the private key. See https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details.

private_key_password (dagster.StringSource, optional):

The password of the private key. See https://docs.snowflake.com/en/user-guide/key-pair-auth.html for details.

An IO manager definition that reads inputs from and writes PySpark DataFrames to Snowflake. When using the snowflake_pyspark_io_manager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.

Returns:

IOManagerDefinition

Examples

from dagster_snowflake_pyspark import snowflake_pyspark_io_manager
from pyspark.sql import DataFrame
from dagster import Definitions

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": snowflake_pyspark_io_manager.configured({
            "database": "my_database",
            "warehouse": "my_warehouse", # required for snowflake_pyspark_io_manager
            "account" : {"env": "SNOWFLAKE_ACCOUNT"},
            "password": {"env": "SNOWFLAKE_PASSWORD"},
            ...
        })
    }
)

Note that the warehouse configuration value is required when using the snowflake_pyspark_io_manager

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the IO Manager. For assets, the schema will be determined from the asset key. For ops, the schema can be specified by including a “schema” entry in output metadata. If “schema” is not provided via config or on the asset/op, “public” will be used for the schema.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> DataFrame:
    # the returned value will be stored at my_schema.my_table
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: DataFrame) -> DataFrame:
    # my_table will just contain the data from column "a"
    ...
class dagster_snowflake_pyspark.SnowflakePySparkTypeHandler(*args, **kwds)[source]

Plugin for the Snowflake I/O Manager that can store and load PySpark DataFrames as Snowflake tables.

Examples

from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from pyspark.sql import DataFrame
from dagster import Definitions

snowflake_io_manager = build_snowflake_io_manager([SnowflakePySparkTypeHandler()])

@asset
def my_asset() -> DataFrame:
    ...

defs = Definitions(
    assets=[my_asset],
    resources={
        "io_manager": snowflake_io_manager.configured(...)
    }
)

# OR

@job(resource_defs={'io_manager': snowflake_io_manager})
def my_job():
    ...