Currently two coupled codes are compiled separately. They will link through CPL library by running both using MPI's MPMD mode
mpiexec -n 64 ./md : -n 8 ./cfd
with all exchanges between them through CPL_send, CPL_recv using the single MPI_COMM_WORLD
created by MPMD jobs.
The problem with this approach is that they share a single MPI_COMM_WORLD
.
When we come to new codes, these require MPI_COMM_WORLD
to
be changed throughout the code if any communication in the code assume
they are the only code in the world (pretty much every software does!).
Although not too much work, the changes must then be maintained as a
patch or re-applied with each new version of the code from the
downstream repository. As an example, the LAMMPS socket (or APP)
currently has four different patches just from changes in the last two
years (and this is an established code).
Much better is to provide a function,
cplexec -n 64 ./md : -n 8 ./cfd
which creates two codes with their own MPI_COMM_WORLD. All
intercomms will then be setup by CPL library and no changes are required
to each code. One promising candidate to do this would be to write a
C++ program which uses MMPI_Comm_spawn
based on the command line input supplied. This would look something like,
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mpi.h"
#define NREALMS 2
int main(int argc, char **argv)
{
//Parse commandline and convert to exec names, universe size and processors
int n = 0;
int UNIVERSE_SIZE = 0;
int nprocs[NREALMS];
char *ptr;
char *execnames[NREALMS];
MPI_Comm parentcomm, intercomm[NREALMS];
MPI_Info infos[2] = { MPI_INFO_NULL, MPI_INFO_NULL };
//Parse input arguments
for (int i = 0; i < argc; ++i)
{
if ( strcmp("-n",argv[i]) == 0){
int nproc = strtol(argv[i+1], &ptr, 10);
UNIVERSE_SIZE += nproc;
nprocs[n] = nproc;
execnames[n] = argv[i+2];
printf("%d, %d, %s, %s \n", nprocs[n], UNIVERSE_SIZE, argv[i+2], execnames[n]);
n += 1;
}
}
// Now spawn the programs you have parsed
int ierr = MPI_Init( NULL, NULL);
MPI_Comm_get_parent( &parentcomm );
if (parentcomm == MPI_COMM_NULL)
{
//Loop and spawn
for (int n = 0; n < 2; ++n){
int errcodes[nprocs[n]];
ierr = MPI_Comm_spawn( execnames[n], MPI_ARGV_NULL, nprocs[n],
MPI_INFO_NULL, 0, MPI_COMM_WORLD, &intercomm[n], errcodes );
}
} else {
printf("ERROR -- parent comm has it's own parent!?\n");
return 1;
}
fflush(stdout);
ierr = MPI_Finalize();
return 0;
}
compiled using mpic++ ./code.cpp -o ./cplexec
and run as above,
cplexec -n 64 ./md -n 8 ./cfd
with sample md.exe and cfd.exe of the form,
//StackWorkers
#include <iostream>
#include <mpi.h>
#include <string.h>
#include <stdlib.h> /* malloc, free, rand */
int main(int argc, char *argv[])
{
int ierr = MPI_Init(NULL, NULL);
MPI_Comm parent, comm;
ierr = MPI_Comm_get_parent(&parent);
int np; int rank; int nu; int ncp; int crank;
MPI_Comm_size (MPI_COMM_WORLD, &np); // Find out number of processes
MPI_Comm_rank (MPI_COMM_WORLD, &rank); // Find out process rank
MPI_Comm_remote_size(parent, &nu);
comm = MPI_COMM_WORLD;
printf("I'm the spawned %s %d %d %d %d %d %d \n", argv[0], parent, np, rank, nu, ncp, crank );
//Allocate and array and gather some data
float sub_avg = float(rank);
float *sub_avgs = NULL;
if (rank == 0) {
sub_avgs = (float *)malloc(sizeof(float) * np);
}
MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0,
MPI_COMM_WORLD);
if (rank == 0) {
for (int i = 0; i < np; ++i)
printf("proc %d send %f \n ", i, sub_avgs[i]);
}
fflush(stdout);
ierr = MPI_Finalize();
return 0;
}
There is a major problem with all this: The process which spawns the CFD and MD child processes is then sitting doing nothing. I don't think we can simple finalise it and be sure it won't take the children with it. One option would be to merge the parent into one of the child runs. This can be achieved by adding,
//Merge parent with cfd processors
MPI_Comm comm;
MPI_Intercomm_merge(intercomm[1], 0, &comm );
MPI_Barrier(comm);
on the parent and,
if ( strcmp("./cfd.exe",argv[0]) == 0){
MPI_Intercomm_merge(parent, 1, &comm );
MPI_Barrier(comm);
}else{
comm = MPI_COMM_WORLD;
}
to the child. However, the merged cfd and parent still don't share an MPI_COMM_WORLD so this won't work in general! In addition, the topology may be extremely badly setup with the parent on an entirely different node to the remaining cfd processors which it will commonly communicate (spawn is discouraged for reasons of efficiency in general).
The use of MPI ports and connect presents a better options. This allow two previous unrelated MPI jobs to link up and begin sharing an intercomm. In principle, this requires names to be posted on an ompi-server (or mpich equivalent). However, for scientific computing, writing to a shared file space is a common feature and one which we can safely assume is possible. In fact, many coupling schemes actually go no futher than writing and reading based coupling between code.
So, if we were to open a port in one MPI program and write this to a file. All we would need to do in program two is to read the port from this file, connect and begin sending information. The code to do this,
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <fstream>
using namespace std;
int main( int argc, char *argv[] )
{
int num_errors = 0;
int rank, size;
char port1[MPI_MAX_PORT_NAME];
char port2[MPI_MAX_PORT_NAME];
MPI_Status status;
MPI_Comm comm1, comm2;
int data = 0;
char *ptr;
int runno = strtol(argv[1], &ptr, 10);
for (int i = 0; i < argc; ++i)
printf("inputs %d %d %s \n", i,runno, argv[i]);
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (runno == 0)
{
printf("0: opening ports.\n");fflush(stdout);
MPI_Open_port(MPI_INFO_NULL, port1);
printf("opened port1: <%s>\n", port1);
//Write port file
ofstream myfile;
myfile.open("port");
if( !myfile )
cout << "Opening file failed" << endl;
myfile << port1 << endl;
if( !myfile )
cout << "Write failed" << endl;
myfile.close();
printf("Port %s written to file \n", port1); fflush(stdout);
printf("accepting port1.\n");fflush(stdout);
//Establish connection and send data
MPI_Comm_accept(port1, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm1);
printf("sending 5 \n");fflush(stdout);
data = 5;
MPI_Send(&data, 1, MPI_INT, 0, 0, comm1);
MPI_Close_port(port1);
}
else if (runno == 1)
{
//Read port file
size_t chars_read = 0;
ifstream myfile;
//Wait until file exists and is avaialble
myfile.open("port");
while(!myfile){
myfile.open("port");
cout << "Opening file failed" << myfile << endl;
usleep(30000);
}
while( myfile && chars_read < 255 ) {
myfile >> port1[ chars_read ];
if( myfile )
++chars_read;
if( port1[ chars_read - 1 ] == '\n' )
break;
}
printf("Reading port %s from file \n", port1); fflush(stdout);
remove( "port" );
//Establish connection and recieve data
MPI_Comm_connect(port1, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm1);
MPI_Recv(&data, 1, MPI_INT, 0, 0, comm1, &status);
printf("Received %d 1\n", data); fflush(stdout);
}
//Barrier on intercomm before disconnecting
MPI_Barrier(comm1);
MPI_Comm_disconnect(&comm1);
MPI_Finalize();
return 0;
}
This code should be compiled and then run as two separate executables with run number runno<\code> specified by a command line option,
<code> mpiexec ./a.out 0 & mpiexec ./a.out 1
This means that the CPL_init can be written to check if MPI_COMM_WORLD
contains everything and if not, attempt to open a socket between the two codes.
Once open, this can be used as CPL_INTER_COMM is in the main setup and extended to an intracomm by merging the two wit MPI_intercomm_create(CPL_REALM_COMM, comm_size - 1, CPL_WORLD_COMM ...
etc
This does all mean that the cplexec will not be needed, instead both codes can simply be run using something like,
mpiexec -n 64 ./md.exe > MD_run_hist & mpiexec -n 8 ./cfd.exe > CFD_run_hist
It is this approach that CPL library uses. A function, cplexec, is still provided to create both jobs as Python subprocesses, check all libraries are consistent between both runs, kill both jobs neatly if either fails and attempt to ensure any errors are raised correctly.