# manager_calling_workers.py # Call with: mpiexec -np W python manager_calling_workers.py where (W-1) is the desired number of workers # # The main part of this function calls the manager() function if rank == 0 and the worker() function otherwise. # The manager() first sends work to all workers, and then listens to their replies, giving additional work to any worker that completes. After all work is given, the manager() receives from all workers, and then send a kill signal to all workers. from mpi4py import MPI import time import numpy as np import sys WORKTAG = 0 DIETAG = 1 class Work(): def __init__(self, work_items): self.work_items = work_items[:] def get_next_item(self): if len(self.work_items) == 0: return None if isinstance(self.work_items,list): return self.work_items.pop(0) elif isinstance(self.work_items,np.ndarray): to_return = self.work_items[0,:] self.work_items = np.delete(self.work_items,0,0) return to_return else: raise TypeError("Unknown Data Type") def manager(wi): all_data = [] size = MPI.COMM_WORLD.Get_size() work_to_do = Work(wi) comm = MPI.COMM_WORLD status = MPI.Status() count_active_workers = 0 # Send a task to each worker (provided there are enough tasks) for i in range(1, size): anext = work_to_do.get_next_item() if anext is None: break comm.send(obj=anext, dest=i, tag=WORKTAG) count_active_workers += 1 # Keep appending received data to all_data and sending next tasks as long as they exist while 1: anext = work_to_do.get_next_item() if anext is None: break data = comm.recv(obj=None, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) all_data.append(data) comm.send(obj=anext, dest=status.Get_source(), tag=WORKTAG) # Receive from all processes. for i in range(1,min(size,count_active_workers+1)): data = comm.recv(obj=None, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG) all_data.append(data) # Send a kill signal to each rank for i in range(1,size): comm.send(obj=None, dest=i, tag=DIETAG) return all_data def worker(do_work): comm = MPI.COMM_WORLD status = MPI.Status() # Receive command do_work from manager and data. Kill when DIETAG is passed while 1: data = comm.recv(obj=None, source=0, tag=MPI.ANY_TAG, status=status) if status.Get_tag(): break k = do_work(data) comm.send(obj=k, dest=0) def main(work_list, do_work): rank = MPI.COMM_WORLD.Get_rank() size = MPI.COMM_WORLD.Get_size() if rank == 0: all_dat = manager(work_list) A = sorted(all_dat,key=lambda k: k[1]) np.save('outfile', A) else: worker(do_work) # A do_work function to pass to the workers that uses spawns new MPI ranks def spawn_function_evaluation_using_MPI(a): start_time = time.time() x = np.array(a, dtype=float) ranks_for_this_f_eval = int(np.ceil(np.random.rand(1)*5))+1 # A random number between 2 and 6 ranks_for_this_f_eval = 1 comm_to_function = MPI.COMM_SELF.Spawn(sys.executable,args=['function.py',str(len(np.atleast_1d(x)))],maxprocs=ranks_for_this_f_eval) # Spawn an instance of function.py with ranks_for_this_f_eval ranks, telling the length to expect for x. comm_to_function.Bcast(x,root=MPI.ROOT) # Broadcast the starting point to function.py y = np.array(0.0) # Initialize the variable that will store the function evaluation comm_to_function.Reduce(None, y, op=MPI.SUM, root=MPI.ROOT) # Sum the output from function.py comm_to_function.Disconnect() end_time = time.time() return (x,y,MPI.COMM_WORLD.Get_rank(),ranks_for_this_f_eval,start_time,end_time,'Note') # Return a tuple with the point evaluated, the function value at x, the MPI rank which spawned the process, the number of if __name__ == "__main__": A = np.random.rand(30,2)*0.14+0.01 # Generate numbers between 0.01 and 0.15 A[:,1] *= -1 # Multiply the second column by -1 main(A,spawn_function_evaluation_using_MPI) # passing vectors to the functions, and not scalars