Prefect & Coiled

· 325 words · 2 minute read

If you process significant amounts of data, but think Spark is a little messy, you have probably tried out Dask. And if you want to scale out Dask across multiple nodes, Coiled is your friend. Coiled is an on-demand dask cluster running in your own infrastructure, and best of all, it is designed to be invoked from your IDE, lifting the computation from your laptop to the cloud only when you want and running on your laptop otherwise.

It was through Coiled I learned about Prefect. While Prefect 1 had a very nice Coiled Dask excutor, Prefect 2 doesn’t work the same way, but it turns out it is very easy to use Coiled functions from Prefect flows. Coiled has a function decorator that can be used in conjunction with a Prefect task, which lets that particular task be executed with Coiled. Don’t worry, it still runs on your own infrastructure, but on a whole paralellized cluster.

The one small issue with running Coiled on Prefect is that Prefect must authenticate with Coiled using a Token. This isn’t hard, but I wanted to make it easier by creating a prefect-coiled (precoil) Prefect Block to store the credentials, and also include a tiny setup function that can be run before calling the Coiled function, that sets up the credentials for the session.

from prefect import task, flow
from precoil import CoiledConfig
from prefect import get_run_logger
import coiled

@task
def get_new_data_files():
    files = ["Hello", "World"]
    return files

@task
@coiled.function()
def process(file):
  # This code will be executed in Coiled!
    results = file.upper()
    print(f"Processing {file}...")
    return results

@flow(name="Coiled")
def coiled():

    logger = get_run_logger()

    # Set up coiled credentials so that you can run Coiled functions
    # "coiledaccount" refers to a Block we have registered in Prefect
    CoiledConfig.load('coiledaccount').configure()

    files = get_new_data_files()
    futures = process.map(files)
    for future in futures:
        logger.info(f"Done with {future.result()}")

if __name__ == '__main__':
    coiled()

The precoil repo with the Prefect Coiled block can be found at https://github.com/radbrt/precoil.