[mpich-discuss] Questions about non-blocking collective calls...
Eric Chamberland
Eric.Chamberland at giref.ulaval.ca
Wed Oct 21 14:45:32 CDT 2015
Hi,
A long time ago (in 2002) we programmed here a non-blocking MPI_Igather
with equivalent calls to MPI_Isend/MPI_Irecv (see the 2 attached files).
A very convenient advantage of this version, is that I can do some work
on the root process as soon as it start receiving data... Then, it wait
for the next communication to terminate and process the received data.
Now, I am looking at MPI_Igather (and all non-blocking collective MPI
calls), and I am somewhat surprised (or ignorant) that I cannot have the
root rank receive some data, then process it, then wait until the next
"MPI_irecv" terminate...
In other words, a MPI_Igather generate only 1 MPI_Request but I would
like to have either "p" (with p = size of communicator) MPI_Request
generated or be able to call "p" times MPI_WaitAny with the same
MPI_Request... Am I normal? :)
So my 3 questions are:
#1- Is there a way to use MPI_Igather with MPI_WaitAny (or something
else?) to process data as it is received?
#2- Big question: will our implementation with MPI_Isend/MPI_Irecv scale
to a large number of processes? What are the possible drawbacks of
doing it like we did?
#3- Why should I replace our implementation by the native MPI_Igather?
Thanks!
Eric
-------------- next part --------------
#include "mpi.h"
#include <cstdio>
#include <cstdlib>
#include <fstream>
using namespace std;
void abortOnError(int ierr) {
if (ierr != MPI_SUCCESS) {
printf("ERROR Returned by MPI: %d\n",ierr);
char* lCharPtr = new char[MPI_MAX_ERROR_STRING];
int lLongueur = 0;
MPI_Error_string(ierr,lCharPtr, &lLongueur);
printf("ERROR_string Returned by MPI: %s\n",lCharPtr);
MPI_Abort( MPI_COMM_WORLD, 1 );
}
}
int main(int argc, char *argv[])
{
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Request *lRecvRequests = 0;
MPI_Request lSendRequest = 0;
const int lNbVal = 1000;
int sendarray[lNbVal];
for (int i = 0; i< lNbVal;++i) {
sendarray[i] = rank;
}
int root = 0, *rbuf;
if (root == rank) {
rbuf = (int *)malloc(size*lNbVal*sizeof(int));
lRecvRequests = (MPI_Request*) malloc(size*sizeof(MPI_Request));
}
if (root == rank) {
//rank 0 receive from everybody:
for (int p = 0; p < size; ++p) {
int tag = p;
abortOnError(MPI_Irecv(rbuf+lNbVal*p, lNbVal, MPI_INT, p, tag, MPI_COMM_WORLD, &lRecvRequests[p]));
}
}
//All ranks sends data to rank 0:
int tag = rank;
abortOnError(MPI_Isend(sendarray, lNbVal, MPI_INT, root, tag, MPI_COMM_WORLD, &lSendRequest));
if (root == rank) {
//Show memory used on rank 0:
std::ifstream lFicProcSelfStatus("/proc/self/status");
std::string lTmpLine;
while (lFicProcSelfStatus) {
std::getline(lFicProcSelfStatus, lTmpLine, '\n');
std::cout << lTmpLine <<std::endl;
}
}
MPI_Status lStatusReception;
lStatusReception.MPI_ERROR = MPI_SUCCESS;
int lCount = 0;
int lIndex = 0;
if (root == rank) {
//Explore what we received:
for (int p = 0; p < size; ++p) {
abortOnError(MPI_Waitany(size,lRecvRequests, &lIndex, &lStatusReception));
abortOnError(MPI_Get_count(&lStatusReception, MPI_INT, &lCount));
int received_rank = lStatusReception.MPI_SOURCE;
std::cout << "Received from rank: " << received_rank << "), " << lCount << " values : " << std::endl;
std::cout << "Doing some work with asynchronously received data..." <<std::endl;
std::cout << "First 5 values: ";
for (int j = 0; j <5;++j) {
std::cout << *(rbuf+lNbVal*received_rank+j) << " ";
}
std::cout << std::endl;
}
}
MPI_Finalize();
return 0;
}
-------------- next part --------------
#include "mpi.h"
#include <cstdio>
#include <cstdlib>
#include <fstream>
using namespace std;
void abortOnError(int ierr) {
if (ierr != MPI_SUCCESS) {
printf("ERROR Returned by MPI: %d\n",ierr);
char* lCharPtr = new char[MPI_MAX_ERROR_STRING];
int lLongueur = 0;
MPI_Error_string(ierr,lCharPtr, &lLongueur);
printf("ERROR_string Returned by MPI: %s\n",lCharPtr);
MPI_Abort( MPI_COMM_WORLD, 1 );
}
}
int main(int argc, char *argv[])
{
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
const int lNbVal = 1000;
int sendarray[lNbVal];
for (int i = 0; i< lNbVal;++i) {
sendarray[i] = rank;
}
int root = 0, *rbuf;
if (0 == rank) {
rbuf = (int *)malloc(size*lNbVal*sizeof(int));
}
MPI_Request lRequest;
abortOnError(MPI_Igather( sendarray, lNbVal, MPI_INT, rbuf, lNbVal, MPI_INT, root, MPI_COMM_WORLD, &lRequest));
if (root == rank) {
//Show memory used on rank 0:
std::ifstream lFicProcSelfStatus("/proc/self/status");
std::string lTmpLine;
while (lFicProcSelfStatus) {
std::getline(lFicProcSelfStatus, lTmpLine, '\n');
std::cout << lTmpLine <<std::endl;
}
}
MPI_Status lStatusReception;
lStatusReception.MPI_ERROR = MPI_SUCCESS;
int lCount = 0;
if (0 == rank) {
abortOnError(MPI_Wait(&lRequest, &lStatusReception));
//Explore what we got:
for (int p = 0; p < size; ++p) {
abortOnError(MPI_Get_count(&lStatusReception, MPI_INT, &lCount));
std::cout << "Received from rank: " << p << " (?==" <<lStatusReception.MPI_SOURCE << "), " << lCount << " values : " << std::endl;
std::cout << "First 5 values: ";
for (int j = 0; j <5;++j) {
std::cout << *(rbuf+lNbVal*p+j) << " ";
}
std::cout << std::endl;
}
}
else {
abortOnError(MPI_Wait(&lRequest, &lStatusReception));
abortOnError(MPI_Get_count(&lStatusReception, MPI_INT, &lCount));
std::cout << "rank: " << rank << " sent " << lCount << " values." << std::endl;
}
MPI_Finalize();
return 0;
}
-------------- next part --------------
_______________________________________________
discuss mailing list discuss at mpich.org
To manage subscription options or unsubscribe:
https://lists.mpich.org/mailman/listinfo/discuss
More information about the discuss
mailing list