[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.1.2-73-gbfc0924

Service Account noreply at mpich.org
Thu Aug 7 10:04:23 CDT 2014


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".

The branch, master has been updated
       via  bfc09241bd576e70546de494a8df6ae67c4d21be (commit)
      from  12909511b352b3b6d069b8d1f933b383f8447704 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/bfc09241bd576e70546de494a8df6ae67c4d21be

commit bfc09241bd576e70546de494a8df6ae67c4d21be
Author: Paul Coffman <pkcoff at us.ibm.com>
Date:   Wed Jul 30 00:05:22 2014 -0500

    Reduce P2PContig communication with local compute
    
    P2PContig additional optimizations for performance improvement to
    exchange some communication during aggregation for local computation -
    most helpful at scale
    
    Signed-off-by: Rob Latham <robl at mcs.anl.gov>

diff --git a/src/mpi/romio/adio/common/p2p_aggregation.c b/src/mpi/romio/adio/common/p2p_aggregation.c
index b8e00e8..46cfe55 100644
--- a/src/mpi/romio/adio/common/p2p_aggregation.c
+++ b/src/mpi/romio/adio/common/p2p_aggregation.c
@@ -4,6 +4,8 @@
 
 #include <pthread.h>
 
+// #define p2pcontigtrace 1
+
 void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 	const void *buf,
 	int *error_code,
@@ -28,6 +30,8 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
     MPI_Comm_size(fd->comm, &nprocs);
     MPI_Comm_rank(fd->comm, &myrank);
 
+	ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
+
     int myAggRank = -1; // if I am an aggregor this is my index into fd->hints->ranklist
     int iAmUsedAgg = 0;
 
@@ -54,28 +58,19 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 	}
     }
 
-    /* determine how much data and to whom I need to send (also record the
-     * offset in the buffer for procs that span file domains) */
+    /* Determine how much data and to whom I need to send.  For source proc
+     * targets, also determine the target file domain offsets locally to
+     * reduce communication overhead */
     int *targetAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
-    int *dataSizeToSendPerTargetAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
-    int *bufferOffsetToSendPerTargetAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
+    ADIO_Offset *targetAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
+    ADIO_Offset *targetAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
     int numTargetAggs = 0;
     int i;
     for (i=0;i<naggs;i++) {
-	if ( ((st_offsets[myrank] >= fd_start[i]) &&  (st_offsets[myrank] <= fd_end[i])) || ((end_offsets[myrank] >= fd_start[i]) &&  (end_offsets[myrank] <= fd_end[i]))) {
+	if ( ((myOffsetStart >= fd_start[i]) &&  (myOffsetStart <= fd_end[i])) || ((myOffsetEnd >= fd_start[i]) &&  (myOffsetEnd <= fd_end[i]))) {
 	    targetAggsForMyData[numTargetAggs] = fd->hints->ranklist[i];
-	    if ( ((st_offsets[myrank] >= fd_start[i]) &&  (st_offsets[myrank] <= fd_end[i])) && ((end_offsets[myrank] >= fd_start[i]) &&  (end_offsets[myrank] <= fd_end[i]))) {
-		dataSizeToSendPerTargetAgg[numTargetAggs] = (end_offsets[myrank] - st_offsets[myrank])+1;
-		bufferOffsetToSendPerTargetAgg[numTargetAggs] = 0;
-	    }
-	    else if ((st_offsets[myrank] >= fd_start[i]) &&  (st_offsets[myrank] <= fd_end[i])) { // starts in this fd and goes past it
-		dataSizeToSendPerTargetAgg[numTargetAggs] = (fd_end[i] - st_offsets[myrank]) +1;
-		bufferOffsetToSendPerTargetAgg[numTargetAggs] = 0;
-	    }
-	    else { // starts in fd before this and ends in it
-		dataSizeToSendPerTargetAgg[numTargetAggs] = (end_offsets[myrank] - fd_start[i]) +1;
-		bufferOffsetToSendPerTargetAgg[numTargetAggs] = fd_start[i]- st_offsets[myrank];
-	    }
+	    targetAggsForMyDataFDStart[numTargetAggs] = fd_start[i];
+	    targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[i];
 	    numTargetAggs++;
 	}
     }
@@ -86,7 +81,6 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
     ADIO_Offset *remainingDataOffsetToGetPerProc=NULL;
 
     int numSourceProcs = 0;
-    int totalDataSizeToGet = 0;
 
     if (iAmUsedAgg) { /* for the used aggregators figure out how much data I
 			 need from what procs */
@@ -100,8 +94,6 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 	remainingDataAmountToGetPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
 	remainingDataOffsetToGetPerProc = (ADIO_Offset *)ADIOI_Malloc(numSourceProcs * sizeof(ADIO_Offset));
 
-	/* TODO: here was a spot where the balancecontig code figured out bridge ranks */
-
 	/* everybody has the st_offsets and end_offsets for all ranks so if I am a
 	 * used aggregator go thru them and figure out which ranks have data that
 	 * falls into my file domain assigned to me */
@@ -121,9 +113,8 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 		    remainingDataAmountToGetPerProc[numSourceProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
 		    remainingDataOffsetToGetPerProc[numSourceProcs] = fd_start[myAggRank];
 		}
-		totalDataSizeToGet += remainingDataAmountToGetPerProc[numSourceProcs];
 #ifdef p2pcontigtrace
-		printf("getting %ld bytes from source proc %d in fd %d with borders %ld to %ld\n",remainingDataAmountToGetPerProc[numSourceProcs],fd->hints->ranklist[myAggRank],i,fd_start[myAggRank],fd_end[myAggRank]);
+		printf("getting %ld bytes from source proc %d in fd rank %d with borders %ld to %ld\n",remainingDataAmountToGetPerProc[numSourceProcs],i,fd->hints->ranklist[myAggRank],fd_start[myAggRank],fd_end[myAggRank]);
 #endif
 		numSourceProcs++;
 	    }
@@ -133,12 +124,8 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
     int *amountOfDataReqestedByTargetAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
     for (i=0;i<numTargetAggs;i++) {
 	amountOfDataReqestedByTargetAgg[i] = 0;
-#ifdef p2pcontigtrace
-	printf("Need to send %ld bytes at buffer offset %d to agg %d\n", dataSizeToSendPerTargetAgg[i],bufferOffsetToSendPerTargetAgg[i],targetAggsForMyData[i]);
-#endif
     }
 
-    int totalAmountDataSent = 0;
     int totalAmountDataReceived = 0;
     MPI_Request *mpiSizeToSendRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
     MPI_Request *mpiRecvDataRequest = (MPI_Request *) ADIOI_Malloc(numSourceProcs * sizeof(MPI_Request));
@@ -177,7 +164,9 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 
     int *dataSizeGottenThisRoundPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
     int *mpiRequestMapPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
-    int *mpiSendRequestMapPerProc = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
+    int *targetAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
+    int *sendBufferOffsetsThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
+    int *bufferAmountsToSendThisRound = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
 
 #ifdef ROMIO_GPFS
     endTimeBase = MPI_Wtime();
@@ -190,6 +179,71 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
     int roundIter;
     for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
 
+	/* determine what target aggs I need to send data to this round */
+	int numTargetAggsThisRound = 0;
+	for (i=0;i<numTargetAggs;i++) {
+	    if ( ((myOffsetStart >= targetAggsForMyDataFDStart[i]) && (myOffsetStart <= targetAggsForMyDataFDEnd[i])) ||
+		    ((myOffsetEnd >= targetAggsForMyDataFDStart[i]) && (myOffsetEnd <= targetAggsForMyDataFDEnd[i]))) {
+		// we know that we need to send data to this target agg at some point, now need to figure out how much this round
+
+		// here are the offsets currently being collected by the aggregator during this round
+		ADIO_Offset currentRoundFDStartForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
+		ADIO_Offset currentRoundFDEndForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[i] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
+		if (currentRoundFDEndForMyTargetAgg > targetAggsForMyDataFDEnd[i])
+		    currentRoundFDEndForMyTargetAgg = targetAggsForMyDataFDEnd[i];
+
+#ifdef p2pcontigtrace
+		printf("roundIter %d target iter %d targetAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld\n",
+			roundIter,i,targetAggsForMyData[i],myOffsetStart,myOffsetEnd,
+			targetAggsForMyDataFDStart[i],targetAggsForMyDataFDEnd[i],
+			currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg);
+#endif
+
+		/* send the portion of my data that is within
+		 * currentRoundFDStartForMyTargetAgg to
+		 * currentRoundFDEndForMyTargetAgg */
+		/* find the offset into the send buffer and the amount
+		 * of data to send */
+		int sendBufferOffset = 0;
+		int bufferAmountToSend = 0;
+
+		if ((myOffsetStart >= currentRoundFDStartForMyTargetAgg) && (myOffsetStart <= currentRoundFDEndForMyTargetAgg)) {
+		    if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
+			bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - myOffsetStart) +1;
+		    else
+			bufferAmountToSend = (myOffsetEnd - myOffsetStart) +1;
+		}
+		else if ((myOffsetEnd >= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd <= currentRoundFDEndForMyTargetAgg)) {
+		    sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
+		    if (myOffsetEnd > currentRoundFDEndForMyTargetAgg)
+			bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
+		    else
+			bufferAmountToSend = (myOffsetEnd - currentRoundFDStartForMyTargetAgg) +1;
+		}
+		else if ((myOffsetStart <= currentRoundFDStartForMyTargetAgg) && (myOffsetEnd >= currentRoundFDEndForMyTargetAgg)) {
+		    sendBufferOffset = (int) (currentRoundFDStartForMyTargetAgg - myOffsetStart);
+		    bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
+		}
+
+		if (bufferAmountToSend > 0) { // we have data to send this round
+		    targetAggIndexesForMyDataThisRound[numTargetAggsThisRound] = i;
+		    sendBufferOffsetsThisRound[numTargetAggsThisRound] = sendBufferOffset;
+		    bufferAmountsToSendThisRound[numTargetAggsThisRound] = bufferAmountToSend;
+#ifdef p2pcontigtrace
+		    printf("bufferAmountToSend is %d sendBufferOffset is %d\n",bufferAmountToSend,sendBufferOffset);
+#endif
+		    /* only need to be pinged by the agg for rounds after the first one - for the first one just
+		     * send the data without being pinged */
+		    if (roundIter > 0)
+			MPI_Irecv(&amountOfDataReqestedByTargetAgg[numTargetAggsThisRound],1,
+				MPI_INT,targetAggsForMyData[i],0,
+				fd->comm,&mpiSizeToSendRequest[numTargetAggsThisRound]);
+		    numTargetAggsThisRound++;
+
+		}
+	    }
+	}
+
 	// determine what offsets define the portion of the file domain the agg is writing this round
 	if (iAmUsedAgg) {
 	    if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
@@ -202,22 +256,8 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 #endif
 	}
 
-	int numRecvToWaitFor = 0;
 	int irecv,isend;
-
-	/* the source procs receive the amount of data the aggs want them to send */
-#ifdef ROMIO_GPFS
-	startTimeBase = MPI_Wtime();
-#endif
-	for (i=0;i<numTargetAggs;i++) {
-	    MPI_Irecv(&amountOfDataReqestedByTargetAgg[i],1,
-		    MPI_INT,targetAggsForMyData[i],0,
-		    fd->comm,&mpiSizeToSendRequest[i]);
-	    numRecvToWaitFor++;
-#ifdef p2pcontigtrace
-	    printf("MPI_Irecv from rank %d\n",targetAggsForMyData[i]);
-#endif
-	}
+	int numSourceProcsSentData = 0;
 
 	// the aggs send the amount of data they need to their source procs
 	for (i=0;i<numSourceProcs;i++) {
@@ -239,30 +279,44 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 #ifdef p2pcontigtrace
 	    printf("dataSizeGottenThisRoundPerProc[%d] set to %d - remainingDataOffsetToGetPerProc is %d remainingDataAmountToGetPerProc is %d currentRoundFDStart is %d currentRoundFDEnd is %d\n",i,dataSizeGottenThisRoundPerProc[i],remainingDataOffsetToGetPerProc[i],remainingDataAmountToGetPerProc[i],currentRoundFDStart,currentRoundFDEnd);
 #endif
-	    MPI_Isend(&dataSizeGottenThisRoundPerProc[i],1,MPI_INT,
-		    sourceProcsForMyData[i],0,fd->comm,&mpiSendDataSizeRequest[i]);
-
+	    if (dataSizeGottenThisRoundPerProc[i] > 0) {
+		if (roundIter > 0) {
+		    MPI_Isend(&dataSizeGottenThisRoundPerProc[i],1,MPI_INT,
+			    sourceProcsForMyData[i],0,fd->comm,
+			    &mpiSendDataSizeRequest[numSourceProcsSentData]);
+		    numSourceProcsSentData++;
+		}
+	    }
 	}
 
-        int numDataSendToWaitFor = 0;
-	/* the source procs send the requested data to the aggs - only send if
-	 * requested more than 0 bytes */
-	for (i = 0; i < numRecvToWaitFor; i++) {
-	    MPI_Waitany(numRecvToWaitFor,mpiSizeToSendRequest,&irecv,&mpiWaitAnyStatusFromTargetAggs);
+	int numDataSendToWaitFor = 0;
+	/* the source procs send the requested data to the aggs */
+	for (i = 0; i < numTargetAggsThisRound; i++) {
+
+		/* the source procs aren't pinged by the target aggs on the first round */
+	    if (roundIter > 0) {
+
+		MPI_Waitany(numTargetAggsThisRound,mpiSizeToSendRequest,
+			&irecv,&mpiWaitAnyStatusFromTargetAggs);
 
 #ifdef p2pcontigtrace
-	    printf("was sent request for %d bytes from rank %d irecv index %d\n",amountOfDataReqestedByTargetAgg[irecv],targetAggsForMyData[irecv],irecv);
+		printf("irecv is %d amountOfDataReqestedByTargetAgg is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",irecv,amountOfDataReqestedByTargetAgg[irecv], bufferAmountsToSendThisRound[irecv], sendBufferOffsetsThisRound[irecv],targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]]);
 #endif
+		ADIOI_Assert(amountOfDataReqestedByTargetAgg[irecv] == bufferAmountsToSendThisRound[irecv]);
+		MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[irecv]],
+			bufferAmountsToSendThisRound[irecv],MPI_BYTE,
+			targetAggsForMyData[targetAggIndexesForMyDataThisRound[irecv]],
+			0,fd->comm,&mpiSendDataToTargetAggRequest[irecv]);
 
-	    if (amountOfDataReqestedByTargetAgg[irecv] > 0) {
-		MPI_Isend(&((char*)buf)[bufferOffsetToSendPerTargetAgg[irecv]],amountOfDataReqestedByTargetAgg[irecv],MPI_BYTE,
-			targetAggsForMyData[irecv],0,fd->comm,&mpiSendDataToTargetAggRequest[irecv]);
-		totalAmountDataSent += amountOfDataReqestedByTargetAgg[irecv];
-		bufferOffsetToSendPerTargetAgg[irecv] += amountOfDataReqestedByTargetAgg[irecv];
-                mpiSendRequestMapPerProc[numDataSendToWaitFor] = irecv;
-                numDataSendToWaitFor++;
 	    }
-
+	    else {
+#ifdef p2pcontigtrace
+		printf("i is %d bufferAmountsToSendThisRound is %d sendBufferOffsetsThisRound is %d targetAggsForMyData is %d\n",i, bufferAmountsToSendThisRound[i], sendBufferOffsetsThisRound[i],targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]]);
+#endif
+		MPI_Isend(&((char*)buf)[sendBufferOffsetsThisRound[i]],bufferAmountsToSendThisRound[i],MPI_BYTE,
+			targetAggsForMyData[targetAggIndexesForMyDataThisRound[i]],0,fd->comm,&mpiSendDataToTargetAggRequest[i]);
+	    }
+        numDataSendToWaitFor++;
 	}
 
 #ifdef ROMIO_GPFS
@@ -316,8 +370,8 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 
 	/* clean up the MPI_Request object for the MPI_Isend which told the
 	 * source procs how much data to send */
-        for (i=0;i<numSourceProcs;i++) {
-           MPI_Waitany(numSourceProcs,mpiSendDataSizeRequest,
+        for (i=0;i<numSourceProcsSentData;i++) {
+           MPI_Waitany(numSourceProcsSentData,mpiSendDataSizeRequest,
 		   &isend,&mpiIsendStatusForSize);
         }
 
@@ -376,7 +430,7 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
 	if (iAmUsedAgg)
 	    currentRoundFDStart += coll_bufsize;
         for (i = 0; i < numDataSendToWaitFor; i++) {
-          MPI_Wait(&mpiSendDataToTargetAggRequest[mpiSendRequestMapPerProc[i]],
+          MPI_Wait(&mpiSendDataToTargetAggRequest[i],
 		  &mpiIsendStatusForData);
         }
 
@@ -405,8 +459,11 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
     }
 
     ADIOI_Free(targetAggsForMyData);
-    ADIOI_Free(dataSizeToSendPerTargetAgg);
-    ADIOI_Free(bufferOffsetToSendPerTargetAgg);
+    ADIOI_Free(targetAggsForMyDataFDStart);
+    ADIOI_Free(targetAggsForMyDataFDEnd);
+    ADIOI_Free(targetAggIndexesForMyDataThisRound);
+    ADIOI_Free(sendBufferOffsetsThisRound);
+    ADIOI_Free(bufferAmountsToSendThisRound);
     ADIOI_Free(amountOfDataReqestedByTargetAgg);
     ADIOI_Free(mpiSizeToSendRequest);
     ADIOI_Free(mpiRecvDataRequest);
@@ -414,7 +471,6 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
     ADIOI_Free(mpiSendDataToTargetAggRequest);
     ADIOI_Free(dataSizeGottenThisRoundPerProc);
     ADIOI_Free(mpiRequestMapPerProc);
-    ADIOI_Free(mpiSendRequestMapPerProc);
 
     /* TODO: still need a barrier here? */
     MPI_Barrier(fd->comm);
@@ -449,6 +505,8 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
     MPI_Comm_size(fd->comm, &nprocs);
     MPI_Comm_rank(fd->comm, &myrank);
 
+    ADIO_Offset myOffsetStart = st_offsets[myrank], myOffsetEnd = end_offsets[myrank];
+
     int myAggRank = -1; // if I am an aggregor this is my index into fd->hints->ranklist
     int iAmUsedAgg = 0;
 
@@ -470,27 +528,20 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 	}
     }
 
-    // for my offset range determine how much data and from whom I need to get (also record the offset in the buffer for procs that span file domains)
+    /* for my offset range determine how much data and from whom I need to get
+     * it.  For source ag targets, also determine the source file domain
+     * offsets locally to reduce communication overhead */
     int *sourceAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
-    int *dataSizeToGetPerSourceAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
-    int *bufferOffsetToGetPerSourceAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
+    ADIO_Offset *sourceAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
+    ADIO_Offset *sourceAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
     int numSourceAggs = 0;
     int i;
     for (i=0;i<naggs;i++) {
-	if ( ((st_offsets[myrank] >= fd_start[i]) &&  (st_offsets[myrank] <= fd_end[i])) || ((end_offsets[myrank] >= fd_start[i]) &&  (end_offsets[myrank] <= fd_end[i]))) {
+	if ( ((myOffsetStart >= fd_start[i]) && (myOffsetStart <= fd_end[i])) ||
+		((myOffsetEnd >= fd_start[i]) &&  (myOffsetEnd <= fd_end[i]))) {
 	    sourceAggsForMyData[numSourceAggs] = fd->hints->ranklist[i];
-	    if ( ((st_offsets[myrank] >= fd_start[i]) &&  (st_offsets[myrank] <= fd_end[i])) && ((end_offsets[myrank] >= fd_start[i]) &&  (end_offsets[myrank] <= fd_end[i]))) {
-		dataSizeToGetPerSourceAgg[numSourceAggs] = (end_offsets[myrank] - st_offsets[myrank])+1;
-		bufferOffsetToGetPerSourceAgg[numSourceAggs] = 0;
-	    }
-	    else if ((st_offsets[myrank] >= fd_start[i]) &&  (st_offsets[myrank] <= fd_end[i])) { // starts in this fd and goes past it
-		dataSizeToGetPerSourceAgg[numSourceAggs] = (fd_end[i] - st_offsets[myrank]) +1;
-		bufferOffsetToGetPerSourceAgg[numSourceAggs] = 0;
-	    }
-	    else { // starts in fd before this and ends in it
-		dataSizeToGetPerSourceAgg[numSourceAggs] = (end_offsets[myrank] - fd_start[i]) +1;
-		bufferOffsetToGetPerSourceAgg[numSourceAggs] = fd_start[i]- st_offsets[myrank];
-	    }
+	    sourceAggsForMyDataFDStart[numSourceAggs] = fd_start[i];
+	    sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[i];
 	    numSourceAggs++;
 	}
     }
@@ -504,7 +555,6 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
     ADIO_Offset *remainingDataOffsetToSendPerProc=NULL;
 
     int numTargetProcs = 0;
-    int totalDataSizeToSend = 0;
 
     if (iAmUsedAgg) {
 	/* for the used aggregators figure out how much data I need from what procs */
@@ -524,8 +574,6 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 	remainingDataOffsetToSendPerProc =
 	    (ADIO_Offset *)ADIOI_Malloc(numTargetProcs * sizeof(ADIO_Offset));
 
-	/* TODO: some balancecontig logic might need to go here */
-
 	/* everybody has the st_offsets and end_offsets for all ranks so if I am a
 	 * used aggregator go thru them and figure out which ranks have data that
 	 * falls into my file domain assigned to me */
@@ -545,25 +593,15 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 		    remainingDataAmountToSendPerProc[numTargetProcs] = (end_offsets[i] - fd_start[myAggRank]) +1;
 		    remainingDataOffsetToSendPerProc[numTargetProcs] = fd_start[myAggRank];
 		}
-		totalDataSizeToSend += remainingDataAmountToSendPerProc[numTargetProcs];
 		numTargetProcs++;
 	    }
 	}
     }
 
-    int *amountOfDataReqestedFromSourceAgg = (int *)ADIOI_Malloc(naggs * sizeof(int));
-    for (i=0;i<numSourceAggs;i++) {
-	amountOfDataReqestedFromSourceAgg[i] = 0;
-    }
-
 
-    int totalAmountDataSent = 0;
-    MPI_Request *mpiSizeToSendRequest = (MPI_Request *) ADIOI_Malloc(numSourceAggs * sizeof(MPI_Request));
     MPI_Request *mpiRecvDataFromSourceAggsRequest = (MPI_Request *) ADIOI_Malloc(numSourceAggs * sizeof(MPI_Request));
-    MPI_Request *mpiSendDataSizeRequest = (MPI_Request *) ADIOI_Malloc(numTargetProcs * sizeof(MPI_Request));
-
     MPI_Request *mpiSendDataToTargetProcRequest = (MPI_Request *) ADIOI_Malloc(numTargetProcs * sizeof(MPI_Request));
-    MPI_Status mpiWaitAnyStatusFromTargetAggs,mpiWaitAnyStatusFromSourceProcs,mpiIsendStatusForSize,mpiIsendStatusForData;
+    MPI_Status mpiWaitAnyStatusFromSourceProcs,mpiIsendStatusForData;
 
     /* use the two-phase buffer allocated in the file_open - no app should ever
      * be both reading and writing at the same time */
@@ -575,10 +613,6 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
     // compute number of rounds
     ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
 
-#ifdef p2pcontigtrace
-    printf("need to send %d bytes - coll_bufsize is %d\n",totalDataSizeToSend,coll_bufsize);
-#endif
-
     ADIO_Offset currentRoundFDStart = 0, nextRoundFDStart = 0;
     ADIO_Offset currentRoundFDEnd = 0, nextRoundFDEnd = 0;
 
@@ -588,6 +622,9 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
     }
 
     int *dataSizeSentThisRoundPerProc = (int *)ADIOI_Malloc(numTargetProcs * sizeof(int));
+    int *sourceAggIndexesForMyDataThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
+    int *recvBufferOffsetsThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
+    int *bufferAmountsToGetThisRound = (int *)ADIOI_Malloc(numSourceAggs * sizeof(int));
     *error_code = MPI_SUCCESS;
 
     int currentReadBuf = 0;
@@ -705,14 +742,68 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 	    } // useIOBuffer
 	} // IAmUsedAgg
 
-	/* the source procs receive the amount of data the aggs will be sending them */
+	/* determine what source aggs I need to get data from this round and
+	 * recv only from them */
+	int numSourceAggsThisRound = 0;
 	for (i=0;i<numSourceAggs;i++) {
-	    MPI_Irecv(&amountOfDataReqestedFromSourceAgg[i],1,
-		    MPI_INT,sourceAggsForMyData[i],0,
-		    fd->comm,&mpiSizeToSendRequest[i]);
+	    if ( ((myOffsetStart >= sourceAggsForMyDataFDStart[i]) && (myOffsetStart <= sourceAggsForMyDataFDEnd[i]))
+		    || ((myOffsetEnd >= sourceAggsForMyDataFDStart[i]) && (myOffsetEnd <= sourceAggsForMyDataFDEnd[i])) ) {
+		/* we know that we need to get data from this source agg at
+		 * some point, now need to figure out how much this round */
+
+		/* here are the offsets currently being sent by the aggregator
+		 * during this round */
+		ADIO_Offset currentRoundFDStartForMySourceAgg =
+		    (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
+			    (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
+		ADIO_Offset currentRoundFDEndForMySourceAgg =
+		    (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[i] +
+			    (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
+		if (currentRoundFDEndForMySourceAgg > sourceAggsForMyDataFDEnd[i])
+		    currentRoundFDEndForMySourceAgg = sourceAggsForMyDataFDEnd[i];
+
+#ifdef p2pcontigtrace
+		printf("roundIter %d source iter %d sourceAggsForMyData is %d myOffsetStart is %ld myOffsetEnd is %ld sourceAggsForMyDataFDStart is %ld sourceAggsForMyDataFDEnd is %ld currentRoundFDStartForMySourceAgg is %ld currentRoundFDEndForMySourceAgg is %ld\n",roundIter,i,sourceAggsForMyData[i],myOffsetStart,myOffsetEnd,sourceAggsForMyDataFDStart[i],sourceAggsForMyDataFDEnd[i],currentRoundFDStartForMySourceAgg,currentRoundFDEndForMySourceAgg);
+#endif
+
+		// get the portion of my data that is within currentRoundFDStartForMySourceAgg to currentRoundFDEndForMySourceAgg
+		// find the offset into the recv buffer and the amount of data to get
+		int recvBufferOffset = 0;
+		int bufferAmountToGet = 0;
+
+		if ((myOffsetStart >= currentRoundFDStartForMySourceAgg) && (myOffsetStart <= currentRoundFDEndForMySourceAgg)) {
+		    if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
+			bufferAmountToGet = (currentRoundFDEndForMySourceAgg - myOffsetStart) +1;
+		    else
+			bufferAmountToGet = (myOffsetEnd - myOffsetStart) +1;
+		}
+		else if ((myOffsetEnd >= currentRoundFDStartForMySourceAgg) && (myOffsetEnd <= currentRoundFDEndForMySourceAgg)) {
+		    recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
+		    if (myOffsetEnd > currentRoundFDEndForMySourceAgg)
+			bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
+		    else
+			bufferAmountToGet = (myOffsetEnd - currentRoundFDStartForMySourceAgg) +1;
+		}
+		else if ((myOffsetStart <= currentRoundFDStartForMySourceAgg) && (myOffsetEnd >= currentRoundFDEndForMySourceAgg)) {
+		    recvBufferOffset = (int) (currentRoundFDStartForMySourceAgg - myOffsetStart);
+		    bufferAmountToGet = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
+		}
+
+
+		if (bufferAmountToGet > 0) { // we have data to get this round
+		    sourceAggIndexesForMyDataThisRound[numSourceAggsThisRound] = i;
+		    recvBufferOffsetsThisRound[numSourceAggsThisRound] = recvBufferOffset;
+		    bufferAmountsToGetThisRound[numSourceAggsThisRound] = bufferAmountToGet;
+#ifdef p2pcontigtrace
+		    printf("bufferAmountToGet is %d recvBufferOffset is %d\n",bufferAmountToGet,recvBufferOffset);
+#endif
+		    numSourceAggsThisRound++;
+		}
+	    }
 	}
 
-	// the aggs send the amount of data they will be sending to their source procs
+	/* the aggs determine the amount of data they will be sending to their
+	 * source procs */
 	for (i=0;i<numTargetProcs;i++) {
 	    if ((remainingDataOffsetToSendPerProc[i] >= currentRoundFDStart) &&
 		    (remainingDataOffsetToSendPerProc[i] <= currentRoundFDEnd)) {
@@ -737,31 +828,18 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 	    else
 		dataSizeSentThisRoundPerProc[i] = 0;
 
-	    MPI_Isend(&dataSizeSentThisRoundPerProc[i],1,MPI_INT,
-		    targetProcsForMyData[i],0,fd->comm,&mpiSendDataSizeRequest[i]);
-
 	}
 
-	/* the source procs get the requested data amount from the aggs and then
-	 * receive that amount of data - only recv if requested more than 0 bytes */
-	int numDataRecvToWaitFor = 0;
-	for (i = 0; i < numSourceAggs; i++) {
-	    MPI_Waitany(numSourceAggs,mpiSizeToSendRequest,
-		    &irecv,&mpiWaitAnyStatusFromTargetAggs);
-	    if (amountOfDataReqestedFromSourceAgg[irecv] > 0) {
-
-		MPI_Irecv(&((char*)buf)[bufferOffsetToGetPerSourceAgg[irecv]],
-			amountOfDataReqestedFromSourceAgg[irecv],MPI_BYTE,
-			sourceAggsForMyData[irecv],0,fd->comm,
-			&mpiRecvDataFromSourceAggsRequest[numDataRecvToWaitFor]);
-		totalAmountDataSent += amountOfDataReqestedFromSourceAgg[irecv];
-		bufferOffsetToGetPerSourceAgg[irecv] += amountOfDataReqestedFromSourceAgg[irecv];
-
-		numDataRecvToWaitFor++;
-	    }
+	/* the target procs get the data from the source aggs */
+	for (i = 0; i < numSourceAggsThisRound; i++) {
+	    MPI_Irecv(&((char*)buf)[recvBufferOffsetsThisRound[i]],
+		    bufferAmountsToGetThisRound[i],MPI_BYTE,
+		    sourceAggsForMyData[sourceAggIndexesForMyDataThisRound[i]],0,fd->comm,
+		    &mpiRecvDataFromSourceAggsRequest[i]);
 	}
 
-	// the aggs send the data to the source procs
+	// the source aggs send the data to the target procs
+	int numTargetProcsSentThisRound = 0;
 	for (i=0;i<numTargetProcs;i++) {
 
 	    int currentWBOffset = 0;
@@ -773,28 +851,25 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
 		MPI_Isend(&((char*)read_buf)[currentWBOffset],
 			dataSizeSentThisRoundPerProc[i],
 			MPI_BYTE,targetProcsForMyData[i],0,
-			fd->comm,&mpiSendDataToTargetProcRequest[i]);
+			fd->comm,&mpiSendDataToTargetProcRequest[numTargetProcsSentThisRound]);
+		numTargetProcsSentThisRound++;
 		remainingDataAmountToSendPerProc[i] -= dataSizeSentThisRoundPerProc[i];
 		remainingDataOffsetToSendPerProc[i] += dataSizeSentThisRoundPerProc[i];
 	    }
 	}
 
 	// wait for the target procs to get their data
-	for (i = 0; i < numDataRecvToWaitFor; i++) {
-	    MPI_Waitany(numDataRecvToWaitFor,mpiRecvDataFromSourceAggsRequest,
+	for (i = 0; i < numSourceAggsThisRound; i++) {
+	    MPI_Waitany(numSourceAggsThisRound,mpiRecvDataFromSourceAggsRequest,
 		    &irecv,&mpiWaitAnyStatusFromSourceProcs);
 	}
 
 	nextRoundFDStart = currentRoundFDStart + coll_bufsize;
 
-
         // clean up the MPI_Isend MPI_Requests
-        for (i=0;i<numTargetProcs;i++) {
-          MPI_Waitany(numTargetProcs,mpiSendDataSizeRequest,
-		  &isend,&mpiIsendStatusForSize);
-          if (dataSizeSentThisRoundPerProc[isend] > 0) {
-            MPI_Wait(&mpiSendDataToTargetProcRequest[isend],&mpiIsendStatusForData);
-          }
+        for (i=0;i<numTargetProcsSentThisRound;i++) {
+          MPI_Waitany(numTargetProcsSentThisRound,mpiSendDataToTargetProcRequest,
+		  &isend,&mpiIsendStatusForData);
         }
 
 	MPI_Barrier(fd->comm); // need to sync up the source aggs which did the isend with the target procs which did the irecvs to give the target procs time to get the data before overwriting with next round readcontig
@@ -816,14 +891,15 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
     }
 
     ADIOI_Free(sourceAggsForMyData);
-    ADIOI_Free(dataSizeToGetPerSourceAgg);
-    ADIOI_Free(bufferOffsetToGetPerSourceAgg);
-    ADIOI_Free(amountOfDataReqestedFromSourceAgg);
-    ADIOI_Free(mpiSizeToSendRequest);
+    ADIOI_Free(sourceAggsForMyDataFDStart);
+    ADIOI_Free(sourceAggsForMyDataFDEnd);
+
     ADIOI_Free(mpiRecvDataFromSourceAggsRequest);
-    ADIOI_Free(mpiSendDataSizeRequest);
     ADIOI_Free(mpiSendDataToTargetProcRequest);
     ADIOI_Free(dataSizeSentThisRoundPerProc);
+    ADIOI_Free(sourceAggIndexesForMyDataThisRound);
+    ADIOI_Free(recvBufferOffsetsThisRound);
+    ADIOI_Free(bufferAmountsToGetThisRound);
 
     /* TODO: is Barrier here needed? */
     MPI_Barrier(fd->comm);

-----------------------------------------------------------------------

Summary of changes:
 src/mpi/romio/adio/common/p2p_aggregation.c |  356 ++++++++++++++++-----------
 1 files changed, 216 insertions(+), 140 deletions(-)


hooks/post-receive
-- 
MPICH primary repository


More information about the commits mailing list