Source code for cplpy


from ctypes import c_char_p, c_char, c_int, c_double, c_bool, c_void_p, byref, POINTER, util, pointer, cdll
import ctypes
import mpi4py
from distutils.version import StrictVersion
from mpi4py import MPI
import numpy as np
from numpy.ctypeslib import ndpointer, load_library
from functools import wraps
import os
import time
import subprocess as sp
from subprocess import STDOUT, check_output, CalledProcessError
import shutil
import pickle
import errno


__all__ = ["CPL", "cart_create", "run_test", "prepare_config", "parametrize_file"]


#class OpenMPI_Not_Supported(Exception):
#    pass

class CPLLibraryNotFound(Exception):
    pass

class mpi4py_version_error(Exception):
    pass


# TODO: Raise exception of library not loaded
_loaded = False

# All Python types except integers, strings, and unicode strings have to be
# wrapped in their corresponding ctypes type, so that they can be converted
# to the required C

_CPL_GET_VARS = {"icmin_olap": c_int, "jcmin_olap": c_int, "kcmin_olap": c_int,
                 "icmax_olap": c_int, "jcmax_olap": c_int, "kcmax_olap": c_int,
                 "icmin_cnst": c_int, "jcmin_cnst": c_int, "kcmin_cnst": c_int,
                 "icmax_cnst": c_int, "jcmax_cnst": c_int, "kcmax_cnst": c_int,
                 "icmin_bnry": c_int, "jcmin_bnry": c_int, "kcmin_bnry": c_int,
                 "icmax_bnry": c_int, "jcmax_bnry": c_int, "kcmax_bnry": c_int,
                 "ncx": c_int, "ncy": c_int, "ncz": c_int,
                 "npx_md": c_int, "npy_md": c_int, "npz_md": c_int,
                 "npx_cfd": c_int, "npy_cfd": c_int, "npz_cfd": c_int,
                 "overlap": c_int, "xl_md": c_double, "yl_md": c_double,
                 "nsteps_md": c_int, "nsteps_cfd": c_int, "nsteps_coupled": c_int,
                 "zl_md": c_double, "xl_cfd": c_double, "yl_cfd": c_double,
                 "zl_cfd": c_double, "dx" : c_double, "dy" : c_double, "dz" : c_double,
                 "x_orig_cfd": c_double,"y_orig_cfd": c_double,"z_orig_cfd": c_double,
                 "x_orig_md": c_double,"y_orig_md": c_double,"z_orig_md": c_double,
                 "timestep_ratio": c_int
                 }

_CPL_SET_VARS = {"output_mode": c_int}

_CPL_FILE_VARS_TYPES = {"CPL_INT", "CPL_DOUBLE", "CPL_INT_ARRAY", "CPL_DOUBLE_ARRAY"}

class CPL_VAR_TYPES():
    INT = 1
    DOUBLE = 2
    BOOL = 3
    STRING = 4
    INT_ARRAY = 5
    DOUBLE_ARRAY = 6
    BOOL_ARRAY = 7
    STRING_ARRAY = 8


_CPL_GET_FILE_VARS = {CPL_VAR_TYPES.DOUBLE: ("get_real_param", c_double), 
                      CPL_VAR_TYPES.DOUBLE_ARRAY:("get_real_array_param", POINTER(c_double)),
                      CPL_VAR_TYPES.INT: ("get_int_param", c_int), 
                      CPL_VAR_TYPES.INT_ARRAY: ("get_int_array_param", POINTER(c_int)),
                      CPL_VAR_TYPES.BOOL: ("get_boolean_param", c_bool), 
                      CPL_VAR_TYPES.BOOL_ARRAY: ("get_boolean_array_param", POINTER(c_bool)),
                      CPL_VAR_TYPES.STRING: ("get_string_param", c_char_p), 
                      CPL_VAR_TYPES.STRING_ARRAY: ("get_string_array_param", POINTER(c_char_p))}
# Decorator to abort all processes if an exception is thrown. This
# avoids getting blocked when the exception do not occurs in every
# process.
def abortMPI(func):
    @wraps(func)
    def handleExcepts(self, *args, **kwargs):
        retval = None
        try:
            retval = func(self, *args, **kwargs)
        except Exception as e:
            print (e)
            # Dirty workaround to let the output be printed.
            time.sleep(2)
            MPI.COMM_WORLD.Abort(errorcode=1)
        else:
            return retval
    return handleExcepts


[docs]class CPL: # Shared attribute containing the library CFD_REALM = 1 MD_REALM = 2 GATHER_SCATTER = 1 SEND_RECEIVE = 2 NULL_REALM = 0 _libname = "libcpl" try: _lib_path = os.environ["CPL_LIBRARY_PATH"] if os.path.exists(_lib_path + "/"+ _libname + ".so"): _cpl_lib = load_library(_libname, _lib_path) else: raise CPLLibraryNotFound("Compiled CPL library libcpl.so not found at " + _lib_path + "/"+ _libname + ".so") except KeyError as e: print(("CPL info: ", "CPL_LIBRARY_PATH not defined. Looking in system directories...")) try: _cpl_lib = cdll.LoadLibrary(_libname + ".so") print(("CPL info: ", "Success!")) except OSError as e: raise CPLLibraryNotFound("Library libcpl.so not found!") #TODO: Check this #time.sleep(2) #MPI.COMM_WORLD.Abort(errorcode=1) # Check for JSON support by cheking if load_param_file symbol exists JSON_SUPPORT = True try: _cpl_lib.CPLC_load_param_file except: JSON_SUPPORT = False def __init__(self): self._var = POINTER(POINTER(c_char_p)) self.realm = None # py_test_python function py_test_python = _cpl_lib.CPLC_test_python py_test_python.argtypes = \ [c_int, c_double, c_bool, ndpointer(np.int32, ndim=2, flags='aligned, f_contiguous'), ndpointer(np.float64, ndim=2, flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(2,), flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(2,), flags='aligned, f_contiguous')] @abortMPI def test_python(self, int_p, doub_p, bool_p, int_pptr, doub_pptr): int_pptr_dims = np.array(int_pptr.shape, order='F', dtype=np.int32) doub_pptr_dims = np.array(doub_pptr.shape, order='F', dtype=np.int32) self.py_test_python(int_p, doub_p, bool_p, int_pptr, doub_pptr, int_pptr_dims, doub_pptr_dims) #NOTE: Using CPLC_init_Fort and Comm.f2py() and Comm.py2f() we achieve integration # with MPICH and OpenMPI seamlessly. mpi4py >= 2.0.0 is needed. if StrictVersion(mpi4py.__version__) < StrictVersion('2.0.0'): raise mpi4py_version_error("Comm.f2py() and Comm.py2f()" + " require mpi4py >= 2.0.0") #Detect if OpenMPI or MPICH mpicc=str(mpi4py.get_config()['mpicc']) mpishow=sp.check_output(["mpicc","-show"]).decode("utf-8") if ("open" in mpicc or "open" in mpishow): MPI_version = "OPENMPI" ompi_info = sp.check_output("ompi_info").split("\n") for m in ompi_info: if ("Open MPI:" in m): ompi_major_version_no = int(m.split(":")[-1].split(".")[0]) elif ("mpich" in mpicc or "mpich" in mpishow): MPI_version = "MPICH" else: print(("UNKNOWN MPI VERSION FROM ", mpicc)) MPI_version = "UNKNOWN" _py_init = _cpl_lib.CPLC_init_Fort _py_init.argtypes = [c_int, POINTER(c_int)] # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[docs] @abortMPI def init(self, calling_realm): """ (cfd+md) Splits MPI_COMM_WORLD in both the CFD and MD code respectively and create intercommunicator between CFD and MD **Remarks** Assumes MPI has been initialised `MPI_init` and communicator MPI_COMM_WORLD exists and contains all processors in both CFD and MD regions **Synopsis** .. code-block:: python CPL.init(callingrealm) **Inputs** - *callingrealm* - Should identify calling processor as either CFD_REALM (integer with value 1) or MD_REALM (integer with value 2). **Outputs** - RETURNED_REALM_COMM - Communicator based on callingrealm value local to CFD or MD processor and resulting from the split of MPI_COMM_WORLD **Example** .. literalinclude:: ../../../examples/cpl_init/md_init.py **Errors** COUPLER_ERROR_REALM = 1 wrong realm value COUPLER_ERROR_ONE_REALM = 2 one realm missing COUPLER_ERROR_INIT = 3 ! initialisation error """ self.realm = calling_realm MPI_Comm_int = c_int() self._py_init(calling_realm, byref(MPI_Comm_int)) self.COMM = MPI.Comm.f2py(MPI_Comm_int.value) return self.COMM
if JSON_SUPPORT: _py_load_param_file = _cpl_lib.CPLC_load_param_file _py_load_param_file.argtypes = [c_char_p] @abortMPI def load_param_file(self, fname): self._py_load_param_file(c_char_p(fname), c_int(len(fname))) if JSON_SUPPORT: _py_close_param_file = _cpl_lib.CPLC_close_param_file @abortMPI def close_param_file(self): self._py_close_param_file() @abortMPI def get_file_var(self, section, var_name, var_type): try: fun_name = _CPL_GET_FILE_VARS[var_type][0] var_ctype = _CPL_GET_FILE_VARS[var_type][1] fun = getattr(self._cpl_lib, "CPLC_" + fun_name) fun.argtypes = [c_char_p, c_char_p, POINTER(var_ctype)] except KeyError: print(("CPL-ERROR: CPL Library function '" + str(fun_name) + "' not found!")) raise KeyError else: self._var = var_ctype() if ("array" in fun_name): print ("ENTRO") var_len = c_int() fun.argtypes.append(POINTER(c_int)) print ("EY") fun(c_char_p(section), c_char_p(var_name), byref(self._var), byref(var_len)) print(("len:" , var_len.value)) #print (self._var[0]) #print (byref(var[0])) a = ([self._var[i] for i in range(var_len.value)]) return a else: fun(c_char_p(section), c_char_p(var_name), byref(self._var)) return self._var.value _py_finalize = _cpl_lib.CPLC_finalize @abortMPI def finalize(self): self._py_finalize() # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_setup_cfd = _cpl_lib.CPLC_setup_cfd_Fort py_setup_cfd.argtypes = \ [c_int, ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous'), ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(3,), flags='aligned, f_contiguous')]
[docs] @abortMPI def setup_cfd(self, icomm_grid, xyzL, xyz_orig, ncxyz): """ Initialisation routine for coupler module - Every variable is sent and stored to ensure both md and cfd region have an identical list of parameters **Remarks** Assumes CPL has been initialised `CPL.init` and communicator MD_REALM exists **Synopsis** .. code-block:: python CPL.setup_cfd(icomm_grid, xyzL, xyz_orig, ncxyz) **Inputs** - *icomm_grid* - Communicator based on CFD processor topology returned from a call to MPI_CART_CREATE. - *xyzL* - CFD domain size. - *xyz_orig* - CFD origin. - *ncxyz* - Number of CFD cells in global domain. """ if ( ((type(icomm_grid) is list) and (len(icomm_grid) is 3)) or ((type(icomm_grid) is np.array) and (icomm_grid.shape[0] is 3))): icomm_grid = self.COMM.Create_cart([icomm_grid[0], icomm_grid[1], icomm_grid[2]]) if ((type(xyzL) is list) or (xyzL.dtype != np.float64) or (not xyzL.flags["F_CONTIGUOUS"])): xyzL = np.array(xyzL, order='F', dtype=np.float64) if ((type(xyz_orig) is list) or (xyz_orig.dtype != np.float64) or (not xyz_orig.flags["F_CONTIGUOUS"])): xyz_orig = np.array(xyz_orig, order='F', dtype=np.float64) if ((type(ncxyz) is list) or (ncxyz.dtype != np.int32) or (not ncxyz.flags["F_CONTIGUOUS"])): ncxyz = np.array(ncxyz, order='F', dtype=np.int32) self.py_setup_cfd(icomm_grid.py2f(), xyzL, xyz_orig, ncxyz)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_setup_md = _cpl_lib.CPLC_setup_md_Fort py_setup_md.argtypes = \ [c_int, ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous'), ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous')]
[docs] @abortMPI def setup_md(self, icomm_grid, xyzL, xyz_orig): """ Initialisation routine for coupler module - Every variable is sent and stored to ensure both md and cfd region have an identical list of parameters **Remarks** Assumes CPL has been initialised `CPL.init` and communicator MD_REALM exists **Synopsis** .. code-block:: python CPL.md_init(icomm_grid, xyzL, xyz_orig) **Inputs** - *icomm_grid* - Communicator based on MD processor topology returned from a call to MPI_CART_CREATE. - *xyzL* - MD domain size. - *xyz_orig* - MD origin. """ if ( ((type(icomm_grid) is list) and (len(icomm_grid) is 3)) or ((type(icomm_grid) is np.array) and (icomm_grid.shape[0] is 3))): icomm_grid = self.COMM.Create_cart([icomm_grid[0], icomm_grid[1], icomm_grid[2]]) if ((type(xyzL) is list) or (xyzL.dtype != np.float64) or (not xyzL.flags["F_CONTIGUOUS"])): xyzL = np.array(xyzL, order='F', dtype=np.float64) if ((type(xyz_orig) is list) or (xyz_orig.dtype != np.float64) or (not xyz_orig.flags["F_CONTIGUOUS"])): xyz_orig = np.array(xyz_orig, order='F', dtype=np.float64) self.py_setup_md(icomm_grid.py2f(), xyzL, xyz_orig)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_proc_extents = _cpl_lib.CPLC_proc_extents py_proc_extents.argtypes = \ [ndpointer(np.int32, shape=(3,), flags='aligned, f_contiguous'), c_int, ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')] @abortMPI def proc_extents(self, coord, realm): coord = self._type_check(coord) extents = np.zeros(6, order='F', dtype=np.int32) self.py_proc_extents(coord, realm, extents) return extents # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_my_proc_extents = _cpl_lib.CPLC_my_proc_extents py_my_proc_extents.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')] @abortMPI def my_proc_extents(self): extents = np.zeros(6, order='F', dtype=np.int32) self.py_my_proc_extents(extents) return extents # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_proc_portion = _cpl_lib.CPLC_proc_portion py_proc_portion.argtypes = \ [ndpointer(np.int32, shape=(3,), flags='aligned, f_contiguous'), c_int, ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')]
[docs] @abortMPI def proc_portion(self, coord, realm, limits): """ Get maximum and minimum cell indices, i.e. the 'portion', of the input cell extents 'limits' that is contributed by the processor specified by processor coord. **Remarks** Assumes the coupler has been initialised with `CPL.init <#cplpy.CPL.init>`_ and topological mapping has been setup using either `CPL.setup_md <#cplpy.CPL.setup_md>`_ or `CPL.setup_cfd <#cplpy.CPL.setup_cfd>`_ as appropriate. - Note: limits(6) and portion(6) are of the form: (xmin,xmax,ymin,ymax,zmin,zmax) **Synopsis** .. code-block:: python cpl.proc_portion(coord, realm, limits) **Inputs** - coord - processor cartesian coordinate, list or numpy array of 3 integers - realm - cfd_realm (1) or md_realm (2) (integer) - limits - Array of cell extents that specify the input region, list or numpy array of 6 integers **Outputs** - portion - Array of cell extents that define the local processor's contribution to the input region 'limits', numpy array of 6 integers """ coord = self._type_check(coord) limits = self._type_check(limits) portion = np.zeros(6, order='F', dtype=np.int32) self.py_proc_portion(coord, realm, limits, portion) return portion
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_my_proc_portion = _cpl_lib.CPLC_my_proc_portion py_my_proc_portion.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')]
[docs] @abortMPI def my_proc_portion(self, limits): """ Get maximum and minimum cell indices, i.e. the 'portion' on calling process. **Remarks** Assumes the coupler has been initialised with `CPL.init <#cplpy.CPL.init>`_ and topological mapping has been setup using either `CPL.setup_md <#cplpy.CPL.setup_md>`_ or `CPL.setup_cfd <#cplpy.CPL.setup_cfd>`_ as appropriate. - Note: limits(6) and portion(6) are of the form: (xmin,xmax,ymin,ymax,zmin,zmax) **Synopsis** .. code-block:: python CPL.my_proc_portion(limits) **Inputs** - limits - Array of cell extents that specify the input region, list or numpy array of 6 integers **Outputs** - portion - Array of cell extents that define the local processor's contribution to the input region 'limits', numpy array of 6 integers """ limits = self._type_check(limits) portion = np.zeros(6, order='F', dtype=np.int32) self.py_my_proc_portion(limits, portion) return portion
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_map_cfd2md_coord = _cpl_lib.CPLC_map_cfd2md_coord py_map_cfd2md_coord.argtypes = \ [ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous'), ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous')] @abortMPI def map_cfd2md_coord(self, coord_cfd): coord_cfd = self._type_check(coord_cfd) coord_md = np.zeros(3, order='F', dtype=np.float64) self.py_map_cfd2md_coord(coord_cfd, coord_md) return coord_md # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_map_md2cfd_coord = _cpl_lib.CPLC_map_md2cfd_coord py_map_md2cfd_coord.argtypes = \ [ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous'), ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous')] @abortMPI def map_md2cfd_coord(self, coord_md): coord_md = self._type_check(coord_md) coord_cfd = np.zeros(3, order='F', dtype=np.float64) self.py_map_md2cfd_coord(coord_md, coord_cfd) return coord_cfd # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_map_glob2loc_cell = _cpl_lib.CPLC_map_glob2loc_cell py_map_glob2loc_cell.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(3,), flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(3,), flags='aligned, f_contiguous')] @abortMPI def map_glob2loc_cell(self, limits, glob_cell): limits = self._type_check(limits) glob_cell = self._type_check(glob_cell) loc_cell = np.zeros(3, order='F', dtype=np.int32) self.py_map_glob2loc_cell(limits, glob_cell, loc_cell) return loc_cell # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_map_cell2coord = _cpl_lib.CPLC_map_cell2coord py_map_cell2coord.argtypes = \ [c_int, c_int, c_int, ndpointer(np.float64, shape=(3,), flags='aligned, f_contiguous')] @abortMPI def map_cell2coord(self, i, j, k): coord = np.zeros(3, order='F', dtype=np.float64) self.py_map_cell2coord(i, j, k, coord) return coord # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_map_coord2cell = _cpl_lib.CPLC_map_coord2cell py_map_coord2cell.argtypes = \ [c_double, c_double, c_double, ndpointer(np.int32, shape=(3,), flags='aligned, f_contiguous')] @abortMPI def map_coord2cell(self, x, y, z): cell = np.zeros(3, order='F', dtype=np.int32) self.py_map_coord2cell(x, y, z, cell) return cell # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_get_no_cells = _cpl_lib.CPLC_get_no_cells py_get_no_cells.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(3,), flags='aligned, f_contiguous')] @abortMPI def get_no_cells(self, limits): limits = self._type_check(limits) no_cells = np.zeros(3, order='F', dtype=np.int32) self.py_get_no_cells(limits, no_cells) return no_cells # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - #Limits of overlap region py_get_olap_limits = _cpl_lib.CPLC_get_olap_limits py_get_olap_limits.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')] @abortMPI def get_olap_limits(self): limits = np.zeros(6, order='F', dtype=np.int32) self.py_get_olap_limits(limits) return limits # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - #Limits of contraint region py_get_cnst_limits = _cpl_lib.CPLC_get_cnst_limits py_get_cnst_limits.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')] @abortMPI def get_cnst_limits(self): limits = np.zeros(6, order='F', dtype=np.int32) self.py_get_cnst_limits(limits) return limits # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - #Limits of boundary region py_get_bnry_limits = _cpl_lib.CPLC_get_bnry_limits py_get_bnry_limits.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')] @abortMPI def get_bnry_limits(self): limits = np.zeros(6, order='F', dtype=np.int32) self.py_get_bnry_limits(limits) return limits # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_set_timing = _cpl_lib.CPLC_set_timing py_set_timing.argtypes = \ [c_int, c_int, c_double] #Don't call abortMPI so it can be handled nicely in Python. #@abortMPI def set_timing(self, initialstep, nsteps, dt): class DepricatedException(Exception): """Raise Error as function should not be used""" raise DepricatedException("CPL set_timing is depricated and should not be used") self.py_set_timing(initialstep, nsteps, dt) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_send = _cpl_lib.CPLC_send py_send.argtypes = \ [ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), POINTER(c_bool)] py_send_min = _cpl_lib.CPLC_send_min py_send_min.argtypes = \ [ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), POINTER(c_bool)]
[docs] @abortMPI def send(self, asend, limits=None): """ Send four dimensional array *asend* of data from all processors in the current realm with data between global cell array *limits* to the corresponding processors from the other realm. **Remarks** Assumes the coupler has been initialised with `CPL.init <#cplpy.CPL.init>`_ and topological mapping has been setup using either `CPL.setup_md <#cplpy.CPL.setup_md>`_ or `CPL.setup_cfd <#cplpy.CPL.setup_cfd>`_ as appropriate. **Synopsis** .. code-block:: python CPL.send(asend, limits=None) **Inputs** - asend - Array of data to send. Should be a four dimensional Numpy array allocated using the number of cells on the current processor between the limits. For example, if overlap limits are 8 cells, between cells 0 and 7 split over 2 procesors, the first processor will have from 0 to 3 and the second from 4 to 7. This should be be obtained from `CPL.my_proc_portion(limits, portion) <#cplpy.CPL.my_proc_portion>`_ to allocate a Numpy array, or allocated using the helper function `CPL.get_arrays <#cplpy.CPL.get_arrays>`_ with appropriate sizes. . - limits [Optional] - Optional arguments limits specify if global limits of overlap region not used. These are in the global cell coordinates, and must match the corresponding recieve. **Outputs** - send_flag - Returned flag which indicates success or failure of send process **Example** This example links with the `CPL.recv <#cplpy.CPL.recv>`_ examples .. literalinclude:: ../../../examples/sendrecv_globcell/python/cfd_send_cells.py """ asend = self._type_check(asend) asend_shape = np.array(asend.shape, order='F', dtype=np.int32) send_flag = c_bool() if limits is None: self.py_send_min(asend, asend_shape, byref(send_flag)) else: self.py_send(asend, asend_shape, limits, byref(send_flag)) return send_flag.value
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_recv = _cpl_lib.CPLC_recv py_recv.argtypes = \ [ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), POINTER(c_bool)] py_recv_min = _cpl_lib.CPLC_recv_min py_recv_min.argtypes = \ [ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), POINTER(c_bool)]
[docs] @abortMPI def recv(self, arecv, limits=None): """ Receive data from to local grid from the associated ranks from the other realm **Remarks** Assumes the coupler has been initialised with `CPL.init <#cplpy.CPL.init>`_ and topological mapping has been setup using either `CPL.setup_md <#cplpy.CPL.setup_md>`_ or `CPL.setup_cfd <#cplpy.CPL.setup_cfd>`_ as appropriate. **Synopsis** .. code-block:: python CPL.recv(arecv, limits=None) **Inputs** - arecv - Array of data to recv. Should be a four dimensional Numpy array allocated using the number of cells on the current processor between the limits. For example, if overlap limits are 8 cells, between cells 0 and 7 split over 2 procesors, the first processor will have from 0 to 3 and the second from 4 to 7. This should be be obtained from `CPL.my_proc_portion(limits, portion) <#cplpy.CPL.my_proc_portion>`_ to allocate a Numpy array, or allocated using the helper function `CPL.get_arrays <#cplpy.CPL.get_arrays>`_ with appropriate sizes. - limits [Optional] - Limits in global cell coordinates, must be the same as corresponding send command. **Outputs** - recv_flag - Returned flag which indicates success or failure of recv process **Example** This example links with the `CPL.send <#cplpy.CPL.send>`_ examples .. literalinclude:: ../../../examples/sendrecv_globcell/python/md_recv_cells.py """ arecv = self._type_check(arecv) arecv_shape = np.array(arecv.shape, order='F', dtype=np.int32) recv_flag = c_bool() if limits is None: self.py_recv_min(arecv, arecv_shape, byref(recv_flag)) else: self.py_recv(arecv, arecv_shape, limits, byref(recv_flag)) return arecv, recv_flag.value
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_gather = _cpl_lib.CPLC_gather py_gather.argtypes = \ [ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous'), ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous')] @abortMPI def gather(self, gather_array, limits, recv_array): gather_array = self._type_check(gather_array) recv_array = self._type_check(recv_array) gather_shape = np.array(gather_array.shape, order='F', dtype=np.int32) recv_shape = np.array(recv_array.shape, order='F', dtype=np.int32) self.py_gather(gather_array, gather_shape, limits, recv_array, recv_shape) return recv_array # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_scatter = _cpl_lib.CPLC_scatter py_scatter.argtypes = \ [ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous'), ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous'), ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous')] @abortMPI def scatter(self, scatter_array, limits, recv_array): scatter_array = self._type_check(scatter_array) recv_array = self._type_check(recv_array) scatter_shape = np.array(scatter_array.shape, order='F', dtype=np.int32) recv_shape = np.array(recv_array.shape, order='F', dtype=np.int32) self.py_scatter(scatter_array, scatter_shape, limits, recv_array, recv_shape) return recv_array # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_swaphalos = _cpl_lib.CPLC_swaphalos py_swaphalos.argtypes = \ [ndpointer(np.float64, flags='aligned, f_contiguous'), ndpointer(np.int32, ndim=1, flags='aligned, f_contiguous')] @abortMPI def swaphalos(self, A): A = self._type_check(A) A_shape = np.array(A.shape, order='F', dtype=np.int32) self.py_swaphalos(A, A_shape) return A # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_overlap = _cpl_lib.CPLC_overlap py_overlap.argtypes = [] @abortMPI def overlap(self): self.py_overlap.restype = c_bool return self.py_overlap() # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - py_is_proc_inside = _cpl_lib.CPLC_is_proc_inside py_is_proc_inside.argtypes = \ [ndpointer(np.int32, shape=(6,), flags='aligned, f_contiguous')] @abortMPI def is_proc_inside(self, region): self.py_is_proc_inside.restype = c_bool region = self._type_check(region) return self.py_is_proc_inside(region) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @abortMPI def get(self, var_name): try: var_type = _CPL_GET_VARS[var_name] fun = getattr(self._cpl_lib, "CPLC_" + var_name) except KeyError: print(("CPL-ERROR: CPL Library function '" + str(var_name) + "' not found!")) print ("Available options include: ") for var in _CPL_GET_VARS: print(var) raise KeyError else: fun.restype = var_type return fun() # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @abortMPI def set(self, var_name, value): try: var_type = _CPL_SET_VARS[var_name] fun = getattr(self._cpl_lib, "CPLC_set_" + var_name) except KeyError: print(("CPL-ERROR: CPL Library function '" + str(var_name) + "' not found!")) raise KeyError else: fun.argtypes = [var_type] return fun(var_type(value)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @abortMPI def _type_check(self, A): if type(A) is list: ndtype = type(A[0]) if ndtype == float: ndtype = np.float64 elif ndtype == int: ndtype = np.int32 A = np.asfortranarray(A, dtype=ndtype) if not A.flags["F_CONTIGUOUS"]: A = np.require(A, requirements=['F']) if not A.flags["ALIGNED"]: A = np.require(A, requirements=['A']) return A # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[docs] @abortMPI def get_arrays(self, recv_size, send_size): """ Return recv array and send array based on constraint/boundary sizes **Example** A minimal example is possible using CPL.get_arrays, which shows paired sending and recv commands .. literalinclude:: ../../../examples/minimal_send_recv_mocks/minimal_MD.py .. literalinclude:: ../../../examples/minimal_send_recv_mocks/minimal_CFD.py """ #Get constraint region cnst_limits = self.get_cnst_limits(); cnst_portion = self.my_proc_portion(cnst_limits) cnst_ncxl, cnst_ncyl, cnst_nczl = self.get_no_cells(cnst_portion) #Get overlap region BC_limits = self.get_bnry_limits() BC_portion = self.my_proc_portion(BC_limits) BC_ncxl, BC_ncyl, BC_nczl = self.get_no_cells(BC_portion) #Allocate send and recv arrays recv_array = np.zeros((recv_size, BC_ncxl, BC_ncyl, BC_nczl), order='F', dtype=np.float64) send_array = np.zeros((send_size, cnst_ncxl, cnst_ncyl, cnst_nczl), order='F', dtype=np.float64) return recv_array, send_array
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @abortMPI def dump_region(self, region, array, fname, comm, components={}, coords="mine"): lines = "" portion = self.my_proc_portion(region) cell_coords = np.array(3) dx = self.get("dx") dy = self.get("dy") dz = self.get("dz") def_func = lambda x : x components_dic = components if callable(components): def_func = components if not components or callable(components): components_idx = list(range(0, array.shape[0])) for c_idx in components_idx: components_dic[c_idx] = def_func for k,v in list(components_dic.items()): if v is None: components_dic[k] = def_func #if self.overlap(): if self.is_proc_inside(portion): ncx, ncy, ncz = self.get_no_cells(portion) if (ncx, ncy, ncz) != array.shape[1:]: print ("self-Error in dump_region(): array and processor portion of different size.") MPI.COMM_WORLD.Abort(errorcode=1) for i in range(portion[0], portion[1]+1): for j in range(portion[2], portion[3]+1): for k in range(portion[4], portion[5]+1): cell_coords = self.map_cell2coord(i, j, k) if coords != "mine": if self.realm == CPL.CFD_REALM: cell_coords = self.map_cfd2md_coord(cell_coords) else: cell_coords = self.map_md2cfd_coord(cell_coords) [i_loc, j_loc, k_loc] = self.map_glob2loc_cell(portion, [i, j, k]) lines += str(cell_coords[0] + dx/2.0) + " "\ + str(cell_coords[1] + dy/2.0) + " "\ + str(cell_coords[2] + dz/2.0) for k, f in list(components_dic.items()): lines += " " + str(f(array[k, i_loc, j_loc, k_loc])) lines += "\n" # Gather all the forces from every processor and dump them to a file at the root lines = comm.gather(lines, root=0) myrank = comm.Get_rank() if myrank == 0: with open(fname, "w") as file_out: file_out.writelines(lines)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def cart_create(old_comm, dims, periods, coords): dummy_cart_comm = old_comm.Create_cart(dims, periods) temp_comm = old_comm.Split(0, dummy_cart_comm.Get_rank()) new_cart_comm = temp_comm.Create_cart(dims, periods) comm_coords = new_cart_comm.Get_coords(new_cart_comm.Get_rank()) if (not (coords == comm_coords).all()): print ("cart_create Error") exit() return new_cart_comm # -----------------------------TESTING ROUTINES------------------------------ # CONFIG_FILE = "COUPLER.in" TEST_DIR = os.path.dirname(os.path.realpath(__file__)) TEST_NAME = os.path.basename(os.path.realpath(__file__)) TESTS_DIR_NAMES = ["initialisation", "mapping"] def copyanything(src_dir, dst_dir, name): src_dir = os.path.join(src_dir, name) try: shutil.copytree(src_dir, os.path.join(dst_dir, name)) except OSError as exc: if exc.errno == errno.ENOTDIR: shutil.copy(src_dir, dst_dir) else: raise def parametrize_file(source, dest, params): with open(source, "r+") as param_file_in: lines = param_file_in.readlines() for (k, v) in list(params.items()): lines = [l.replace("$[" + str(k) + "]", str(v)) for l in lines] with open(dest, "w+") as param_file_out: param_file_out.writelines(lines) def parametrize_config(template_dir, params): # It assumes is in the temp directory with cpl/ folder accessible # from this level. source = os.path.join(template_dir, CONFIG_FILE) dest = os.path.join("cpl/", CONFIG_FILE) parametrize_file(source, dest, params) def prepare_config(tmpdir, test_dir, md_fname, cfd_fname): tmpdir.mkdir("cpl") copyanything(test_dir, tmpdir.strpath, md_fname) copyanything(test_dir, tmpdir.strpath, cfd_fname) os.chdir(tmpdir.strpath) def run_test(template_dir, config_params, md_exec, md_fname, md_args, cfd_exec, cfd_fname, cfd_args, md_params, cfd_params, err_msg, debug=False, mpirun="split", printoutput=False): from distutils.spawn import find_executable import sys parametrize_config(template_dir, config_params) #Save parameter dictonaries to be read by md/cfd codes pickle.dump(md_params, open("md_params.dic", "wb")) pickle.dump(cfd_params, open("cfd_params.dic", "wb")) try: mdprocs = md_params["npx"] * md_params["npy"] * md_params["npz"] cfdprocs = cfd_params["npx"] * cfd_params["npy"] * cfd_params["npz"] if find_executable("mpiexec") is None: print("Error: mpiexec not found.") sys.exit(1) if find_executable(md_exec)is None: print(("Error: %s not found." % md_exec)) sys.exit(1) if find_executable(cfd_exec)is None: print(("Error: %s not found." % cfd_exec)) sys.exit(1) if os.path.exists(md_fname) and os.path.exists(cfd_fname): if "port" in mpirun: # For OpenMPI >= 3.0.0 # cmd = " ".join(["mpiexec --oversubscribe", "-n", str(mdprocs), md_exec, md_args, # "& PID=$!;", "mpiexec --oversubscribe", "-n", str(cfdprocs), cmd = " ".join(["mpiexec", "-n", str(mdprocs), md_exec, md_args, "& PID=$!;", "mpiexec", "-n", str(cfdprocs), cfd_exec, cfd_args, "; wait $PID"]) # cmd = " ".join(["cplexec", # "-m ", str(mdprocs), " ' ", md_exec, md_args, " ' " # "-c ", str(cfdprocs), " ' ", cfd_exec, cfd_args, " ' "]) elif "split" in mpirun: # For OpenMPI >= 3.0.0 # cmd = " ".join(["mpiexec --oversubscribe", "-n", str(mdprocs), md_exec, md_args, cmd = " ".join(["mpiexec","-n", str(mdprocs), md_exec, md_args, ":", "-n", str(cfdprocs), cfd_exec, cfd_args]) else: raise ValueError("MPIrun type unknown", mpirun) #Check for OpenMPI version greater than 3 and add oversubscribe option if (CPL.MPI_version == "OPENMPI"): if (CPL.ompi_major_version_no >= 3): cmd.replace("mpiexec","mpiexec --oversubscribe") cmd.replace("mpiexec","mpiexec --oversubscribe") if debug: print(("\nMPI run: " + cmd)) out = check_output(cmd, stderr=STDOUT, shell=True) if printoutput: print(out) else: print(("Current directory: " + os.getcwd())) print((md_fname + " or " + cfd_fname + " are not found.")) assert False return False #This checsk the error message is as expected except CalledProcessError as exc: print((exc.output)) if err_msg != "": print(("ERROR = ", err_msg)) assert err_msg in exc.output.decode("utf-8") else: assert exc.output.decode("utf-8") == "" else: if err_msg != "": assert False else: assert True return True def exec_tests(test="all"): import pytest import os test_path = os.path.dirname(os.path.realpath(__file__)) test_path = os.path.join(test_path, "test") print(test_path) if test != "all": test_path = os.path.join(test_path, test) pytest.main(["-v", test_path]) def get_test_dir(): import os return os.path.join(os.path.dirname(os.path.realpath(__file__)), "test") if __name__ == "__main__": lib = CPL()