Last Updated: June 11, 2025

Managing Long-Running Tasks on SaladCloud with ๐Ÿ• Kelpie

Managing long running tasks, such as molecular simulations, LoRA training, and LLM finetuning, presents unique challenges on SaladCloud, due primarily to the interruptible nature of nodes. At the core of all solutions to this problem are a job queue, and progress checkpoints. The job queue is responsible for distributing tasks to workers, and detecting when a worker has been interrupted. Workloads should save checkpoints of their progress and upload it to cloud storage, so that they can be resumed from the last checkpoint in the event of an interruption. Workers should also upload completed artifacts to cloud storage.
Basic Architecture

Basic architecture for long-running tasks on SaladCloud

We will use ๐Ÿ• Kelpie as our job queue and Cloudflare R2, an S3-compatible object storage service, as our cloud storage. We prefer R2 to AWS S3 for many SaladCloud workloads, because R2 does not charge for egress data, and SaladCloudโ€™s distributed nodes are not in datacenters, and therefore may incur egress fees from other providers. Kelpie handles all interactions with the storage service, so your job will only need to write to the local file system, and Kelpie will take care of uploading the files to R2. For this guide, we will build an application that slowly calculates a sum for n steps, sleeping for 30 seconds between steps to simulate work. We will set up a storage bucket and a checkpoint saving system, and enable Kelpieโ€™s autoscaling to handle scaling the number of workers based on the number of jobs in the queue.

The Job Queue: Kelpie

๐Ÿ• Kelpie is an open-source job queue that is particularly focused on the challenges of running extremely long tasks on interruptible hardware, and has been used in production for hundreds of thousands of hours of molecular dynamics simulations and AI model finetuning. It is designed to be simple to instrument, and to be able to integrate with any containerized workload. It executes scripts in a container according to a job definition, and optionally handles downloading input data, uploading output data, and syncing progress checkpoints to your s3-compatible storage. It also provides a mechanism for scaling your container group in response to job volume. It has deep integrations with SaladCloud that are very convenient for our purposes. It has no cost to use on SaladCloud. You can use your Salad API key to authenticate with Kelpie, which will allow you to submit and manage jobs. Kelpie can authenticate automatically from a SaladCloud node. Kelpie uses the Salad container group ID as a queue name, which can be retrieved with the Get Container Group Endpoint. You can explore the full API with the Swagger UI.

Cloud Storage: R2

R2 is a cloud storage service from Cloudflare that is compatible with the S3 API. It is a great choice for SaladCloud workloads because it does not charge egress fees, and SaladCloudโ€™s distributed nodes are mostly not in datacenters, and therefore may incur egress fees from other providers. From the R2 console, navigate to โ€œR2 Object Storageโ€, and click โ€œCreate Bucketโ€.
The R2 Object Storage Console

The R2 Object Storage Console

Give your bucket a meaningful name, and select an appropriate location. We are going to use the standard storage class, and automatic location.
Creating a new bucket

Creating a new bucket

Once your bucket is created, you will need to create an access key and secret key. Select โ€œManage API tokensโ€ from the โ€{ } APIโ€ menu, and click โ€œCreate Tokenโ€.
Navigate to manage api tokens

You still need an API token to access your bucket

Create a token with โ€œObject Read & Writeโ€ permissions, and only grant it access to the bucket weโ€™ve just created. Since secret rotation is outside the scope of this guide, weโ€™re going to use the โ€œforeverโ€ TTL. However, it is best practice to user shorter-lived secrets and to have easy automatic mechanisms in place to rotate secrets as needed. Once created you will be given an access key and secret key. Save these somewhere safe, as you will not be able to retrieve them again. The application code will get these keys from environment variables, so you will need to set them in your environment. Also on that page will be the S3 endpoint URL for your bucket. Save this as well, as it will be needed in the application code.

Instrumenting Our Application

Using Kelpie just requires adding the Kelpie worker binary to your container, and setting it to run as the command in your Dockerfile. The worker binary will read the job queue, and execute the job script with the provided arguments. The
# Start with a base image that has the dependencies you need,
# and can successfully run your script.
FROM yourimage:yourtag

# Add the kelpie binary to your container image
ARG KELPIE_VERSION=0.6.0
ADD https://github.com/SaladTechnologies/kelpie/releases/download/${KELPIE_VERSION}/kelpie /kelpie
RUN chmod +x /kelpie

# Use kelpie as the "main" command. Kelpie will then execute your
# command with the provided arguments and environment variables
# from the job definition, from the WORKDIR of the container.
CMD ["/kelpie"]

Code

Our actual application code can be very simple:
import json
import time
import sys

def do_the_actual_work(num_steps: int, checkpoint: dict, checkpoint_file: str) -> int | None:
    '''
    Do the actual work for the job. This function will simulate work by
    sleeping for 30 seconds and incrementing the step and sum in the
    checkpoint.

    Parameters:
    - num_steps: int, the number of steps to run
    - checkpoint: dict, the checkpoint
    - checkpoint_file: str, the checkpoint file
    '''
    print(f"Max steps: {num_steps}", flush=True)
    print(f"Starting step: {checkpoint['step']}", flush=True)
    while checkpoint['step'] < num_steps:
        # Simulate work
        print(f"step {checkpoint['step']}", flush=True)
        time.sleep(30)
        # Update the checkpoint.
        checkpoint['step'] += 1
        checkpoint['sum'] += checkpoint['step']
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint, f)

    print(f"Job Finished. Sum: {checkpoint['sum']}", flush=True)
    return checkpoint['sum']


if __name__ == '__main__':
    '''
    Main function to run the job. This function will read the number of steps
    to run, the checkpoint file, and the output file from the command line
    arguments.
    '''
    usage = f"Usage: python {sys.argv[0]} <num_steps> <checkpoint_file> <output_file>"
    if len(sys.argv) != 4:
        print(usage, file=sys.stderr)
        sys.exit(1)
    num_steps = int(sys.argv[1])
    checkpoint_file = sys.argv[2]
    output_file = sys.argv[3]

    checkpoint = {'step': 0, 'sum': 0}

    # Load the checkpoint if it exists
    try:
        with open(checkpoint_file, 'r') as f:
            checkpoint = json.load(f)
    except FileNotFoundError:
        # If the checkpoint file does not exist, we start from scratch
        pass

    sum = do_the_actual_work(num_steps, checkpoint, checkpoint_file)

    # Write the final sum to the output file
    with open(output_file, 'w') as f:
        f.write(str(sum))
Youโ€™ll notice that the script does not interact with Kelpie directly, or with the storage service directly. It simply reads the command line arguments, which are provided by Kelpie, and writes to the local file system. Kelpie will handle the rest, including uploading the checkpoint file and output file to the storage service, and updating the job status in the queue.

Dockerfile

FROM python:3.10.12-slim-buster

# Our app has no dependencies, but yours probably does.
WORKDIR /app

# Our app has no dependencies, but yours probably does.
# If you have a requirements.txt, uncomment the next two lines
# COPY requirements.txt .
# RUN pip install -r requirements.txt

# Copy your main application code into the container.
COPY main.py .

# Set up directories for checkpoints and outputs.
RUN mkdir -p checkpoints && \
    mkdir -p outputs

# Add the kelpie binary to your container image
ARG KELPIE_VERSION=0.6.0
ADD https://github.com/SaladTechnologies/kelpie/releases/download/${KELPIE_VERSION}/kelpie /kelpie
RUN chmod +x /kelpie

# Use kelpie as the "main" command. Kelpie will then execute your
# command with the provided arguments and environment variables
# from the job definition, from the WORKDIR of the container.
CMD ["/kelpie"]

Building and Testing the Worker

Now, build the docker image, and use a tag that makes sense for you.
docker build -t saladtechnologies/lrt-worker-examples:kelpie .
You can push this image to Docker Hub or any other container registry you prefer.
docker push saladtechnologies/lrt-worker-examples:kelpie
We can test the image locally by running it with our Salad API Key and a contrived container group ID. First, generate a random UUID to use as the container group ID. You can use the uuidgen command on Linux or macOS, or use an online UUID generator. Do this again to simulate a machine ID.
uuidgen
# b26a8cb1-1806-454f-80df-9721a6e76910
uuidgen
# 4b659cb9-d508-4e1e-9cee-7116d36dd542
Create a file called worker.env with the following contents, replacing <your_api_key> with your Salad API key and <container_group_id> with the UUID you generated above. Some of these will not be required in production, as they can be derived automatically from the SaladCloud node, but we will set them explicitly for local testing.
SALAD_API_KEY=your-api-key
SALAD_ORGANIZATION=your-organization-name
SALAD_PROJECT=your-project-name
SALAD_CONTAINER_GROUP_ID=b26a8cb1-1806-454f-80df-9721a6e76910
SALAD_MACHINE_ID=4b659cb9-d508-4e1e-9cee-7116d36dd542
AWS_ACCESS_KEY_ID=your-access-key-id
AWS_SECRET_ACCESS_KEY=your-secret-access-key
AWS_ENDPOINT_URL=your-s3-endpoint-url
AWS_REGION=auto
Then, run the container with the environment variables set:
docker run --rm \
  --env-file worker.env \
  saladtechnologies/lrt-worker-examples:kelpie
You should see the Kelpie worker start up, and it will wait for jobs to be submitted to the queue. You will see logs that it is failing to set the deletion cost, because we are not running on SaladCloud, but this is not an issue for our purposes.

Submitting Jobs to the Queue

Now we need to populate the queue with jobs. First, weโ€™ll define some environment variables in a new file submitter.env.
SALAD_API_KEY=your-api-key
SALAD_ORGANIZATION=your-organization-name
SALAD_PROJECT=your-project-name
SALAD_CONTAINER_GROUP_ID=b26a8cb1-1806-454f-80df-9721a6e76910
BUCKET_NAME=long-running-tasks-checkpoints-and-artifacts
Iโ€™ve saved mine in a file called submitter.env, and Iโ€™m going to source them into my environment with the following command:
export $(grep -v '^#' submitter.env | xargs -d '\n')
Suppose we have a csv with our 10,000 jobs, and we want to submit them all. Our CSV (data.csv) looks like this, with 10,000 rows.
job_id,steps
job-0,600
job-1,600
job-2,600
job-3,600
We can submit all of these jobs with the following code:
import csv
import os
import requests
import sys

salad_api_key = os.getenv('SALAD_API_KEY')
salad_organization = os.getenv('SALAD_ORGANIZATION')
salad_project = os.getenv('SALAD_PROJECT')
container_group_id = os.getenv('SALAD_CONTAINER_GROUP_ID')
bucket_name = os.getenv('BUCKET_NAME')

kelpie_api_url = "https://kelpie.saladexamples.com"
batch_size = 100

kelpie_headers = {
    "Salad-Api-Key": salad_api_key,
    "Salad-Organization": salad_organization,
    "Salad-Project": salad_project,
    "Content-Type": "application/json"
}


def submit_one_batch(jobs: list) -> None:
    '''
    Kelpie API can accept up to 100 jobs in a single batch submission.
    '''
    response = requests.post(
        f"{kelpie_api_url}/jobs/batch", headers=kelpie_headers, json=jobs)
    if response.status_code != 202:
        print(f"Error submitting jobs: {response.text}")


def job_to_kelpie_job(id: str, steps: int) -> dict:
    return {
        # Our Container Group ID is used as a Job Queue ID
        "container_group_id": container_group_id,

        # We define how to run our job
        "command": "python",
        "arguments": [
            "main.py",
            str(steps),
            f"checkpoints/checkpoint.json",
            f"outputs/sum.txt"
        ],

        # We define the storage actions needed for the job
        "sync": {

            # Before the job runs, we download the checkpoint file
            "before": [
                {
                    "bucket": bucket_name,
                    "prefix": f"{id}/checkpoints/",
                    "local_path": "checkpoints/",
                    "direction": "download"
                }
            ],

            # During the job, we upload the checkpoint file when it changes
            "during": [
                {
                    "bucket": bucket_name,
                    "prefix": f"{id}/checkpoints/",
                    "local_path": "checkpoints/",
                    "direction": "upload"
                }
            ],

            # After the job runs, we upload the output file
            "after": [
                {
                    "bucket": bucket_name,
                    "prefix": f"{id}/outputs/",
                    "local_path": "outputs/",
                    "direction": "upload"
                }
            ]
        }
    }


if __name__ == "__main__":
    '''
    Main function to read the jobs from a CSV file and submit them to the Kelpie API.
    The CSV file should have two columns: job_id and steps.
    '''

    # We can pass a max number of jobs to submit as an argument. Helpful for testing.
    max_jobs = sys.argv[1] if len(sys.argv) > 1 else None

    jobs = []
    total_jobs = 0
    with open('data.csv', 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            job_id = row['job_id']
            steps = int(row['steps'])
            jobs.append(job_to_kelpie_job(job_id, steps))
            total_jobs += 1
            if max_jobs and total_jobs >= int(max_jobs):
                break
            if len(jobs) >= batch_size:
                submit_one_batch(jobs)
                jobs = []
    if jobs:
        submit_one_batch(jobs)
        jobs = []
    print(
        f"All jobs submitted successfully. Total jobs submitted: {total_jobs}")

This code reads the jobs from a CSV file, converts them to Kelpie job definitions, and submits them to the Kelpie API in batches of 100. The job definitions specify the command to run, the arguments to pass, and the storage actions to take before, during, and after the job runs. The checkpoint file is downloaded before the job runs, uploaded during the job when it changes, and the output file is uploaded after the job runs. You can run this code with the following command:
python submit_jobs.py 1
You should see output indicating that the jobs were submitted successfully. If you check your running container, you should see it pick up a job and start working.

Deploying the Worker to SaladCloud

To deploy our worker to SaladCloud, we need to create a new Container Group. This can be done via the API, SDKs, or the Portal. Weโ€™re going to use the Portal. Weโ€™re going to create a new Container Group, and weโ€™re going to use the image we just pushed to Docker Hub. Weโ€™re going to request 100 replicas (the max via the portal), and weโ€™re going to set most, but not all, of our environment variables from worker.env.
Creating a new Container Group

Creating a new Container Group

Our application is extremely simple, so weโ€™re going to only request 1 vCPU, 1 GB of RAM, and no GPU. Your hardware requirements are likely significantly higher than this.
Setting the hardware requirements

Setting the hardware requirements

All CPU-only jobs are prioritized as โ€œBatchโ€ (the lowest tier), and we donโ€™t need any additional storage for this particular application.
Setting the job priority and storage requirements

Setting the job priority and storage requirements

We do not need the container gateway, as our application pulls its work from a queue. We also do not need health probes, as those are primarily for services accessed via Container Gateway. Go ahead and hit deploy, and youโ€™ll be taken to the container group page, where you can see its status. First, it will prepare by pulling the container image into our high-performance cache.
Preparing the container

Preparing the container

Once itโ€™s prepared, it will start allocating replicas, and downloading the container image to those replicas.
Allocating replicas

Allocating replicas

Once the replicas are allocated, they will start downloading the container image from our internal cache. This can take a few minutes, depending on the size of the image.
Downloading the images to the replicas

Downloading the images to the replicas

After a minute or so, we should see our instances up and running.
Instances up and running

Instances up and running

Validating That It Works

Now that our cluster is up and running, we need to retrieve the container group id from the Get Container Group Endpoint and set it in our submitter.env variables, and then run our job submitter script to submit jobs to the queue.
export $(grep -v '^#' submitter.env | xargs -d '\n')
python submit-jobs.py
Then, weโ€™ll be able to see our jobs being processed in our container group logs:
Kelpie worker hard at work

Kelpie worker hard at work

We can also see this by querying the Kelpie API for the jobs in the queue. The List Jobs Endpoint lets us query by status and container group ID.
curl -X 'GET' \
  'https://kelpie.saladexamples.com/jobs?status=running&container_group_id=<your-container-group-id>&page_size=2' \
  -H 'accept: application/json' \
  -H 'Salad-Api-Key: <your-salad-api-key>' \
  -H 'Salad-Organization: <your-salad-organization>' \
  -H 'Salad-Project: <your-salad-project>'
Youโ€™ll get a list of jobs that are currently running, and you can see the job ID, status, and other details. Learn more about job lifecycle and job statuses.
{
  "_count": 2,
  "jobs": [
    {
      "id": "0376cf91-6287-4783-82ae-456054c5d4ef",
      "user_id": "5b0f6331-fc03-423c-919e-11728c2b97b2",
      "status": "running",
      "created": "2025-06-11T14:59:14.000Z",
      "started": "2025-06-11T14:59:18.000Z",
      "heartbeat": "2025-06-11T15:05:03.000Z",
      "num_failures": 0,
      "machine_id": "5bcbf73a-a4cc-2d50-ac39-5493edb9431a",
      "command": "python",
      "arguments": ["main.py", "600", "checkpoints/checkpoint.json", "outputs/sum.txt"],
      "environment": {},
      "max_failures": 3,
      "heartbeat_interval": 30,
      "container_group_id": "26506e01-6356-48ab-8b63-fd4c32cd881f",
      "webhook": null,
      "compression": false,
      "num_heartbeats": 12,
      "sync": {
        "before": [
          {
            "bucket": "long-running-tasks-checkpoints-and-artifacts",
            "prefix": "job-15/checkpoints/",
            "local_path": "checkpoints/",
            "direction": "download"
          }
        ],
        "during": [
          {
            "bucket": "long-running-tasks-checkpoints-and-artifacts",
            "prefix": "job-15/checkpoints/",
            "local_path": "checkpoints/",
            "direction": "upload"
          }
        ],
        "after": [
          {
            "bucket": "long-running-tasks-checkpoints-and-artifacts",
            "prefix": "job-15/outputs/",
            "local_path": "outputs/",
            "direction": "upload"
          }
        ]
      }
    },
    {
      "id": "30af44d4-9c01-44eb-8bcd-6ecf7a63c1f9",
      "user_id": "5b0f6331-fc03-423c-919e-11728c2b97b2",
      "status": "running",
      "created": "2025-06-11T14:59:14.000Z",
      "started": "2025-06-11T14:59:30.000Z",
      "heartbeat": "2025-06-11T15:09:56.000Z",
      "num_failures": 0,
      "machine_id": "60127913-82f0-be50-a4df-1d052b8daf84",
      "command": "python",
      "arguments": ["main.py", "600", "checkpoints/checkpoint.json", "outputs/sum.txt"],
      "environment": {},
      "max_failures": 3,
      "heartbeat_interval": 30,
      "container_group_id": "26506e01-6356-48ab-8b63-fd4c32cd881f",
      "webhook": null,
      "compression": false,
      "num_heartbeats": 19,
      "sync": {
        "before": [
          {
            "bucket": "long-running-tasks-checkpoints-and-artifacts",
            "prefix": "job-56/checkpoints/",
            "local_path": "checkpoints/",
            "direction": "download"
          }
        ],
        "during": [
          {
            "bucket": "long-running-tasks-checkpoints-and-artifacts",
            "prefix": "job-56/checkpoints/",
            "local_path": "checkpoints/",
            "direction": "upload"
          }
        ],
        "after": [
          {
            "bucket": "long-running-tasks-checkpoints-and-artifacts",
            "prefix": "job-56/outputs/",
            "local_path": "outputs/",
            "direction": "upload"
          }
        ]
      }
    }
  ]
}
From the R2 console, we can see that our bucket is being filled with checkpoints and results.
Checkpoints and results in the R2 bucket

Checkpoints and results in the R2 bucket

Autoscaling

Kelpie supports autoscaling based on the number of jobs in the queue. You can enable autoscaling by first adding the kelpie user (currently shawn.rushefsky@salad.com) to your organization, and then creating an autoscaling rule through the Kelpie API with the Create Scaling Rule Endpoint
curl -X 'POST' \
  'https://kelpie.saladexamples.com/scaling-rules' \
  -H 'accept: application/json' \
  -H 'Salad-Api-Key: <your-salad-api-key>' \
  -H 'Salad-Organization: <your-salad-organization>' \
  -H 'Salad-Project: <your-salad-project>' \
  -H 'Content-Type: application/json' \
  -d '{
  "container_group_id": "<your-container-group-id>",
  "min_replicas": 0,
  "max_replicas": 100,
  "idle_threshold_seconds": 0
  }'
Every 5 minutes, the Kelpie API evaluates the number of jobs in the queue, and scales the number of replicas to be equal to the number of jobs in the queue, up to the maximum number of replicas specified in the scaling rule. If there are no jobs in the queue, the API will scale down the number of replicas to the minimum specified in the scaling rule, including stopping the container group if the minimum is 0. Learn more about Kelpie autoscaling.

Conclusion

In this guide, we have covered how to manage long-running tasks on SaladCloud using Kelpie as a job queue, and R2 as cloud storage. We have built a simple application that simulates work by calculating a sum, and we have set up checkpoints and output files to be uploaded to R2. We have also covered how to submit jobs to the queue, and how to deploy the worker to SaladCloud. Finally, we have covered how to enable autoscaling for the worker based on the number of jobs in the queue. This setup can be used for a wide range of long-running tasks, such as molecular simulations, LoRA training, and LLM finetuning, and can be easily adapted to your specific needs.