[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.2b1-87-ga568267
Service Account
noreply at mpich.org
Mon Apr 20 18:14:18 CDT 2015
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".
The branch, master has been updated
via a56826750cc7b99150b9b59e602dcaf13cb5f661 (commit)
via aec01b399f6ad205daf3fb5d6b3e52eb13cbe9c7 (commit)
via 001cd724bb0a6b06c5e26a47748f6b204ac0479f (commit)
via de0412c2980e5c8e34223ddceeafe8c006b9d66e (commit)
via 19f29078c001ff4176330815b066eac3da7b9e52 (commit)
from c09f396958cbef74684c3b423315b972973042a5 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/a56826750cc7b99150b9b59e602dcaf13cb5f661
commit a56826750cc7b99150b9b59e602dcaf13cb5f661
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Mon Apr 20 15:41:07 2015 -0500
Modify comments about atomicity in GACC/FOP handlers.
Signed-off-by: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Signed-off-by: Antonio J. Pena <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_req.c b/src/mpid/ch3/src/ch3u_handle_recv_req.c
index ff5ee54..5e62e66 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_req.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_req.c
@@ -263,7 +263,6 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPID_Datatype_is_contig(rreq->dev.datatype, &is_contig);
- /* Copy data into a temporary buffer */
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(resp_req, 1);
@@ -272,9 +271,13 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPIU_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, rreq->dev.recv_data_sz,
mpi_errno, "GACC resp. buffer");
+ /* NOTE: 'copy data + ACC' needs to be atomic */
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
+ /* Copy data from target window to temporary buffer */
+
if (is_contig) {
MPIU_Memcpy(resp_req->dev.user_buf,
(void *) ((char *) rreq->dev.real_user_buf + rreq->dev.stream_offset),
@@ -397,6 +400,8 @@ int MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq, i
* operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
+ /* NOTE: 'copy data + ACC' needs to be atomic */
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
@@ -1193,10 +1198,14 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
- /* Perform ACCUMULATE OP */
+
+ /* NOTE: copy 'data + ACC' needs to be atomic */
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
+ /* Copy data from target window to response packet header */
+
void *src = (void *) (get_accum_pkt->addr), *dest =
(void *) (get_accum_resp_pkt->info.data);
mpi_errno = immed_copy(src, dest, len);
@@ -1206,6 +1215,8 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_ERR_POP(mpi_errno);
}
+ /* Perform ACCUMULATE OP */
+
/* All data fits in packet header */
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
@@ -1244,10 +1255,13 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_Datatype_is_contig(get_accum_pkt->datatype, &is_contig);
- /* Perform ACCUMULATE OP */
+ /* NOTE: 'copy data + ACC' needs to be atomic */
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
+ /* Copy data from target window to temporary buffer */
+
/* NOTE: here we copy data from stream_offset = 0, because
the unit that is piggybacked with LOCK flag must be the
first stream unit. */
@@ -1271,6 +1285,8 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_Segment_free(seg);
}
+ /* Perform ACCUMULATE OP */
+
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->count, get_accum_pkt->datatype,
@@ -1379,9 +1395,13 @@ static inline int perform_fop_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
win_ptr->at_completion_counter++;
}
+ /* NOTE: 'copy data + ACC' needs to be atomic */
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
+ /* Copy data from target window to temporary buffer / response packet header */
+
if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
/* copy data to resp pkt header */
void *src = fop_pkt->addr, *dest = fop_resp_pkt->info.data;
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index 2bffebb..5ec61a4 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -836,6 +836,8 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
(get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
+ /* NOTE: 'copy data + ACC' needs to be atomic */
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
@@ -1232,6 +1234,8 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
(fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
fop_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
+ /* NOTE: 'copy data + ACC' needs to be atomic */
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
http://git.mpich.org/mpich.git/commitdiff/aec01b399f6ad205daf3fb5d6b3e52eb13cbe9c7
commit aec01b399f6ad205daf3fb5d6b3e52eb13cbe9c7
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Fri Apr 17 15:05:25 2015 -0500
Bug-fix: correct the wrong judgement in RMA function.
Here we should check if the packet type is FOP_IMMED, if so,
we initialize the response packet to be FOP_RESP_IMMED. The
originally code wrongly check the packet flag instead of packet
type.
Signed-off-by: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Signed-off-by: Antonio J. Pena <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_req.c b/src/mpid/ch3/src/ch3u_handle_recv_req.c
index 7c85ba5..ff5ee54 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_req.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_req.c
@@ -1342,7 +1342,7 @@ static inline int perform_fop_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
MPID_Datatype_is_contig(fop_pkt->datatype, &is_contig);
- if (fop_pkt->flags & MPIDI_CH3_PKT_FOP_IMMED) {
+ if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP_IMMED);
}
else {
http://git.mpich.org/mpich.git/commitdiff/001cd724bb0a6b06c5e26a47748f6b204ac0479f
commit 001cd724bb0a6b06c5e26a47748f6b204ac0479f
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Fri Apr 17 13:14:40 2015 -0500
Delete assert that no longer makes sense.
After reducing the IMMED data size from 16 bytes to 8 bytes,
FOP data is no longer always fit in the packet header, hence
the assert no longer makes sense.
Signed-off-by: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Signed-off-by: Antonio J. Pena <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index 50800ee..2bffebb 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -1209,8 +1209,6 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
- MPIU_Assert(rreq == NULL); /* FOP should not have request because all data
- * can fit in packet header */
if (acquire_lock_fail) {
(*rreqp) = rreq;
goto fn_exit;
http://git.mpich.org/mpich.git/commitdiff/de0412c2980e5c8e34223ddceeafe8c006b9d66e
commit de0412c2980e5c8e34223ddceeafe8c006b9d66e
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Fri Apr 17 09:04:44 2015 -0500
Set size of IMMED data in RMA packets to 8 bytes.
Originally the size of IMMED data in RMA packets is 16 bytes
which makes the size of CH3 packet be 56 bytes. Here we reduce
the size of IMMED data in RMA packets to 8 bytes, so that the
size of CH3 packet is reduced to 48 bytes, the same with
mpich-3.1.4 (the old RMA infrastructure).
Signed-off-by: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Signed-off-by: Antonio J. Pena <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/include/mpidpkt.h b/src/mpid/ch3/include/mpidpkt.h
index 0c4ff9e..eec2083 100644
--- a/src/mpid/ch3/include/mpidpkt.h
+++ b/src/mpid/ch3/include/mpidpkt.h
@@ -23,7 +23,7 @@
#define MPIDI_EAGER_SHORT_SIZE 16
/* This is the number of ints that can be carried within an RMA packet */
-#define MPIDI_RMA_IMMED_BYTES 16
+#define MPIDI_RMA_IMMED_BYTES 8
/* Union over all types (integer, logical, and multi-language types) that are
allowed in a CAS operation. This is used to allocate enough space in the
http://git.mpich.org/mpich.git/commitdiff/19f29078c001ff4176330815b066eac3da7b9e52
commit 19f29078c001ff4176330815b066eac3da7b9e52
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Sun Mar 22 14:54:28 2015 -0500
Move 'stream_offset' out of RMA packet struct.
'stream_offset' is used to specify the starting position
(on target window) of the current streaming unit in ACC-like
operations. It is originally put in the RMA packet struct,
which potentially increases the size of CH3 packet size.
In this patch, we move 'stream_offset' out of the RMA
packet as follows: 1. when target data is basic datatype,
we use 'stream_offset' and the starting address for the entire
operation to calculate the starting address for current
streaming unit, and rewrite 'addr' in RMA packet with that
value; 2. when target data is derived datatype, we cannot do
the same thing as basic datatype because the target needs to
know both the starting address for the entire operation and
the starting address for the current streaming unit. Therefore,
we send 'stream_offset' separately to the target side.
Signed-off-by: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Signed-off-by: Antonio J. Pena <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/include/mpid_rma_issue.h b/src/mpid/ch3/include/mpid_rma_issue.h
index bd40aac..17c6930 100644
--- a/src/mpid/ch3/include/mpid_rma_issue.h
+++ b/src/mpid/ch3/include/mpid_rma_issue.h
@@ -497,7 +497,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
- count = 2 + vec_len;
+ count = 3 + vec_len;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
@@ -514,10 +514,16 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
+ req->dev.stream_offset = stream_offset;
+
+ displaces[2] = MPIU_PtrToAint(&(req->dev.stream_offset));
+ blocklens[2] = sizeof(req->dev.stream_offset);
+ datatypes[2] = MPI_BYTE;
+
for (i = 0; i < vec_len; i++) {
- displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
- MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
- datatypes[i + 2] = MPI_BYTE;
+ displaces[i + 3] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
+ MPIU_Assign_trunc(blocklens[i + 3], dloop_vec[i].DLOOP_VECTOR_LEN, int);
+ datatypes[i + 3] = MPI_BYTE;
}
MPID_Segment_free(segp);
@@ -730,7 +736,11 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
- accum_pkt->info.metadata.stream_offset = stream_offset;
+ if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
+ accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
+ + j * stream_elem_count * predefined_dtp_extent);
+ accum_pkt->count = stream_size / predefined_dtp_size;
+ }
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
@@ -939,7 +949,11 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
- get_accum_pkt->info.metadata.stream_offset = stream_offset;
+ if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
+ get_accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
+ + j * stream_elem_count * predefined_dtp_extent);
+ get_accum_pkt->count = stream_size / predefined_dtp_size;
+ }
resp_req->dev.stream_offset = stream_offset;
diff --git a/src/mpid/ch3/include/mpid_rma_types.h b/src/mpid/ch3/include/mpid_rma_types.h
index be0e869..ff8c63d 100644
--- a/src/mpid/ch3/include/mpid_rma_types.h
+++ b/src/mpid/ch3/include/mpid_rma_types.h
@@ -60,6 +60,9 @@ typedef struct MPIDI_RMA_Op {
int result_count;
MPI_Datatype result_datatype;
+ /* used in streaming ACCs */
+ void *original_target_addr;
+
struct MPID_Request **reqs;
int reqs_size;
diff --git a/src/mpid/ch3/include/mpidpkt.h b/src/mpid/ch3/include/mpidpkt.h
index 3351a65..0c4ff9e 100644
--- a/src/mpid/ch3/include/mpidpkt.h
+++ b/src/mpid/ch3/include/mpidpkt.h
@@ -522,31 +522,16 @@ MPIDI_CH3_PKT_DEFS
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
- (pkt_).put.info.metadata.dataloop_size = (dataloop_size_); \
+ (pkt_).put.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET): \
- (pkt_).get.info.metadata.dataloop_size = (dataloop_size_); \
+ (pkt_).get.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
- (pkt_).accum.info.metadata.dataloop_size = (dataloop_size_); \
+ (pkt_).accum.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
- (pkt_).get_accum.info.metadata.dataloop_size = (dataloop_size_); \
- break; \
- default: \
- MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
- } \
- }
-
-#define MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET(pkt_, stream_offset_, err_) \
- { \
- err_ = MPI_SUCCESS; \
- switch((pkt_).type) { \
- case (MPIDI_CH3_PKT_ACCUMULATE): \
- (stream_offset_) = (pkt_).accum.info.metadata.stream_offset; \
- break; \
- case (MPIDI_CH3_PKT_GET_ACCUM): \
- (stream_offset_) = (pkt_).get_accum.info.metadata.stream_offset; \
+ (pkt_).get_accum.info.dataloop_size = (dataloop_size_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
@@ -609,12 +594,7 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
- /* note that we use struct here in order
- * to consistently access dataloop_size
- * by "pkt->info.metadata.dataloop_size". */
- struct {
- int dataloop_size;
- } metadata;
+ int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_put_t;
@@ -628,10 +608,8 @@ typedef struct MPIDI_CH3_Pkt_get {
struct {
/* note that we use struct here in order
* to consistently access dataloop_size
- * by "pkt->info.metadata.dataloop_size". */
- struct {
- int dataloop_size; /* for derived datatypes */
- } metadata;
+ * by "pkt->info.dataloop_size". */
+ int dataloop_size; /* for derived datatypes */
} info;
MPI_Request request_handle;
MPI_Win target_win_handle;
@@ -662,10 +640,7 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
- struct {
- int dataloop_size;
- MPI_Aint stream_offset;
- } metadata;
+ int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_accum_t;
@@ -680,10 +655,7 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Op op;
MPI_Win target_win_handle;
union {
- struct {
- int dataloop_size;
- MPI_Aint stream_offset;
- } metadata;
+ int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_get_accum_t;
diff --git a/src/mpid/ch3/include/mpidrma.h b/src/mpid/ch3/include/mpidrma.h
index cc05214..4c90166 100644
--- a/src/mpid/ch3/include/mpidrma.h
+++ b/src/mpid/ch3/include/mpidrma.h
@@ -373,21 +373,8 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPID_Datatype_get_extent_macro(target_dtp, type_extent);
MPID_Datatype_get_size_macro(target_dtp, type_size);
- if (pkt->type == MPIDI_CH3_PKT_PUT) {
- recv_data_sz = type_size * target_count;
- buf_size = type_extent * target_count;
- }
- else {
- MPI_Aint stream_offset, stream_elem_count;
- MPI_Aint total_len, rest_len;
-
- MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET((*pkt), stream_offset, mpi_errno);
- stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
- total_len = type_size * target_count;
- rest_len = total_len - stream_offset;
- recv_data_sz = MPIR_MIN(rest_len, type_size * stream_elem_count);
- buf_size = type_extent * (recv_data_sz / type_size);
- }
+ recv_data_sz = type_size * target_count;
+ buf_size = type_extent * target_count;
if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_req.c b/src/mpid/ch3/src/ch3u_handle_recv_req.c
index 6e0d943..7c85ba5 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_req.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_req.c
@@ -1107,26 +1107,20 @@ static inline int perform_acc_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
if (acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
/* All data fits in packet header */
+ /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
+ that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(acc_pkt->info.data, acc_pkt->count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
- 0, acc_pkt->op);
+ 0/* stream offset */, acc_pkt->op);
}
else {
MPIU_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
- MPI_Aint type_size, type_extent;
- MPI_Aint total_len, rest_len, recv_count;
- MPID_Datatype_get_size_macro(acc_pkt->datatype, type_size);
- MPID_Datatype_get_extent_macro(acc_pkt->datatype, type_extent);
-
- total_len = type_size * acc_pkt->count;
- rest_len = total_len - acc_pkt->info.metadata.stream_offset;
- recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
- MPIU_Assert(recv_count > 0);
-
- mpi_errno = do_accumulate_op(lock_entry->data, recv_count, acc_pkt->datatype,
+ /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
+ that is piggybacked with LOCK flag must be the first stream unit */
+ mpi_errno = do_accumulate_op(lock_entry->data, acc_pkt->count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
- acc_pkt->info.metadata.stream_offset, acc_pkt->op);
+ 0/* stream offset */, acc_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
@@ -1160,8 +1154,6 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_IOV iov[MPID_IOV_LIMIT];
int is_contig;
int mpi_errno = MPI_SUCCESS;
- MPI_Aint type_extent;
- MPI_Aint total_len, rest_len, recv_count;
/* Piggyback candidate should have basic datatype for target datatype. */
MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));
@@ -1215,10 +1207,12 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
}
/* All data fits in packet header */
+ /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
+ that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
- get_accum_pkt->datatype, 0, get_accum_pkt->op);
+ get_accum_pkt->datatype, 0/* stream offset */, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
@@ -1246,14 +1240,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
- MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, type_extent);
-
- total_len = type_size * get_accum_pkt->count;
- rest_len = total_len - get_accum_pkt->info.metadata.stream_offset;
- recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
- MPIU_Assert(recv_count > 0);
-
- sreq->dev.user_buf = (void *) MPIU_Malloc(recv_count * type_size);
+ sreq->dev.user_buf = (void *) MPIU_Malloc(get_accum_pkt->count * type_size);
MPID_Datatype_is_contig(get_accum_pkt->datatype, &is_contig);
@@ -1261,15 +1248,16 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
+ /* NOTE: here we copy data from stream_offset = 0, because
+ the unit that is piggybacked with LOCK flag must be the
+ first stream unit. */
if (is_contig) {
- MPIU_Memcpy(sreq->dev.user_buf,
- (void *) ((char *) get_accum_pkt->addr +
- get_accum_pkt->info.metadata.stream_offset), recv_count * type_size);
+ MPIU_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr, get_accum_pkt->count * type_size);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
- MPI_Aint first = get_accum_pkt->info.metadata.stream_offset;
- MPI_Aint last = first + type_size * recv_count;
+ MPI_Aint first = 0;
+ MPI_Aint last = first + type_size * get_accum_pkt->count;
if (seg == NULL) {
if (win_ptr->shm_allocated == TRUE)
@@ -1283,9 +1271,11 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_Segment_free(seg);
}
- mpi_errno = do_accumulate_op(lock_entry->data, recv_count, get_accum_pkt->datatype,
+ /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
+ that is piggybacked with LOCK flag must be the first stream unit */
+ mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->count, get_accum_pkt->datatype,
get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
- get_accum_pkt->info.metadata.stream_offset, get_accum_pkt->op);
+ 0/* stream offset */, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
@@ -1311,7 +1301,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) sreq->dev.user_buf);
- iov[1].MPID_IOV_LEN = recv_count * type_size;
+ iov[1].MPID_IOV_LEN = get_accum_pkt->count * type_size;
iovcnt = 2;
mpi_errno = MPIDI_CH3_iSendv(lock_entry->vc, sreq, iov, iovcnt);
diff --git a/src/mpid/ch3/src/ch3u_rma_ops.c b/src/mpid/ch3/src/ch3u_rma_ops.c
index 9611716..136c260 100644
--- a/src/mpid/ch3/src/ch3u_rma_ops.c
+++ b/src/mpid/ch3/src/ch3u_rma_ops.c
@@ -198,7 +198,7 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
put_pkt->count = target_count;
put_pkt->datatype = target_datatype;
- put_pkt->info.metadata.dataloop_size = 0;
+ put_pkt->info.dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
@@ -387,7 +387,7 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
- get_pkt->info.metadata.dataloop_size = 0;
+ get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt)
@@ -612,11 +612,10 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
accum_pkt->count = target_count;
accum_pkt->datatype = target_datatype;
- accum_pkt->info.metadata.dataloop_size = 0;
+ accum_pkt->info.dataloop_size = 0;
accum_pkt->op = op;
accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
accum_pkt->source_win_handle = win_ptr->handle;
- accum_pkt->info.metadata.stream_offset = 0;
accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (accum_pkt->info.data);
@@ -625,6 +624,12 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
MPIU_ERR_POP(mpi_errno);
}
+ /* NOTE: here we backup the original starting address for the entire operation
+ on target window in 'original_target_addr', because when actually issuing
+ this operation, we may stream this operation and overwrite 'addr' with the
+ starting address for the streaming unit. */
+ new_ptr->original_target_addr = accum_pkt->addr;
+
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
@@ -810,7 +815,7 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
- get_pkt->info.metadata.dataloop_size = 0;
+ get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
@@ -931,10 +936,9 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_accum_pkt->count = target_count;
get_accum_pkt->datatype = target_datatype;
- get_accum_pkt->info.metadata.dataloop_size = 0;
+ get_accum_pkt->info.dataloop_size = 0;
get_accum_pkt->op = op;
get_accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
- get_accum_pkt->info.metadata.stream_offset = 0;
get_accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (get_accum_pkt->info.data);
@@ -942,6 +946,12 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
+
+ /* NOTE: here we backup the original starting address for the entire operation
+ on target window in 'original_target_addr', because when actually issuing
+ this operation, we may stream this operation and overwrite 'addr' with the
+ starting address for the streaming unit. */
+ new_ptr->original_target_addr = get_accum_pkt->addr;
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
@@ -1346,7 +1356,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = 1;
get_pkt->datatype = datatype;
- get_pkt->info.metadata.dataloop_size = 0;
+ get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index adfe90b..50800ee 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -293,24 +293,24 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(put_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- put_pkt->info.metadata.dataloop_size);
+ put_pkt->info.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.metadata.dataloop_size) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- put_pkt->info.metadata.dataloop_size);
+ put_pkt->info.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- put_pkt->info.metadata.dataloop_size;
+ put_pkt->info.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
@@ -325,7 +325,7 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.metadata.dataloop_size;
+ req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
@@ -519,24 +519,24 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(get_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- get_pkt->info.metadata.dataloop_size);
+ get_pkt->info.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.metadata.dataloop_size) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- get_pkt->info.metadata.dataloop_size);
+ get_pkt->info.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- get_pkt->info.metadata.dataloop_size;
+ get_pkt->info.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
@@ -549,7 +549,7 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.metadata.dataloop_size;
+ req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
@@ -574,7 +574,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
MPID_Request *req = NULL;
- MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
@@ -582,7 +581,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
- MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
@@ -640,7 +638,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
- req->dev.stream_offset = accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
@@ -653,8 +650,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
req->dev.datatype = accum_pkt->datatype;
- MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
-
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
@@ -673,11 +668,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
- total_len = type_size * accum_pkt->count;
- rest_len = total_len - req->dev.stream_offset;
- stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
-
- req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
+ req->dev.recv_data_sz = type_size * accum_pkt->count;
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
@@ -709,21 +700,25 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(accum_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- accum_pkt->info.metadata.dataloop_size);
+ accum_pkt->info.dataloop_size);
}
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.metadata.dataloop_size) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size +
+ sizeof(req->dev.stream_offset)) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- accum_pkt->info.metadata.dataloop_size);
+ accum_pkt->info.dataloop_size);
+ MPIU_Memcpy(&(req->dev.stream_offset),
+ data_buf + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size,
+ sizeof(req->dev.stream_offset));
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- accum_pkt->info.metadata.dataloop_size;
+ accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
@@ -738,8 +733,10 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.metadata.dataloop_size;
- req->dev.iov_count = 2;
+ req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
+ req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
+ req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
+ req->dev.iov_count = 3;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
@@ -769,14 +766,12 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &pkt->get_accum;
MPID_Request *req = NULL;
- MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
MPID_Win *win_ptr;
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
- MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
@@ -892,7 +887,6 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.real_user_buf = get_accum_pkt->addr;
req->dev.target_win_handle = get_accum_pkt->target_win_handle;
req->dev.flags = get_accum_pkt->flags;
- req->dev.stream_offset = get_accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = get_accum_pkt->request_handle;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
@@ -907,8 +901,6 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
req->dev.datatype = get_accum_pkt->datatype;
- MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
-
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
@@ -926,11 +918,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.user_buf = req->dev.tmpbuf;
MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
- total_len = type_size * get_accum_pkt->count;
- rest_len = total_len - req->dev.stream_offset;
- stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
-
- req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
+ req->dev.recv_data_sz = type_size * get_accum_pkt->count;
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
@@ -962,22 +950,26 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- get_accum_pkt->info.metadata.dataloop_size);
+ get_accum_pkt->info.dataloop_size);
}
if (data_len >=
- sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.metadata.dataloop_size) {
+ sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.dataloop_size +
+ sizeof(req->dev.stream_offset)) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- get_accum_pkt->info.metadata.dataloop_size);
+ get_accum_pkt->info.dataloop_size);
+ MPIU_Memcpy(&(req->dev.stream_offset),
+ data_buf + sizeof(MPIDI_RMA_dtype_info) +
+ get_accum_pkt->info.dataloop_size, sizeof(req->dev.stream_offset));
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- get_accum_pkt->info.metadata.dataloop_size;
+ get_accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(vc, req, &complete);
@@ -992,8 +984,10 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.metadata.dataloop_size;
- req->dev.iov_count = 2;
+ req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.dataloop_size;
+ req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
+ req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
+ req->dev.iov_count = 3;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
@@ -2005,7 +1999,7 @@ int MPIDI_CH3_PktPrint_Put(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->put.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->put.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->put.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.metadata.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.dataloop_size));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->put.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->put.source_win_handle));
/*MPIU_DBG_PRINTF((" win_ptr ...... 0x%08X\n", pkt->put.win_ptr)); */
@@ -2018,7 +2012,7 @@ int MPIDI_CH3_PktPrint_Get(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->get.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->get.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->get.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.metadata.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.dataloop_size));
MPIU_DBG_PRINTF((" request ...... 0x%08X\n", pkt->get.request_handle));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->get.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->get.source_win_handle));
@@ -2043,7 +2037,7 @@ int MPIDI_CH3_PktPrint_Accumulate(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->accum.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->accum.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->accum.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.metadata.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.dataloop_size));
MPIU_DBG_PRINTF((" op ........... 0x%08X\n", pkt->accum.op));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->accum.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->accum.source_win_handle));
-----------------------------------------------------------------------
Summary of changes:
src/mpid/ch3/include/mpid_rma_issue.h | 26 ++++++--
src/mpid/ch3/include/mpid_rma_types.h | 3 +
src/mpid/ch3/include/mpidpkt.h | 48 +++------------
src/mpid/ch3/include/mpidrma.h | 17 +-----
src/mpid/ch3/src/ch3u_handle_recv_req.c | 82 ++++++++++++++-----------
src/mpid/ch3/src/ch3u_rma_ops.c | 26 ++++++---
src/mpid/ch3/src/ch3u_rma_pkthandler.c | 98 +++++++++++++++----------------
7 files changed, 146 insertions(+), 154 deletions(-)
hooks/post-receive
--
MPICH primary repository
More information about the commits
mailing list