Submitting Cluster Jobs

Overview

Questions

  • How do I submit workflows on HPC systems?

  • How can I combine many simulations into a single cluster job?

Objectives

  • Demonstrate the use of resources to execute with MPI domain decomposition.

  • Demonstrate the use of groups to execute an action on multiple jobs in parallel.

Cluster Jobs

On HPC systems, you submit cluster jobs to the queue which then execute on the compute nodes. Row generates cluster job submission scripts that execute the actions in your workflow. HPC systems provide access to many more CPU cores or GPUs than are available on the typical workstation. Use them to execute the most time-consuming actions. You can increase the throughput (number of simulations per unit time) by executing many simulations in parallel, reduce latency (time to complete a single simulation) by executing each simulation on more than one MPI rank with domain decomposition, or combine the two.

This tutorial will demonstrate how to use row to submit HOOMD-blue actions in cluster jobs. See Parallel Simulations with MPI to learn more about MPI domain decomposition. See the row user documentation and your HPC center’s documentation for an introduction to HPC job queues.

Domain decomposition

The resources table workflow.toml instructs row to request the chosen resources in the cluster job. The values used in this tutorial are for example purposes only. You should choose a number of MPI ranks, equilibration steps, and cluster job walltime appropriate for your project.

HOOMD-blue uses MPI for interprocess communication. In this case, the 2 processes per directory equate to 2 MPI ranks per directory using domain decomposition:

[action.resources]
processes.per_directory = 2

You also need to select the mpi launcher:

[[action]]
...
launchers = ["mpi"]

Executing many simulations in parallel

Some HPC systems schedule jobs only by full node or limit the number of cluster jobs you can queue at one time. You may also have many thousands of directories and want to reduce the time spent waiting in queue. In both of these cases, you can use MPI to group the execution of an action on many directories into one cluster job (see Parallel Simulations with MPI for an introduction to MPI partitions). You can use MPI partitions alone (one rank per directory) or in combination with MPI domain decomposition (more than one rank per directory).

Row groups eligible directories together every time it submits an action for execution. It groups ALL eligible directories by default. The randomize and compress actions in the previous section take *jobs as an argument and loop over the directories in serial. When executing in serial like this, the wall time scales with the number of directories. Those actions correspondingly set resources.walltime.per_directory and row computes the total wall time needed for the cluster job based on the group’s size. Randomize and compress execute very quickly, so the total wall times to process the directories in serial are manageable.

The equilibrate action takes much longer, so it is not feasible to execute in serial. To execute equilibrate in parallel, you need to:

  1. Launch up to maximum_size directories per cluster job. Set walltime.per_submission so that the total wall time does not scale with the number of directories.

    [action.group]
    maximum_size = 64
    
    [action.resources]
    walltime.per_submission = "12:00:00"
    
  2. Choose the job based on the partition in equilibrate.py (shown in the previous section): python     communicator = hoomd.communicator.Communicator(         ranks_per_partition=RANKS_PER_PARTITION     )     job = jobs[communicator.partition]

Submitting cluster jobs

The example is small for demonstration purposes. It uses 2 MPI ranks per directory and executes all three directories in one cluster job (in parallel). In production work you should choose the number of ranks per directory (processes.per_directory) and the number of directories per cluster job (maximum_size) to utilize an integer number of whole nodes in each cluster job leaving no empty cores or GPUs. For example use 16 ranks per directory and 32 directories per cluster job to use 4 whole 128-core nodes per cluster job. When the total number of directories is not an integer multiple of maximum_size, row will form one smaller tail group to cover the remainder. If it doesn’t fill a whole node, Row will submit the tail cluster job to a shared queue (if available on the HPC system).

Here is the full workflow.toml that would allow cluster jobs to use up to one full 128-core node:

[2]:
with open("workflow.toml", "w") as workflow:
    workflow.write("""
[default.action]
command = "python project.py --action $ACTION_NAME {directories}"
submit_options.anvil.account = "my_account"

[[action]]
name = "randomize"
products = ["random.gsd"]
resources.walltime.per_directory = "00:05:00"

[[action]]
name = "compress"
previous_actions = ["randomize"]
products = ["compressed.gsd"]
resources.walltime.per_directory = "00:10:00"

[[action]]
name = "equilibrate"
previous_actions = ["compress"]
products = ["trajectory.gsd"]
launchers = ["mpi"]

[action.group]
maximum_size = 64

[action.resources]
processes.per_directory = 2
walltime.per_submission = "12:00:00"
""")

This tutorial assumes that you are using the Purdue Anvil cluster. It sets a placeholder cluster account in the default action submit_options.anvil.account. Adjust the account name and/or the cluster name as appropriate for your HPC system.

[3]:
! row show status
Action      Completed Submitted Eligible Waiting Remaining cost
randomize           3         0        0       0
compress            3         0        0       0
equilibrate         0         0        3       0   72 CPU-hours

The equilibrate step is ready to execute. Row automates the cluster job submission process with row submit. Use the --dry-run flag first to ensure that the generated cluster jobs are correct (--dry-run displays the generated submission script(s) and skips the submission step):

[4]:
! row submit --dry-run
#!/bin/bash
#SBATCH --job-name=equilibrate-59363805e6f46a715bc154b38dffc4e4+2
#SBATCH --output=equilibrate-%j.out
#SBATCH --partition=shared
#SBATCH --ntasks=6
#SBATCH --mem-per-cpu=1800M
#SBATCH --time=720
#SBATCH --account=my_account

directories=(
59363805e6f46a715bc154b38dffc4e4
972b10bd6b308f65f0bc3a06db58cf9d
c1a59a95a0e8b4526b28cf12aa0a689e
)

export ACTION_WORKSPACE_PATH=workspace
export ACTION_CLUSTER=anvil
export ACTION_NAME=equilibrate
export ACTION_PROCESSES=6
export ACTION_WALLTIME_IN_MINUTES=720
export ACTION_PROCESSES_PER_DIRECTORY=2

trap 'printf %s\\n "${directories[@]}" | /Users/joaander/.cargo/bin/row scan --no-progress -a equilibrate - || exit 3' EXIT
srun --mpi=pmi2 --ntasks=6 python project.py --action $ACTION_NAME "${directories[@]}" || { >&2 echo "[row] Error executing command."; exit 1; }

In this configuration, row submits one cluster job that launches project.py with 6 ranks.

The partitioned communicator

hoomd.communicator.Communicator(ranks_per_partition=2)

will form 3 partitions, each using 2 MPI ranks to domain decompose the simulation. The partition index selects which directory each partition will execute:

job = jobs[communicator.partition]

If there were more than 64 directories in this workspace, row would generate more than one cluster job due to the maximum_size = 64.

When you are sure that the resources are configured correctly, you can execute row submit on a cluster to submit the job to the queue. Row will track the cluster jobs and prevent you from submitting an action on a directory that is still in queue or running.

Summary

In this section of the tutorial, you configured the eqilibrate action to execute on many directories in parallel and used row to generate a cluster job for submission.

This is the end of the tutorial on organizing and executing simulations.

This tutorial only teaches the basics of row. Read the row documentation to learn more.