[mpich-discuss] Questions about non-blocking collective calls...

Eric Chamberland Eric.Chamberland at giref.ulaval.ca
Wed Oct 21 14:45:32 CDT 2015


Hi,

A long time ago (in 2002) we programmed here a non-blocking MPI_Igather 
with equivalent calls to MPI_Isend/MPI_Irecv (see the 2 attached files).

A very convenient advantage of this version, is that I can do some work 
on the root process as soon as it start receiving data...  Then, it wait 
for the next communication to terminate and process the received data.

Now, I am looking at MPI_Igather (and all non-blocking collective MPI 
calls), and I am somewhat surprised (or ignorant) that I cannot have the 
root rank receive some data, then process it, then wait until the next 
"MPI_irecv" terminate...

In other words, a MPI_Igather generate only 1 MPI_Request but I would 
like to have either "p" (with p = size of communicator) MPI_Request 
generated or be able to call "p" times MPI_WaitAny with the same 
MPI_Request...  Am I normal? :)

So my 3 questions are:

#1- Is there a way to use MPI_Igather with MPI_WaitAny (or something 
else?) to process data as it is received?

#2- Big question: will our implementation with MPI_Isend/MPI_Irecv scale 
to a large number of processes?  What are the possible drawbacks of 
doing it like we did?

#3- Why should I replace our implementation by the native MPI_Igather?

Thanks!

Eric
-------------- next part --------------
#include "mpi.h"
#include <cstdio>
#include <cstdlib>
#include <fstream>

using namespace std;

void abortOnError(int ierr) {
    if (ierr != MPI_SUCCESS) {
        printf("ERROR Returned by MPI: %d\n",ierr);
        char* lCharPtr = new char[MPI_MAX_ERROR_STRING];
        int lLongueur = 0;
        MPI_Error_string(ierr,lCharPtr, &lLongueur);
        printf("ERROR_string Returned by MPI: %s\n",lCharPtr);
        MPI_Abort( MPI_COMM_WORLD, 1 );
    }
}

int main(int argc, char *argv[])
{
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    MPI_Request *lRecvRequests = 0;
    MPI_Request lSendRequest = 0;
    const int lNbVal = 1000;
    int sendarray[lNbVal];

    for (int i = 0; i< lNbVal;++i) {
        sendarray[i] = rank;
    }
    int root = 0, *rbuf;
    if (root == rank) {
        rbuf = (int *)malloc(size*lNbVal*sizeof(int));
        lRecvRequests = (MPI_Request*) malloc(size*sizeof(MPI_Request));
    }

    if (root == rank) {
        //rank 0 receive from everybody:
        for (int p = 0; p < size; ++p) {
            int tag = p;
            abortOnError(MPI_Irecv(rbuf+lNbVal*p, lNbVal, MPI_INT, p, tag, MPI_COMM_WORLD, &lRecvRequests[p]));
        }
    }

    //All ranks sends data to rank 0:
    int tag = rank;
    abortOnError(MPI_Isend(sendarray, lNbVal, MPI_INT, root, tag, MPI_COMM_WORLD, &lSendRequest));

    if (root == rank) {
        //Show memory used on rank 0:
        std::ifstream lFicProcSelfStatus("/proc/self/status");
        std::string lTmpLine;
        while (lFicProcSelfStatus) {
            std::getline(lFicProcSelfStatus, lTmpLine, '\n');
            std::cout << lTmpLine <<std::endl;
        }
    }


    MPI_Status  lStatusReception;
    lStatusReception.MPI_ERROR  = MPI_SUCCESS;
    int lCount = 0;
    int lIndex = 0;

    if (root == rank) {
        //Explore what we received:
        for (int p = 0; p < size; ++p) {
            abortOnError(MPI_Waitany(size,lRecvRequests, &lIndex, &lStatusReception));
            abortOnError(MPI_Get_count(&lStatusReception, MPI_INT, &lCount));
            int received_rank =  lStatusReception.MPI_SOURCE;
            std::cout << "Received from rank: " << received_rank << "), " << lCount << " values : " << std::endl;
            std::cout << "Doing some work with asynchronously received data..." <<std::endl;
            std::cout << "First 5 values: ";
            for (int j = 0; j <5;++j) {
                std::cout  << *(rbuf+lNbVal*received_rank+j) << " ";
            }
            std::cout << std::endl;
        }
    }

    MPI_Finalize();
    return 0;

}
-------------- next part --------------
#include "mpi.h"
#include <cstdio>
#include <cstdlib>
#include <fstream>

using namespace std;

void abortOnError(int ierr) {
    if (ierr != MPI_SUCCESS) {
        printf("ERROR Returned by MPI: %d\n",ierr);
        char* lCharPtr = new char[MPI_MAX_ERROR_STRING];
        int lLongueur = 0;
        MPI_Error_string(ierr,lCharPtr, &lLongueur);
        printf("ERROR_string Returned by MPI: %s\n",lCharPtr);
        MPI_Abort( MPI_COMM_WORLD, 1 );
    }
}

int main(int argc, char *argv[])
{
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    const int lNbVal = 1000;

    int sendarray[lNbVal];
    for (int i = 0; i< lNbVal;++i) {
        sendarray[i] = rank;
    }
    int root = 0, *rbuf;
    if (0 == rank) {
        rbuf = (int *)malloc(size*lNbVal*sizeof(int));
    }
    MPI_Request lRequest;

    abortOnError(MPI_Igather( sendarray, lNbVal, MPI_INT, rbuf, lNbVal, MPI_INT, root, MPI_COMM_WORLD, &lRequest));

    if (root == rank) {
        //Show memory used on rank 0:
        std::ifstream lFicProcSelfStatus("/proc/self/status");
        std::string lTmpLine;
        while (lFicProcSelfStatus) {
            std::getline(lFicProcSelfStatus, lTmpLine, '\n');
            std::cout << lTmpLine <<std::endl;
        }
    }

    MPI_Status  lStatusReception;
    lStatusReception.MPI_ERROR  = MPI_SUCCESS;
    int lCount = 0;

    if (0 == rank) {
        abortOnError(MPI_Wait(&lRequest, &lStatusReception));
        //Explore what we got:
        for (int p = 0; p < size; ++p) {
            abortOnError(MPI_Get_count(&lStatusReception, MPI_INT, &lCount));

            std::cout << "Received from rank: " << p  << " (?==" <<lStatusReception.MPI_SOURCE << "), " << lCount << " values : " << std::endl;
            std::cout << "First 5 values: ";
            for (int j = 0; j <5;++j) {
                std::cout << *(rbuf+lNbVal*p+j) << " ";
            }
            std::cout << std::endl;
        }
    }
    else {
        abortOnError(MPI_Wait(&lRequest, &lStatusReception));
        abortOnError(MPI_Get_count(&lStatusReception, MPI_INT, &lCount));
        std::cout << "rank: " << rank  << " sent " << lCount << " values." << std::endl;
    }

    MPI_Finalize();
    return 0;

}
-------------- next part --------------
_______________________________________________
discuss mailing list     discuss at mpich.org
To manage subscription options or unsubscribe:
https://lists.mpich.org/mailman/listinfo/discuss


More information about the discuss mailing list