"""
NRV-Multi-Core/Parallel computing handling.
"""
import numpy as np
from ._NRV_Singleton import NRV_singleton
try:
import mpi4py.MPI as mpi
comm = mpi.COMM_WORLD
MCore_Flag = True
except ImportError:
MCore_Flag = False
[docs]
class Mcore_handler(metaclass=NRV_singleton):
"""
Class to handle parallel processing (cores, no threads) in NRV2
"""
[docs]
def __init__(self, Flag):
"""
Instantiation of Mcore
Parameters
----------
Flag : bool
should be true if mpi4py is installed, to be handled by user !
"""
super().__init__()
self.Flag = Flag
if self.Flag:
self.rank = comm.rank
self.size = comm.size
else:
self.rank = 0
self.size = 1
[docs]
def is_alone(self):
"""
Check if the process is runing alone or if other instances have been launched.
Returns
-------
alone : bool
True if the programm is the only instance launched, else False
"""
return self.size == 1
[docs]
def is_master(self):
"""
Check if the process is master or not whe parallel computing
Returns
-------
master : bool
True if multiple instances are launched and the current process is the master (rank 0)
"""
return not self.is_alone() and self.rank == 0
[docs]
def do_master_only_work(self):
"""
Check if the process is alone or it is the master to perform non splitable job
Returns
-------
only : bool
True if the process is alone or if it is the master, else False
"""
return self.is_alone() or self.is_master()
[docs]
def say_hello(self):
"""
Display a sentence from each process on prompt. For debug only
"""
if self.is_alone():
print("Hi, I am the only core launched")
elif self.is_master():
print("Hi, I am the master core")
else:
print("Hi, I am a slave core, my ID is " + str(self.rank))
[docs]
def split_job_from_arrays(self, len_arrays, stype="default"):
"""
Split an array for parallel independant computing, by sharing independant sub-spaces \
of array index
Parameters
----------
len_arrays : int
length of the array containing the full job to perform in parallel
stype : str
method used to split the array:
"comb":
Returns
-------
mask : np.array
subspace of the array indexes, specific to each instantiation of the programm
"""
if self.is_alone():
mask = np.arange(len_arrays)
else:
if stype == "comb":
mask = np.arange(len_arrays)
mask = np.where(self.rank == mask % self.size)[0]
else:
if self.is_master():
all_indexes = np.arange(len_arrays)
mask_chunks = np.array_split(all_indexes, self.size, axis=0)
else:
mask_chunks = None
mask = comm.scatter(mask_chunks, root=0)
return mask
[docs]
def split_job_from_arrays_to_slaves(self, len_arrays):
"""
Split an array for parallel independant computing, by sharing independant sub-spaces \
of array index, the master gets a table of all jobs to do initialized to False
Parameters
----------
len_arrays : int
length of the array containing the full job to perform in parallel
Returns
-------
mask : np.array
subspace of the array indexes, specific to each instantiation of the programm
"""
if self.is_master():
all_indexes = np.arange(len_arrays)
jobs_to_do = np.asarray(np.full(len(all_indexes), False))
chunks = np.array_split(all_indexes, self.size - 1, axis=0)
chunks.insert(0, jobs_to_do)
else:
chunks = None
chunk = comm.scatter(chunks, root=0)
return chunk
[docs]
def master_broadcasts_array_to_all(self, var):
"""
Broadcast an array to all instances of the process (share jobs performed by the master only)
Parameters
----------
var : np.array
variable to broadcast, from the master only, esle None
Returns
-------
data : np.array
variable broadcasted in all instances
"""
return self.master_broadcasts_to_all(var)
[docs]
def master_broadcasts_to_all(self, var):
"""
Broadcast an array to all instances of the process (share jobs performed by the master only)
Parameters
----------
var : np.array or dict
variable to broadcast, from the master only, esle None
Returns
-------
data : np.array or dict
variable broadcasted in all instances
"""
if self.is_alone():
data = var
else:
if self.rank == 0:
data = var
else:
data = None
data = comm.bcast(data, root=0)
return data
[docs]
def gather_jobs(self, partial_result):
"""
Gather the jobs performed by all instances to the master
Parameters
----------
partial_result : dict
individual result from an instance
Returns
-------
result : dict
global dict if master or alone, else None
"""
if self.is_alone():
final_result = partial_result
else:
# Not ideal but prevent overwrite the type of master
if not self.is_master() and "nrv_type" in partial_result:
partial_result.pop("nrv_type")
list_results = comm.gather(partial_result, root=0)
if self.is_master():
final_result = list_results[0]
for i in range(1, len(list_results)):
final_result.update(list_results[i])
else:
final_result = None
return final_result
[docs]
def gather_jobs_as_array(self, partial_result):
"""
Gather the jobs performed by all instances to the master
Parameters
----------
partial_result : np.array
individual result from an instance
Returns
-------
result : np.array
global array if master or alone, else None
"""
if self.is_alone():
final_result = partial_result
else:
results = comm.gather(partial_result, root=0)
if self.is_master():
final_result = np.concatenate(tuple(results))
else:
final_result = None
return final_result
[docs]
def sum_jobs(self, partial_result):
"""
Gather, sum and broadcast array for conservative results.
Parameters
----------
partial_result : np.array
individual result from an instance
Returns
-------
result : np.array
global array as the sum of all partial results from each core,
returned to all cores
"""
if self.is_alone():
final_result = partial_result
else:
results = comm.gather(partial_result, root=0)
# gather and sum results
sumed_result = None
if self.is_master():
list_results = list(results)
sumed_result = list_results[0]
for result in list_results[1:]:
sumed_result += result
# broadcast to all
final_result = self.master_broadcasts_array_to_all(sumed_result)
return final_result
[docs]
def send_data_to_master(self, data):
"""
Send a dictionary of data directly to the master.
Parameters
----------
data : dict
data to send
"""
comm.send(data, dest=0)
[docs]
def recieve_data_from_slave(self):
"""
Recieve data from anay source
Returns:
--------
data
"""
data = comm.recv(source=mpi.ANY_SOURCE)
return data
[docs]
def send_back_array_to_dest(self, data, destination):
"""
Send a numpy array to a slave
Parameters
----------
data : np.array
data to send
destination : int
ID of the process to send the data
"""
comm.send(data, dest=destination)
[docs]
def recieve_potential_array_from_master(self):
"""
Recieve potenatial data from the master as a numpy array
Parameters
----------
N_points : int
number of point of the x-coordinate vector
N_elec : int
number of electrode in the simulation
"""
data = comm.recv(source=0)
return data
[docs]
def send_synchronization_flag(self):
"""
Blocking collective communication to force all process to synchronize to a specific line of code
Returns
-------
bool
a flag set to True
"""
if self.rank == 0:
Validation_Flag = True
else:
Validation_Flag = None
Validation_Flag = comm.bcast(Validation_Flag, root=0)
return Validation_Flag
# public interface
MCH = Mcore_handler(MCore_Flag)
[docs]
def synchronize_processes():
"""
synchronize all processes, used to wait saving complete before loading
Returns
-------
bool
a flag set to True
"""
if not MCH.is_alone():
comm.Barrier()
return 0