91 lines
3.3 KiB
Python
91 lines
3.3 KiB
Python
"""
|
|
A task scheduler that assign unfinished jobs to different workers.
|
|
"""
|
|
import numpy as np
|
|
|
|
def get_unplotted_indices(vals, xcoordinates, ycoordinates=None):
|
|
"""
|
|
Args:
|
|
vals: values at (x, y), with value -1 when the value is not yet calculated.
|
|
xcoordinates: x locations, i.e.,[-1, -0.5, 0, 0.5, 1]
|
|
ycoordinates: y locations, i.e.,[-1, -0.5, 0, 0.5, 1]
|
|
|
|
Returns:
|
|
- a list of indices into vals for points that have not yet been calculated.
|
|
- a list of corresponding coordinates, with one x/y coordinate per row.
|
|
"""
|
|
|
|
# Create a list of indices into the vectorizes vals
|
|
inds = np.array(range(vals.size))
|
|
|
|
# Select the indices of the un-recorded entries, assuming un-recorded entries
|
|
# will be smaller than zero. In case some vals (other than loss values) are
|
|
# negative and those indexces will be selected again and calcualted over and over.
|
|
inds = inds[vals.ravel() <= 0]
|
|
|
|
# Make lists containing the x- and y-coodinates of the points to be plotted
|
|
if ycoordinates is not None:
|
|
# If the plot is 2D, then use meshgrid to enumerate all coordinates in the 2D mesh
|
|
xcoord_mesh, ycoord_mesh = np.meshgrid(xcoordinates, ycoordinates)
|
|
s1 = xcoord_mesh.ravel()[inds]
|
|
s2 = ycoord_mesh.ravel()[inds]
|
|
return inds, np.c_[s1,s2]
|
|
else:
|
|
return inds, xcoordinates.ravel()[inds]
|
|
|
|
|
|
def split_inds(num_inds, nproc):
|
|
"""
|
|
Evenly slice out a set of jobs that are handled by each MPI process.
|
|
- Assuming each job takes the same amount of time.
|
|
- Each process handles an (approx) equal size slice of jobs.
|
|
- If the number of processes is larger than rows to divide up, then some
|
|
high-rank processes will receive an empty slice rows, e.g., there will be
|
|
3, 2, 2, 2 jobs assigned to rank0, rank1, rank2, rank3 given 9 jobs with 4
|
|
MPI processes.
|
|
"""
|
|
|
|
chunk = num_inds // nproc
|
|
remainder = num_inds % nproc
|
|
splitted_idx = []
|
|
for rank in range(0, nproc):
|
|
# Set the starting index for this slice
|
|
start_idx = rank * chunk + min(rank, remainder)
|
|
# The stopping index can't go beyond the end of the array
|
|
stop_idx = start_idx + chunk + (rank < remainder)
|
|
splitted_idx.append(range(start_idx, stop_idx))
|
|
|
|
return splitted_idx
|
|
|
|
|
|
def get_job_indices(vals, xcoordinates, ycoordinates, comm):
|
|
"""
|
|
Prepare the job indices over which coordinate to calculate.
|
|
|
|
Args:
|
|
vals: the value matrix
|
|
xcoordinates: x locations, i.e.,[-1, -0.5, 0, 0.5, 1]
|
|
ycoordinates: y locations, i.e.,[-1, -0.5, 0, 0.5, 1]
|
|
comm: MPI environment
|
|
|
|
Returns:
|
|
inds: indices that splitted for current rank
|
|
coords: coordinates for current rank
|
|
inds_nums: max number of indices for all ranks
|
|
"""
|
|
|
|
inds, coords = get_unplotted_indices(vals, xcoordinates, ycoordinates)
|
|
|
|
rank = 0 if comm is None else comm.Get_rank()
|
|
nproc = 1 if comm is None else comm.Get_size()
|
|
splitted_idx = split_inds(len(inds), nproc)
|
|
|
|
# Split the indices over the available MPI processes
|
|
inds = inds[splitted_idx[rank]]
|
|
coords = coords[splitted_idx[rank]]
|
|
|
|
# Figure out the number of jobs that each MPI process needs to calculate.
|
|
inds_nums = [len(idx) for idx in splitted_idx]
|
|
|
|
return inds, coords, inds_nums
|