[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.2b4-205-g2470e84
Service Account
noreply at mpich.org
Wed Sep 2 14:58:02 CDT 2015
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".
The branch, master has been updated
via 2470e846446a7f565f4bee8f2fe2d227ce464155 (commit)
from 0c218fbbfc236b1234bd839ff210255a5aa9ee43 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/2470e846446a7f565f4bee8f2fe2d227ce464155
commit 2470e846446a7f565f4bee8f2fe2d227ce464155
Author: Paul Coffman <pcoffman at anl.gov>
Date: Tue Jul 14 21:24:02 2015 +0000
assorted fixes to ROMIO Collective IO One-sided
This commit addresses several design deficiencies in the ROMIO one-sided
collective IO algorithm and fixes several bugs. Specifically:
- Re-engineering of one-sided algorithm to more easily support other
file systems such as lustre, reworking and simplifying how the data in
the source buffer is tracked, packing non-contiguous source data
comprising the same contiguous chunk of target data to make just 1
rdma call instead of several, and providing a framework in which
multiple file domains within an aggregator can exist.
- Initial mpi_gather for offsets now includes the total amount of data
each rank needs to write - this was needed because the one-sided
algorithm needs to distinguish between ranks with 0 and 1 bytes (those
that have and don't have data) and the difference in starting and
ending offsets for 1 byte is 0 the same as 0 bytes so it cannot be
distiguished --- some corner cases which are implicitly handled by the
two-phase algorithm need to be explicitly handled by the one-sided
algorithm.
- Added env var GPFSMPIO_ONESIDED_ALWAYS_RMW which when set will force
the one-sided write to always first read the write buffer offset range
from the disk to pre-fill any holes in the data.
- In the case where holes are found during the one-sided write and
GPFSMPIO_ONESIDED_NO_RMW is unset resulting in a read-modify-write
rewrite, instead of calling the baseline two-phase algorithm recall
the one-sided algorithm with GPFSMPIO_ONESIDED_ALWAYS_RMW set.
- Removed the one-sided active synchronization code path since it would
add significant memory overhead to support the non-contiguous source
data buffer packing and on blue gene was always slower than passive
anyhow.
- Fixed bugs related to loss of precision on calculations involving
offsets defined as long (ADIO_Offset) and sizes defined as int.
- Fixed bug with the number of aggregation rounds where in some cases
not enough rounds were being performed - essentially take the max
number of rounds needed by any aggregator instead of the average.
- Fixed bug related to an offset range crossing a file-domain boundary
where the source buffer data pointer was not advanced properly,
resulting in incorrect data from the source buffer being written.
- Replaced the GPFSMPIO_AGGMETHOD env var to specify the aggregation
method for both read and write with GPFSMPIO_WRITE_AGGMETHOD and
GPFSMPIO_READ_AGGMETHOD to individually specify them --- on BlueGene
/Q there are one-sided bugs in pami which are in some cases exposed by
the mpi_put used by the write and other cases the mpi_get used by the
read -- the user needs the ability to use what works.
Signed-off-by: Rob Latham <robl at mcs.anl.gov>
diff --git a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_rdcoll.c b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_rdcoll.c
index dd71643..910b51e 100644
--- a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_rdcoll.c
+++ b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_rdcoll.c
@@ -114,6 +114,7 @@ void ADIOI_GPFS_ReadStridedColl(ADIO_File fd, void *buf, int count,
ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
*fd_end = NULL, *end_offsets = NULL;
ADIO_Offset *gpfs_offsets0 = NULL, *gpfs_offsets = NULL;
+ ADIO_Offset *count_sizes;
int ii;
ADIO_Offset *len_list = NULL;
int *buf_idx = NULL;
@@ -174,7 +175,36 @@ void ADIOI_GPFS_ReadStridedColl(ADIO_File fd, void *buf, int count,
st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
+ ADIO_Offset my_count_size;
+ /* One-sided aggregation needs the amount of data per rank as well because the difference in
+ * starting and ending offsets for 1 byte is 0 the same as 0 bytes so it cannot be distiguished.
+ */
+ if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
+ count_sizes = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
+ MPI_Count buftype_size;
+ MPI_Type_size_x(datatype, &buftype_size);
+ my_count_size = (ADIO_Offset) count * (ADIO_Offset)buftype_size;
+ }
if (gpfsmpio_tunegather) {
+ if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
+ gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
+ gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
+ for (ii=0; ii<nprocs; ii++) {
+ gpfs_offsets0[ii*3] = 0;
+ gpfs_offsets0[ii*3+1] = 0;
+ gpfs_offsets0[ii*3+2] = 0;
+ }
+ gpfs_offsets0[myrank*3] = start_offset;
+ gpfs_offsets0[myrank*3+1] = end_offset;
+ gpfs_offsets0[myrank*3+2] = my_count_size;
+ MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*3, ADIO_OFFSET, MPI_MAX, fd->comm );
+ for (ii=0; ii<nprocs; ii++) {
+ st_offsets [ii] = gpfs_offsets[ii*3] ;
+ end_offsets[ii] = gpfs_offsets[ii*3+1];
+ count_sizes[ii] = gpfs_offsets[ii*3+2];
+ }
+ }
+ else {
gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
for (ii=0; ii<nprocs; ii++) {
@@ -190,6 +220,7 @@ void ADIOI_GPFS_ReadStridedColl(ADIO_File fd, void *buf, int count,
st_offsets [ii] = gpfs_offsets[ii*2] ;
end_offsets[ii] = gpfs_offsets[ii*2+1];
}
+ }
ADIOI_Free( gpfs_offsets0 );
ADIOI_Free( gpfs_offsets );
} else {
@@ -197,6 +228,10 @@ void ADIOI_GPFS_ReadStridedColl(ADIO_File fd, void *buf, int count,
ADIO_OFFSET, fd->comm);
MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
ADIO_OFFSET, fd->comm);
+ if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
+ MPI_Allgather(&count_sizes, 1, ADIO_OFFSET, count_sizes, 1,
+ ADIO_OFFSET, fd->comm);
+ }
}
GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_PATANA, GPFSMPIO_CIO_T_GATHER )
@@ -259,31 +294,67 @@ void ADIOI_GPFS_ReadStridedColl(ADIO_File fd, void *buf, int count,
* needs to be mapped to an actual rank in the communicator later.
*
*/
- if (gpfsmpio_tuneblocking)
+ int currentNonZeroDataIndex = 0;
+ if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
+ /* Take out the 0-data offsets by shifting the indexes with data to the
+ * front and keeping track of the non-zero data index for use as the
+ * length. By doing this we will optimally use all available aggs
+ * and spread the actual data across them instead of having offsets
+ * with empty data potentially dilute the file domains and create
+ * problems for the one-sided aggregation.
+ */
+ for (i=0; i<nprocs; i++) {
+ if (count_sizes[i] > 0) {
+ st_offsets[currentNonZeroDataIndex] = st_offsets[i];
+ end_offsets[currentNonZeroDataIndex] = end_offsets[i];
+ currentNonZeroDataIndex++;
+ }
+ }
+ }
+ if (gpfsmpio_tuneblocking) {
+ if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
+ ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, currentNonZeroDataIndex,
+ nprocs_for_coll, &min_st_offset,
+ &fd_start, &fd_end, &fd_size, fd->fs_ptr);
+ }
+ else {
ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, nprocs,
nprocs_for_coll, &min_st_offset,
&fd_start, &fd_end, &fd_size, fd->fs_ptr);
- else
+ }
+ }
+ else {
+ if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
+ ADIOI_Calc_file_domains(st_offsets, end_offsets, currentNonZeroDataIndex,
+ nprocs_for_coll, &min_st_offset,
+ &fd_start, &fd_end,
+ fd->hints->min_fdomain_size, &fd_size,
+ fd->hints->striping_unit);
+ }
+ else {
ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
nprocs_for_coll, &min_st_offset,
&fd_start, &fd_end,
fd->hints->min_fdomain_size, &fd_size,
fd->hints->striping_unit);
+ }
+ }
GPFSMPIO_T_CIO_SET_GET( r, 1, 1, GPFSMPIO_CIO_T_MYREQ, GPFSMPIO_CIO_T_FD_PART );
- if ((gpfsmpio_aggmethod == 1) || (gpfsmpio_aggmethod == 2)) {
+ if ((gpfsmpio_read_aggmethod == 1) || (gpfsmpio_read_aggmethod == 2)) {
/* If the user has specified to use a one-sided aggregation method then do that at
* this point instead of the two-phase I/O.
*/
ADIOI_OneSidedReadAggregation(fd, offset_list, len_list, contig_access_count, buf,
- datatype,error_code, st_offsets, end_offsets, fd_start, fd_end);
- GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
+ datatype,error_code, st_offsets, end_offsets, currentNonZeroDataIndex, fd_start, fd_end);
+ GPFSMPIO_T_CIO_REPORT( 0, fd, myrank, nprocs)
ADIOI_Free(offset_list);
ADIOI_Free(len_list);
ADIOI_Free(st_offsets);
ADIOI_Free(end_offsets);
ADIOI_Free(fd_start);
ADIOI_Free(fd_end);
+ ADIOI_Free(count_sizes);
goto fn_exit;
}
if (gpfsmpio_p2pcontig==1) {
diff --git a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.c b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.c
index 0d44176..333612b 100644
--- a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.c
+++ b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.c
@@ -37,11 +37,13 @@ long bglocklessmpio_f_type;
int gpfsmpio_bg_nagg_pset;
int gpfsmpio_pthreadio;
int gpfsmpio_p2pcontig;
-int gpfsmpio_aggmethod;
+int gpfsmpio_write_aggmethod;
+int gpfsmpio_read_aggmethod;
int gpfsmpio_balancecontig;
int gpfsmpio_devnullio;
int gpfsmpio_bridgeringagg;
int gpfsmpio_onesided_no_rmw;
+int gpfsmpio_onesided_always_rmw;
int gpfsmpio_onesided_inform_rmw;
double gpfsmpio_prof_cw [GPFSMPIO_CIO_LAST+1];
@@ -108,8 +110,10 @@ double gpfsmpio_prof_cr [GPFSMPIO_CIO_LAST+1];
* 3.) There are no gaps between the offsets.
* 4.) No single rank has a data size which spans multiple file domains.
*
- * - GPFSMPIO_AGGMETHOD - Replaces the two-phase collective IO aggregation with a one-
- * sided algorithm, significantly reducing communication and memory overhead. Fully
+ * - GPFSMPIO_WRITE_AGGMETHOD/GPFSMPIO_READ_AGGMETHOD - Replaces the two-phase
+ * collective IO aggregation
+ * with a one-sided algorithm, significantly reducing communication and
+ * memory overhead. Fully
* supports all datasets and datatypes, the only caveat is that any holes in the data
* when writing to a pre-existing file are ignored -- there is no read-modify-write
* support to maintain the correctness of regions of pre-existing data so every byte
@@ -124,7 +128,7 @@ double gpfsmpio_prof_cr [GPFSMPIO_CIO_LAST+1];
* optimal performance for this is achieved when paired with PAMID_TYPED_ONESIDED=1.
* - Default is 0
*
- * - GPFSMPIO_ONESIDED_NO_RMW - For one-sided aggregation (GPFSMPIO_AGGMETHOD = 1 or 2)
+ * - GPFSMPIO_ONESIDED_NO_RMW - For one-sided aggregation (GPFSMPIO_WRITE_AGGMETHOD = 1 or 2)
* disable the detection of holes in the data when writing to a pre-existing
* file requiring a read-modify-write, thereby avoiding the communication
* overhead for this detection.
@@ -200,9 +204,13 @@ void ad_gpfs_get_env_vars() {
x = getenv( "GPFSMPIO_P2PCONTIG" );
if (x) gpfsmpio_p2pcontig = atoi(x);
- gpfsmpio_aggmethod = 0;
- x = getenv( "GPFSMPIO_AGGMETHOD" );
- if (x) gpfsmpio_aggmethod = atoi(x);
+ gpfsmpio_write_aggmethod = 0;
+ x = getenv( "GPFSMPIO_WRITE_AGGMETHOD" );
+ if (x) gpfsmpio_write_aggmethod = atoi(x);
+
+ gpfsmpio_read_aggmethod = 0;
+ x = getenv( "GPFSMPIO_READ_AGGMETHOD" );
+ if (x) gpfsmpio_read_aggmethod = atoi(x);
gpfsmpio_balancecontig = 0;
x = getenv( "GPFSMPIO_BALANCECONTIG" );
@@ -220,6 +228,12 @@ void ad_gpfs_get_env_vars() {
x = getenv( "GPFSMPIO_ONESIDED_NO_RMW" );
if (x) gpfsmpio_onesided_no_rmw = atoi(x);
+ gpfsmpio_onesided_always_rmw = 0;
+ x = getenv( "GPFSMPIO_ONESIDED_ALWAYS_RMW" );
+ if (x) gpfsmpio_onesided_always_rmw = atoi(x);
+ if (gpfsmpio_onesided_always_rmw)
+ gpfsmpio_onesided_no_rmw = 1;
+
gpfsmpio_onesided_inform_rmw = 0;
x = getenv( "GPFSMPIO_ONESIDED_INFORM_RMW" );
if (x) gpfsmpio_onesided_inform_rmw = atoi(x);
diff --git a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.h b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.h
index 56e1588..35fdc8d 100644
--- a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.h
+++ b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.h
@@ -66,11 +66,13 @@ extern int gpfsmpio_tuneblocking;
extern long bglocklessmpio_f_type;
extern int gpfsmpio_pthreadio;
extern int gpfsmpio_p2pcontig;
-extern int gpfsmpio_aggmethod;
+extern int gpfsmpio_write_aggmethod;
+extern int gpfsmpio_read_aggmethod;
extern int gpfsmpio_balancecontig;
extern int gpfsmpio_devnullio;
extern int gpfsmpio_bridgeringagg;
extern int gpfsmpio_onesided_no_rmw;
+extern int gpfsmpio_onesided_always_rmw;
extern int gpfsmpio_onesided_inform_rmw;
/* Default is, well, kind of complicated. Blue Gene /L and /P had "psets": one
diff --git a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_wrcoll.c b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_wrcoll.c
index b8d4366..e374b59 100644
--- a/src/mpi/romio/adio/ad_gpfs/ad_gpfs_wrcoll.c
+++ b/src/mpi/romio/adio/ad_gpfs/ad_gpfs_wrcoll.c
@@ -132,6 +132,7 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
*fd_end = NULL, *end_offsets = NULL;
ADIO_Offset *gpfs_offsets0 = NULL, *gpfs_offsets = NULL;
+ ADIO_Offset *count_sizes;
int ii;
int *buf_idx = NULL;
@@ -171,11 +172,41 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
/* each process communicates its start and end offsets to other
processes. The result is an array each of start and end offsets stored
in order of process rank. */
-
+
st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
+ ADIO_Offset my_count_size;
+ /* One-sided aggregation needs the amount of data per rank as well because
+ * the difference in starting and ending offsets for 1 byte is 0 the same
+ * as 0 bytes so it cannot be distiguished.
+ */
+ if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
+ count_sizes = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
+ MPI_Count buftype_size;
+ MPI_Type_size_x(datatype, &buftype_size);
+ my_count_size = (ADIO_Offset) count * (ADIO_Offset)buftype_size;
+ }
if (gpfsmpio_tunegather) {
+ if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
+ gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
+ gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(3*nprocs*sizeof(ADIO_Offset));
+ for (ii=0; ii<nprocs; ii++) {
+ gpfs_offsets0[ii*3] = 0;
+ gpfs_offsets0[ii*3+1] = 0;
+ gpfs_offsets0[ii*3+2] = 0;
+ }
+ gpfs_offsets0[myrank*3] = start_offset;
+ gpfs_offsets0[myrank*3+1] = end_offset;
+ gpfs_offsets0[myrank*3+2] = my_count_size;
+ MPI_Allreduce( gpfs_offsets0, gpfs_offsets, nprocs*3, ADIO_OFFSET, MPI_MAX, fd->comm );
+ for (ii=0; ii<nprocs; ii++) {
+ st_offsets [ii] = gpfs_offsets[ii*3] ;
+ end_offsets[ii] = gpfs_offsets[ii*3+1];
+ count_sizes[ii] = gpfs_offsets[ii*3+2];
+ }
+ }
+ else {
gpfs_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
gpfs_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
for (ii=0; ii<nprocs; ii++) {
@@ -191,6 +222,7 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
st_offsets [ii] = gpfs_offsets[ii*2] ;
end_offsets[ii] = gpfs_offsets[ii*2+1];
}
+ }
ADIOI_Free( gpfs_offsets0 );
ADIOI_Free( gpfs_offsets );
} else {
@@ -198,6 +230,10 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
ADIO_OFFSET, fd->comm);
MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
ADIO_OFFSET, fd->comm);
+ if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
+ MPI_Allgather(&count_sizes, 1, ADIO_OFFSET, count_sizes, 1,
+ ADIO_OFFSET, fd->comm);
+ }
}
GPFSMPIO_T_CIO_SET_GET(w, 1, 1, GPFSMPIO_CIO_T_PATANA, GPFSMPIO_CIO_T_GATHER )
@@ -250,25 +286,61 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
done by (logically) dividing the file into file domains (FDs); each
process may directly access only its own file domain. */
- if (gpfsmpio_tuneblocking)
+ int currentValidDataIndex = 0;
+ if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
+ /* Take out the 0-data offsets by shifting the indexes with data to the front
+ * and keeping track of the valid data index for use as the length.
+ */
+ for (i=0; i<nprocs; i++) {
+ if (count_sizes[i] > 0) {
+ st_offsets[currentValidDataIndex] = st_offsets[i];
+ end_offsets[currentValidDataIndex] = end_offsets[i];
+ currentValidDataIndex++;
+ }
+ }
+ }
+
+ if (gpfsmpio_tuneblocking) {
+ if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
+ ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets,
+ currentValidDataIndex,
+ nprocs_for_coll, &min_st_offset,
+ &fd_start, &fd_end, &fd_size, fd->fs_ptr);
+ }
+ else {
+
ADIOI_GPFS_Calc_file_domains(fd, st_offsets, end_offsets, nprocs,
nprocs_for_coll, &min_st_offset,
&fd_start, &fd_end, &fd_size, fd->fs_ptr);
- else
+ }
+ }
+ else {
+ if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
+ ADIOI_Calc_file_domains(st_offsets, end_offsets, currentValidDataIndex,
+ nprocs_for_coll, &min_st_offset,
+ &fd_start, &fd_end,
+ fd->hints->min_fdomain_size, &fd_size,
+ fd->hints->striping_unit);
+ }
+ else {
ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
nprocs_for_coll, &min_st_offset,
&fd_start, &fd_end,
fd->hints->min_fdomain_size, &fd_size,
fd->hints->striping_unit);
+ }
+ }
GPFSMPIO_T_CIO_SET_GET( w, 1, 1, GPFSMPIO_CIO_T_MYREQ, GPFSMPIO_CIO_T_FD_PART );
- if ((gpfsmpio_aggmethod == 1) || (gpfsmpio_aggmethod == 2)) {
+ if ((gpfsmpio_write_aggmethod == 1) || (gpfsmpio_write_aggmethod == 2)) {
/* If the user has specified to use a one-sided aggregation method then do that at
* this point instead of the two-phase I/O.
*/
int holeFound = 0;
- ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count, buf, datatype, error_code, st_offsets, end_offsets, fd_start, fd_end, &holeFound);
+ ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count,
+ buf, datatype, error_code, st_offsets, end_offsets,
+ currentValidDataIndex, fd_start, fd_end, &holeFound);
int anyHolesFound = 0;
if (!gpfsmpio_onesided_no_rmw)
MPI_Allreduce(&holeFound, &anyHolesFound, 1, MPI_INT, MPI_MAX, fd->comm);
@@ -280,15 +352,35 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
ADIOI_Free(end_offsets);
ADIOI_Free(fd_start);
ADIOI_Free(fd_end);
+ ADIOI_Free(count_sizes);
goto fn_exit;
}
else {
- /* Holes are found in the data and the user has not set gpfsmpio_onesided_no_rmw ---
- * fall thru and perform the two-phase aggregation and if the user has gpfsmpio_onesided_inform_rmw
- * set then inform him of this condition and behavior.
+ /* Holes are found in the data and the user has not set
+ * gpfsmpio_onesided_no_rmw --- set gpfsmpio_onesided_always_rmw to 1
+ * and re-call ADIOI_OneSidedWriteAggregation and if the user has
+ * gpfsmpio_onesided_inform_rmw set then inform him of this condition
+ * and behavior.
*/
+
if (gpfsmpio_onesided_inform_rmw && (myrank ==0))
- FPRINTF(stderr,"Information: Holes found during one-sided write aggregation algorithm --- additionally performing default two-phase aggregation algorithm\n");
+ FPRINTF(stderr,"Information: Holes found during one-sided "
+ "write aggregation algorithm --- re-running one-sided "
+ "write aggregation with GPFSMPIO_ONESIDED_ALWAYS_RMW set to 1.\n");
+ gpfsmpio_onesided_always_rmw = 1;
+ int prev_gpfsmpio_onesided_no_rmw = gpfsmpio_onesided_no_rmw;
+ gpfsmpio_onesided_no_rmw = 1;
+ ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count, buf, datatype, error_code, st_offsets, end_offsets, currentValidDataIndex, fd_start, fd_end, &holeFound);
+ gpfsmpio_onesided_no_rmw = prev_gpfsmpio_onesided_no_rmw;
+ GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
+ ADIOI_Free(offset_list);
+ ADIOI_Free(len_list);
+ ADIOI_Free(st_offsets);
+ ADIOI_Free(end_offsets);
+ ADIOI_Free(fd_start);
+ ADIOI_Free(fd_end);
+ ADIOI_Free(count_sizes);
+ goto fn_exit;
}
}
if (gpfsmpio_p2pcontig==1) {
diff --git a/src/mpi/romio/adio/common/onesided_aggregation.c b/src/mpi/romio/adio/common/onesided_aggregation.c
index f4ae0de..fedf9a0 100644
--- a/src/mpi/romio/adio/common/onesided_aggregation.c
+++ b/src/mpi/romio/adio/common/onesided_aggregation.c
@@ -5,16 +5,15 @@
#include "../ad_gpfs/ad_gpfs_tuning.h"
#else
int gpfsmpio_onesided_no_rmw = 0;
-int gpfsmpio_aggmethod = 0;
+int gpfsmpio_write_aggmethod = 0;
+int gpfsmpio_read_aggmethod = 0;
+int gpfsmpio_onesided_always_rmw = 0;
#endif
#include <pthread.h>
// #define onesidedtrace 1
-/* Uncomment this to use fences for the one-sided communication.
- */
-// #define ACTIVE_TARGET 1
/* This data structure holds the number of extents, the index into the flattened buffer and the remnant length
* beyond the flattened buffer index corresponding to the base buffer offset for non-contiguous source data
@@ -26,6 +25,29 @@ typedef struct NonContigSourceBufOffset {
ADIO_Offset indiceOffset;
} NonContigSourceBufOffset;
+/* This data structure holds the access state of the source buffer for target
+ * file domains within aggregators corresponding to the target data blocks. It
+ * is designed to be initialized with a starting point for a given file domain
+ * with an aggregator, after which the data access for data written to a given
+ * file domain from this compute is linear and uninterupted, and this serves as
+ * a key optimization for feeding the target aggs. For contigous source data
+ * the starting point is a single-value offset, for non-contiguous data it is
+ * the number of extents, the index into the flattened buffer and the remnant
+ * length beyond the flattened buffer index. The validity of the usage of this
+ * structure relies on the requirement that only 1 aggregator can write to a
+ * given file domain. */
+typedef struct FDSourceBufferState {
+
+ ADIO_Offset indiceOffset;
+ MPI_Aint bufTypeExtent;
+ int dataTypeExtent;
+ int flatBufIndice;
+
+ ADIO_Offset sourceBufferOffset;
+
+} FDSourceBufferState;
+
+
static int ADIOI_OneSidedSetup(ADIO_File fd, int procs) {
int ret = MPI_SUCCESS;
@@ -52,6 +74,107 @@ int ADIOI_OneSidedCleanup(ADIO_File fd)
return ret;
}
+/* This funtion packs a contigous buffer of data from the non-contgious source
+ * buffer for a specified chunk of data and advances the FDSourceBufferState
+ * machinery, so subsequent calls with the FDSourceBufferState will return the
+ * next linear chunk.
+ * Parameters:
+ * in: sourceDataBuffer - pointer to source data buffer.
+ * in: flatBuf - pointer to flattened source data buffer
+ * in: targetNumBytes - number of bytes to return and advance.
+ * in: packing - whether data is being packed from the source buffer to the
+ * packed buffer (1) or unpacked from the packed buffer to the source
+ * buffer (0)
+ * in/out: currentFDSourceBufferState - pointer to FDSourceBufferState structure, current
+ * data used as starting point, will be updated with
+ * the new state after targetNumBytes advance.
+ * out: packedDataBufer - pointer to the output packed data buffer. If the
+ * value is NULL then no data will be written.
+ *
+ */
+inline static void nonContigSourceDataBufferAdvance(char *sourceDataBuffer,
+ ADIOI_Flatlist_node *flatBuf, int targetNumBytes, int packing,
+ FDSourceBufferState *currentFDSourceBufferState, char *packedDataBufer)
+{
+ // make currentDataTypeExtent and bufTypeExtent ADIO_Offset since they are
+ // used in offset calculations
+ ADIO_Offset currentIndiceOffset = currentFDSourceBufferState->indiceOffset;
+ ADIO_Offset bufTypeExtent = (ADIO_Offset)currentFDSourceBufferState->bufTypeExtent;
+ ADIO_Offset currentDataTypeExtent =
+ (ADIO_Offset)currentFDSourceBufferState->dataTypeExtent;
+ int currentFlatBufIndice = currentFDSourceBufferState->flatBufIndice;
+
+ int targetSendDataIndex = 0;
+
+#ifdef onesidedtrace
+ printf("nonContigSourceDataBufferAdvance: currentFlatBufIndice is %d currentDataTypeExtent is %ld currentIndiceOffset is %ld\n",currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset);
+#endif
+
+ int remainingBytesToLoad = targetNumBytes;
+ while (remainingBytesToLoad > 0) {
+ if ((flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset) >= remainingBytesToLoad) { // we can get the rest of our data from this indice
+ ADIO_Offset physicalSourceBufferOffset = (currentDataTypeExtent * bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset;
+
+#ifdef onesidedtrace
+ printf("loading remainingBytesToLoad %d from src buffer offset %ld to targetSendDataIndex %d\n",remainingBytesToLoad,physicalSourceBufferOffset,targetSendDataIndex);
+#endif
+
+ if (packedDataBufer != NULL) {
+ if (packing)
+ memcpy(&(packedDataBufer[targetSendDataIndex]),&(sourceDataBuffer[physicalSourceBufferOffset]),remainingBytesToLoad);
+ else
+ memcpy(&(sourceDataBuffer[physicalSourceBufferOffset]),&(packedDataBufer[targetSendDataIndex]),remainingBytesToLoad);
+ }
+
+ targetSendDataIndex += remainingBytesToLoad;
+ currentIndiceOffset += (ADIO_Offset)remainingBytesToLoad;
+ if (currentIndiceOffset >= flatBuf->blocklens[currentFlatBufIndice]) {
+ currentIndiceOffset = (ADIO_Offset)0;
+ currentFlatBufIndice++;
+ if (currentFlatBufIndice == flatBuf->count) {
+ currentFlatBufIndice = 0;
+ currentDataTypeExtent++;
+ }
+ }
+ remainingBytesToLoad = 0;
+
+ }
+ else { // we can only get part of our data from this indice
+ int amountDataToLoad = (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset);
+ ADIO_Offset physicalSourceBufferOffset = (currentDataTypeExtent * bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset;
+
+#ifdef onesidedtrace
+ printf("loading amountDataToLoad %d from src buffer offset %ld to targetSendDataIndex %d\n",amountDataToLoad,physicalSourceBufferOffset,targetSendDataIndex);
+#endif
+ if (packedDataBufer != NULL) {
+ if (packing)
+ memcpy(&(packedDataBufer[targetSendDataIndex]),&(sourceDataBuffer[physicalSourceBufferOffset]),amountDataToLoad);
+ else
+ memcpy(&(sourceDataBuffer[physicalSourceBufferOffset]),&(packedDataBufer[targetSendDataIndex]),amountDataToLoad);
+ }
+
+ targetSendDataIndex += amountDataToLoad;
+ currentIndiceOffset = (ADIO_Offset)0;
+ currentFlatBufIndice++;
+ if (currentFlatBufIndice == flatBuf->count) {
+ currentFlatBufIndice = 0;
+ currentDataTypeExtent++;
+ }
+ remainingBytesToLoad -= amountDataToLoad;
+ }
+ } // while
+
+ /* update machinery with new flatbuf position
+ */
+ currentFDSourceBufferState->indiceOffset = currentIndiceOffset;
+ currentFDSourceBufferState->dataTypeExtent = (int) currentDataTypeExtent;
+ currentFDSourceBufferState->flatBufIndice = currentFlatBufIndice;
+#ifdef onesidedtrace
+ printf("source buf advanced to currentFlatBufIndice %d currentDataTypeExtent %ld currentIndiceOffset %ld\n",currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset);
+#endif
+}
+
+
void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
ADIO_Offset *offset_list,
ADIO_Offset *len_list,
@@ -61,11 +184,32 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
int *error_code,
ADIO_Offset *st_offsets,
ADIO_Offset *end_offsets,
+ int numNonZeroDataOffsets,
ADIO_Offset *fd_start,
ADIO_Offset* fd_end,
int *hole_found)
{
+ int i,j; /* generic iterators */
+
+#ifdef onesidedtrace
+ if (buf == NULL) {
+ printf("ADIOI_OneSidedWriteAggregation - buf is NULL contig_access_count is %d\n",contig_access_count);
+ for (i=0;i<contig_access_count;i++)
+ printf("offset_list[%d] is %ld len_list[%d] is %ld\n",
+ i,offset_list[i],i,len_list[i]);
+ }
+ if (contig_access_count < 0)
+ printf("ADIOI_OneSidedWriteAggregation - contig_access_count "
+ "of %d is less than 0\n",contig_access_count);
+#endif
+
+ int lenListOverZero = 0;
+ for (i=0;((i<contig_access_count) && (!lenListOverZero));i++)
+ if (len_list[i] > 0)
+ lenListOverZero = 1;
+
+
*error_code = MPI_SUCCESS; /* initialize to success */
#ifdef ROMIO_GPFS
@@ -73,7 +217,6 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
startTimeBase = MPI_Wtime();
#endif
- int i,j; /* generic iterators */
MPI_Status status;
pthread_t io_thread;
void *thread_ret;
@@ -82,6 +225,10 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
int nprocs,myrank;
MPI_Comm_size(fd->comm, &nprocs);
MPI_Comm_rank(fd->comm, &myrank);
+#ifdef onesidedtrace
+printf("ADIOI_OneSidedWriteAggregation started on rank %d\n",myrank);
+#endif
+
if (fd->io_buf_window == MPI_WIN_NULL ||
fd->io_buf_put_amounts_window == MPI_WIN_NULL)
@@ -101,14 +248,8 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
ADIOI_Flatlist_node *flatBuf=NULL;
ADIOI_Datatype_iscontig(datatype, &bufTypeIsContig);
- /* maxNumContigOperations keeps track of how many different chunks we will need to send
- * for the purpose of pre-allocating the data structures to hold them.
- */
- int maxNumContigOperations = contig_access_count;
-
if (!bufTypeIsContig) {
- /* Flatten the non-contiguous source datatype.
- */
+ /* Flatten the non-contiguous source datatype and set the extent. */
flatBuf = ADIOI_Flatten_and_find(datatype);
MPI_Type_extent(datatype, &bufTypeExtent);
#ifdef onesidedtrace
@@ -118,34 +259,67 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
#endif
}
+ int naggs = fd->hints->cb_nodes;
+
+ /* Track the state of the source buffer for feeding the target data blocks.
+ * For GPFS the number of file domains per agg is always 1 so we just need 1 agg
+ * dimension to track the data, in the case of lustre we will need 2 dimensions
+ * agg and file domain since aggs write to multiple file domains in the case of lustre.
+ * This structure will be modified as the data is written to reflect the current state
+ * of the offset.
+ */
+
+#ifdef onesidedtrace
+ printf("sizeof(FDSourceBufferState) is %d - make sure is 32 for 32-byte memalign optimal\n",sizeof(FDSourceBufferState));
+#endif
+ FDSourceBufferState *currentFDSourceBufferState;
+
+ currentFDSourceBufferState = (FDSourceBufferState *) ADIOI_Malloc(naggs * sizeof(FDSourceBufferState));
+ for (i=0;i<naggs;i++) {
+ /* initialize based on the bufType to indicate that it is unset.
+ */
+ if (bufTypeIsContig) {
+ currentFDSourceBufferState[i].sourceBufferOffset = -1;
+ }
+ else {
+ currentFDSourceBufferState[i].indiceOffset = -1;
+ }
+ }
+
#ifdef onesidedtrace
printf(" ADIOI_OneSidedWriteAggregation bufTypeIsContig is %d contig_access_count is %d\n",bufTypeIsContig,contig_access_count);
#endif
+ /* maxNumContigOperations keeps track of how many different chunks we will need to send
+ * for the purpose of pre-allocating the data structures to hold them.
+ */
+ int maxNumContigOperations = contig_access_count;
+
ADIO_Offset lastFileOffset = 0, firstFileOffset = -1;
- /* Get the total range being written.
+ /* Get the total range being written - in the case of just 1 byte the starting and ending offsets
+ * will match the same as they would for 0 bytes so to distinguish we need the actual data count.
*/
- for (j=0;j<nprocs;j++) {
- if (end_offsets[j] > st_offsets[j]) {
- /* Guard against ranks with empty data.
- */
+ for (j=0;j<numNonZeroDataOffsets;j++) {
+#ifdef onesidedtrace
+printf("end_offsets[%d] is %ld st_offsets[%d] is %ld\n",j,end_offsets[j],j,st_offsets[j]);
+#endif
lastFileOffset = ADIOI_MAX(lastFileOffset,end_offsets[j]);
if (firstFileOffset == -1)
firstFileOffset = st_offsets[j];
else
firstFileOffset = ADIOI_MIN(firstFileOffset,st_offsets[j]);
- }
}
int myAggRank = -1; /* if I am an aggregor this is my index into fd->hints->ranklist */
int iAmUsedAgg = 0; /* whether or not this rank is used as an aggregator. */
- int naggs = fd->hints->cb_nodes;
- int coll_bufsize = fd->hints->cb_buffer_size;
+ /* Make coll_bufsize an ADIO_Offset since it is used in calculations with offsets.
+ */
+ ADIO_Offset coll_bufsize = (ADIO_Offset)(fd->hints->cb_buffer_size);
#ifdef ROMIO_GPFS
if (gpfsmpio_pthreadio == 1) {
/* split buffer in half for a kind of double buffering with the threads*/
- coll_bufsize = fd->hints->cb_buffer_size/2;
+ coll_bufsize = (ADIO_Offset)(fd->hints->cb_buffer_size/2);
}
#endif
@@ -176,22 +350,29 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
printf("contig_access_count is %d lastFileOffset is %ld firstFileOffset is %ld\n",contig_access_count,lastFileOffset,firstFileOffset);
for (j=0;j<contig_access_count;j++) {
printf("offset_list[%d]: %ld , len_list[%d]: %ld\n",j,offset_list[j],j,len_list[j]);
+
}
#endif
- /* Determine how much data and to whom I need to send. For source proc
- * targets, also determine the target file domain offsets locally to
- * reduce communication overhead.
+ /* Compute number of rounds.
+ */
+ int numberOfRounds = 0;
+ for (j=0;j<naggs;j++) {
+ int currentNumberOfRounds = (int)(((fd_end[j] - fd_start[j])+(ADIO_Offset)1)/coll_bufsize);
+ if (((ADIO_Offset)currentNumberOfRounds*coll_bufsize) < ((fd_end[j] - fd_start[j])+(ADIO_Offset)1))
+ currentNumberOfRounds++;
+ if (currentNumberOfRounds > numberOfRounds)
+ numberOfRounds = currentNumberOfRounds;
+ }
+
+ /* Data structures to track what data this compute needs to send to whom.
+ * For lustre they will all need another dimension for the file domain.
*/
int *targetAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
ADIO_Offset *targetAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
ADIO_Offset *targetAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
int numTargetAggs = 0;
- /* Compute number of rounds.
- */
- ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
-
/* This data structure holds the beginning offset and len list index for the range to be written
* coresponding to the round and target agg. Initialize to -1 to denote being unset.
*/
@@ -209,41 +390,6 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
for (i=0;i<numberOfRounds;i++)
targetAggsForMyDataLastOffLenIndex[i] = (int *)ADIOI_Malloc(naggs * sizeof(int));
- /* This data structure holds the base buffer offset for contiguous source data for the range to be written
- * coresponding to the round and target agg. Initialize to -1 to denote being unset.yeah
- */
- ADIO_Offset **baseSourceBufferOffset;
-
- if (bufTypeIsContig) {
- baseSourceBufferOffset= (ADIO_Offset **)ADIOI_Malloc(numberOfRounds * sizeof(ADIO_Offset *));
- for (i=0;i<numberOfRounds;i++) {
- baseSourceBufferOffset[i] = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
- for (j=0;j<naggs;j++)
- baseSourceBufferOffset[i][j] = -1;
- }
- }
- ADIO_Offset currentSourceBufferOffset = 0;
-
- /* This data structure holds the number of extents, the index into the flattened buffer and the remnant length
- * beyond the flattened buffer indice corresponding to the base buffer offset for non-contiguous source data
- * for the range to be written coresponding to the round and target agg.
- */
- NonContigSourceBufOffset **baseNonContigSourceBufferOffset;
- if (!bufTypeIsContig) {
- baseNonContigSourceBufferOffset = (NonContigSourceBufOffset **) ADIOI_Malloc(numberOfRounds * sizeof(NonContigSourceBufOffset *));
- for (i=0;i<numberOfRounds;i++) {
- baseNonContigSourceBufferOffset[i] = (NonContigSourceBufOffset *)ADIOI_Malloc(naggs * sizeof(NonContigSourceBufOffset));
- /* initialize flatBufIndice to -1 to indicate that it is unset.
- */
- for (j=0;j<naggs;j++)
- baseNonContigSourceBufferOffset[i][j].flatBufIndice = -1;
- }
- }
-
- int currentDataTypeExtent = 0;
- int currentFlatBufIndice=0;
- ADIO_Offset currentIndiceOffset = 0;
-
#ifdef onesidedtrace
printf("NumberOfRounds is %d\n",numberOfRounds);
for (i=0;i<naggs;i++)
@@ -255,6 +401,11 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
int currentAggRankListIndex = 0;
int maxNumNonContigSourceChunks = 0;
+ ADIO_Offset currentSourceBufferOffset = 0;
+ int currentDataTypeExtent = 0;
+ int currentFlatBufIndice=0;
+ ADIO_Offset currentIndiceOffset = 0;
+
/* This denotes the coll_bufsize boundaries within the source buffer for writing for the same round.
*/
ADIO_Offset intraRoundCollBufsizeOffset = 0;
@@ -266,26 +417,30 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
targetAggsForMyDataCurrentRoundIter[i] = 0;
/* This is the first of the two main loops in this algorithm. The purpose of this loop is essentially to populate
- * the data structures defined above for what source data blocks needs to go where (target agg) and when (round iter).
+ * the data structures defined above for what source data blocks needs to go where (target agg and file domain) and when
+ * (round iter). For lustre essentially an additional layer of nesting will be required for the multiple file domains
+ * within the target agg.
*/
+ if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) {
int blockIter;
for (blockIter=0;blockIter<contig_access_count;blockIter++) {
/* Determine the starting source buffer offset for this block - for iter 0 skip it since that value is 0.
*/
if (blockIter>0) {
- if (bufTypeIsContig)
+ if (bufTypeIsContig) {
currentSourceBufferOffset += len_list[blockIter-1];
+ }
else {
+
/* Non-contiguous source datatype, count up the extents and indices to this point
- * in the blocks for use in computing the source buffer offset.
+ * in the blocks for use in computing the source starting buffer offset for target aggs
+ * and file domains.
*/
ADIO_Offset sourceBlockTotal = 0;
int lastIndiceUsed = currentFlatBufIndice;
int numNonContigSourceChunks = 0;
-#ifdef onesidedtrace
- printf("blockIter %d len_list[blockIter-1] is %d currentIndiceOffset is %ld currentFlatBufIndice is %d\n",blockIter,len_list[blockIter-1],currentIndiceOffset,currentFlatBufIndice);
-#endif
+
while (sourceBlockTotal < len_list[blockIter-1]) {
numNonContigSourceChunks++;
sourceBlockTotal += (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset);
@@ -295,7 +450,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
currentFlatBufIndice = 0;
currentDataTypeExtent++;
}
- currentIndiceOffset = 0;
+ currentIndiceOffset = (ADIO_Offset)0;
}
if (sourceBlockTotal > len_list[blockIter-1]) {
currentFlatBufIndice--;
@@ -304,34 +459,39 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
currentFlatBufIndice = flatBuf->count-1;
}
currentIndiceOffset = len_list[blockIter-1] - (sourceBlockTotal - flatBuf->blocklens[lastIndiceUsed]);
- ADIOI_Assert((currentIndiceOffset >= 0) && (currentIndiceOffset < flatBuf->blocklens[currentFlatBufIndice]));
+ // ADIOI_Assert((currentIndiceOffset >= 0) && (currentIndiceOffset < flatBuf->blocklens[currentFlatBufIndice]));
}
else
- currentIndiceOffset = 0;
- maxNumContigOperations += numNonContigSourceChunks;
+ currentIndiceOffset = (ADIO_Offset)0;
+ maxNumContigOperations += (numNonContigSourceChunks+2);
if (numNonContigSourceChunks > maxNumNonContigSourceChunks)
maxNumNonContigSourceChunks = numNonContigSourceChunks;
+
#ifdef onesidedtrace
printf("blockiter %d currentFlatBufIndice is now %d currentDataTypeExtent is now %d currentIndiceOffset is now %ld maxNumContigOperations is now %d\n",blockIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset,maxNumContigOperations);
#endif
} // !bufTypeIsContig
} // blockIter > 0
- /* For the first iteration we need to include these maxNumContigOperations and maxNumNonContigSourceChunks
- * for non-contig case even though we did not need to compute the starting offset.
+ /* For the last iteration we need to include these maxNumContigOperations and maxNumNonContigSourceChunks
+ * for non-contig case even though we did not need to compute the next starting offset.
*/
- if ((blockIter == 0) && (!bufTypeIsContig)) {
+ if ((blockIter == (contig_access_count-1)) && (!bufTypeIsContig)) {
ADIO_Offset sourceBlockTotal = 0;
int tmpCurrentFlatBufIndice = currentFlatBufIndice;
- while (sourceBlockTotal < len_list[0]) {
- maxNumContigOperations++;
- maxNumNonContigSourceChunks++;
+ int lastNumNonContigSourceChunks = 0;
+ while (sourceBlockTotal < len_list[blockIter]) {
+ lastNumNonContigSourceChunks++;
sourceBlockTotal += flatBuf->blocklens[tmpCurrentFlatBufIndice];
tmpCurrentFlatBufIndice++;
if (tmpCurrentFlatBufIndice == flatBuf->count) {
tmpCurrentFlatBufIndice = 0;
}
}
+ maxNumContigOperations += (lastNumNonContigSourceChunks+2);
+ if (lastNumNonContigSourceChunks > maxNumNonContigSourceChunks)
+ maxNumNonContigSourceChunks = lastNumNonContigSourceChunks;
+
}
ADIO_Offset blockStart = offset_list[blockIter], blockEnd = offset_list[blockIter]+len_list[blockIter]-(ADIO_Offset)1;
@@ -351,11 +511,13 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
/* Determine if this is a new target agg.
*/
if (blockIter>0) {
- if ((offset_list[blockIter-1]+len_list[blockIter-1]-(ADIO_Offset)1) < fd_start[currentAggRankListIndex])
+ if ((offset_list[blockIter-1]+len_list[blockIter-1]-(ADIO_Offset)1) < fd_start[currentAggRankListIndex]) {
numTargetAggs++;
+ }
}
- /* Determine which round to start writing.
+ /* Determine which round to start writing - data is written coll_bufsize per round from the aggregator
+ * so if our starting offset in the file domain is multiple coll_bufsize that will correspond to the round.
*/
if ((blockStart - fd_start[currentAggRankListIndex]) >= coll_bufsize) {
ADIO_Offset currentRoundBlockStart = fd_start[currentAggRankListIndex];
@@ -386,15 +548,31 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset;
}
targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
- if (bufTypeIsContig)
- baseSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = currentSourceBufferOffset;
+ /* Set the source buffer state starting point for data access for this
+ agg and file domain. */
+
+ if (bufTypeIsContig) {
+ if (currentFDSourceBufferState[numTargetAggs].sourceBufferOffset == -1) {
+
+ currentFDSourceBufferState[numTargetAggs].sourceBufferOffset = currentSourceBufferOffset;
+#ifdef onesidedtrace
+ printf("For agg %d sourceBufferOffset initialized to %ld\n",currentAggRankListIndex,currentSourceBufferOffset);
+#endif
+ }
+ }
else {
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].indiceOffset = currentIndiceOffset;
+ if (currentFDSourceBufferState[numTargetAggs].indiceOffset == -1) {
+ currentFDSourceBufferState[numTargetAggs].indiceOffset = currentIndiceOffset;
+ currentFDSourceBufferState[numTargetAggs].bufTypeExtent = bufTypeExtent;
+ currentFDSourceBufferState[numTargetAggs].dataTypeExtent = currentDataTypeExtent;
+ currentFDSourceBufferState[numTargetAggs].flatBufIndice = currentFlatBufIndice;
+#ifdef onesidedtrace
+ printf("For agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld\n",numTargetAggs,currentDataTypeExtent,currentFlatBufIndice,currentIndiceOffset);
+#endif
+ }
}
- intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((targetAggsForMyDataCurrentRoundIter[numTargetAggs]+1) * coll_bufsize);
+ intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((ADIO_Offset)(targetAggsForMyDataCurrentRoundIter[numTargetAggs]+1) * coll_bufsize);
#ifdef onesidedtrace
printf("Initial settings numTargetAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld currentSourceBufferOffset set to %ld intraRoundCollBufsizeOffset set to %ld\n",numTargetAggs,blockIter,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter],currentSourceBufferOffset,intraRoundCollBufsizeOffset);
@@ -405,12 +583,15 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
*/
targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
- /* If this blocks extends into the next file domain handle this situation.
+ /* If this blocks extends into the next file domain advance to the next target aggs and source buffer states.
*/
if (blockEnd > fd_end[currentAggRankListIndex]) {
#ifdef onesidedtrace
- printf("large block, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld\n",blockEnd,fd_end[currentAggRankListIndex]);
+ printf("block extends past current fd, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld total block size is %ld blockStart was %ld\n",blockEnd,fd_end[currentAggRankListIndex], len_list[blockIter],blockStart);
#endif
+ ADIO_Offset amountToAdvanceSBOffsetForFD = 0;
+ int additionalFDCounter = 0;
+
while (blockEnd >= fd_end[currentAggRankListIndex]) {
ADIO_Offset thisAggBlockEnd = fd_end[currentAggRankListIndex];
if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) {
@@ -418,14 +599,6 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
targetAggsForMyDataCurrentRoundIter[numTargetAggs]++;
intraRoundCollBufsizeOffset += coll_bufsize;
targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
- if (bufTypeIsContig)
- baseSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = currentSourceBufferOffset;
- else {
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].indiceOffset = currentIndiceOffset;
- }
-
targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
#ifdef onesidedtrace
printf("targetAggsForMyDataCurrentRoundI%d] is now %d intraRoundCollBufsizeOffset is now %ld\n",numTargetAggs,targetAggsForMyDataCurrentRoundIter[numTargetAggs],intraRoundCollBufsizeOffset);
@@ -433,6 +606,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
} // while (thisAggBlockEnd >= intraRoundCollBufsizeOffset)
} // if (thisAggBlockEnd >= intraRoundCollBufsizeOffset)
+ int prevAggRankListIndex = currentAggRankListIndex;
currentAggRankListIndex++;
/* Skip over unused aggs.
@@ -463,15 +637,76 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset;
}
targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
- if (bufTypeIsContig)
- baseSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = currentSourceBufferOffset;
- else {
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].indiceOffset = currentIndiceOffset;
+ /* For the first additonal file domain the source buffer offset
+ * will be incremented relative to the state of this first main
+ * loop but for subsequent full file domains the offset will be
+ * incremented by the size
+ * of the file domain.
+ */
+ if (additionalFDCounter == 0)
+ amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex]
+ - blockStart) + (ADIO_Offset)1;
+ else
+ amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex]
+ -fd_start[prevAggRankListIndex]) +(ADIO_Offset)1;
+
+ if (bufTypeIsContig) {
+ ADIOI_Assert(numTargetAggs > 0);
+ if (currentFDSourceBufferState[numTargetAggs].sourceBufferOffset == -1) {
+ if (additionalFDCounter == 0) { // first file domain, still use the current data counter
+ currentFDSourceBufferState[numTargetAggs].sourceBufferOffset =
+ currentSourceBufferOffset+amountToAdvanceSBOffsetForFD;
+ }
+ else { // 2nd file domain, advance full file domain from last source buffer state
+ currentFDSourceBufferState[numTargetAggs].sourceBufferOffset =
+ currentFDSourceBufferState[numTargetAggs-1].sourceBufferOffset+amountToAdvanceSBOffsetForFD;
+ }
+
+#ifdef onesidedtrace
+ printf("Crossed into new FD - for agg %d sourceBufferOffset initialized to %ld amountToAdvanceSBOffsetForFD is %ld\n",numTargetAggs,currentFDSourceBufferState[numTargetAggs].sourceBufferOffset,amountToAdvanceSBOffsetForFD);
+#endif
+ }
}
+ else if (currentFDSourceBufferState[numTargetAggs].indiceOffset == -1) {
+ // non-contiguos source buffer
+ ADIOI_Assert(numTargetAggs > 0);
+
+ /* Initialize the source buffer state appropriately and then
+ * advance it with the
+ * nonContigSourceDataBufferAdvance function.
+ */
+ if (additionalFDCounter == 0) {
+ // first file domain, still use the current data counter
+ currentFDSourceBufferState[numTargetAggs].indiceOffset =
+ currentIndiceOffset;
+ currentFDSourceBufferState[numTargetAggs].bufTypeExtent = bufTypeExtent;
+ currentFDSourceBufferState[numTargetAggs].dataTypeExtent =
+ currentDataTypeExtent;
+ currentFDSourceBufferState[numTargetAggs].flatBufIndice =
+ currentFlatBufIndice;
+ }
+ else {
+ // 2nd file domain, advance full file domain from last source buffer state
+ currentFDSourceBufferState[numTargetAggs].indiceOffset =
+ currentFDSourceBufferState[numTargetAggs-1].indiceOffset;
+ currentFDSourceBufferState[numTargetAggs].bufTypeExtent =
+ currentFDSourceBufferState[numTargetAggs-1].bufTypeExtent;
+ currentFDSourceBufferState[numTargetAggs].dataTypeExtent =
+ currentFDSourceBufferState[numTargetAggs-1].dataTypeExtent;
+ currentFDSourceBufferState[numTargetAggs].flatBufIndice =
+ currentFDSourceBufferState[numTargetAggs-1].flatBufIndice;
+ }
+ nonContigSourceDataBufferAdvance(((char*)buf), flatBuf,
+ (int)amountToAdvanceSBOffsetForFD, 1,
+ ¤tFDSourceBufferState[numTargetAggs], NULL);
#ifdef onesidedtrace
- printf("large block init settings numTargetAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n",numTargetAggs,i,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter]);
+ printf("Crossed into new FD - for agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld amountToAdvanceSBOffsetForFD is %d\n",numTargetAggs,currentFDSourceBufferState[numTargetAggs].dataTypeExtent,currentFDSourceBufferState[numTargetAggs].flatBufIndice,currentFDSourceBufferState[numTargetAggs].indiceOffset,amountToAdvanceSBOffsetForFD);
+#endif
+ }
+ additionalFDCounter++;
+
+#ifdef onesidedtrace
+ printf("block extended beyond fd init settings numTargetAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n",numTargetAggs,i,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter]);
#endif
intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + coll_bufsize;
targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
@@ -479,28 +714,22 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
} // while (blockEnd >= fd_end[currentAggRankListIndex])
} // if (blockEnd > fd_end[currentAggRankListIndex])
- /* Else if we are still in the same file domain / target agg but have gone past the coll_bufsize and need
- * to advance to the next round handle this situation.
+ /* If we are still in the same file domain / target agg but have gone
+ * past the coll_bufsize and need to advance to the next round -
+ * initialize tracking data appropriately.
*/
- else if (blockEnd >= intraRoundCollBufsizeOffset) {
+ if (blockEnd >= intraRoundCollBufsizeOffset) {
ADIO_Offset currentBlockEnd = blockEnd;
while (currentBlockEnd >= intraRoundCollBufsizeOffset) {
targetAggsForMyDataCurrentRoundIter[numTargetAggs]++;
intraRoundCollBufsizeOffset += coll_bufsize;
targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
- if (bufTypeIsContig)
- baseSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = currentSourceBufferOffset;
- else {
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs].indiceOffset = currentIndiceOffset;
- }
targetAggsForMyDataLastOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
#ifdef onesidedtrace
printf("smaller than fd currentBlockEnd is now %ld intraRoundCollBufsizeOffset is now %ld targetAggsForMyDataCurrentRoundIter[%d] is now %d\n",currentBlockEnd, intraRoundCollBufsizeOffset, numTargetAggs,targetAggsForMyDataCurrentRoundIter[numTargetAggs]);
#endif
} // while (currentBlockEnd >= intraRoundCollBufsizeOffset)
- } // else if (blockEnd >= intraRoundCollBufsizeOffset)
+ } // if (blockEnd >= intraRoundCollBufsizeOffset)
/* Need to advance numTargetAggs if this is the last target offset to
* include this one.
@@ -518,6 +747,8 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
}
#endif
+ } // if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero)
+
ADIOI_Free(targetAggsForMyDataCurrentRoundIter);
int currentWriteBuf = 0;
@@ -544,12 +775,9 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
for (i=0;i<nprocs;i++)
write_buf_put_amounts[i] = 0;
}
-#ifdef ACTIVE_TARGET
- MPI_Win_fence(0, write_buf_window);
- if (!gpfsmpio_onesided_no_rmw)
- MPI_Win_fence(0, fd->io_buf_put_amounts_window);
-#endif
+ /* Counters to track the offset range being written by the used aggs.
+ */
ADIO_Offset currentRoundFDStart = 0;
ADIO_Offset currentRoundFDEnd = 0;
@@ -563,7 +791,33 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
if (currentRoundFDEnd > lastFileOffset)
currentRoundFDEnd = lastFileOffset;
}
+#ifdef onesidedtrace
+printf("iAmUsedAgg - currentRoundFDStart initialized to %ld currentRoundFDEnd to %ld\n",currentRoundFDStart,currentRoundFDEnd);
+#endif
+ if (gpfsmpio_onesided_always_rmw) { // read in the first buffer
+ ADIO_Offset tmpCurrentRoundFDEnd = 0;
+ if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
+ if (myAggRank == greatestFileDomainAggRank) {
+ if (fd_end[myAggRank] > lastFileOffset)
+ tmpCurrentRoundFDEnd = lastFileOffset;
+ else
+ tmpCurrentRoundFDEnd = fd_end[myAggRank];
+ }
+ else
+ tmpCurrentRoundFDEnd = fd_end[myAggRank];
+ }
+ else
+ tmpCurrentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
+#ifdef onesidedtrace
+printf("gpfsmpio_onesided_always_rmw - first buffer pre-read for file offsets %ld to %ld total is %d\n",currentRoundFDStart,tmpCurrentRoundFDEnd,(int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1);
+#endif
+ ADIO_ReadContig(fd, write_buf, (int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1,
+ MPI_BYTE, ADIO_EXPLICIT_OFFSET,currentRoundFDStart, &status, error_code);
+
+ }
}
+ if (gpfsmpio_onesided_always_rmw) // wait until the first buffer is read
+ MPI_Barrier(fd->comm);
#ifdef ROMIO_GPFS
endTimeBase = MPI_Wtime();
@@ -571,25 +825,20 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
startTimeBase = MPI_Wtime();
#endif
- ADIO_Offset currentBaseSourceBufferOffset = 0;
-
- /* These data structures are used to track the offset/len pairs for non-contiguous source buffers that are
- * to be used for each block of data in the offset list. Allocate them once with the maximum size and then
- * reuse the space throughout the algoritm below.
- */
- ADIO_Offset *nonContigSourceOffsets;
- int *nonContigSourceLens;
- if (!bufTypeIsContig) {
- nonContigSourceOffsets = (ADIO_Offset *)ADIOI_Malloc((maxNumNonContigSourceChunks+2) * sizeof(ADIO_Offset));
- nonContigSourceLens = (int *)ADIOI_Malloc((maxNumNonContigSourceChunks+2) * sizeof(int));
- }
/* This is the second main loop of the algorithm, actually nested loop of target aggs within rounds. There are 2 flavors of this.
- * For gpfsmpio_aggmethod of 1 each nested iteration for the target agg does an mpi_put on a contiguous chunk using a primative datatype
- * determined using the data structures from the first main loop. For gpfsmpio_aggmethod of 2 each nested iteration for the target agg
+ * For gpfsmpio_write_aggmethod of 1 each nested iteration for the target
+ * agg does an mpi_put on a contiguous chunk using a primative datatype
+ * determined using the data structures from the first main loop. For
+ * gpfsmpio_write_aggmethod of 2 each nested iteration for the target agg
* builds up data to use in created a derived data type for 1 mpi_put that is done for the target agg for each round.
+ * To support lustre there will need to be an additional layer of nesting
+ * for the multiple file domains within target aggs.
*/
int roundIter;
+
for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
+ if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) {
+
int aggIter;
for (aggIter=0;aggIter<numTargetAggs;aggIter++) {
@@ -597,27 +846,28 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
int numBytesPutThisAggRound = 0;
/* If we have data for the round/agg process it.
*/
- if ((bufTypeIsContig && (baseSourceBufferOffset[roundIter][aggIter] != -1)) || (!bufTypeIsContig && (baseNonContigSourceBufferOffset[roundIter][aggIter].flatBufIndice != -1))) {
-
- ADIO_Offset currentRoundFDStartForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
- ADIO_Offset currentRoundFDEndForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
+ if (targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter] != -1) {
+ ADIO_Offset currentRoundFDStartForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)roundIter*coll_bufsize));
+ ADIO_Offset currentRoundFDEndForMyTargetAgg = (ADIO_Offset)((ADIO_Offset)targetAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*coll_bufsize) - (ADIO_Offset)1);
int targetAggContigAccessCount = 0;
/* These data structures are used for the derived datatype mpi_put
- * in the gpfsmpio_aggmethod of 2 case.
+ * in the gpfsmpio_write_aggmethod of 2 case.
*/
int *targetAggBlockLengths;
MPI_Aint *targetAggDisplacements, *sourceBufferDisplacements;
MPI_Datatype *targetAggDataTypes;
+ char *derivedTypePackedSourceBuffer;
+ int derivedTypePackedSourceBufferOffset = 0;
int allocatedDerivedTypeArrays = 0;
+ ADIO_Offset amountOfDataWrittenThisRoundAgg = 0;
+
#ifdef onesidedtrace
printf("roundIter %d processing targetAggsForMyData %d \n",roundIter,targetAggsForMyData[aggIter]);
#endif
- ADIO_Offset sourceBufferOffset = 0;
-
/* Process the range of offsets for this target agg.
*/
int offsetIter;
@@ -628,67 +878,11 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
ADIO_Offset offsetStart = offset_list[offsetIter], offsetEnd = (offset_list[offsetIter]+len_list[offsetIter]-(ADIO_Offset)1);
- /* Get the base source buffer offset for this iterataion of the target agg in the round.
- * For the first one just get the predetermined base value, for subsequent ones keep track
- * of the value for each iteration and increment it.
- */
- if (offsetIter == startingOffLenIndex) {
- if (bufTypeIsContig)
- currentBaseSourceBufferOffset = baseSourceBufferOffset[roundIter][aggIter];
- else {
- currentIndiceOffset = baseNonContigSourceBufferOffset[roundIter][aggIter].indiceOffset;
- currentDataTypeExtent = baseNonContigSourceBufferOffset[roundIter][aggIter].dataTypeExtent;
- currentFlatBufIndice = baseNonContigSourceBufferOffset[roundIter][aggIter].flatBufIndice;
- }
- }
- else {
- if (bufTypeIsContig)
- currentBaseSourceBufferOffset += len_list[offsetIter-1];
- else {
-
- /* For non-contiguous source datatype advance the flattened buffer machinery to this offset.
- * Note that currentDataTypeExtent, currentFlatBufIndice and currentIndiceOffset are used and
- * advanced across the offsetIters.
- */
- ADIO_Offset sourceBlockTotal = 0;
- int lastIndiceUsed = currentFlatBufIndice;
- while (sourceBlockTotal < len_list[offsetIter-1]) {
- sourceBlockTotal += (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset);
- lastIndiceUsed = currentFlatBufIndice;
- currentFlatBufIndice++;
- if (currentFlatBufIndice == flatBuf->count) {
- currentFlatBufIndice = 0;
- currentDataTypeExtent++;
- }
- currentIndiceOffset = 0;
- } // while
- if (sourceBlockTotal > len_list[offsetIter-1]) {
- currentFlatBufIndice--;
- if (currentFlatBufIndice < 0 ) {
- currentDataTypeExtent--;
- currentFlatBufIndice = flatBuf->count-1;
- }
- currentIndiceOffset = len_list[offsetIter-1] - (sourceBlockTotal - flatBuf->blocklens[lastIndiceUsed]);
- ADIOI_Assert((currentIndiceOffset >= 0) && (currentIndiceOffset < flatBuf->blocklens[currentFlatBufIndice]));
- }
- else
- currentIndiceOffset = 0;
-#ifdef onesidedtrace
- printf("offsetIter %d contig_access_count target agg %d currentFlatBufIndice is now %d currentDataTypeExtent is now %d currentIndiceOffset is now %ld\n",offsetIter,aggIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset);
-#endif
- }
- }
-
- /* For source buffer contiguous here is the offset we will use for this iteration.
- */
- if (bufTypeIsContig)
- sourceBufferOffset = currentBaseSourceBufferOffset;
-
#ifdef onesidedtrace
- printf("roundIter %d target iter %d targetAggsForMyData is %d offset_list[%d] is %ld len_list[%d] is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld targetAggsForMyDataFirstOffLenIndex is %ld baseSourceBufferOffset is %ld\n",
+ printf("roundIter %d target iter %d targetAggsForMyData is %d offset_list[%d] is %ld len_list[%d] is %ld targetAggsForMyDataFDStart is %ld targetAggsForMyDataFDEnd is %ld currentRoundFDStartForMyTargetAgg is %ld currentRoundFDEndForMyTargetAgg is %ld targetAggsForMyDataFirstOffLenIndex is %ld\n",
roundIter,aggIter,targetAggsForMyData[aggIter],offsetIter,offset_list[offsetIter],offsetIter,len_list[offsetIter],
targetAggsForMyDataFDStart[aggIter],targetAggsForMyDataFDEnd[aggIter],
- currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg, targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter], baseSourceBufferOffset[roundIter][aggIter]);
+ currentRoundFDStartForMyTargetAgg,currentRoundFDEndForMyTargetAgg, targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter]);
#endif
/* Determine the amount of data and exact source buffer offsets to use.
@@ -707,15 +901,11 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
else
bufferAmountToSend = (offsetEnd - currentRoundFDStartForMyTargetAgg) +1;
if (offsetStart < currentRoundFDStartForMyTargetAgg) {
- if (bufTypeIsContig)
- sourceBufferOffset += (currentRoundFDStartForMyTargetAgg-offsetStart);
offsetStart = currentRoundFDStartForMyTargetAgg;
}
}
else if ((offsetStart <= currentRoundFDStartForMyTargetAgg) && (offsetEnd >= currentRoundFDEndForMyTargetAgg)) {
bufferAmountToSend = (currentRoundFDEndForMyTargetAgg - currentRoundFDStartForMyTargetAgg) +1;
- if (bufTypeIsContig)
- sourceBufferOffset += (currentRoundFDStartForMyTargetAgg-offsetStart);
offsetStart = currentRoundFDStartForMyTargetAgg;
}
@@ -724,7 +914,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
printf("bufferAmountToSend is %d\n",bufferAmountToSend);
#endif
if (bufferAmountToSend > 0) { /* we have data to send this round */
- if (gpfsmpio_aggmethod == 2) {
+ if (gpfsmpio_write_aggmethod == 2) {
/* Only allocate these arrays if we are using method 2 and only do it once for this round/target agg.
*/
if (!allocatedDerivedTypeArrays) {
@@ -732,151 +922,88 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
targetAggDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
sourceBufferDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
targetAggDataTypes = (MPI_Datatype *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Datatype));
- allocatedDerivedTypeArrays = 1;
- }
- }
- /* For the non-contiguous source datatype we need to determine the number,size and offsets of the chunks
- * from the source buffer to be sent to this contiguous chunk defined by the round/agg/iter in the target.
- */
- int numNonContigSourceChunks = 0;
- ADIO_Offset baseDatatypeInstanceOffset = 0;
-
- if (!bufTypeIsContig) {
+ if (!bufTypeIsContig) {
+ int k;
+ for (k=targetAggsForMyDataFirstOffLenIndex[roundIter][aggIter];k<=targetAggsForMyDataLastOffLenIndex[roundIter][aggIter];k++)
+ amountOfDataWrittenThisRoundAgg += len_list[k];
- currentSourceBufferOffset = (ADIO_Offset)((ADIO_Offset)currentDataTypeExtent * (ADIO_Offset)bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset;
#ifdef onesidedtrace
- printf("!bufTypeIsContig currentSourceBufferOffset set to %ld for roundIter %d target %d currentDataTypeExtent %d flatBuf->indices[currentFlatBufIndice] %ld currentIndiceOffset %ld currentFlatBufIndice %d\n",currentSourceBufferOffset,roundIter,aggIter,currentDataTypeExtent,flatBuf->indices[currentFlatBufIndice],currentIndiceOffset,currentFlatBufIndice);
+ printf("derivedTypePackedSourceBuffer mallocing %ld\n",amountOfDataWrittenThisRoundAgg);
#endif
-
- /* Use a tmp variable for the currentFlatBufIndice and currentIndiceOffset as they are used across the offsetIters
- * to compute the starting point for this iteration and will be modified now to compute the data chunks for
- * this iteration.
- */
- int tmpFlatBufIndice = currentFlatBufIndice;
- ADIO_Offset tmpIndiceOffset = currentIndiceOffset;
-
- /* now populate the nonContigSourceOffsets and nonContigSourceLens arrays for use in the one-sided operations.
- */
- int ncArrayIndex = 0;
- int remainingBytesToLoadedIntoNCArrays = bufferAmountToSend;
- int datatypeInstances = currentDataTypeExtent;
- while (remainingBytesToLoadedIntoNCArrays > 0) {
- ADIOI_Assert(ncArrayIndex < (maxNumNonContigSourceChunks+2));
- nonContigSourceOffsets[ncArrayIndex] = currentSourceBufferOffset;
-
- if ((flatBuf->blocklens[tmpFlatBufIndice] - tmpIndiceOffset) >= remainingBytesToLoadedIntoNCArrays) {
- nonContigSourceLens[ncArrayIndex] = remainingBytesToLoadedIntoNCArrays;
- remainingBytesToLoadedIntoNCArrays = 0;
+ if (amountOfDataWrittenThisRoundAgg > 0)
+ derivedTypePackedSourceBuffer = (char *)ADIOI_Malloc(amountOfDataWrittenThisRoundAgg * sizeof(char));
+ else
+ derivedTypePackedSourceBuffer = NULL;
}
- else {
- nonContigSourceLens[ncArrayIndex] = (int)(flatBuf->blocklens[tmpFlatBufIndice] - tmpIndiceOffset);
- remainingBytesToLoadedIntoNCArrays -= (flatBuf->blocklens[tmpFlatBufIndice] - tmpIndiceOffset);
- tmpIndiceOffset = 0;
- tmpFlatBufIndice++;
- if (tmpFlatBufIndice == flatBuf->count) {
- tmpFlatBufIndice = 0;
- datatypeInstances++;
- baseDatatypeInstanceOffset = datatypeInstances * bufTypeExtent;
- }
- currentSourceBufferOffset = baseDatatypeInstanceOffset + flatBuf->indices[tmpFlatBufIndice];
- }
-#ifdef onesidedtrace
- printf("currentSourceBufferOffset set to %ld off of baseDatatypeInstanceOffset of %ld + tmpFlatBufIndice %d with value %ld\n ncArrayIndex is %d nonContigSourceOffsets[ncArrayIndex] is %ld nonContigSourceLens[ncArrayIndex] is %ld\n",currentSourceBufferOffset,baseDatatypeInstanceOffset, tmpFlatBufIndice, flatBuf->indices[tmpFlatBufIndice],ncArrayIndex,nonContigSourceOffsets[ncArrayIndex],nonContigSourceLens[ncArrayIndex]);
-#endif
- ncArrayIndex++;
- numNonContigSourceChunks++;
- } // while
-
- } // !bufTypeIsContig
+ allocatedDerivedTypeArrays = 1;
+ }
+ }
/* Determine the offset into the target window.
*/
- MPI_Aint targetDisplacementToUseThisRound = (MPI_Aint) ((ADIO_Offset)offsetStart - currentRoundFDStartForMyTargetAgg);
+ MPI_Aint targetDisplacementToUseThisRound = (MPI_Aint) (offsetStart - currentRoundFDStartForMyTargetAgg);
/* If using the thread writer select the appropriate side of the split window.
*/
if (useIOBuffer && (write_buf == write_buf1)) {
- targetDisplacementToUseThisRound += coll_bufsize;
+ targetDisplacementToUseThisRound += (MPI_Aint) coll_bufsize;
}
- /* For gpfsmpio_aggmethod of 1 do the mpi_put using the primitive MPI_BYTE type on each contiguous chunk of source data.
+ /* For gpfsmpio_write_aggmethod of 1 do the mpi_put using the primitive MPI_BYTE type for each contiguous
+ * chunk in the target, of source data is non-contiguous then pack the data first.
*/
- if (gpfsmpio_aggmethod == 1) {
-#ifndef ACTIVE_TARGET
+
+ if (gpfsmpio_write_aggmethod == 1) {
MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], 0, write_buf_window);
-#endif
if (bufTypeIsContig) {
- MPI_Put(((char*)buf) + sourceBufferOffset,bufferAmountToSend, MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, bufferAmountToSend,MPI_BYTE,write_buf_window);
+ MPI_Put(((char*)buf) + currentFDSourceBufferState[aggIter].sourceBufferOffset,bufferAmountToSend, MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, bufferAmountToSend,MPI_BYTE,write_buf_window);
+ currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToSend;
}
else {
- for (i=0;i<numNonContigSourceChunks;i++) {
- MPI_Put(((char*)buf) + nonContigSourceOffsets[i],nonContigSourceLens[i], MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, nonContigSourceLens[i],MPI_BYTE,write_buf_window);
-
-#ifdef onesidedtrace
- printf("mpi_put[%d] nonContigSourceOffsets is %d of nonContigSourceLens %d to target disp %d first int of data: %d\n ",i,nonContigSourceOffsets[i],nonContigSourceLens[i],targetDisplacementToUseThisRound, ((int*)(((char*)buf) + nonContigSourceOffsets[i]))[0]);
-#endif
- targetDisplacementToUseThisRound += nonContigSourceLens[i];
- }
+ char *putSourceData = (char *) ADIOI_Malloc(bufferAmountToSend*sizeof(char));
+ nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, bufferAmountToSend, 1, ¤tFDSourceBufferState[aggIter], putSourceData);
+ MPI_Put(putSourceData,bufferAmountToSend, MPI_BYTE,targetAggsForMyData[aggIter],targetDisplacementToUseThisRound, bufferAmountToSend,MPI_BYTE,write_buf_window);
+ ADIOI_Free(putSourceData);
}
-#ifndef ACTIVE_TARGET
MPI_Win_unlock(targetAggsForMyData[aggIter], write_buf_window);
-#endif
}
- /* For gpfsmpio_aggmethod of 2 populate the data structures for this round/agg for this offset iter
+ /* For gpfsmpio_write_aggmethod of 2 populate the data structures for this round/agg for this offset iter
* to be used subsequently when building the derived type for 1 mpi_put for all the data for this
* round/agg.
*/
- else if (gpfsmpio_aggmethod == 2) {
+ else if (gpfsmpio_write_aggmethod == 2) {
if (bufTypeIsContig) {
targetAggBlockLengths[targetAggContigAccessCount]= bufferAmountToSend;
targetAggDataTypes[targetAggContigAccessCount] = MPI_BYTE;
targetAggDisplacements[targetAggContigAccessCount] = targetDisplacementToUseThisRound;
- sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)sourceBufferOffset;
+ sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)currentFDSourceBufferState[aggIter].sourceBufferOffset;
+ currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToSend;
targetAggContigAccessCount++;
}
else {
- for (i=0;i<numNonContigSourceChunks;i++) {
- if (nonContigSourceLens[i] > 0) {
- targetAggBlockLengths[targetAggContigAccessCount]= nonContigSourceLens[i];
- targetAggDataTypes[targetAggContigAccessCount] = MPI_BYTE;
- targetAggDisplacements[targetAggContigAccessCount] = targetDisplacementToUseThisRound;
- sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)nonContigSourceOffsets[i];
-#ifdef onesidedtrace
- printf("mpi_put building arrays for iter %d - sourceBufferDisplacements is %d of nonContigSourceLens %d to target disp %d targetAggContigAccessCount is %d targetAggBlockLengths[targetAggContigAccessCount] is %d\n",i,sourceBufferDisplacements[targetAggContigAccessCount],nonContigSourceLens[i],targetAggDisplacements[targetAggContigAccessCount],targetAggContigAccessCount, targetAggBlockLengths[targetAggContigAccessCount]);
-#endif
- targetAggContigAccessCount++;
- targetDisplacementToUseThisRound += nonContigSourceLens[i];
- }
- }
+ nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, bufferAmountToSend, 1, ¤tFDSourceBufferState[aggIter], &derivedTypePackedSourceBuffer[derivedTypePackedSourceBufferOffset]);
+ targetAggBlockLengths[targetAggContigAccessCount]= bufferAmountToSend;
+ targetAggDataTypes[targetAggContigAccessCount] = MPI_BYTE;
+ targetAggDisplacements[targetAggContigAccessCount] = targetDisplacementToUseThisRound;
+ sourceBufferDisplacements[targetAggContigAccessCount] = (MPI_Aint)derivedTypePackedSourceBufferOffset;
+ targetAggContigAccessCount++;
+ derivedTypePackedSourceBufferOffset += (ADIO_Offset)bufferAmountToSend;
}
}
#ifdef onesidedtrace
- printf("roundIter %d bufferAmountToSend is %d sourceBufferOffset is %d offsetStart is %ld currentRoundFDStartForMyTargetAgg is %ld targetDisplacementToUseThisRound is %ld targetAggsForMyDataFDStart[aggIter] is %ld\n",roundIter, bufferAmountToSend,sourceBufferOffset, offsetStart,currentRoundFDStartForMyTargetAgg,targetDisplacementToUseThisRound,targetAggsForMyDataFDStart[aggIter]);
+ printf("roundIter %d bufferAmountToSend is %d offsetStart is %ld currentRoundFDStartForMyTargetAgg is %ld targetDisplacementToUseThisRound is %ld targetAggsForMyDataFDStart[aggIter] is %ld\n",roundIter, bufferAmountToSend, offsetStart,currentRoundFDStartForMyTargetAgg,targetDisplacementToUseThisRound,targetAggsForMyDataFDStart[aggIter]);
#endif
} // bufferAmountToSend > 0
} // contig list
- /* For gpfsmpio_aggmethod of 2 now build the derived type using the data from this round/agg and do 1 single mpi_put.
+ /* For gpfsmpio_write_aggmethod of 2 now build the derived type using the data from this round/agg and do 1 single mpi_put.
*/
- if (gpfsmpio_aggmethod == 2) {
+ if (gpfsmpio_write_aggmethod == 2) {
+
MPI_Datatype sourceBufferDerivedDataType, targetBufferDerivedDataType;
- if (targetAggContigAccessCount > 0) {
- /* Rebase source buffer offsets to 0 if there are any negative offsets for safety
- * when iteracting with PAMI.
- */
- MPI_Aint lowestDisplacement = 0;
- for (i=0;i<targetAggContigAccessCount;i++) {
- if (sourceBufferDisplacements[i] < lowestDisplacement)
- lowestDisplacement = sourceBufferDisplacements[i];
- }
- if (lowestDisplacement < 0) {
- lowestDisplacement *= -1;
- for (i=0;i<targetAggContigAccessCount;i++)
- sourceBufferDisplacements[i] += lowestDisplacement;
- }
MPI_Type_create_struct(targetAggContigAccessCount, targetAggBlockLengths, sourceBufferDisplacements, targetAggDataTypes, &sourceBufferDerivedDataType);
MPI_Type_commit(&sourceBufferDerivedDataType);
MPI_Type_create_struct(targetAggContigAccessCount, targetAggBlockLengths, targetAggDisplacements, targetAggDataTypes, &targetBufferDerivedDataType);
@@ -885,22 +1012,27 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
#ifdef onesidedtrace
printf("mpi_put of derived type to agg %d targetAggContigAccessCount is %d\n",targetAggsForMyData[aggIter],targetAggContigAccessCount);
#endif
-#ifndef ACTIVE_TARGET
+ if (targetAggContigAccessCount > 0) {
MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], 0, write_buf_window);
-#endif
+ if (bufTypeIsContig) {
+ MPI_Put(((char*)buf),1, sourceBufferDerivedDataType,targetAggsForMyData[aggIter],0, 1,targetBufferDerivedDataType,write_buf_window);
+ }
+ else {
+ MPI_Put(derivedTypePackedSourceBuffer,1, sourceBufferDerivedDataType,targetAggsForMyData[aggIter],0, 1,targetBufferDerivedDataType,write_buf_window);
+ }
- MPI_Put((((char*)buf) - lowestDisplacement),1, sourceBufferDerivedDataType,targetAggsForMyData[aggIter],0, 1,targetBufferDerivedDataType,write_buf_window);
-#ifndef ACTIVE_TARGET
MPI_Win_unlock(targetAggsForMyData[aggIter], write_buf_window);
-#endif
-
}
+
if (allocatedDerivedTypeArrays) {
ADIOI_Free(targetAggBlockLengths);
ADIOI_Free(targetAggDisplacements);
ADIOI_Free(targetAggDataTypes);
ADIOI_Free(sourceBufferDisplacements);
+ if (!bufTypeIsContig)
+ if (derivedTypePackedSourceBuffer != NULL)
+ ADIOI_Free(derivedTypePackedSourceBuffer);
}
if (targetAggContigAccessCount > 0) {
MPI_Type_free(&sourceBufferDerivedDataType);
@@ -908,27 +1040,18 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
}
}
if (!gpfsmpio_onesided_no_rmw) {
-#ifndef ACTIVE_TARGET
MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], 0, fd->io_buf_put_amounts_window);
-#endif
MPI_Put(&numBytesPutThisAggRound,1, MPI_INT,targetAggsForMyData[aggIter],myrank, 1,MPI_INT,fd->io_buf_put_amounts_window);
-
-#ifndef ACTIVE_TARGET
MPI_Win_unlock(targetAggsForMyData[aggIter], fd->io_buf_put_amounts_window);
-#endif
}
} // baseoffset != -1
} // target aggs
+ } /// contig_access_count > 0
- /* the source procs send the requested data to the aggs */
-
-#ifdef ACTIVE_TARGET
- MPI_Win_fence(0, write_buf_window);
- if (!gpfsmpio_onesided_no_rmw)
- MPI_Win_fence(0, fd->io_buf_put_amounts_window);
-#else
- MPI_Barrier(fd->comm);
+#ifdef onesidedtrace
+printf("first barrier roundIter %d\n",roundIter);
#endif
+ MPI_Barrier(fd->comm);
if (iAmUsedAgg) {
/* Determine what offsets define the portion of the file domain the agg is writing this round.
@@ -944,10 +1067,10 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
currentRoundFDEnd = fd_end[myAggRank];
}
else
- currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
+ currentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
#ifdef onesidedtrace
- printf("currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domeain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]);
+ printf("used agg about to writecontig - currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domeain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]);
#endif
int doWriteContig = 1;
@@ -1017,18 +1140,41 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
}
}
- if (iAmUsedAgg)
- currentRoundFDStart += coll_bufsize;
+ if (iAmUsedAgg) {
+ currentRoundFDStart += coll_bufsize;
- if (roundIter<(numberOfRounds-1))
+ if (gpfsmpio_onesided_always_rmw && (roundIter<(numberOfRounds-1))) { // read in the buffer for the next round unless this is the last round
+ ADIO_Offset tmpCurrentRoundFDEnd = 0;
+ if ((fd_end[myAggRank] - currentRoundFDStart) < coll_bufsize) {
+ if (myAggRank == greatestFileDomainAggRank) {
+ if (fd_end[myAggRank] > lastFileOffset)
+ tmpCurrentRoundFDEnd = lastFileOffset;
+ else
+ tmpCurrentRoundFDEnd = fd_end[myAggRank];
+ }
+ else
+ tmpCurrentRoundFDEnd = fd_end[myAggRank];
+ }
+ else
+ tmpCurrentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
+#ifdef onesidedtrace
+printf("gpfsmpio_onesided_always_rmw - round %d buffer pre-read for file offsets %ld to %ld total is %d\n",roundIter, currentRoundFDStart,tmpCurrentRoundFDEnd,(int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1);
+#endif
+ ADIO_ReadContig(fd, write_buf, (int)(tmpCurrentRoundFDEnd - currentRoundFDStart)+1,
+ MPI_BYTE, ADIO_EXPLICIT_OFFSET,currentRoundFDStart, &status, error_code);
+
+ }
+ }
+
+ if (roundIter<(numberOfRounds-1)) {
+#ifdef onesidedtrace
+printf("second barrier roundIter %d\n",roundIter);
+#endif
MPI_Barrier(fd->comm);
+ }
} /* for-loop roundIter */
- if (!bufTypeIsContig) {
- ADIOI_Free(nonContigSourceOffsets);
- ADIOI_Free(nonContigSourceLens);
- }
#ifdef ROMIO_GPFS
endTimeBase = MPI_Wtime();
@@ -1044,6 +1190,9 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
}
+#ifdef onesidedtrace
+printf("freeing datastructures\n");
+#endif
ADIOI_Free(targetAggsForMyData);
ADIOI_Free(targetAggsForMyDataFDStart);
ADIOI_Free(targetAggsForMyDataFDEnd);
@@ -1051,17 +1200,11 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
for (i=0;i<numberOfRounds;i++) {
ADIOI_Free(targetAggsForMyDataFirstOffLenIndex[i]);
ADIOI_Free(targetAggsForMyDataLastOffLenIndex[i]);
- if (bufTypeIsContig)
- ADIOI_Free(baseSourceBufferOffset[i]);
- else
- ADIOI_Free(baseNonContigSourceBufferOffset[i]);
}
ADIOI_Free(targetAggsForMyDataFirstOffLenIndex);
ADIOI_Free(targetAggsForMyDataLastOffLenIndex);
- if (bufTypeIsContig)
- ADIOI_Free(baseSourceBufferOffset);
- else
- ADIOI_Free(baseNonContigSourceBufferOffset);
+
+ ADIOI_Free(currentFDSourceBufferState);
if (!bufTypeIsContig)
ADIOI_Delete_flattened(datatype);
@@ -1078,9 +1221,26 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
int *error_code,
ADIO_Offset *st_offsets,
ADIO_Offset *end_offsets,
+ int numNonZeroDataOffsets,
ADIO_Offset *fd_start,
ADIO_Offset* fd_end)
{
+ int i,j; /* generic iterators */
+
+#ifdef onesidedtrace
+ if (buf == NULL) {
+ printf("ADIOI_OneSidedWriteAggregation - buf is NULL contig_access_count is %d\n",contig_access_count);
+ for (i=0;i<contig_access_count;i++)
+ printf("offset_list[%d] is %ld len_list[%d] is %ld\n",i,offset_list[i],i,len_list[i]);
+ }
+ if (contig_access_count < 0)
+ printf("ADIOI_OneSidedWriteAggregation - contig_access_count of %d is less than 0\n",contig_access_count);
+#endif
+
+ int lenListOverZero = 0;
+ for (i=0;((i<contig_access_count) && (!lenListOverZero));i++)
+ if (len_list[i] > 0)
+ lenListOverZero = 1;
*error_code = MPI_SUCCESS; /* initialize to success */
@@ -1093,12 +1253,15 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
pthread_t io_thread;
void *thread_ret;
ADIOI_IO_ThreadFuncData io_thread_args;
- int i,j; /* generic iterators */
int nprocs,myrank;
MPI_Comm_size(fd->comm, &nprocs);
MPI_Comm_rank(fd->comm, &myrank);
+#ifdef onesidedtrace
+printf("ADIOI_OneSidedReadAggregation started on rank %d\n",myrank);
+#endif
+
if (fd->io_buf_window == MPI_WIN_NULL ||
fd->io_buf_put_amounts_window == MPI_WIN_NULL)
{
@@ -1116,41 +1279,78 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
ADIOI_Flatlist_node *flatBuf=NULL;
ADIOI_Datatype_iscontig(datatype, &bufTypeIsContig);
- /* maxNumContigOperations keeps track of how many different chunks we will need to recv
- * for the purpose of pre-allocating the data structures to hold them.
- */
- int maxNumContigOperations = contig_access_count;
-
if (!bufTypeIsContig) {
/* Flatten the non-contiguous source datatype.
*/
flatBuf = ADIOI_Flatten_and_find(datatype);
MPI_Type_extent(datatype, &bufTypeExtent);
+#ifdef onesidedtrace
+ printf("flatBuf->count is %d bufTypeExtent is %d\n", flatBuf->count,bufTypeExtent);
+ for (i=0;i<flatBuf->count;i++)
+ printf("flatBuf->blocklens[%d] is %d flatBuf->indices[%d] is %ld\n",i,flatBuf->blocklens[i],i,flatBuf->indices[i]);
+#endif
}
#ifdef onesidedtrace
printf("ADIOI_OneSidedReadAggregation bufTypeIsContig is %d contig_access_count is %d\n",bufTypeIsContig,contig_access_count);
#endif
+ int naggs = fd->hints->cb_nodes;
+
+ /* Track the state of the source buffer for feeding the target data blocks.
+ * For GPFS the number of file domains per agg is always 1 so we just need 1 agg
+ * dimension to track the data, in the case of lustre we will need 2 dimensions
+ * agg and file domain since aggs write to multiple file domains in the
+ * case of lustre.
+ * This structure will be modified as the data is written to reflect the
+ * current state of the offset.
+ */
+
+#ifdef onesidedtrace
+ printf("sizeof(FDSourceBufferState) is %d - make sure is 32 for 32-byte memalign optimal\n",sizeof(FDSourceBufferState));
+#endif
+ FDSourceBufferState *currentFDSourceBufferState;
+
+ currentFDSourceBufferState = (FDSourceBufferState *) ADIOI_Malloc(naggs * sizeof(FDSourceBufferState));
+ for (i=0;i<naggs;i++) {
+ /* initialize based on the bufType to indicate that it is unset.
+ */
+ if (bufTypeIsContig) {
+ currentFDSourceBufferState[i].sourceBufferOffset = -1;
+ }
+ else {
+ currentFDSourceBufferState[i].indiceOffset = -1;
+ }
+ }
+
+#ifdef onesidedtrace
+ printf(" ADIOI_OneSidedReadAggregation bufTypeIsContig is %d contig_access_count is %d\n",bufTypeIsContig,contig_access_count);
+#endif
+
+ /* maxNumContigOperations keeps track of how many different chunks we will
+ * need to recv for the purpose of pre-allocating the data structures to
+ * hold them.
+ */
+ int maxNumContigOperations = contig_access_count;
+
+
ADIO_Offset lastFileOffset = 0, firstFileOffset = -1;
/* Get the total range being read.
*/
- for (j=0;j<nprocs;j++) {
- if (end_offsets[j] > st_offsets[j]) {
- /* Guard against ranks with empty data.
- */
+ for (j=0;j<numNonZeroDataOffsets;j++) {
+#ifdef onesidedtrace
+printf("end_offsets[%d] is %ld st_offsets[%d] is %ld\n",j,end_offsets[j],j,st_offsets[j]);
+#endif
lastFileOffset = ADIOI_MAX(lastFileOffset,end_offsets[j]);
if (firstFileOffset == -1)
firstFileOffset = st_offsets[j];
else
firstFileOffset = ADIOI_MIN(firstFileOffset,st_offsets[j]);
- }
}
int myAggRank = -1; /* if I am an aggregor this is my index into fd->hints->ranklist */
int iAmUsedAgg = 0; /* whether or not this rank is used as an aggregator. */
- int naggs = fd->hints->cb_nodes;
int coll_bufsize = fd->hints->cb_buffer_size;
#ifdef ROMIO_GPFS
if (gpfsmpio_pthreadio == 1) {
@@ -1189,20 +1389,26 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
}
#endif
+ /* Compute number of rounds.
+ */
+ int numberOfRounds = 0;
+ for (j=0;j<naggs;j++) {
+ int currentNumberOfRounds = (int)(((fd_end[j] - fd_start[j])+(ADIO_Offset)1)/coll_bufsize);
+ if (((ADIO_Offset)currentNumberOfRounds*coll_bufsize) < ((fd_end[j] - fd_start[j])+(ADIO_Offset)1))
+ currentNumberOfRounds++;
+ if (currentNumberOfRounds > numberOfRounds)
+ numberOfRounds = currentNumberOfRounds;
+ }
- /* for my offset range determine how much data and from whom I need to get
- * it. For source ag sources, also determine the source file domain
- * offsets locally to reduce communication overhead */
- int *sourceAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
+ /* Data structures to track what data this compute needs to receive from whom.
+ * For lustre they will all need another dimension for the file domain.
+ */ int *sourceAggsForMyData = (int *)ADIOI_Malloc(naggs * sizeof(int));
ADIO_Offset *sourceAggsForMyDataFDStart = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
ADIO_Offset *sourceAggsForMyDataFDEnd = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
int numSourceAggs = 0;
- /* compute number of rounds */
- ADIO_Offset numberOfRounds = (ADIO_Offset)((((ADIO_Offset)(end_offsets[nprocs-1]-st_offsets[0]))/((ADIO_Offset)((ADIO_Offset)coll_bufsize*(ADIO_Offset)naggs)))) + 1;
-
/* This data structure holds the beginning offset and len list index for the range to be read
- * coresponding to the round and source agg.
+ * coresponding to the round and source agg. Initialize to -1 to denote being unset.
*/
int **sourceAggsForMyDataFirstOffLenIndex = (int **)ADIOI_Malloc(numberOfRounds * sizeof(int *));
for (i=0;i<numberOfRounds;i++) {
@@ -1218,40 +1424,6 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
for (i=0;i<numberOfRounds;i++)
sourceAggsForMyDataLastOffLenIndex[i] = (int *)ADIOI_Malloc(naggs * sizeof(int));
- /* This data structure holds the base buffer offset for contiguous source data for the range to be read
- * coresponding to the round and source agg.
- */
- ADIO_Offset **baseRecvBufferOffset;
- if (bufTypeIsContig) {
- baseRecvBufferOffset = (ADIO_Offset **)ADIOI_Malloc(numberOfRounds * sizeof(ADIO_Offset *));
- for (i=0;i<numberOfRounds;i++) {
- baseRecvBufferOffset[i] = (ADIO_Offset *)ADIOI_Malloc(naggs * sizeof(ADIO_Offset));
- for (j=0;j<naggs;j++)
- baseRecvBufferOffset[i][j] = -1;
- }
- }
- ADIO_Offset currentRecvBufferOffset = 0;
-
- /* This data structure holds the number of extents, the index into the flattened buffer and the remnant length
- * beyond the flattened buffer indice corresponding to the base buffer offset for non-contiguous source data
- * for the range to be written coresponding to the round and target agg.
- */
- NonContigSourceBufOffset **baseNonContigSourceBufferOffset;
- if (!bufTypeIsContig) {
- baseNonContigSourceBufferOffset = (NonContigSourceBufOffset **) ADIOI_Malloc(numberOfRounds * sizeof(NonContigSourceBufOffset *));
- for (i=0;i<numberOfRounds;i++) {
- baseNonContigSourceBufferOffset[i] = (NonContigSourceBufOffset *)ADIOI_Malloc(naggs * sizeof(NonContigSourceBufOffset));
- /* initialize flatBufIndice to -1 to indicate that it is unset.
- */
- for (j=0;j<naggs;j++)
- baseNonContigSourceBufferOffset[i][j].flatBufIndice = -1;
- }
- }
-
- int currentDataTypeExtent = 0;
- int currentFlatBufIndice=0;
- ADIO_Offset currentIndiceOffset = 0;
-
#ifdef onesidedtrace
printf("NumberOfRounds is %d\n",numberOfRounds);
for (i=0;i<naggs;i++)
@@ -1263,6 +1435,11 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
int currentAggRankListIndex = 0;
int maxNumNonContigSourceChunks = 0;
+ ADIO_Offset currentRecvBufferOffset = 0;
+ int currentDataTypeExtent = 0;
+ int currentFlatBufIndice=0;
+ ADIO_Offset currentIndiceOffset = 0;
+
/* This denotes the coll_bufsize boundaries within the source buffer for reading for 1 round.
*/
ADIO_Offset intraRoundCollBufsizeOffset = 0;
@@ -1274,8 +1451,11 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
sourceAggsForMyDataCurrentRoundIter[i] = 0;
/* This is the first of the two main loops in this algorithm. The purpose of this loop is essentially to populate
- * the data structures defined above for what source data blocks needs to go where (source agg) and when (round iter).
+ * the data structures defined above for what read data blocks needs to go where (source agg and file domain) and when
+ * (round iter). For lustre essentially an additional layer of nesting will be required for the multiple file domains
+ * within the source agg.
*/
+ if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero) {
int blockIter;
for (blockIter=0;blockIter<contig_access_count;blockIter++) {
@@ -1290,7 +1470,7 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
* in the blocks.
*/
ADIO_Offset sourceBlockTotal = 0;
- int lastIndiceUsed;
+ int lastIndiceUsed = currentFlatBufIndice;
int numNonContigSourceChunks = 0;
while (sourceBlockTotal < len_list[blockIter-1]) {
numNonContigSourceChunks++;
@@ -1301,7 +1481,7 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
currentFlatBufIndice = 0;
currentDataTypeExtent++;
}
- currentIndiceOffset = 0;
+ currentIndiceOffset = (ADIO_Offset)0;
}
if (sourceBlockTotal > len_list[blockIter-1]) {
currentFlatBufIndice--;
@@ -1310,34 +1490,39 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
currentFlatBufIndice = flatBuf->count-1;
}
currentIndiceOffset = len_list[blockIter-1] - (sourceBlockTotal - flatBuf->blocklens[lastIndiceUsed]);
- ADIOI_Assert((currentIndiceOffset >= 0) && (currentIndiceOffset < flatBuf->blocklens[currentFlatBufIndice]));
+ // ADIOI_Assert((currentIndiceOffset >= 0) && (currentIndiceOffset < flatBuf->blocklens[currentFlatBufIndice]));
}
else
currentIndiceOffset = 0;
- maxNumContigOperations += numNonContigSourceChunks;
+ maxNumContigOperations += (numNonContigSourceChunks+2);
if (numNonContigSourceChunks > maxNumNonContigSourceChunks)
maxNumNonContigSourceChunks = numNonContigSourceChunks;
+
#ifdef onesidedtrace
printf("block iter %d currentFlatBufIndice is now %d currentDataTypeExtent is now %d currentIndiceOffset is now %ld maxNumContigOperations is now %d\n",blockIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset,maxNumContigOperations);
#endif
} // !bufTypeIsContig
} // blockIter > 0
- /* For the first iteration we need to include these maxNumContigOperations and maxNumNonContigSourceChunks
- * for non-contig case even though we did not need to compute the starting offset.
+ /* For the last iteration we need to include these maxNumContigOperations and maxNumNonContigSourceChunks
+ * for non-contig case even though we did not need to compute the next starting offset.
*/
- if ((blockIter == 0) && (!bufTypeIsContig)) {
+ if ((blockIter == (contig_access_count-1)) && (!bufTypeIsContig)) {
ADIO_Offset sourceBlockTotal = 0;
int tmpCurrentFlatBufIndice = currentFlatBufIndice;
- while (sourceBlockTotal < len_list[0]) {
- maxNumContigOperations++;
- maxNumNonContigSourceChunks++;
+ int lastNumNonContigSourceChunks = 0;
+ while (sourceBlockTotal < len_list[blockIter]) {
+ lastNumNonContigSourceChunks++;
sourceBlockTotal += flatBuf->blocklens[tmpCurrentFlatBufIndice];
tmpCurrentFlatBufIndice++;
if (tmpCurrentFlatBufIndice == flatBuf->count) {
tmpCurrentFlatBufIndice = 0;
}
}
+ maxNumContigOperations += (lastNumNonContigSourceChunks+2);
+ if (lastNumNonContigSourceChunks > maxNumNonContigSourceChunks)
+ maxNumNonContigSourceChunks = lastNumNonContigSourceChunks;
+
}
ADIO_Offset blockStart = offset_list[blockIter], blockEnd = offset_list[blockIter]+len_list[blockIter]-(ADIO_Offset)1;
@@ -1388,15 +1573,30 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset;
}
sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
- if (bufTypeIsContig)
- baseRecvBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = currentRecvBufferOffset;
- else {
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].indiceOffset = currentIndiceOffset;
- }
- intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((sourceAggsForMyDataCurrentRoundIter[numSourceAggs]+1) * coll_bufsize);
+ /* Set the source buffer state starting point for data access for this agg and file domain.
+ */
+ if (bufTypeIsContig) {
+ if (currentFDSourceBufferState[numSourceAggs].sourceBufferOffset == -1) {
+
+ currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentRecvBufferOffset;
+#ifdef onesidedtrace
+ printf("For agg %d sourceBufferOffset initialized to %ld\n",currentAggRankListIndex,currentRecvBufferOffset);
+#endif
+ }
+ }
+ else {
+ if (currentFDSourceBufferState[numSourceAggs].indiceOffset == -1) {
+ currentFDSourceBufferState[numSourceAggs].indiceOffset = currentIndiceOffset;
+ currentFDSourceBufferState[numSourceAggs].bufTypeExtent = bufTypeExtent;
+ currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentDataTypeExtent;
+ currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFlatBufIndice;
+#ifdef onesidedtrace
+ printf("For agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld\n",numSourceAggs,currentDataTypeExtent,currentFlatBufIndice,currentIndiceOffset);
+#endif
+ }
+ }
+ intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + ((ADIO_Offset)(sourceAggsForMyDataCurrentRoundIter[numSourceAggs]+1) * coll_bufsize);
#ifdef onesidedtrace
printf("init settings numSourceAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld currentRecvBufferOffset set to %ld intraRoundCollBufsizeOffset set to %ld\n",numSourceAggs,blockIter,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter],currentRecvBufferOffset,intraRoundCollBufsizeOffset);
#endif
@@ -1407,12 +1607,14 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
*/
sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
- /* If this blocks extends into the next file domain handle this situation.
+ /* If this blocks extends into the next file domain advance to the next source aggs and source buffer states.
*/
if (blockEnd > fd_end[currentAggRankListIndex]) {
#ifdef onesidedtrace
- printf("large block, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld\n",blockEnd,fd_end[currentAggRankListIndex]);
+ printf("block extends past current fd, blockEnd %ld >= fd_end[currentAggRankListIndex] %ld total block size is %ld blockStart was %ld\n",blockEnd,fd_end[currentAggRankListIndex], len_list[blockIter],blockStart);
#endif
+ ADIO_Offset amountToAdvanceSBOffsetForFD = 0;
+ int additionalFDCounter = 0;
while (blockEnd >= fd_end[currentAggRankListIndex]) {
ADIO_Offset thisAggBlockEnd = fd_end[currentAggRankListIndex];
if (thisAggBlockEnd >= intraRoundCollBufsizeOffset) {
@@ -1420,14 +1622,6 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
sourceAggsForMyDataCurrentRoundIter[numSourceAggs]++;
intraRoundCollBufsizeOffset += coll_bufsize;
sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
- if (bufTypeIsContig)
- baseRecvBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = currentRecvBufferOffset;
- else {
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].indiceOffset = currentIndiceOffset;
- }
-
sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
#ifdef onesidedtrace
printf("sourceAggsForMyDataCurrentRoundI%d] is now %d intraRoundCollBufsizeOffset is now %ld\n",numSourceAggs,sourceAggsForMyDataCurrentRoundIter[numSourceAggs],intraRoundCollBufsizeOffset);
@@ -1435,6 +1629,7 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
} // while (thisAggBlockEnd >= intraRoundCollBufsizeOffset)
} // if (thisAggBlockEnd >= intraRoundCollBufsizeOffset)
+ int prevAggRankListIndex = currentAggRankListIndex;
currentAggRankListIndex++;
/* Skip over unused aggs.
@@ -1465,16 +1660,66 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset;
}
sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
- if (bufTypeIsContig)
- baseRecvBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = currentRecvBufferOffset;
- else {
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].indiceOffset = currentIndiceOffset;
+
+
+ /* For the first additonal file domain the source buffer offset
+ * will be incremented relative to the state of this first main
+ * loop but for subsequent full file domains the offset will be
+ * incremented by the size of the file domain.
+ */
+ if (additionalFDCounter == 0)
+ amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex] - blockStart) + (ADIO_Offset)1;
+ else
+ amountToAdvanceSBOffsetForFD = (fd_end[prevAggRankListIndex]-fd_start[prevAggRankListIndex]) +(ADIO_Offset)1;
+
+ if (bufTypeIsContig) {
+ ADIOI_Assert(numSourceAggs > 0);
+ if (currentFDSourceBufferState[numSourceAggs].sourceBufferOffset == -1) {
+ if (additionalFDCounter == 0) { // first file domain, still use the current data counter
+ currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentRecvBufferOffset+amountToAdvanceSBOffsetForFD;
+ }
+ else { // 2nd file domain, advance full file domain from last source buffer state
+ currentFDSourceBufferState[numSourceAggs].sourceBufferOffset = currentFDSourceBufferState[numSourceAggs-1].sourceBufferOffset+amountToAdvanceSBOffsetForFD;
+ }
+
+#ifdef onesidedtrace
+ printf("Crossed into new FD - for agg %d sourceBufferOffset initialized to %ld amountToAdvanceSBOffsetForFD is %ld\n",numSourceAggs,currentFDSourceBufferState[numSourceAggs].sourceBufferOffset,amountToAdvanceSBOffsetForFD);
+#endif
+ }
+ }
+ else if (currentFDSourceBufferState[numSourceAggs].indiceOffset == -1) {
+ // non-contiguos source buffer
+ ADIOI_Assert(numSourceAggs > 0);
+
+ /* Initialize the source buffer state appropriately and then
+ * advance it with the nonContigSourceDataBufferAdvance function.
+ */
+ if (additionalFDCounter == 0) {
+ // first file domain, still use the current data counter
+ currentFDSourceBufferState[numSourceAggs].indiceOffset = currentIndiceOffset;
+ currentFDSourceBufferState[numSourceAggs].bufTypeExtent = bufTypeExtent;
+ currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentDataTypeExtent;
+ currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFlatBufIndice;
+ }
+ else {
+ // 2nd file domain, advance full file domain from last source
+ // buffer state
+ currentFDSourceBufferState[numSourceAggs].indiceOffset = currentFDSourceBufferState[numSourceAggs-1].indiceOffset;
+ currentFDSourceBufferState[numSourceAggs].bufTypeExtent = currentFDSourceBufferState[numSourceAggs-1].bufTypeExtent;
+ currentFDSourceBufferState[numSourceAggs].dataTypeExtent = currentFDSourceBufferState[numSourceAggs-1].dataTypeExtent;
+ currentFDSourceBufferState[numSourceAggs].flatBufIndice = currentFDSourceBufferState[numSourceAggs-1].flatBufIndice;
+ }
+ nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, (int)amountToAdvanceSBOffsetForFD, 0, ¤tFDSourceBufferState[numSourceAggs], NULL);
+#ifdef onesidedtrace
+ printf("Crossed into new FD - for agg %d dataTypeExtent initialized to %d flatBufIndice to %d indiceOffset to %ld amountToAdvanceSBOffsetForFD is %d\n",numSourceAggs,currentFDSourceBufferState[numSourceAggs].dataTypeExtent,currentFDSourceBufferState[numSourceAggs].flatBufIndice,currentFDSourceBufferState[numSourceAggs].indiceOffset,amountToAdvanceSBOffsetForFD);
+#endif
}
+ additionalFDCounter++;
+
+
#ifdef onesidedtrace
- printf("large block init settings numSourceAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n",numSourceAggs,i,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter]);
+ printf("block extended beyond fd init settings numSourceAggs %d offset_list[%d] with value %ld past fd border %ld with len %ld\n",numSourceAggs,i,offset_list[blockIter],fd_start[currentAggRankListIndex],len_list[blockIter]);
#endif
intraRoundCollBufsizeOffset = fd_start[currentAggRankListIndex] + coll_bufsize;
sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
@@ -1482,28 +1727,21 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
} // while (blockEnd >= fd_end[currentAggRankListIndex])
} // if (blockEnd > fd_end[currentAggRankListIndex])
- /* Else if we are still in the same file domain / source agg but have gone past the coll_bufsize and need
+ /* If we are still in the same file domain / source agg but have gone past the coll_bufsize and need
* to advance to the next round handle this situation.
*/
- else if (blockEnd >= intraRoundCollBufsizeOffset) {
+ if (blockEnd >= intraRoundCollBufsizeOffset) {
ADIO_Offset currentBlockEnd = blockEnd;
while (currentBlockEnd >= intraRoundCollBufsizeOffset) {
sourceAggsForMyDataCurrentRoundIter[numSourceAggs]++;
intraRoundCollBufsizeOffset += coll_bufsize;
sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
- if (bufTypeIsContig)
- baseRecvBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = currentRecvBufferOffset;
- else {
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].flatBufIndice = currentFlatBufIndice;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].dataTypeExtent = currentDataTypeExtent;
- baseNonContigSourceBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs].indiceOffset = currentIndiceOffset;
- }
sourceAggsForMyDataLastOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
#ifdef onesidedtrace
printf("block less than fd currentBlockEnd is now %ld intraRoundCollBufsizeOffset is now %ld sourceAggsForMyDataCurrentRoundIter[%d] is now %d\n",currentBlockEnd, intraRoundCollBufsizeOffset, numSourceAggs,sourceAggsForMyDataCurrentRoundIter[numSourceAggs]);
#endif
} // while (currentBlockEnd >= intraRoundCollBufsizeOffset)
- } // else if (blockEnd >= intraRoundCollBufsizeOffset)
+ } // if (blockEnd >= intraRoundCollBufsizeOffset)
/* Need to advance numSourceAggs if this is the last source offset to
* include this one.
@@ -1521,6 +1759,8 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
}
#endif
+ } // if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero)
+
ADIOI_Free(sourceAggsForMyDataCurrentRoundIter);
/* use the two-phase buffer allocated in the file_open - no app should ever
@@ -1541,10 +1781,6 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
MPI_Win read_buf_window = fd->io_buf_window;
-#ifdef ACTIVE_TARGET
- MPI_Win_fence(0, read_buf_window);
-#endif
-
ADIO_Offset currentRoundFDStart = 0, nextRoundFDStart = 0;
ADIO_Offset currentRoundFDEnd = 0, nextRoundFDEnd = 0;
@@ -1563,6 +1799,13 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
if (nextRoundFDEnd > lastFileOffset)
nextRoundFDEnd = lastFileOffset;
}
+#ifdef onesidedtrace
+printf("iAmUsedAgg - currentRoundFDStart initialized "
+ "to %ld currentRoundFDEnd to %ld\n",
+ currentRoundFDStart,currentRoundFDEnd);
+#endif
+
+
}
#ifdef ROMIO_GPFS
@@ -1572,26 +1815,18 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
#endif
- ADIO_Offset currentBaseRecvBufferOffset = 0;
-
- /* These data structures are used to track the offset/len pairs for non-contiguous source buffers that are
- * to be used for each block of data in the offset list. Allocate them once with the maximum size and then
- * reuse the space throughout the algoritm below.
- */
- ADIO_Offset *nonContigSourceOffsets;
- int *nonContigSourceLens;
- if (!bufTypeIsContig) {
- nonContigSourceOffsets = (ADIO_Offset *)ADIOI_Malloc((maxNumNonContigSourceChunks+2) * sizeof(ADIO_Offset));
- nonContigSourceLens = (int *)ADIOI_Malloc((maxNumNonContigSourceChunks+2) * sizeof(int));
- }
- /* This is the second main loop of the algorithm, actually nested loop of source aggs within rounds. There are 2 flavors of this.
- * For gpfsmpio_aggmethod of 1 each nested iteration for the source agg does an mpi_put on a contiguous chunk using a primative datatype
- * determined using the data structures from the first main loop. For gpfsmpio_aggmethod of 2 each nested iteration for the source agg
- * builds up data to use in created a derived data type for 1 mpi_put that is done for the source agg for each round.
+ /* This is the second main loop of the algorithm, actually nested loop of target aggs within rounds. There are 2 flavors of this.
+ * For gpfsmpio_read_aggmethod of 1 each nested iteration for the source agg does an mpi_put on a contiguous chunk using a primative datatype
+ * determined using the data structures from the first main loop. For gpfsmpio_read_aggmethod of 2 each nested iteration for the source agg
+ * builds up data to use in created a derived data type for 1 mpi_put that is done for the target agg for each round.
+ * To support lustre there will need to be an additional layer of nesting for the multiple file domains
+ * within target aggs.
*/
int roundIter;
for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
+ if ((contig_access_count > 0) && (buf != NULL) && lenListOverZero)
+ {
/* determine what offsets define the portion of the file domain the agg is reading this round */
if (iAmUsedAgg) {
@@ -1604,7 +1839,7 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
amountDataToReadThisRound = ((currentRoundFDEnd-currentRoundFDStart)+1);
}
else {
- currentRoundFDEnd = currentRoundFDStart + coll_bufsize - 1;
+ currentRoundFDEnd = currentRoundFDStart + coll_bufsize - (ADIO_Offset)1;
amountDataToReadThisRound = coll_bufsize;
}
@@ -1629,7 +1864,7 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
amountDataToReadNextRound = ((nextRoundFDEnd-nextRoundFDStart)+1);
}
else {
- nextRoundFDEnd = nextRoundFDStart + coll_bufsize - 1;
+ nextRoundFDEnd = nextRoundFDStart + coll_bufsize - (ADIO_Offset)1;
amountDataToReadNextRound = coll_bufsize;
}
@@ -1712,101 +1947,34 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
/* If we have data for the round/agg process it.
*/
- if ((bufTypeIsContig && (baseRecvBufferOffset[roundIter][aggIter] != -1)) || (!bufTypeIsContig && (baseNonContigSourceBufferOffset[roundIter][aggIter].flatBufIndice != -1))) {
+ if (sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter] != -1) {
- ADIO_Offset currentRoundFDStartForMySourceAgg = (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)roundIter*(ADIO_Offset)coll_bufsize));
- ADIO_Offset currentRoundFDEndForMySourceAgg = (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*(ADIO_Offset)coll_bufsize) - (ADIO_Offset)1);
+ ADIO_Offset currentRoundFDStartForMySourceAgg = (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)roundIter*coll_bufsize));
+ ADIO_Offset currentRoundFDEndForMySourceAgg = (ADIO_Offset)((ADIO_Offset)sourceAggsForMyDataFDStart[aggIter] + (ADIO_Offset)((ADIO_Offset)(roundIter+1)*coll_bufsize) - (ADIO_Offset)1);
int sourceAggContigAccessCount = 0;
- /* These data structures are used for the derived datatype mpi_put
- * in the gpfsmpio_aggmethod of 2 case.
+ /* These data structures are used for the derived datatype mpi_get
+ * in the gpfsmpio_read_aggmethod of 2 case.
*/
int *sourceAggBlockLengths;
MPI_Aint *sourceAggDisplacements, *recvBufferDisplacements;
MPI_Datatype *sourceAggDataTypes;
-
+ char *derivedTypePackedSourceBuffer;
+ int derivedTypePackedSourceBufferOffset = 0;
int allocatedDerivedTypeArrays = 0;
-
- ADIO_Offset recvBufferOffset = 0;
+ ADIO_Offset amountOfDataReadThisRoundAgg = 0;
/* Process the range of offsets for this source agg.
*/
int offsetIter;
- for (offsetIter=sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter];offsetIter<=sourceAggsForMyDataLastOffLenIndex[roundIter][aggIter];offsetIter++) {
+ int startingOffLenIndex = sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter], endingOffLenIndex = sourceAggsForMyDataLastOffLenIndex[roundIter][aggIter];
+ for (offsetIter=startingOffLenIndex;offsetIter<=endingOffLenIndex;offsetIter++) {
if (currentRoundFDEndForMySourceAgg > sourceAggsForMyDataFDEnd[aggIter])
currentRoundFDEndForMySourceAgg = sourceAggsForMyDataFDEnd[aggIter];
ADIO_Offset offsetStart = offset_list[offsetIter], offsetEnd = (offset_list[offsetIter]+len_list[offsetIter]-(ADIO_Offset)1);
- /* Get the base source buffer offset for this iterataion of the source agg in the round.
- * For the first one just get the predetermined base value, for subsequent ones keep track
- * of the value for each iteration and increment it.
- */
- if (offsetIter == sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter]) {
- if (bufTypeIsContig)
- currentBaseRecvBufferOffset = baseRecvBufferOffset[roundIter][aggIter];
- else {
- currentFlatBufIndice = baseNonContigSourceBufferOffset[roundIter][aggIter].flatBufIndice;
- currentDataTypeExtent = baseNonContigSourceBufferOffset[roundIter][aggIter].dataTypeExtent;
- currentIndiceOffset = baseNonContigSourceBufferOffset[roundIter][aggIter].indiceOffset;
-#ifdef onesidedtrace
- printf("currentFlatBufIndice initially set to %d starting this round/agg %d/%d\n",currentFlatBufIndice,roundIter,aggIter);
-#endif
- }
- }
- else {
- if (bufTypeIsContig)
- currentBaseRecvBufferOffset += len_list[offsetIter-1];
- else {
-
- /* For non-contiguous source datatype advance the flattened buffer machinery to this offset.
- * Note that currentDataTypeExtent, currentFlatBufIndice and currentIndiceOffset are used and
- * advanced across the offsetIters.
- */
- ADIO_Offset sourceBlockTotal = 0;
- int lastIndiceUsed;
- while (sourceBlockTotal < len_list[offsetIter-1]) {
- sourceBlockTotal += (flatBuf->blocklens[currentFlatBufIndice] - currentIndiceOffset);
- lastIndiceUsed = currentFlatBufIndice;
- currentFlatBufIndice++;
- if (currentFlatBufIndice == flatBuf->count) {
- currentFlatBufIndice = 0;
- currentDataTypeExtent++;
- }
- currentIndiceOffset = 0;
- } // while
- if (sourceBlockTotal > len_list[offsetIter-1]) {
- currentFlatBufIndice--;
- if (currentFlatBufIndice < 0 ) {
- currentDataTypeExtent--;
- currentFlatBufIndice = flatBuf->count-1;
- }
- currentIndiceOffset = len_list[offsetIter-1] - (sourceBlockTotal - flatBuf->blocklens[lastIndiceUsed]);
- ADIOI_Assert((currentIndiceOffset >= 0) && (currentIndiceOffset < flatBuf->blocklens[currentFlatBufIndice]));
- }
- else
- currentIndiceOffset = 0;
-#ifdef onesidedtrace
- printf("contig_access_count source agg %d currentFlatBufIndice is now %d currentDataTypeExtent is now %d currentIndiceOffset is now %ld\n",aggIter,currentFlatBufIndice,currentDataTypeExtent,currentIndiceOffset);
-#endif
- }
-
- }
-
- /* For source buffer contiguous here is the offset we will use for this iteration.
- */
- if (bufTypeIsContig)
- recvBufferOffset = currentBaseRecvBufferOffset;
-
-#ifdef onesidedtrace
- printf("roundIter %d source iter %d sourceAggsForMyData is %d offset_list[%d] is %ld len_list[%d] is %ld sourceAggsForMyDataFDStart is %ld sourceAggsForMyDataFDEnd is %ld currentRoundFDStartForMySourceAgg is %ld currentRoundFDEndForMySourceAgg is %ld sourceAggsForMyDataFirstOffLenIndex is %d baseRecvBufferOffset is %ld\n",
- roundIter,aggIter,sourceAggsForMyData[aggIter],offsetIter,offset_list[offsetIter],offsetIter,len_list[offsetIter],
- sourceAggsForMyDataFDStart[aggIter],sourceAggsForMyDataFDEnd[aggIter],
- currentRoundFDStartForMySourceAgg,currentRoundFDEndForMySourceAgg,sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter],baseRecvBufferOffset[roundIter][aggIter]);
-#endif
-
-
/* Determine the amount of data and exact source buffer offsets to use.
*/
int bufferAmountToRecv = 0;
@@ -1823,20 +1991,16 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
else
bufferAmountToRecv = (offsetEnd - currentRoundFDStartForMySourceAgg) +1;
if (offsetStart < currentRoundFDStartForMySourceAgg) {
- if (bufTypeIsContig)
- recvBufferOffset += (currentRoundFDStartForMySourceAgg-offsetStart);
offsetStart = currentRoundFDStartForMySourceAgg;
}
}
else if ((offsetStart <= currentRoundFDStartForMySourceAgg) && (offsetEnd >= currentRoundFDEndForMySourceAgg)) {
bufferAmountToRecv = (currentRoundFDEndForMySourceAgg - currentRoundFDStartForMySourceAgg) +1;
- if (bufTypeIsContig)
- recvBufferOffset += (currentRoundFDStartForMySourceAgg-offsetStart);
offsetStart = currentRoundFDStartForMySourceAgg;
}
if (bufferAmountToRecv > 0) { /* we have data to recv this round */
- if (gpfsmpio_aggmethod == 2) {
+ if (gpfsmpio_read_aggmethod == 2) {
/* Only allocate these arrays if we are using method 2 and only do it once for this round/source agg.
*/
if (!allocatedDerivedTypeArrays) {
@@ -1844,162 +2008,106 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
sourceAggDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
recvBufferDisplacements = (MPI_Aint *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Aint));
sourceAggDataTypes = (MPI_Datatype *)ADIOI_Malloc(maxNumContigOperations * sizeof(MPI_Datatype));
- allocatedDerivedTypeArrays = 1;
- }
- }
-
- /* For the non-contiguous source datatype we need to determine the number,size and offsets of the chunks
- * from the source buffer to be sent to this contiguous chunk defined by the round/agg/iter in the source.
- */
- int numNonContigSourceChunks = 0;
- ADIO_Offset baseDatatypeInstanceOffset = 0;
+ if (!bufTypeIsContig) {
+ int k;
+ for (i=sourceAggsForMyDataFirstOffLenIndex[roundIter][aggIter];i<=sourceAggsForMyDataLastOffLenIndex[roundIter][aggIter];k++)
+ amountOfDataReadThisRoundAgg += len_list[k];
- if (!bufTypeIsContig) {
-
- currentRecvBufferOffset = (ADIO_Offset)((ADIO_Offset)currentDataTypeExtent * (ADIO_Offset)bufTypeExtent) + flatBuf->indices[currentFlatBufIndice] + currentIndiceOffset;
#ifdef onesidedtrace
- printf("!bufTypeIsContig currentRecvBufferOffset set to %ld for roundIter %d source %d currentDataTypeExtent %d flatBuf->indices[currentFlatBufIndice] %ld currentIndiceOffset %ld currentFlatBufIndice %d\n",currentRecvBufferOffset,roundIter,aggIter,currentDataTypeExtent,flatBuf->indices[currentFlatBufIndice],currentIndiceOffset,currentFlatBufIndice);
+ printf("derivedTypePackedSourceBuffer mallocing %ld\n",amountOfDataReadThisRoundAgg);
#endif
-
- /* Use a tmp variable for the currentFlatBufIndice and currentIndiceOffset as they are used across the offsetIters
- * to compute the starting point for this iteration and will be modified now to compute the data chunks for
- * this iteration.
- */
- int tmpFlatBufIndice = currentFlatBufIndice;
- ADIO_Offset tmpIndiceOffset = currentIndiceOffset;
-
- /* now populate the nonContigSourceOffsets and nonContigSourceLens arrays for use in the one-sided operations.
- */
- int ncArrayIndex = 0;
- int remainingBytesToLoadedIntoNCArrays = bufferAmountToRecv;
-
- int datatypeInstances = currentDataTypeExtent;
- while (remainingBytesToLoadedIntoNCArrays > 0) {
- ADIOI_Assert(ncArrayIndex < (maxNumNonContigSourceChunks+2));
- nonContigSourceOffsets[ncArrayIndex] = currentRecvBufferOffset;
-
- if ((flatBuf->blocklens[tmpFlatBufIndice] - tmpIndiceOffset) >= remainingBytesToLoadedIntoNCArrays) {
- nonContigSourceLens[ncArrayIndex] = remainingBytesToLoadedIntoNCArrays;
- remainingBytesToLoadedIntoNCArrays = 0;
+ if (amountOfDataReadThisRoundAgg > 0)
+ derivedTypePackedSourceBuffer = (char *)ADIOI_Malloc(amountOfDataReadThisRoundAgg * sizeof(char));
+ else
+ derivedTypePackedSourceBuffer = NULL;
}
- else {
- nonContigSourceLens[ncArrayIndex] = (int)(flatBuf->blocklens[tmpFlatBufIndice] - tmpIndiceOffset);
- remainingBytesToLoadedIntoNCArrays -= (flatBuf->blocklens[tmpFlatBufIndice] - tmpIndiceOffset);
- tmpIndiceOffset = 0;
- tmpFlatBufIndice++;
- if (tmpFlatBufIndice == flatBuf->count) {
- tmpFlatBufIndice = 0;
- datatypeInstances++;
- baseDatatypeInstanceOffset = datatypeInstances * bufTypeExtent;
- }
- currentRecvBufferOffset = baseDatatypeInstanceOffset + flatBuf->indices[tmpFlatBufIndice];
- }
- ncArrayIndex++;
- numNonContigSourceChunks++;
- } // while
-#ifdef onesidedtrace
- printf("currentRecvBufferOffset finally set to %ld\n",currentRecvBufferOffset);
-#endif
- } // !bufTypeIsContig
+ allocatedDerivedTypeArrays = 1;
+ }
+ }
/* Determine the offset into the source window.
*/
- MPI_Aint sourceDisplacementToUseThisRound = (MPI_Aint) ((ADIO_Offset)offsetStart - currentRoundFDStartForMySourceAgg);
+ MPI_Aint sourceDisplacementToUseThisRound = (MPI_Aint) (offsetStart - currentRoundFDStartForMySourceAgg);
/* If using the thread reader select the appropriate side of the split window.
*/
if (useIOBuffer && (read_buf == read_buf1)) {
- sourceDisplacementToUseThisRound += coll_bufsize;
+ sourceDisplacementToUseThisRound += (MPI_Aint)coll_bufsize;
}
- /* For gpfsmpio_aggmethod of 1 do the mpi_put using the primitive MPI_BYTE type on each contiguous chunk of source data.
+ /* For gpfsmpio_read_aggmethod of 1 do the mpi_get using the primitive MPI_BYTE type from each
+ * contiguous chunk from the target, if the source is non-contiguous then unpack the data after
+ * the MPI_Win_unlock is done to make sure the data has arrived first.
*/
- if (gpfsmpio_aggmethod == 1) {
-#ifndef ACTIVE_TARGET
+ if (gpfsmpio_read_aggmethod == 1) {
MPI_Win_lock(MPI_LOCK_SHARED, sourceAggsForMyData[aggIter], 0, read_buf_window);
-#endif
+ char *getSourceData = NULL;
if (bufTypeIsContig) {
- MPI_Get(((char*)buf) + recvBufferOffset,bufferAmountToRecv, MPI_BYTE,sourceAggsForMyData[aggIter],sourceDisplacementToUseThisRound, bufferAmountToRecv,MPI_BYTE,read_buf_window);
+ MPI_Get(((char*)buf) + currentFDSourceBufferState[aggIter].sourceBufferOffset,bufferAmountToRecv, MPI_BYTE,sourceAggsForMyData[aggIter],sourceDisplacementToUseThisRound, bufferAmountToRecv,MPI_BYTE,read_buf_window);
+ currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToRecv;
+
}
else {
- for (i=0;i<numNonContigSourceChunks;i++) {
- MPI_Get(((char*)buf) + nonContigSourceOffsets[i],nonContigSourceLens[i], MPI_BYTE,sourceAggsForMyData[aggIter],sourceDisplacementToUseThisRound, nonContigSourceLens[i],MPI_BYTE,read_buf_window);
-#ifdef onesidedtrace
- printf("mpi_put[%d] nonContigSourceOffsets is %d of nonContigSourceLens %d to source disp %d\n",i,nonContigSourceOffsets[i],nonContigSourceLens[i],sourceDisplacementToUseThisRound);
-#endif
- sourceDisplacementToUseThisRound += nonContigSourceLens[i];
- }
+ getSourceData = (char *) ADIOI_Malloc(bufferAmountToRecv*sizeof(char));
+ MPI_Get(getSourceData,bufferAmountToRecv, MPI_BYTE,sourceAggsForMyData[aggIter],sourceDisplacementToUseThisRound, bufferAmountToRecv,MPI_BYTE,read_buf_window);
+
}
-#ifndef ACTIVE_TARGET
MPI_Win_unlock(sourceAggsForMyData[aggIter], read_buf_window);
-#endif
-
+ if (!bufTypeIsContig) {
+ nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, bufferAmountToRecv, 0, ¤tFDSourceBufferState[aggIter], getSourceData);
+ ADIOI_Free(getSourceData);
+ }
}
- /* For gpfsmpio_aggmethod of 2 populate the data structures for this round/agg for this offset iter
+ /* For gpfsmpio_read_aggmethod of 2 populate the data structures for this round/agg for this offset iter
* to be used subsequently when building the derived type for 1 mpi_put for all the data for this
* round/agg.
*/
- else if (gpfsmpio_aggmethod == 2) {
-
+ else if (gpfsmpio_read_aggmethod == 2) {
if (bufTypeIsContig) {
sourceAggBlockLengths[sourceAggContigAccessCount]= bufferAmountToRecv;
sourceAggDataTypes[sourceAggContigAccessCount] = MPI_BYTE;
sourceAggDisplacements[sourceAggContigAccessCount] = sourceDisplacementToUseThisRound;
- recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint)recvBufferOffset;
+ recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint)currentFDSourceBufferState[aggIter].sourceBufferOffset;
+ currentFDSourceBufferState[aggIter].sourceBufferOffset += (ADIO_Offset)bufferAmountToRecv;
sourceAggContigAccessCount++;
}
else {
- for (i=0;i<numNonContigSourceChunks;i++) {
- sourceAggBlockLengths[sourceAggContigAccessCount]= nonContigSourceLens[i];
- sourceAggDataTypes[sourceAggContigAccessCount] = MPI_BYTE;
- sourceAggDisplacements[sourceAggContigAccessCount] = sourceDisplacementToUseThisRound;
- recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint)nonContigSourceOffsets[i];
-#ifdef onesidedtrace
- printf("mpi_put building arrays for iter %d - recvBufferDisplacements is %d of nonContigSourceLens %d to source disp %d sourceAggContigAccessCount is %d sourceAggBlockLengths[sourceAggContigAccessCount] is %d\n",i,recvBufferDisplacements[sourceAggContigAccessCount],nonContigSourceLens[i],sourceAggDisplacements[sourceAggContigAccessCount],sourceAggContigAccessCount, sourceAggBlockLengths[sourceAggContigAccessCount]);
-#endif
- sourceAggContigAccessCount++;
- sourceDisplacementToUseThisRound += nonContigSourceLens[i];
- }
+ sourceAggBlockLengths[sourceAggContigAccessCount]= bufferAmountToRecv;
+ sourceAggDataTypes[sourceAggContigAccessCount] = MPI_BYTE;
+ sourceAggDisplacements[sourceAggContigAccessCount] = sourceDisplacementToUseThisRound;
+ recvBufferDisplacements[sourceAggContigAccessCount] = (MPI_Aint)derivedTypePackedSourceBufferOffset;
+ derivedTypePackedSourceBufferOffset += (ADIO_Offset)bufferAmountToRecv;
+ sourceAggContigAccessCount++;
}
}
-#ifdef onesidedtrace
- printf("roundIter %d bufferAmountToRecv is %d recvBufferOffset is %d offsetStart is %ld currentRoundFDStartForMySourceAgg is %ld sourceDisplacementToUseThisRound is %ld sourceAggsForMyDataFDStart[aggIter] is %ld\n",roundIter, bufferAmountToRecv,recvBufferOffset, offsetStart,currentRoundFDStartForMySourceAgg,sourceDisplacementToUseThisRound,sourceAggsForMyDataFDStart[aggIter]);
-#endif
-
} // bufferAmountToRecv > 0
} // contig list
- /* For gpfsmpio_aggmethod of 2 now build the derived type using the data from this round/agg and do 1 single mpi_put.
+ /* For gpfsmpio_read_aggmethod of 2 now build the derived type using the data from this round/agg and do 1 single mpi_put.
*/
- if (gpfsmpio_aggmethod == 2) {
+ if (gpfsmpio_read_aggmethod == 2) {
MPI_Datatype recvBufferDerivedDataType, sourceBufferDerivedDataType;
- if (sourceAggContigAccessCount > 0) {
- /* Rebase source buffer offsets to 0 if there are any negative offsets for safety
- * when iteracting with PAMI.
- */
- MPI_Aint lowestDisplacement = 0;
- for (i=0;i<sourceAggContigAccessCount;i++) {
- if (recvBufferDisplacements[i] < lowestDisplacement)
- lowestDisplacement = recvBufferDisplacements[i];
- }
- if (lowestDisplacement < 0) {
- lowestDisplacement *= -1;
- for (i=0;i<sourceAggContigAccessCount;i++)
- recvBufferDisplacements[i] += lowestDisplacement;
- }
+
MPI_Type_create_struct(sourceAggContigAccessCount, sourceAggBlockLengths, recvBufferDisplacements, sourceAggDataTypes, &recvBufferDerivedDataType);
MPI_Type_commit(&recvBufferDerivedDataType);
MPI_Type_create_struct(sourceAggContigAccessCount, sourceAggBlockLengths, sourceAggDisplacements, sourceAggDataTypes, &sourceBufferDerivedDataType);
MPI_Type_commit(&sourceBufferDerivedDataType);
-#ifndef ACTIVE_TARGET
+
+ if (sourceAggContigAccessCount > 0) {
+
MPI_Win_lock(MPI_LOCK_SHARED, sourceAggsForMyData[aggIter], 0, read_buf_window);
-#endif
- MPI_Get((((char*)buf) - lowestDisplacement),1, recvBufferDerivedDataType,sourceAggsForMyData[aggIter],0, 1,sourceBufferDerivedDataType,read_buf_window);
+ if (bufTypeIsContig) {
+ MPI_Get(((char*)buf),1, recvBufferDerivedDataType,sourceAggsForMyData[aggIter],0, 1,sourceBufferDerivedDataType,read_buf_window);
+ }
+ else {
+ MPI_Get(derivedTypePackedSourceBuffer,1, recvBufferDerivedDataType,sourceAggsForMyData[aggIter],0, 1,sourceBufferDerivedDataType,read_buf_window);
+ }
-#ifndef ACTIVE_TARGET
MPI_Win_unlock(sourceAggsForMyData[aggIter], read_buf_window);
-#endif
+ if (!bufTypeIsContig) {
+ nonContigSourceDataBufferAdvance(((char*)buf), flatBuf, derivedTypePackedSourceBufferOffset, 0, ¤tFDSourceBufferState[aggIter], derivedTypePackedSourceBuffer);
+ }
}
if (allocatedDerivedTypeArrays) {
@@ -2007,6 +2115,9 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
ADIOI_Free(sourceAggDisplacements);
ADIOI_Free(sourceAggDataTypes);
ADIOI_Free(recvBufferDisplacements);
+ if (!bufTypeIsContig)
+ if (derivedTypePackedSourceBuffer != NULL)
+ ADIOI_Free(derivedTypePackedSourceBuffer);
}
if (sourceAggContigAccessCount > 0) {
MPI_Type_free(&recvBufferDerivedDataType);
@@ -2015,23 +2126,15 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
}
} // baseoffset != -1
} // source aggs
-
+ } // contig_access_count > 0
/* the source procs recv the requested data to the aggs */
-#ifdef ACTIVE_TARGET
- MPI_Win_fence(0, read_buf_window);
-#else
MPI_Barrier(fd->comm);
-#endif
nextRoundFDStart = currentRoundFDStart + coll_bufsize;
} /* for-loop roundIter */
- if (!bufTypeIsContig) {
- ADIOI_Free(nonContigSourceOffsets);
- ADIOI_Free(nonContigSourceLens);
- }
#ifdef ROMIO_GPFS
endTimeBase = MPI_Wtime();
gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH] += (endTimeBase-startTimeBase);
@@ -2053,17 +2156,11 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
for (i=0;i<numberOfRounds;i++) {
ADIOI_Free(sourceAggsForMyDataFirstOffLenIndex[i]);
ADIOI_Free(sourceAggsForMyDataLastOffLenIndex[i]);
- if (bufTypeIsContig)
- ADIOI_Free(baseRecvBufferOffset[i]);
- else
- ADIOI_Free(baseNonContigSourceBufferOffset[i]);
}
ADIOI_Free(sourceAggsForMyDataFirstOffLenIndex);
ADIOI_Free(sourceAggsForMyDataLastOffLenIndex);
- if (bufTypeIsContig)
- ADIOI_Free(baseRecvBufferOffset);
- else
- ADIOI_Free(baseNonContigSourceBufferOffset);
+
+ ADIOI_Free(currentFDSourceBufferState);
if (!bufTypeIsContig)
ADIOI_Delete_flattened(datatype);
diff --git a/src/mpi/romio/adio/include/adioi.h b/src/mpi/romio/adio/include/adioi.h
index 63190fb..f715be7 100644
--- a/src/mpi/romio/adio/include/adioi.h
+++ b/src/mpi/romio/adio/include/adioi.h
@@ -707,6 +707,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
int *error_code,
ADIO_Offset *st_offsets,
ADIO_Offset *end_offsets,
+ int numNonZeroDataOffsets,
ADIO_Offset *fd_start,
ADIO_Offset* fd_end,
int *hole_found);
@@ -719,6 +720,7 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
int *error_code,
ADIO_Offset *st_offsets,
ADIO_Offset *end_offsets,
+ int numNonZeroDataOffsets,
ADIO_Offset *fd_start,
ADIO_Offset* fd_end);
ADIO_Offset ADIOI_GEN_SeekIndividual(ADIO_File fd, ADIO_Offset offset,
-----------------------------------------------------------------------
Summary of changes:
src/mpi/romio/adio/ad_gpfs/ad_gpfs_rdcoll.c | 81 ++-
src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.c | 28 +-
src/mpi/romio/adio/ad_gpfs/ad_gpfs_tuning.h | 4 +-
src/mpi/romio/adio/ad_gpfs/ad_gpfs_wrcoll.c | 110 ++-
src/mpi/romio/adio/common/onesided_aggregation.c | 1443 ++++++++++++----------
src/mpi/romio/adio/include/adioi.h | 2 +
6 files changed, 973 insertions(+), 695 deletions(-)
hooks/post-receive
--
MPICH primary repository
More information about the commits
mailing list