Submitting Cluster Jobs

Overview

Questions

  • How do I submit workflows on HPC resources?

  • How can I combine many simulations into a single cluster job?

Objectives

  • Show how to structure the workflow in project.py and use signac-flow’s command line interface.

  • Demonstrate the use of directives to set walltime limits and execute with MPI domain decomposition.

  • Explain how to use aggregate operations with MPI partitions to increase throughput.

Cluster jobs

On HPC resources, you submit cluster jobs to the queue which execute on the compute nodes. You can use signac-flow to generate cluster job submission scripts that execute the steps in your workflow. Use this to execute the expensive or time consuming workflow steps on HPC resources that provide access to many more CPU cores or GPUs than are available on the typical workstation. You can use these resources for to increase throughput (number of simulations per unit time) by executing many simulations at the same time, reduce latency (time to complete a single simulation) by executing each simulation on more than one MPI rank with domain decomposition, or some combination of the two.

This tutorial will demonstrate how to use signac-flow to submit HOOMD-blue workflow steps in cluster jobs. See Parallel Simulations with MPI to learn more about MPI domain decomposition in HOOMD-blue.

Command line interface

Use the command line interface to submit cluster jobs on HPC resources. Place the entire worfklow in a Python file and add a __main__ entry point that calls Project().main(). project.py contains the workflow step code from the previous two tutorial sections and the additional code.

Define parameters:

N_RANKS = 2
N_EQUIL_STEPS = 200000
CLUSTER_JOB_WALLTIME = 1
HOOMD_RUN_WALLTIME_LIMIT = CLUSTER_JOB_WALLTIME * 3600 - 10 * 60

The values used in this tutorial are for example purposes only. You should choose a number of MPI ranks, equilibration steps, and cluster job walltime appropriate for your project. In this tutorial, the workflow step stops 10 minutes before the end of the cluster job. In your own workflows you may need more or less time depending on how long it takes to write out the final system state and exit cleanly.

Set the number of MPI ranks and cluster job walltime for the equilibrate operation:

@flow.directives(nranks=N_RANKS, walltime=CLUSTER_JOB_WALLTIME)

signac flow directives set options for the generated cluster job. Here, nranks sets the number of MPI ranks that the workflow step uses and waltime sets the cluster job walltime in hours.

Execute the entrypoint:

if __name__ == '__main__':
    Project().main()
[3]:
%pycat project.py
import math

import flow
import hoomd

# parameters
N_RANKS = 2
N_EQUIL_STEPS = 200000
CLUSTER_JOB_WALLTIME = 1
HOOMD_RUN_WALLTIME_LIMIT = CLUSTER_JOB_WALLTIME * 3600 - 10 * 60


def create_simulation(job):
    cpu = hoomd.device.CPU()
    sim = hoomd.Simulation(device=cpu, seed=job.statepoint.seed)
    mc = hoomd.hpmc.integrate.ConvexPolyhedron()
    mc.shape['octahedron'] = dict(vertices=[
        (-0.5, 0, 0),
        (0.5, 0, 0),
        (0, -0.5, 0),
        (0, 0.5, 0),
        (0, 0, -0.5),
        (0, 0, 0.5),
    ])
    sim.operations.integrator = mc

    return sim


class Project(flow.FlowProject):
    pass


@Project.operation
@Project.pre.true('initialized')
@Project.post.true('randomized')
def randomize(job):
    sim = create_simulation(job)
    sim.create_state_from_gsd(filename=job.fn('lattice.gsd'))
    sim.run(10e3)
    hoomd.write.GSD.write(state=sim.state,
                          mode='xb',
                          filename=job.fn('random.gsd'))
    job.document['randomized'] = True


@Project.operation
@Project.pre.after(randomize)
@Project.post.true('compressed_step')
def compress(job):
    sim = create_simulation(job)
    sim.create_state_from_gsd(filename=job.fn('random.gsd'))

    a = math.sqrt(2) / 2
    V_particle = 1 / 3 * math.sqrt(2) * a**3

    initial_box = sim.state.box
    final_box = hoomd.Box.from_box(initial_box)
    final_box.volume = (sim.state.N_particles * V_particle
                        / job.statepoint.volume_fraction)
    compress = hoomd.hpmc.update.QuickCompress(
        trigger=hoomd.trigger.Periodic(10), target_box=final_box)
    sim.operations.updaters.append(compress)

    periodic = hoomd.trigger.Periodic(10)
    tune = hoomd.hpmc.tune.MoveSize.scale_solver(moves=['a', 'd'],
                                                 target=0.2,
                                                 trigger=periodic,
                                                 max_translation_move=0.2,
                                                 max_rotation_move=0.2)
    sim.operations.tuners.append(tune)

    while not compress.complete and sim.timestep < 1e6:
        sim.run(1000)

    if not compress.complete:
        raise RuntimeError("Compression failed to complete")

    hoomd.write.GSD.write(state=sim.state,
                          mode='xb',
                          filename=job.fn('compressed.gsd'))
    job.document['compressed_step'] = sim.timestep


@Project.operation
@Project.pre.after(compress)
@Project.post(lambda job: job.document.get('timestep', 0) - job.document[
    'compressed_step'] >= N_EQUIL_STEPS)
# Cluster job directives.
@flow.directives(nranks=N_RANKS, walltime=CLUSTER_JOB_WALLTIME)
def equilibrate(job):
    end_step = job.document['compressed_step'] + N_EQUIL_STEPS

    sim = create_simulation(job)

    sim.operations.integrator.a = job.document.get('a', {})
    sim.operations.integrator.d = job.document.get('d', {})

    if job.isfile('restart.gsd'):
        sim.create_state_from_gsd(filename=job.fn('restart.gsd'))
    else:
        sim.create_state_from_gsd(filename=job.fn('compressed.gsd'))

    gsd_writer = hoomd.write.GSD(filename=job.fn('trajectory.gsd'),
                                 trigger=hoomd.trigger.Periodic(10_000),
                                 mode='ab')
    sim.operations.writers.append(gsd_writer)

    tune = hoomd.hpmc.tune.MoveSize.scale_solver(
        moves=['a', 'd'],
        target=0.2,
        trigger=hoomd.trigger.And([
            hoomd.trigger.Periodic(100),
            hoomd.trigger.Before(job.document['compressed_step'] + 5_000)
        ]))
    sim.operations.tuners.append(tune)

    try:
        while sim.timestep < end_step:
            sim.run(min(100_000, end_step - sim.timestep))

            if (sim.device.communicator.walltime + sim.walltime >=
                    WALLTIME_LIMIT):
                break
    finally:
        hoomd.write.GSD.write(state=sim.state,
                              mode='wb',
                              filename=job.fn('restart.gsd'))

        job.document['timestep'] = sim.timestep
        job.document['a'] = sim.operations.integrator.a.to_base()
        job.document['d'] = sim.operations.integrator.d.to_base()

        if sim.device.communicator.rank == 0:
            walltime = sim.device.communicator.walltime
            print(f'{job.id} ended on step {sim.timestep} '
                  f'after {walltime} seconds')


# Entrypoint.
if __name__ == '__main__':
    Project().main()

Check the status using python3 project.py status:

[4]:
!python3 project.py status --detailed --no-overview -p volume_fraction
Using environment configuration: StandardEnvironment
Fetching status: 100%|██████████████████████████| 9/9 [00:00<00:00, 3802.25it/s]
Fetching labels: 100%|████████████████████████| 3/3 [00:00<00:00, 165564.63it/s]

Detailed View:

job id                            operation          volume_fraction  labels
--------------------------------  ---------------  -----------------  --------
59363805e6f46a715bc154b38dffc4e4  equilibrate [U]                0.6
972b10bd6b308f65f0bc3a06db58cf9d  equilibrate [U]                0.4
c1a59a95a0e8b4526b28cf12aa0a689e  equilibrate [U]                0.5

[U]:unknown [R]:registered [I]:inactive [S]:submitted [H]:held [Q]:queued [A]:active [E]:error


The equilibrate step is ready to execute. signac-flow automates the cluster job submission process with python3 project.py submit. Use the --pretend flag first to ensure that the generated cluster jobs are correct (this displays the generated submission scripts without submitting them):

$ python3 project.py submit --pretend

Submitting cluster job 'octahedra_se/972b10bd6b308f65f0bc3a06db58cf9d/equilibrate/cdc78829a46f27e11ee8a98049bf0575':
 - Group: equilibrate(972b10bd6b308f65f0bc3a06db58cf9d)

# Submit command: sbatch
#!/bin/bash
#SBATCH --job-name="octahedra_se/972b10bd6b308f65f0bc3a06db58cf9d/equilibrate/cdc78829a46f27e11ee8a98049bf0575"
#SBATCH --partition=standard
#SBATCH -t 01:00:00
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2

set -e
set -u

# equilibrate(972b10bd6b308f65f0bc3a06db58cf9d)
python project.py run -o equilibrate -j 972b10bd6b308f65f0bc3a06db58cf9d
# Eligible to run:
# mpiexec -n 2  python project.py exec equilibrate 972b10bd6b308f65f0bc3a06db58cf9d


Submitting cluster job 'octahedra_se/59363805e6f46a715bc154b38dffc4e4/equilibrate/2c15943de4918753dc2373cd33d527ec':
 - Group: equilibrate(59363805e6f46a715bc154b38dffc4e4)
# Submit command: sbatch
#!/bin/bash
#SBATCH --job-name="octahedra_se/59363805e6f46a715bc154b38dffc4e4/equilibrate/2c15943de4918753dc2373cd33d527ec"
#SBATCH --partition=standard
#SBATCH -t 01:00:00
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2

set -e
set -u

# equilibrate(59363805e6f46a715bc154b38dffc4e4)
python project.py run -o equilibrate -j 59363805e6f46a715bc154b38dffc4e4
# Eligible to run:
# mpiexec -n 2  python project.py exec equilibrate 59363805e6f46a715bc154b38dffc4e4


Submitting cluster job 'octahedra_se/c1a59a95a0e8b4526b28cf12aa0a689e/equilibrate/e1ffbf0eafe27af17b2ffc6e0c4c6dd1':
 - Group: equilibrate(c1a59a95a0e8b4526b28cf12aa0a689e)
# Submit command: sbatch
#!/bin/bash
#SBATCH --job-name="octahedra_se/c1a59a95a0e8b4526b28cf12aa0a689e/equilibrate/e1ffbf0eafe27af17b2ffc6e0c4c6dd1"
#SBATCH --partition=standard
#SBATCH -t 01:00:00
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2

set -e
set -u

# equilibrate(c1a59a95a0e8b4526b28cf12aa0a689e)
python project.py run -o equilibrate -j c1a59a95a0e8b4526b28cf12aa0a689e
# Eligible to run:
# mpiexec -n 2  python project.py exec equilibrate c1a59a95a0e8b4526b28cf12aa0a689e

In this configuration, signac flow submits one cluster job for each signac job. Each cluster job executes HOOMD-blue with 2 MPI ranks to domain decompose the simulation. You can use one cluster job for each signac job on HPC resources that have shared queues and allow you to submit many cluster jobs.

Partitioning jobs

Some HPC resources schedule jobs only by full node or limit the number of cluster jobs you can queue at one time. Or you may have many thousands of signac jobs and want to reduce time spent waiting in queue. In these cases, use MPI to partition one cluster job to executes many signac jobs in parallel (see Parallel Simulations with MPI for an introduction to MPI partitions). You can use MPI partitions alone (one rank per signac job) or in combination with MPI domain decomposition (more than one rank per signac job).

Aggregate operations in signac-flow are workflow steps that execute on a list of signac jobs. For example, you could use aggregate operations to loop over simulations and average results generated with different random number seeds. See the signac-flow documentation for more information on aggregation in general

This tutorial aggregates signac jobs in groups of a defined size and executes the group in parallel using MPI partitions. The example is small for demonstration purposes. It uses 2 MPI ranks per signac job and executes all three signac jobs in one cluster job. In production work you should choose the number of ranks per signac job (RANKS_PER_PARTITION) and the number of signac jobs per cluster job (JOBS_PER_AGGREGATE) to utilize an integer number of whole nodes in each cluster job leaving no empty cores or GPUs. For example use 16 ranks per signac job and 32 signac jobs per cluster job to use 4 whole 128-core nodes per cluster job.

project_partitioned.py modifies project.py so that the equilibrate step executes in a partition:

[3]:
%pycat project_partitioned.py
import math

import flow
import hoomd

RANKS_PER_PARTITION = 2
JOBS_PER_AGGREGATE = 3
N_EQUIL_STEPS = 200000
WALLTIME_LIMIT = 50 * 60


def create_simulation(job, communicator):
    cpu = hoomd.device.CPU(communicator=communicator)
    sim = hoomd.Simulation(device=cpu, seed=job.statepoint.seed)
    mc = hoomd.hpmc.integrate.ConvexPolyhedron()
    mc.shape['octahedron'] = dict(vertices=[
        (-0.5, 0, 0),
        (0.5, 0, 0),
        (0, -0.5, 0),
        (0, 0.5, 0),
        (0, 0, -0.5),
        (0, 0, 0.5),
    ])
    sim.operations.integrator = mc

    return sim


class Project(flow.FlowProject):
    pass


@Project.operation
@Project.pre.true('initialized')
@Project.post.true('randomized')
def randomize(job):
    sim = create_simulation(job)
    sim.create_state_from_gsd(filename=job.fn('lattice.gsd'))
    sim.run(10e3)
    hoomd.write.GSD.write(state=sim.state,
                          mode='xb',
                          filename=job.fn('random.gsd'))
    job.document['randomized'] = True


@Project.operation
@Project.pre.after(randomize)
@Project.post.true('compressed_step')
def compress(job):
    sim = create_simulation(job)
    sim.create_state_from_gsd(filename=job.fn('random.gsd'))

    a = math.sqrt(2) / 2
    V_particle = 1 / 3 * math.sqrt(2) * a**3

    initial_box = sim.state.box
    final_box = hoomd.Box.from_box(initial_box)
    final_box.volume = (sim.state.N_particles * V_particle
                        / job.statepoint.volume_fraction)
    compress = hoomd.hpmc.update.QuickCompress(
        trigger=hoomd.trigger.Periodic(10), target_box=final_box)
    sim.operations.updaters.append(compress)

    periodic = hoomd.trigger.Periodic(10)
    tune = hoomd.hpmc.tune.MoveSize.scale_solver(moves=['a', 'd'],
                                                 target=0.2,
                                                 trigger=periodic,
                                                 max_translation_move=0.2,
                                                 max_rotation_move=0.2)
    sim.operations.tuners.append(tune)

    while not compress.complete and sim.timestep < 1e6:
        sim.run(1000)

    if not compress.complete:
        raise RuntimeError("Compression failed to complete")

    hoomd.write.GSD.write(state=sim.state,
                          mode='xb',
                          filename=job.fn('compressed.gsd'))
    job.document['compressed_step'] = sim.timestep


def equilibrated(job):
    return job.document.get(
        'timestep', 0) - job.document['compressed_step'] >= N_EQUIL_STEPS


@Project.operation
@flow.aggregator.groupsof(num=JOBS_PER_AGGREGATE)
@Project.pre.true('compressed_step')
@Project.post(lambda *jobs: all(equilibrated(job) for job in jobs))
@flow.directives(nranks=lambda *jobs: RANKS_PER_PARTITION * len(jobs),
                 walltime=1)
def equilibrate(*jobs):
    communicator = hoomd.communicator.Communicator(
        ranks_per_partition=RANKS_PER_PARTITION)
    job = jobs[communicator.partition]

    end_step = job.document['compressed_step'] + N_EQUIL_STEPS

    sim = create_simulation(job)

    sim.operations.integrator.a = job.document.get('a', {})
    sim.operations.integrator.d = job.document.get('d', {})

    if job.isfile('restart.gsd'):
        sim.create_state_from_gsd(filename=job.fn('restart.gsd'))
    else:
        sim.create_state_from_gsd(filename=job.fn('compressed.gsd'))

    gsd_writer = hoomd.write.GSD(filename=job.fn('trajectory.gsd'),
                                 trigger=hoomd.trigger.Periodic(10_000),
                                 mode='ab')
    sim.operations.writers.append(gsd_writer)

    tune = hoomd.hpmc.tune.MoveSize.scale_solver(
        moves=['a', 'd'],
        target=0.2,
        trigger=hoomd.trigger.And([
            hoomd.trigger.Periodic(100),
            hoomd.trigger.Before(job.document['compressed_step'] + 5_000)
        ]))
    sim.operations.tuners.append(tune)

    try:
        while sim.timestep < end_step:
            sim.run(min(100_000, end_step - sim.timestep))

            if (sim.device.communicator.walltime + sim.walltime >=
                    WALLTIME_LIMIT):
                break
    finally:
        hoomd.write.GSD.write(state=sim.state,
                              mode='wb',
                              filename=job.fn('restart.gsd'))

        job.document['timestep'] = sim.timestep
        job.document['a'] = sim.operations.integrator.a.to_base()
        job.document['d'] = sim.operations.integrator.d.to_base()

        if sim.device.communicator.rank == 0:
            walltime = sim.device.communicator.walltime
            print(f'{job.id} ended on step {sim.timestep} '
                  f'after {walltime} seconds')


if __name__ == '__main__':
    Project().main()

Here are the differences between project.py and project_partitioned.py: * The variables RANKS_PER_PARTITION and JOBS_PER_AGGREGATE set how signac-flow partitions signac jobs into cluster jobs. * create_simulation takes the communicator argument and passes it to the device constructor. * The directive @flow.aggregator.groupsof(num=JOBS_PER_AGGREGATE) defines equilibrate as an aggregate with up to JOBS_PER_AGGREGATE signac jobs contained in it. * The pre and post conditions on equilibrated are now functions of an array of signac jobs: python   @Project.pre.true('compressed_step')   @Project.post(lambda *jobs: all(equilibrated(job) for job in jobs)) * The directives set the number of ranks needed by all jobs in the aggregate: python   @flow.directives(nranks=lambda *jobs: RANKS_PER_PARTITION * len(jobs),                    walltime=CLUSTER_JOB_WALLTIME) * equilibrate is now a function of an array of jobs and chooses the job based on the communicator’s partition. python   def equilibrate(*jobs):       communicator = hoomd.communicator.Communicator(ranks_per_partition=RANKS_PER_PARTITION)       job = jobs[communicator.partition] (the remainder of equilibrate is unchanged from project.py.

Here is the cluster job generated by project_partitioned.py

$ python3 project_partitioned.py submit --pretend

Submitting cluster job 'octahedra_se/agg-e202cc8c2ce0bc1ef7a9d9fcdcd62b6d/equilibrate/614e7ec5470deb1e958ac9863ed1fb07':
 - Group: equilibrate(agg-e202cc8c2ce0bc1ef7a9d9fcdcd62b6d)

# Submit command: sbatch
#!/bin/bash
#SBATCH --job-name="octahedra_se/agg-e202cc8c2ce0bc1ef7a9d9fcdcd62b6d/equilibrate/614e7ec5470deb1e958ac9863ed1fb07"
#SBATCH --partition=standard
#SBATCH -t 01:00:00
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=6

set -e
set -u

# equilibrate(agg-e202cc8c2ce0bc1ef7a9d9fcdcd62b6d)
python project_partitioned.py run -o equilibrate -j agg-e202cc8c2ce0bc1ef7a9d9fcdcd62b6d
# Eligible to run:
# mpiexec -n 6  python project_partitioned.py exec equilibrate agg-e202cc8c2ce0bc1ef7a9d9fcdcd62b6d

It generates only one cluster job that uses MPI to execute the workflow step on 6 ranks which equilibrate splits into 3 partitions that each execute one signac job on 2 ranks each.

Esnure that your partitioned jobs set the product RANKS_PER_PARTITION * JOBS_PER_AGGREGATE to a number of CPU cores or GPUs equal to a whole number of cluster nodes on your HPC resource. Failure to do so will waste resources on HPC resources that schedule by node.

Summary

In this section of the tutorial, you defined the workflow in a file and used the signac flow command line interface to generate cluster jobs for submission. You also learned how to use partitions to more effectively use HPC resources by fully utilizing compute nodes with fewer cluster jobs.

This is the end of the tutorial on organizing and executing simulations.

This tutorial only teaches the basics of signac-flow. Read the signac-flow documentation to learn more.