/* mpic++ -std=c++17 -g -ggdb -fno-omit-frame-pointer -lpthread -O0 thread_test_2.cpp -o thread_test_2 && mpirun -n 2 thread_test_2 1 2000 8 */ #include "mpi.h" #include #include #include #include #include #include using std::thread; // CommManager creates a communicator for each pair of MPI processes, useful for // using later to create windows between only a pair of ranks struct CommManager { CommManager(int nprocs) : nprocs_(nprocs) { MPI_Group world_group; MPI_Comm_group(MPI_COMM_WORLD, &world_group); const auto num_pairs = nprocs_ * nprocs_; comms_ = new MPI_Comm[num_pairs]; std::memset(comms_, 0, num_pairs * sizeof(MPI_Comm)); // remove int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); int created = 0; // populate it, creating a new MPI_Comm on each iteration for (auto i = 0; i < nprocs_; ++i) { for (auto j = i + 1; j < nprocs_; ++j) { // we always access the array with a // order, so we only populate those entries as well // first create the group. MPI_Group pair_group; const int ranks[2] = {i, j}; MPI_Group_incl(world_group, 2, ranks, &pair_group); MPI_Comm new_comm; const auto ret = MPI_Comm_create(MPI_COMM_WORLD, pair_group, &new_comm); assert(ret == MPI_SUCCESS); ++created; MPI_Group_free(&pair_group); if (rank != i && rank != j) { assert(new_comm == MPI_COMM_NULL); } // now, insert it into the repository const auto idx = IndexOrdered(i, j); assert(comms_[idx] == 0); comms_[idx] = new_comm; fprintf(stderr, "[r%d] new MPI_Comm for (%d, %d) -> %d (idx: %d)\n", rank, i, j, new_comm, idx); } } assert(created == nprocs_ * (nprocs_ - 1) / 2); MPI_Group_free(&world_group); } ~CommManager() { delete[] comms_; } void Free() { int num_free = 0; for (auto i = 0; i < nprocs_ * nprocs_; ++i) { if (comms_[i] != 0 && comms_[i] != MPI_COMM_NULL) { MPI_Comm_free(&comms_[i]); num_free++; } } const auto expected = nprocs_ * (nprocs_ - 1) / 2; assert(num_free == expected); } MPI_Comm Get(int comm_world_rank1, int comm_world_rank2) { assert(comm_world_rank1 != comm_world_rank2); return comms_[Index(comm_world_rank1, comm_world_rank2)]; } private: MPI_Comm* comms_; int nprocs_; int Index(int r1, int r2) const { // we always look up the "top half" of the matrix if (r1 < r2) { return IndexOrdered(r1, r2); } else { return IndexOrdered(r2, r1); } } int IndexOrdered(int lower, int higher) const { assert(lower < higher); return (lower * nprocs_) + higher; } }; /* A Channel is created by the process before the usage thereof occurs by the threads. */ struct Channel { MPI_Win win_; const int mpi_rank_; const int num_threads_; const int sender_global_thread_rank_; const int receiver_global_thread_rank_; const int count_; int* buf_; Channel(int my_mpi_rank, int num_threads, int sender_global_thread_rank, int receiver_global_thread_rank, int count, CommManager& comm_mgr) : mpi_rank_(my_mpi_rank), num_threads_(num_threads), sender_global_thread_rank_(sender_global_thread_rank), receiver_global_thread_rank_(receiver_global_thread_rank), count_(count) { int ret = MPI_Alloc_mem(count_ * sizeof(int), MPI_INFO_NULL, &buf_); assert(ret == MPI_SUCCESS); chan_comm_ = comm_mgr.Get(SenderMpiRank(), ReceiverMpiRank()); MPI_Comm_group(chan_comm_, &comm_group_); int chan_comm_rank; MPI_Comm_rank(chan_comm_, &chan_comm_rank); assert(chan_comm_rank == 0 || chan_comm_rank == 1); partner_rank_ = 1 - chan_comm_rank; MPI_Group_incl(comm_group_, 1, &partner_rank_, &origin_or_target_group_); if (mpi_rank_ == SenderMpiRank()) { fprintf(stderr, "[mpi %d]\tcreating SEND window (global ranks: %d -> %d)\n", my_mpi_rank, sender_global_thread_rank, receiver_global_thread_rank); MPI_Win_create(nullptr, 0, sizeof(int), MPI_INFO_NULL, chan_comm_, &win_); char win_name[16]; sprintf(win_name, "S: %d -> %d", sender_global_thread_rank_, receiver_global_thread_rank); MPI_Win_set_name(win_, win_name); } else { // receiver fprintf(stderr, "[mpi %d]\tcreating RECV window (global ranks: %d -> %d)\n", my_mpi_rank, sender_global_thread_rank, receiver_global_thread_rank); MPI_Win_create(buf_, count_, sizeof(int), MPI_INFO_NULL, chan_comm_, &win_); char win_name[16]; sprintf(win_name, "R: %d -> %d", sender_global_thread_rank_, receiver_global_thread_rank); MPI_Win_set_name(win_, win_name); } } ~Channel() = default; void Free() { MPI_Free_mem(buf_); MPI_Group_free(&comm_group_); MPI_Group_free(&origin_or_target_group_); MPI_Win_free(&win_); } int SenderMpiRank() const { return sender_global_thread_rank_ / num_threads_; } int ReceiverMpiRank() const { return receiver_global_thread_rank_ / num_threads_; } void Send() { int ret; ret = MPI_Win_start(origin_or_target_group_, 0, win_); assert(ret == MPI_SUCCESS); ret = MPI_Put(buf_, count_, MPI_INT, partner_rank_, 0, count_, MPI_INT, win_); assert(ret == MPI_SUCCESS); ret = MPI_Win_complete(win_); assert(ret == MPI_SUCCESS); } void Recv() { int ret; ret = MPI_Win_post(origin_or_target_group_, 0, win_); assert(ret == MPI_SUCCESS); ret = MPI_Win_wait(win_); assert(ret == MPI_SUCCESS); } private: MPI_Comm chan_comm_; MPI_Group comm_group_, origin_or_target_group_; int partner_rank_; }; //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// // ranks are global ranks int buf_val(int sender_rank, int recv_rank, int idx) { return (sender_rank * 100000) + (recv_rank * 1000) + idx; } void send_message(int global_thread_rank, Channel& chan, int iters) { for (auto i = 0; i < chan.count_; ++i) { chan.buf_[i] = buf_val(global_thread_rank, chan.receiver_global_thread_rank_, i); } char win_name[16]; int resultlen; MPI_Win_get_name(chan.win_, win_name, &resultlen); fprintf(stderr, " [r%d]\tSEND_MESSAGE win name: %s\n", global_thread_rank, win_name); for (auto iter = 0; iter < iters; ++iter) { chan.Send(); } } void recv_message(int global_thread_rank, Channel& chan, int iters) { char win_name[16]; int resultlen; MPI_Win_get_name(chan.win_, win_name, &resultlen); fprintf(stderr, " [r%d]\tRECV_MESSAGE win name: %s\n", global_thread_rank, win_name); for (auto iter = 0; iter < iters; ++iter) { // fprintf(stderr, KCYN " [r%d]\t i: %d: receiving...\n" KRESET, // global_thread_rank, iter); chan.Recv(); // fprintf(stderr, KCYN " [r%d]\t i: %d: receiving DONE...\n" KRESET, // global_thread_rank, iter); for (auto i = 0; i < chan.count_; ++i) { const auto expected = buf_val(chan.sender_global_thread_rank_, global_thread_rank, i); if (chan.buf_[i] != expected) { fprintf(stderr, "[r%d] Expected %d but got %d for index %d in " "message " "from global rank %d\n", global_thread_rank, expected, chan.buf_[i], i, chan.sender_global_thread_rank_); exit(EXIT_FAILURE); } } } } void thread_runner(unsigned thread_num, int mpi_rank, int mpi_size, int num_threads, int count, int iters, std::vector chans) { const auto global_thread_rank = (mpi_rank * num_threads) + thread_num; const auto is_sender = global_thread_rank < num_threads; Channel& my_chan = chans[thread_num]; char hostname[16]; auto ret = gethostname(hostname, sizeof(hostname)); assert(ret == 0); if (is_sender) { fprintf(stderr, "[r%d - %s] SENDER thread %u, mpi rank %d (mpi size %d) " "sending to global rank %d\n", global_thread_rank, hostname, thread_num, mpi_rank, mpi_size, my_chan.receiver_global_thread_rank_); send_message(global_thread_rank, my_chan, iters); } else { fprintf(stderr, "[r%d - %s] RECEIVER thread %u, mpi rank %d (mpi size %d) " "receiving from glboal rank %d\n", global_thread_rank, hostname, thread_num, mpi_rank, mpi_size, my_chan.sender_global_thread_rank_); recv_message(global_thread_rank, my_chan, iters); } } int main(int argc, char* argv[]) { assert(argc == 4); int num_ints = atoi(argv[1]); int iters = atoi(argv[2]); int num_threads = atoi(argv[3]); int provided; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); assert(provided == MPI_THREAD_MULTIPLE); // each process initialzes MPI int rank, size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); assert(size == 2); CommManager comm_mgr(size); std::vector chans; // each process creates a channel going from thread i --> thread i + offset, // where offset = num_threads for (auto global_thread_rank = 0; global_thread_rank < num_threads; ++global_thread_rank) { const auto global_sender = global_thread_rank; const auto global_receiver = global_thread_rank + num_threads; chans.emplace_back(rank, num_threads, global_sender, global_receiver, num_ints, comm_mgr); } assert(chans.size() == num_threads); // now, kick off the threads, which will do the work std::vector threads; threads.reserve(num_threads); for (auto i = 0; i < num_threads; ++i) { threads.emplace_back(thread(thread_runner, i, rank, size, num_threads, num_ints, iters, chans)); } for (auto& t : threads) { t.join(); } MPI_Barrier(MPI_COMM_WORLD); if (rank == 0) { fprintf(stderr, "\n--- SUCCESS ---\n\n"); } for (auto& c : chans) { c.Free(); } comm_mgr.Free(); MPI_Finalize(); return EXIT_SUCCESS; }