[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.1.3-120-gdb61d8d
Service Account
noreply at mpich.org
Tue Nov 4 17:53:22 CST 2014
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".
The branch, master has been updated
via db61d8d89ed21803983b557a46e48b0340404902 (commit)
via ea444c344b47eac1d897206c80c9a047eb73f51a (commit)
via 3e005f037fc949467cbb5c94c5656280a62bdc27 (commit)
from 73859c0ebd49da4d4522fa239afe2d3bfa235f8e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/db61d8d89ed21803983b557a46e48b0340404902
commit db61d8d89ed21803983b557a46e48b0340404902
Author: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Date: Sat Nov 1 23:19:10 2014 -0500
Add req RMA op tests checking local completion.
Rput/accumulate + wait guarantees local completion, which means we can
modify local buffer after wait is finished. These two tests check the local
completion of Rput and Raccumulate by modifying local buffer after wait
and then checking remote data. We expect the remote data should be equal
to the local data before modifying.
Signed-off-by: Xin Zhao <xinzhao3 at illinois.edu>
diff --git a/test/mpi/rma/Makefile.am b/test/mpi/rma/Makefile.am
index 13529c3..e370fbf 100644
--- a/test/mpi/rma/Makefile.am
+++ b/test/mpi/rma/Makefile.am
@@ -137,6 +137,8 @@ noinst_PROGRAMS = \
acc-loc \
fence_shm \
get-struct \
+ rput_local_comp \
+ racc_local_comp \
at_complete
strided_acc_indexed_LDADD = $(LDADD) -lm
diff --git a/test/mpi/rma/racc_local_comp.c b/test/mpi/rma/racc_local_comp.c
new file mode 100644
index 0000000..ea9e57c
--- /dev/null
+++ b/test/mpi/rma/racc_local_comp.c
@@ -0,0 +1,132 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ *
+ * (C) 2014 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+#include <mpi.h>
+#include <stdio.h>
+#include <assert.h>
+#include "mpitest.h"
+
+#define ITER 100
+#define MAX_SIZE 65536
+
+int main(int argc, char *argv[])
+{
+ int rank, nproc, i;
+ int errors = 0, all_errors = 0;
+ int *buf = NULL, *winbuf = NULL;
+ MPI_Win window;
+
+ MPI_Init(&argc, &argv);
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &nproc);
+
+ if (nproc < 2) {
+ if (rank == 0)
+ printf("Error: must be run with two or more processes\n");
+ MPI_Abort(MPI_COMM_WORLD, 1);
+ }
+
+ MPI_Alloc_mem(MAX_SIZE * sizeof(int), MPI_INFO_NULL, &buf);
+ MPI_Alloc_mem(MAX_SIZE * sizeof(int), MPI_INFO_NULL, &winbuf);
+ MPI_Win_create(winbuf, MAX_SIZE * sizeof(int), sizeof(int), MPI_INFO_NULL,
+ MPI_COMM_WORLD, &window);
+
+ MPI_Win_lock_all(0, window);
+
+ /* Test Raccumulate local completion with small data.
+ * Small data is always copied to header packet as immediate data. */
+ if (rank == 1) {
+ for (i = 0; i < ITER; i++) {
+ MPI_Request acc_req;
+ int val = -1;
+
+ buf[0] = rank * i;
+ MPI_Raccumulate(&buf[0], 1, MPI_INT, 0, 0, 1, MPI_INT, MPI_MAX, window, &acc_req);
+ MPI_Wait(&acc_req, MPI_STATUS_IGNORE);
+
+ /* reset local buffer to check local completion */
+ buf[0] = 0;
+ MPI_Win_flush(0, window);
+
+ MPI_Get(&val, 1, MPI_INT, 0, 0, 1, MPI_INT, window);
+ MPI_Win_flush(0, window);
+
+ if (val != rank * i) {
+ printf("%d - Got %d in small Raccumulate test, expected %d (%d * %d)\n", rank, val,
+ rank * i, rank, i);
+ errors++;
+ }
+ }
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ /* Test Raccumulate local completion with large data .
+ * Large data is not suitable for 1-copy optimization, and always sent out
+ * from user buffer. */
+ if (rank == 1) {
+ for (i = 0; i < ITER; i++) {
+ MPI_Request acc_req;
+ int val0 = -1, val1 = -1, val2 = -1;
+ int j;
+
+ /* initialize data */
+ for (j = 0; j < MAX_SIZE; j++) {
+ buf[j] = rank + j + i;
+ }
+
+ MPI_Raccumulate(buf, MAX_SIZE, MPI_INT, 0, 0, MAX_SIZE, MPI_INT, MPI_REPLACE, window,
+ &acc_req);
+ MPI_Wait(&acc_req, MPI_STATUS_IGNORE);
+
+ /* reset local buffer to check local completion */
+ buf[0] = 0;
+ buf[MAX_SIZE - 1] = 0;
+ buf[MAX_SIZE / 2] = 0;
+ MPI_Win_flush(0, window);
+
+ /* get remote values which are modified in local buffer after wait */
+ MPI_Get(&val0, 1, MPI_INT, 0, 0, 1, MPI_INT, window);
+ MPI_Get(&val1, 1, MPI_INT, 0, MAX_SIZE - 1, 1, MPI_INT, window);
+ MPI_Get(&val2, 1, MPI_INT, 0, MAX_SIZE / 2, 1, MPI_INT, window);
+ MPI_Win_flush(0, window);
+
+ if (val0 != rank + i) {
+ printf("%d - Got %d in large Raccumulate test, expected %d\n", rank,
+ val0, rank + i);
+ errors++;
+ }
+ if (val1 != rank + MAX_SIZE - 1 + i) {
+ printf("%d - Got %d in large Raccumulate test, expected %d\n", rank,
+ val1, rank + MAX_SIZE - 1 + i);
+ errors++;
+ }
+ if (val2 != rank + MAX_SIZE / 2 + i) {
+ printf("%d - Got %d in large Raccumulate test, expected %d\n", rank,
+ val2, rank + MAX_SIZE / 2 + i);
+ errors++;
+ }
+ }
+ }
+
+ MPI_Win_unlock_all(window);
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ MPI_Win_free(&window);
+ if (buf)
+ MPI_Free_mem(buf);
+ if (winbuf)
+ MPI_Free_mem(winbuf);
+
+ MPI_Reduce(&errors, &all_errors, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
+
+ if (rank == 0 && all_errors == 0)
+ printf(" No Errors\n");
+
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/mpi/rma/rput_local_comp.c b/test/mpi/rma/rput_local_comp.c
new file mode 100644
index 0000000..0d1f682
--- /dev/null
+++ b/test/mpi/rma/rput_local_comp.c
@@ -0,0 +1,129 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ *
+ * (C) 2014 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+#include <mpi.h>
+#include <stdio.h>
+#include <assert.h>
+#include "mpitest.h"
+
+#define ITER 100
+#define MAX_SIZE 65536
+
+int main(int argc, char *argv[])
+{
+ int rank, nproc, i;
+ int errors = 0, all_errors = 0;
+ int *buf = NULL, *winbuf = NULL;
+ MPI_Win window;
+
+ MPI_Init(&argc, &argv);
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &nproc);
+
+ if (nproc < 2) {
+ if (rank == 0)
+ printf("Error: must be run with two or more processes\n");
+ MPI_Abort(MPI_COMM_WORLD, 1);
+ }
+
+ MPI_Alloc_mem(MAX_SIZE * sizeof(int), MPI_INFO_NULL, &buf);
+ MPI_Alloc_mem(MAX_SIZE * sizeof(int), MPI_INFO_NULL, &winbuf);
+ MPI_Win_create(winbuf, MAX_SIZE * sizeof(int), sizeof(int), MPI_INFO_NULL,
+ MPI_COMM_WORLD, &window);
+
+ MPI_Win_lock_all(0, window);
+
+ /* Test Rput local completion with small data.
+ * Small data is always copied to header packet as immediate data. */
+ if (rank == 1) {
+ for (i = 0; i < ITER; i++) {
+ MPI_Request put_req;
+ int val = -1;
+
+ buf[0] = rank;
+ MPI_Rput(&buf[0], 1, MPI_INT, 0, 0, 1, MPI_INT, window, &put_req);
+ MPI_Wait(&put_req, MPI_STATUS_IGNORE);
+
+ /* reset local buffer to check local completion */
+ buf[0] = 0;
+ MPI_Win_flush(0, window);
+
+ MPI_Get(&val, 1, MPI_INT, 0, 0, 1, MPI_INT, window);
+ MPI_Win_flush(0, window);
+
+ if (val != rank) {
+ printf("%d - Got %d in small Rput test, expected %d\n", rank, val, rank);
+ errors++;
+ }
+ }
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ /* Test Rput local completion with large data .
+ * Large data is not suitable for 1-copy optimization, and always sent out
+ * from user buffer. */
+ if (rank == 1) {
+ for (i = 0; i < ITER; i++) {
+ MPI_Request put_req;
+ int val0 = -1, val1 = -1, val2 = -1;
+ int j;
+
+ /* initialize data */
+ for (j = 0; j < MAX_SIZE; j++) {
+ buf[j] = rank + j + i;
+ }
+
+ MPI_Rput(buf, MAX_SIZE, MPI_INT, 0, 0, MAX_SIZE, MPI_INT, window, &put_req);
+ MPI_Wait(&put_req, MPI_STATUS_IGNORE);
+
+ /* reset local buffer to check local completion */
+ buf[0] = 0;
+ buf[MAX_SIZE - 1] = 0;
+ buf[MAX_SIZE / 2] = 0;
+ MPI_Win_flush(0, window);
+
+ /* get remote values which are modified in local buffer after wait */
+ MPI_Get(&val0, 1, MPI_INT, 0, 0, 1, MPI_INT, window);
+ MPI_Get(&val1, 1, MPI_INT, 0, MAX_SIZE - 1, 1, MPI_INT, window);
+ MPI_Get(&val2, 1, MPI_INT, 0, MAX_SIZE / 2, 1, MPI_INT, window);
+ MPI_Win_flush(0, window);
+
+ if (val0 != rank + i) {
+ printf("%d - Got %d in large Rput test, expected %d\n", rank, val0, rank + i);
+ errors++;
+ }
+ if (val1 != rank + MAX_SIZE - 1 + i) {
+ printf("%d - Got %d in large Rput test, expected %d\n", rank, val1,
+ rank + MAX_SIZE - 1 + i);
+ errors++;
+ }
+ if (val2 != rank + MAX_SIZE / 2 + i) {
+ printf("%d - Got %d in large Rput test, expected %d\n", rank, val2,
+ rank + MAX_SIZE / 2 + i);
+ errors++;
+ }
+ }
+ }
+
+ MPI_Win_unlock_all(window);
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ MPI_Win_free(&window);
+ if (buf)
+ MPI_Free_mem(buf);
+ if (winbuf)
+ MPI_Free_mem(winbuf);
+
+ MPI_Reduce(&errors, &all_errors, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
+
+ if (rank == 0 && all_errors == 0)
+ printf(" No Errors\n");
+
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/mpi/rma/testlist.in b/test/mpi/rma/testlist.in
index d13d7cf..cb60752 100644
--- a/test/mpi/rma/testlist.in
+++ b/test/mpi/rma/testlist.in
@@ -102,6 +102,8 @@ flush 4 mpiversion=3.0
reqops 4 mpiversion=3.0
req_example 4 mpiversion=3.0
req_example_shm 4 mpiversion=3.0
+rput_local_comp 2 mpiversion=3.0
+racc_local_comp 2 mpiversion=3.0
win_info 4 mpiversion=3.0
linked_list_lockall 4 mpiversion=3.0
pscw_ordering 4 mpiversion=3.0
http://git.mpich.org/mpich.git/commitdiff/ea444c344b47eac1d897206c80c9a047eb73f51a
commit ea444c344b47eac1d897206c80c9a047eb73f51a
Author: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Date: Thu Oct 30 15:23:06 2014 -0500
Bug-fix: trigger OnFinal at end when receiving multiple segments.
There are two request handlers used when receiving data:
(1) OnDataAvail, which is triggered when data is arrived;
(2) OnFinal, which is triggered when receiving data is finished;
In progress engine, only OnDataAvail is triggered when a request is
completed. The upper ch3 layer should change OnDataAvail to OnFinal when
the coming receiving data will complete the request.
However, in the original implementation, when receiving multiple
segments for a large receive data, the OnDataAvail was reset to 0
at the last segment hence the final action was lost. This patch fixed
this bug.
In RMA target put/acc/gacc packet handlers, OnDataAvail was reset to
OnFinal function if OnDataAvail is 0 due to this bug. This patch also
rewrites this part so that packet handlers only sets proper OnFinal
handler at beginning and let the receiving data function change
OnDataAvail to OnFinal at the last segment.
Signed-off-by: Xin Zhao <xinzhao3 at illinois.edu>
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_pkt.c b/src/mpid/ch3/src/ch3u_handle_recv_pkt.c
index 35c4721..1a7bf4f 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_pkt.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_pkt.c
@@ -164,10 +164,8 @@ int MPIDI_CH3U_Receive_data_found(MPID_Request *rreq, char *buf, MPIDI_msg_sz_t
*complete = FALSE;
}
- /* FIXME: We want to set the OnDataAvail to the appropriate
- function, which depends on whether this is an RMA
- request or a pt-to-pt request. */
- rreq->dev.OnDataAvail = 0;
+ /* Trigger OnFinal when receiving the last segment */
+ rreq->dev.OnDataAvail = rreq->dev.OnFinal;
}
else {
/* user buffer is not contiguous or is too small to hold
@@ -206,7 +204,8 @@ int MPIDI_CH3U_Receive_data_found(MPID_Request *rreq, char *buf, MPIDI_msg_sz_t
}
/* --END ERROR HANDLING-- */
*buflen = data_sz;
- rreq->dev.OnDataAvail = 0;
+ /* Trigger OnFinal when receiving the last segment */
+ rreq->dev.OnDataAvail = rreq->dev.OnFinal;
*complete = TRUE;
}
else
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index 067ca0d..6a0bbab 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -225,6 +225,7 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = put_pkt->target_win_handle;
req->dev.source_win_handle = put_pkt->source_win_handle;
req->dev.flags = put_pkt->flags;
+ req->dev.OnFinal = MPIDI_CH3_ReqHandler_PutRecvComplete;
if (MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RESP);
@@ -256,12 +257,6 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
- /* FIXME: Only change the handling of completion if
- * post_data_receive reset the handler. There should
- * be a cleaner way to do this */
- if (!req->dev.OnDataAvail) {
- req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutRecvComplete;
- }
/* return the number of bytes processed in this function */
*buflen = sizeof(MPIDI_CH3_Pkt_t) + data_len;
@@ -280,7 +275,6 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
/* derived datatype */
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RESP_DERIVED_DT);
req->dev.datatype = MPI_DATATYPE_NULL;
- req->dev.OnFinal = MPIDI_CH3_ReqHandler_PutRecvComplete;
req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
@@ -554,6 +548,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.flags = accum_pkt->flags;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
+ req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP);
@@ -598,12 +593,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
- /* FIXME: Only change the handling of completion if
- * post_data_receive reset the handler. There should
- * be a cleaner way to do this */
- if (!req->dev.OnDataAvail) {
- req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumRecvComplete;
- }
+
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
@@ -621,7 +611,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP_DERIVED_DT);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
- req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
@@ -732,6 +721,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.flags = get_accum_pkt->flags;
req->dev.resp_request_handle = get_accum_pkt->request_handle;
+ req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
@@ -778,12 +768,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
- /* FIXME: Only change the handling of completion if
- * post_data_receive reset the handler. There should
- * be a cleaner way to do this */
- if (!req->dev.OnDataAvail) {
- req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
- }
+
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
@@ -801,7 +786,6 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP_DERIVED_DT);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
- req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
http://git.mpich.org/mpich.git/commitdiff/3e005f037fc949467cbb5c94c5656280a62bdc27
commit 3e005f037fc949467cbb5c94c5656280a62bdc27
Author: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Date: Mon Nov 3 20:51:25 2014 -0600
Implement true request-based RMA operations.
There are two requests associated with each request-based
operation: one normal internal request (req) and one newly
added user request (ureq). We return ureq to user when
request-based op call returns.
The ureq is initialized with completion counter (CC) to 1
and ref count to 2 (one is referenced by CH3 and another
is referenced by user). If the corresponding op can be
finished immediately in CH3, the runtime will complete ureq
in CH3, and let user's MPI_Wait/Test to destroy ureq. If
corresponding op cannot be finished immediately, we will
first increment ref count to 3 (because now there are
three places needed to reference ureq: user, CH3,
progress engine). Progress engine will complete ureq when
op is completed, then CH3 will release its reference during
garbage collection, finally user's MPI_Wait/Test will
destroy ureq.
The ureq can be completed in following three ways:
1. If op is issued and completed immediately in CH3
(req is NULL), we just complete ureq before free op.
2. If op is issued but not completed, we remember the ureq
handler in req and specify OnDataAvail / OnFinal handlers
in req to a newly added request handler, which will complete
user reqeust. The handler is triggered at three places:
2-a. when progress engine completes a put/acc req;
2-b. when get/getacc handler completes a get/getacc req;
2-c. when progress engine completes a get/getacc req;
3. If op is not issued (i.e., wait for lock granted), the 2nd
way will be eventually performed when such op is issued by
progress engine.
Signed-off-by: Xin Zhao <xinzhao3 at illinois.edu>
diff --git a/src/include/mpiimpl.h b/src/include/mpiimpl.h
index 7402b0c..99918ad 100644
--- a/src/include/mpiimpl.h
+++ b/src/include/mpiimpl.h
@@ -1613,12 +1613,12 @@ struct MPID_Win;
typedef struct MPID_RMA_Ops {
int (*Win_free)(struct MPID_Win **);
- int (*Put)(const void *, int, MPI_Datatype, int, MPI_Aint, int, MPI_Datatype,
- struct MPID_Win *);
- int (*Get)(void *, int, MPI_Datatype, int, MPI_Aint, int, MPI_Datatype,
- struct MPID_Win *);
- int (*Accumulate)(const void *, int, MPI_Datatype, int, MPI_Aint, int,
- MPI_Datatype, MPI_Op, struct MPID_Win *);
+ int (*Put) (const void *, int, MPI_Datatype, int, MPI_Aint, int,
+ MPI_Datatype, struct MPID_Win *, MPID_Request *);
+ int (*Get) (void *, int, MPI_Datatype, int, MPI_Aint, int, MPI_Datatype,
+ struct MPID_Win *, MPID_Request *);
+ int (*Accumulate) (const void *, int, MPI_Datatype, int, MPI_Aint, int,
+ MPI_Datatype, MPI_Op, struct MPID_Win *, MPID_Request *);
int (*Win_fence)(int, struct MPID_Win *);
int (*Win_post)(MPID_Group *, int, struct MPID_Win *);
@@ -1649,7 +1649,7 @@ typedef struct MPID_RMA_Ops {
int (*Get_accumulate)(const void *, int , MPI_Datatype, void *, int,
MPI_Datatype, int, MPI_Aint, int, MPI_Datatype, MPI_Op,
- struct MPID_Win *);
+ struct MPID_Win *, MPID_Request *);
int (*Fetch_and_op)(const void *, void *, MPI_Datatype, int, MPI_Aint, MPI_Op,
struct MPID_Win *);
int (*Compare_and_swap)(const void *, const void *, void *, MPI_Datatype, int,
diff --git a/src/mpi/rma/accumulate.c b/src/mpi/rma/accumulate.c
index 93ff2f9..f79e945 100644
--- a/src/mpi/rma/accumulate.c
+++ b/src/mpi/rma/accumulate.c
@@ -148,7 +148,7 @@ int MPI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
mpi_errno = MPIU_RMA_CALL(win_ptr,Accumulate(origin_addr, origin_count,
origin_datatype,
target_rank, target_disp, target_count,
- target_datatype, op, win_ptr));
+ target_datatype, op, win_ptr, NULL));
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
/* ... end of body of routine ... */
diff --git a/src/mpi/rma/get.c b/src/mpi/rma/get.c
index e8ba531..51c7226 100644
--- a/src/mpi/rma/get.c
+++ b/src/mpi/rma/get.c
@@ -142,7 +142,7 @@ int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype
mpi_errno = MPIU_RMA_CALL(win_ptr,
Get(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count,
- target_datatype, win_ptr));
+ target_datatype, win_ptr, NULL));
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
/* ... end of body of routine ... */
diff --git a/src/mpi/rma/get_accumulate.c b/src/mpi/rma/get_accumulate.c
index 8f58b20..2898d8d 100644
--- a/src/mpi/rma/get_accumulate.c
+++ b/src/mpi/rma/get_accumulate.c
@@ -198,7 +198,7 @@ int MPI_Get_accumulate(const void *origin_addr, int origin_count,
result_addr, result_count,
result_datatype,
target_rank, target_disp, target_count,
- target_datatype, op, win_ptr));
+ target_datatype, op, win_ptr, NULL));
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
/* ... end of body of routine ... */
diff --git a/src/mpi/rma/put.c b/src/mpi/rma/put.c
index 1d4dc75..a8cbb2f 100644
--- a/src/mpi/rma/put.c
+++ b/src/mpi/rma/put.c
@@ -142,7 +142,7 @@ int MPI_Put(const void *origin_addr, int origin_count, MPI_Datatype
mpi_errno = MPIU_RMA_CALL(win_ptr,
Put(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count,
- target_datatype, win_ptr));
+ target_datatype, win_ptr, NULL));
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
/* ... end of body of routine ... */
diff --git a/src/mpid/ch3/include/mpid_rma_oplist.h b/src/mpid/ch3/include/mpid_rma_oplist.h
index d73273f..45dec03 100644
--- a/src/mpid/ch3/include/mpid_rma_oplist.h
+++ b/src/mpid/ch3/include/mpid_rma_oplist.h
@@ -378,6 +378,15 @@ static inline int MPIDI_CH3I_RMA_Cleanup_ops_target(MPID_Win * win_ptr, MPIDI_RM
/* No errors, free the request */
MPID_Request_release(curr_op->request);
+ /* Release user request */
+ if (curr_op->ureq) {
+ /* User request must be completed by progress engine */
+ MPIU_Assert(MPID_Request_is_complete(curr_op->ureq));
+
+ /* Release the ch3 ref */
+ MPID_Request_release(curr_op->ureq);
+ }
+
/* dequeue the operation and free it */
MPL_LL_DELETE(*op_list, *op_list_tail, curr_op);
MPIDI_CH3I_Win_op_free(win_ptr, curr_op);
diff --git a/src/mpid/ch3/include/mpid_rma_types.h b/src/mpid/ch3/include/mpid_rma_types.h
index 61500d7..1687436 100644
--- a/src/mpid/ch3/include/mpid_rma_types.h
+++ b/src/mpid/ch3/include/mpid_rma_types.h
@@ -82,6 +82,8 @@ typedef struct MPIDI_RMA_Op {
MPIDI_RMA_Pool_type_t pool_type;
int is_dt;
int piggyback_lock_candidate;
+
+ MPID_Request *ureq;
} MPIDI_RMA_Op_t;
typedef struct MPIDI_RMA_Target {
diff --git a/src/mpid/ch3/include/mpidimpl.h b/src/mpid/ch3/include/mpidimpl.h
index ff3cb17..c1b229a 100644
--- a/src/mpid/ch3/include/mpidimpl.h
+++ b/src/mpid/ch3/include/mpidimpl.h
@@ -1154,11 +1154,11 @@ int MPIDI_Win_create(void *, MPI_Aint, int, MPID_Info *, MPID_Comm *,
int MPIDI_Win_free(MPID_Win **);
int MPIDI_Put(const void *, int, MPI_Datatype, int, MPI_Aint, int,
- MPI_Datatype, MPID_Win *);
+ MPI_Datatype, MPID_Win *, MPID_Request * ureq);
int MPIDI_Get(void *, int, MPI_Datatype, int, MPI_Aint, int,
- MPI_Datatype, MPID_Win *);
+ MPI_Datatype, MPID_Win *, MPID_Request * ureq);
int MPIDI_Accumulate(const void *, int, MPI_Datatype, int, MPI_Aint, int,
- MPI_Datatype, MPI_Op, MPID_Win *);
+ MPI_Datatype, MPI_Op, MPID_Win *, MPID_Request * ureq);
int MPIDI_Win_fence(int, MPID_Win *);
int MPIDI_Win_post(MPID_Group *group_ptr, int assert, MPID_Win *win_ptr);
@@ -1187,7 +1187,8 @@ int MPIDI_Win_get_info(MPID_Win *win, MPID_Info **info_used);
int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPI_Datatype origin_datatype, void *result_addr, int result_count,
MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
- int target_count, MPI_Datatype target_datatype, MPI_Op op, MPID_Win *win);
+ int target_count, MPI_Datatype target_datatype, MPI_Op op, MPID_Win *win,
+ MPID_Request * ureq);
int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
MPI_Datatype datatype, int target_rank, MPI_Aint target_disp,
MPI_Op op, MPID_Win *win);
@@ -1904,6 +1905,10 @@ int MPIDI_CH3_ReqHandler_GetSendComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_GaccumLikeSendComplete( MPIDI_VC_t *, MPID_Request *,
int * );
+/* Request-based operation handler */
+int MPIDI_CH3_ReqHandler_ReqOpsComplete(MPIDI_VC_t *, MPID_Request *,
+ int *);
+
/* Thread Support */
#ifdef MPICH_IS_THREADED
#if MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL
diff --git a/src/mpid/ch3/include/mpidpre.h b/src/mpid/ch3/include/mpidpre.h
index b193256..4ae4f9b 100644
--- a/src/mpid/ch3/include/mpidpre.h
+++ b/src/mpid/ch3/include/mpidpre.h
@@ -446,7 +446,8 @@ typedef struct MPIDI_Request {
/* For derived datatypes at target */
struct MPIDI_RMA_dtype_info *dtype_info;
void *dataloop;
- /* req. handle needed to implement derived datatype gets */
+ /* req. handle needed to implement derived datatype gets.
+ * It also used for remembering user request of request-based RMA operations. */
MPI_Request request_handle;
MPI_Win target_win_handle;
MPI_Win source_win_handle;
diff --git a/src/mpid/ch3/src/Makefile.mk b/src/mpid/ch3/src/Makefile.mk
index c3b528e..c121985 100644
--- a/src/mpid/ch3/src/Makefile.mk
+++ b/src/mpid/ch3/src/Makefile.mk
@@ -14,6 +14,7 @@ mpi_core_sources += \
src/mpid/ch3/src/ch3u_handle_recv_req.c \
src/mpid/ch3/src/ch3u_handle_revoke_pkt.c \
src/mpid/ch3/src/ch3u_handle_send_req.c \
+ src/mpid/ch3/src/ch3u_handle_op_req.c \
src/mpid/ch3/src/ch3u_port.c \
src/mpid/ch3/src/ch3u_recvq.c \
src/mpid/ch3/src/ch3u_request.c \
diff --git a/src/mpid/ch3/src/ch3u_handle_op_req.c b/src/mpid/ch3/src/ch3u_handle_op_req.c
new file mode 100644
index 0000000..1109b6f
--- /dev/null
+++ b/src/mpid/ch3/src/ch3u_handle_op_req.c
@@ -0,0 +1,43 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ * (C) 2014 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "mpidimpl.h"
+#include "mpidrma.h"
+
+/* Note the the following function is called when request-based RMA operation
+ is completed at origin side. Here we complete the user request associated
+ with this request-based operation. */
+#undef FUNCNAME
+#define FUNCNAME MPIDI_CH3_ReqHandler_ReqOpsComplete
+#undef FCNAME
+#define FCNAME MPIDI_QUOTE(FUNCNAME)
+int MPIDI_CH3_ReqHandler_ReqOpsComplete(MPIDI_VC_t * vc, MPID_Request * sreq, int *complete)
+{
+ int mpi_errno = MPI_SUCCESS;
+ MPID_Request *ureq = NULL;
+
+ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_REQOPSCOMPLETE);
+ MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_REQOPSCOMPLETE);
+
+ MPID_Request_get_ptr(sreq->dev.request_handle, ureq);
+ MPIU_Assert(ureq != NULL);
+
+ /* Complete user request and release ref of completion handler.
+ * Note that ch3 ref is released by later clean_up call. */
+ MPID_Request_set_completed(ureq);
+ MPID_Request_release(ureq);
+
+ MPIDI_CH3U_Request_complete(sreq);
+ *complete = TRUE;
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_REQOPSCOMPLETE);
+ return mpi_errno;
+
+ fn_fail:
+ goto fn_exit;
+}
+
diff --git a/src/mpid/ch3/src/ch3u_rma_oplist.c b/src/mpid/ch3/src/ch3u_rma_oplist.c
index 42f37f1..10bc147 100644
--- a/src/mpid/ch3/src/ch3u_rma_oplist.c
+++ b/src/mpid/ch3/src/ch3u_rma_oplist.c
@@ -264,7 +264,10 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
goto fn_exit;
if (curr_op->next == NULL &&
- target->sync.sync_flag == MPIDI_RMA_SYNC_NONE) {
+ target->sync.sync_flag == MPIDI_RMA_SYNC_NONE &&
+ /* always issue if it is a request-based RMA,
+ * otherwise a wait call before unlock will be blocked.*/
+ curr_op->ureq == NULL) {
/* skip last OP. */
goto finish_issue;
}
@@ -300,6 +303,12 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
MPIU_ERR_POP(mpi_errno);
if (!curr_op->request) {
+ if (curr_op->ureq) {
+ /* Complete user request and release the ch3 ref */
+ MPID_Request_set_completed(curr_op->ureq);
+ MPID_Request_release(curr_op->ureq);
+ }
+
/* Sending is completed immediately. */
MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list),
&(target->pending_op_list_tail), curr_op);
@@ -321,6 +330,35 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
MPIDI_CH3I_RMA_Ops_append(&(target->read_op_list),
&(target->read_op_list_tail), curr_op);
}
+
+ /* Setup user request info in order to be completed following send request.*/
+ if (curr_op->ureq) {
+ /* Increase ref for completion handler */
+ MPIU_Object_add_ref(curr_op->ureq);
+ curr_op->request->dev.request_handle = curr_op->ureq->handle;
+
+ /* Setup user request completion handler.
+ *
+ * The handler is triggered when send request is completed at
+ * following places:
+ * - progress engine: complete PUT/ACC req.
+ * - GET/GET_ACC packet handler: complete GET/GET_ACC reqs.
+ *
+ * We always set OnFinal which should be called when sending or
+ * receiving the last segment. However, short put/acc ops are
+ * issued in one packet and the lower layer only check OnDataAvail
+ * so we have to set OnDataAvail as well.
+ *
+ * Note that a noncontig send also uses OnDataAvail to loop all
+ * segments but it must be changed to OnFinal when sending the
+ * last segment, so it is also correct for us.
+ *
+ * TODO: implement stack for overriding functions*/
+ if (curr_op->request->dev.OnDataAvail == NULL) {
+ curr_op->request->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
+ }
+ curr_op->request->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
+ }
win_ptr->active_req_cnt++;
}
diff --git a/src/mpid/ch3/src/ch3u_rma_ops.c b/src/mpid/ch3/src/ch3u_rma_ops.c
index eb832cf..b15d668 100644
--- a/src/mpid/ch3/src/ch3u_rma_ops.c
+++ b/src/mpid/ch3/src/ch3u_rma_ops.c
@@ -41,7 +41,8 @@ cvars:
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
origin_datatype, int target_rank, MPI_Aint target_disp,
- int target_count, MPI_Datatype target_datatype, MPID_Win * win_ptr)
+ int target_count, MPI_Datatype target_datatype, MPID_Win * win_ptr,
+ MPID_Request * ureq)
{
int mpi_errno = MPI_SUCCESS;
int dt_contig ATTRIBUTE((unused)), rank;
@@ -91,6 +92,12 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
target_disp, target_count, target_datatype, win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
+
+ if (ureq) {
+ /* Complete user request and release the ch3 ref */
+ MPID_Request_set_completed(ureq);
+ MPID_Request_release(ureq);
+ }
}
else {
MPIDI_RMA_Op_t *new_ptr = NULL;
@@ -119,6 +126,12 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
new_ptr->origin_count = origin_count;
new_ptr->origin_datatype = origin_datatype;
new_ptr->target_rank = target_rank;
+ new_ptr->ureq = NULL; /* reset user request */
+
+ /* Remember user request */
+ if (ureq) {
+ new_ptr->ureq = ureq;
+ }
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
@@ -214,7 +227,8 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
origin_datatype, int target_rank, MPI_Aint target_disp,
- int target_count, MPI_Datatype target_datatype, MPID_Win * win_ptr)
+ int target_count, MPI_Datatype target_datatype, MPID_Win * win_ptr,
+ MPID_Request * ureq)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t data_sz;
@@ -264,6 +278,12 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
target_disp, target_count, target_datatype, win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
+
+ if (ureq) {
+ /* Complete user request and release the ch3 ref */
+ MPID_Request_set_completed(ureq);
+ MPID_Request_release(ureq);
+ }
}
else {
MPIDI_RMA_Op_t *new_ptr = NULL;
@@ -291,6 +311,12 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
new_ptr->origin_count = origin_count;
new_ptr->origin_datatype = origin_datatype;
new_ptr->target_rank = target_rank;
+ new_ptr->ureq = NULL; /* reset user request */
+
+ /* Remember user request */
+ if (ureq) {
+ new_ptr->ureq = ureq;
+ }
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
@@ -363,7 +389,8 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
origin_datatype, int target_rank, MPI_Aint target_disp,
- int target_count, MPI_Datatype target_datatype, MPI_Op op, MPID_Win * win_ptr)
+ int target_count, MPI_Datatype target_datatype, MPI_Op op,
+ MPID_Win * win_ptr, MPID_Request * ureq)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t data_sz;
@@ -414,6 +441,12 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
op, win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
+
+ if (ureq) {
+ /* Complete user request and release the ch3 ref */
+ MPID_Request_set_completed(ureq);
+ MPID_Request_release(ureq);
+ }
}
else {
MPIDI_RMA_Op_t *new_ptr = NULL;
@@ -443,6 +476,12 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
new_ptr->origin_count = origin_count;
new_ptr->origin_datatype = origin_datatype;
new_ptr->target_rank = target_rank;
+ new_ptr->ureq = NULL; /* reset user request */
+
+ /* Remember user request */
+ if (ureq) {
+ new_ptr->ureq = ureq;
+ }
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
@@ -539,7 +578,7 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPI_Datatype origin_datatype, void *result_addr, int result_count,
MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
int target_count, MPI_Datatype target_datatype, MPI_Op op,
- MPID_Win * win_ptr)
+ MPID_Win * win_ptr, MPID_Request * ureq)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t data_sz;
@@ -592,6 +631,12 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
target_datatype, op, win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
+
+ if (ureq) {
+ /* Complete user request and release the ch3 ref */
+ MPID_Request_set_completed(ureq);
+ MPID_Request_release(ureq);
+ }
}
else {
MPIDI_RMA_Op_t *new_ptr = NULL;
@@ -706,6 +751,13 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
}
}
+ new_ptr->ureq = NULL; /* reset user request */
+
+ /* Remember user request */
+ if (ureq) {
+ new_ptr->ureq = ureq;
+ }
+
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
@@ -831,6 +883,7 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
new_ptr->compare_datatype = datatype;
new_ptr->target_rank = target_rank;
new_ptr->piggyback_lock_candidate = 1; /* CAS is always able to piggyback LOCK */
+ new_ptr->ureq = NULL; /* reset user request */
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
@@ -994,6 +1047,8 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
}
}
+ new_ptr->ureq = NULL; /* reset user request */
+
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index 31caba3..067ca0d 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -1253,7 +1253,15 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_GET_ACCUM_RESP");
if (complete) {
- MPIDI_CH3U_Request_complete(req);
+ /* Request-based RMA defines final actions for completing user request. */
+ int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
+ reqFn = req->dev.OnFinal;
+
+ if (reqFn) {
+ mpi_errno = reqFn(vc, req, &complete);
+ } else {
+ MPIDI_CH3U_Request_complete(req);
+ }
*rreqp = NULL;
}
/* return the number of bytes processed in this function */
@@ -1363,7 +1371,16 @@ int MPIDI_CH3_PktHandler_GetResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s",
"MPIDI_CH3_PKT_GET_RESP");
if (complete) {
- MPIDI_CH3U_Request_complete(req);
+ /* Request-based RMA defines final actions for completing user request. */
+ int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
+ reqFn = req->dev.OnFinal;
+
+ if (reqFn) {
+ mpi_errno = reqFn(vc, req, &complete);
+ } else {
+ MPIDI_CH3U_Request_complete(req);
+ }
+
*rreqp = NULL;
}
/* return the number of bytes processed in this function */
diff --git a/src/mpid/ch3/src/ch3u_rma_reqops.c b/src/mpid/ch3/src/ch3u_rma_reqops.c
index 7d6904a..803cc62 100644
--- a/src/mpid/ch3/src/ch3u_rma_reqops.c
+++ b/src/mpid/ch3/src/ch3u_rma_reqops.c
@@ -1,142 +1,13 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
- * (C) 2012 by Argonne National Laboratory.
+ * (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpidi_ch3_impl.h"
#include "mpidrma.h"
-/* Request-based RMA operations are implemented using generalized requests.
- * Below are the generalized request state and handlers for these operations.
- */
-
-typedef struct {
- MPID_Request *request;
- MPID_Win *win_ptr;
- int target_rank;
-} req_state_t;
-
-
-#undef FUNCNAME
-#define FUNCNAME req_poll
-#undef FCNAME
-#define FCNAME MPIDI_QUOTE(FUNCNAME)
-static int req_poll(void *state, MPI_Status * status)
-{
- int mpi_errno = MPI_SUCCESS;
- req_state_t *req_state = (req_state_t *) state;
-
- MPIU_UNREFERENCED_ARG(status);
-
- /* Call flush to complete the operation. Check that a passive target epoch
- * is still active first; the user could complete the request after calling
- * unlock. */
- /* FIXME: We need per-operation completion to make this more efficient. */
- if (req_state->win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
- req_state->win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
- req_state->win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED ||
- req_state->win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_GRANTED) {
- mpi_errno = req_state->win_ptr->RMAFns.Win_flush(req_state->target_rank,
- req_state->win_ptr);
- }
-
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
-
- MPIR_Grequest_complete_impl(req_state->request);
-
- fn_exit:
- return mpi_errno;
- fn_fail:
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME req_wait
-#undef FCNAME
-#define FCNAME MPIDI_QUOTE(FUNCNAME)
-static int req_wait(int count, void **states, double timeout, MPI_Status * status)
-{
- int mpi_errno = MPI_SUCCESS;
- int i;
-
- for (i = 0; i < count; i++) {
- /* Call poll to complete the operation */
- mpi_errno = req_poll(states[i], status);
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
- }
-
- fn_exit:
- return mpi_errno;
- fn_fail:
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME req_query
-#undef FCNAME
-#define FCNAME MPIDI_QUOTE(FUNCNAME)
-static int req_query(void *state, MPI_Status * status)
-{
- int mpi_errno = MPI_SUCCESS;
-
- MPIU_UNREFERENCED_ARG(state);
-
- /* All status fields, except the error code, are undefined */
- MPIR_STATUS_SET_COUNT(*status, 0);
- MPIR_STATUS_SET_CANCEL_BIT(*status, FALSE);
- status->MPI_SOURCE = MPI_UNDEFINED;
- status->MPI_TAG = MPI_UNDEFINED;
-
- fn_exit:
- status->MPI_ERROR = mpi_errno;
- return mpi_errno;
- fn_fail:
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME req_free
-#undef FCNAME
-#define FCNAME MPIDI_QUOTE(FUNCNAME)
-static int req_free(void *state)
-{
- MPIU_Free(state);
-
- return MPI_SUCCESS;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME req_cancel
-#undef FCNAME
-#define FCNAME MPIDI_QUOTE(FUNCNAME)
-static int req_cancel(void *state, int complete)
-{
- int mpi_errno = MPI_SUCCESS;
-
- MPIU_UNREFERENCED_ARG(state);
-
- /* This operation can't be cancelled */
- if (!complete) {
- MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**rmareqcancel");
- }
-
- fn_exit:
- return mpi_errno;
- fn_fail:
- goto fn_exit;
-}
-
-
#undef FUNCNAME
#define FUNCNAME MPIDI_Rput
#undef FCNAME
@@ -151,9 +22,7 @@ int MPIDI_Rput(const void *origin_addr, int origin_count,
MPID_Datatype *dtp;
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPIDI_msg_sz_t data_sz;
- req_state_t *req_state;
- MPIDI_VC_t *orig_vc, *target_vc;
- MPIU_CHKPMEM_DECL(1);
+ MPID_Request *ureq;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_RPUT);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_RPUT);
@@ -165,60 +34,38 @@ int MPIDI_Rput(const void *origin_addr, int origin_count,
win_ptr->states.access_state != MPIDI_RMA_LOCK_ALL_GRANTED,
mpi_errno, MPI_ERR_RMA_SYNC, "**rmasync");
- MPIU_CHKPMEM_MALLOC(req_state, req_state_t *,
- sizeof(req_state_t), mpi_errno, "req-based RMA state");
-
- req_state->win_ptr = win_ptr;
- req_state->target_rank = target_rank;
-
MPIDI_Datatype_get_info(origin_count, origin_datatype, dt_contig, data_sz, dtp, dt_true_lb);
+ /* Create user request, initially cc=1, ref=1 */
+ ureq = MPID_Request_create();
+ MPIU_ERR_CHKANDJUMP(ureq == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
+ ureq->kind = MPID_REQUEST_SEND;
+
/* Enqueue or perform the RMA operation */
if (target_rank != MPI_PROC_NULL && data_sz != 0) {
+
+ /* This request is referenced by user and ch3 by default. */
+ MPIU_Object_set_ref(ureq, 2);
+
mpi_errno = win_ptr->RMAFns.Put(origin_addr, origin_count,
origin_datatype, target_rank,
- target_disp, target_count, target_datatype, win_ptr);
-
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
- }
+ target_disp, target_count, target_datatype, win_ptr, ureq);
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
-
- /* If the operation is already complete, return a completed request.
- * Otherwise, generate a grequest. */
- /* FIXME: We still may need to flush or sync for shared memory windows */
- if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->comm_ptr->rank ||
- (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id) || data_sz == 0)
- {
- mpi_errno = MPIR_Grequest_start_impl(req_query,
- req_free, req_cancel, req_state, &req_state->request);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_POP(mpi_errno);
}
-
- MPIR_Grequest_complete_impl(req_state->request);
}
else {
- mpi_errno = MPIX_Grequest_start_impl(req_query,
- req_free,
- req_cancel,
- req_poll, req_wait, req_state, &req_state->request);
-
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
+ /* set cc=0 if it is not a valid operation. */
+ MPID_Request_set_completed(ureq);
}
- *request = req_state->request;
+ *request = ureq;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_RPUT);
return mpi_errno;
fn_fail:
- MPIU_CHKPMEM_REAP();
goto fn_exit;
}
@@ -237,9 +84,7 @@ int MPIDI_Rget(void *origin_addr, int origin_count,
MPID_Datatype *dtp;
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPIDI_msg_sz_t data_sz;
- req_state_t *req_state;
- MPIDI_VC_t *orig_vc, *target_vc;
- MPIU_CHKPMEM_DECL(1);
+ MPID_Request *ureq;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_RGET);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_RGET);
@@ -251,60 +96,38 @@ int MPIDI_Rget(void *origin_addr, int origin_count,
win_ptr->states.access_state != MPIDI_RMA_LOCK_ALL_GRANTED,
mpi_errno, MPI_ERR_RMA_SYNC, "**rmasync");
- MPIU_CHKPMEM_MALLOC(req_state, req_state_t *,
- sizeof(req_state_t), mpi_errno, "req-based RMA state");
-
- req_state->win_ptr = win_ptr;
- req_state->target_rank = target_rank;
-
MPIDI_Datatype_get_info(origin_count, origin_datatype, dt_contig, data_sz, dtp, dt_true_lb);
+ /* Create user request, initially cc=1, ref=1 */
+ ureq = MPID_Request_create();
+ MPIU_ERR_CHKANDJUMP(ureq == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
+ ureq->kind = MPID_REQUEST_SEND;
+
/* Enqueue or perform the RMA operation */
if (target_rank != MPI_PROC_NULL && data_sz != 0) {
+
+ /* This request is referenced by user and ch3 by default. */
+ MPIU_Object_set_ref(ureq, 2);
+
mpi_errno = win_ptr->RMAFns.Get(origin_addr, origin_count,
origin_datatype, target_rank,
- target_disp, target_count, target_datatype, win_ptr);
+ target_disp, target_count, target_datatype, win_ptr, ureq);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_POP(mpi_errno);
}
}
-
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
-
- /* If the operation is already complete, return a completed request.
- * Otherwise, generate a grequest. */
- /* FIXME: We still may need to flush or sync for shared memory windows */
- if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->comm_ptr->rank ||
- (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id) || data_sz == 0)
- {
- mpi_errno = MPIR_Grequest_start_impl(req_query,
- req_free, req_cancel, req_state, &req_state->request);
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
-
- MPIR_Grequest_complete_impl(req_state->request);
- }
else {
- mpi_errno = MPIX_Grequest_start_impl(req_query,
- req_free,
- req_cancel,
- req_poll, req_wait, req_state, &req_state->request);
-
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
+ /* set cc=0 if it is not a valid operation. */
+ MPID_Request_set_completed(ureq);
}
- *request = req_state->request;
+ *request = ureq;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_RGET);
return mpi_errno;
fn_fail:
- MPIU_CHKPMEM_REAP();
goto fn_exit;
}
@@ -323,9 +146,7 @@ int MPIDI_Raccumulate(const void *origin_addr, int origin_count,
MPID_Datatype *dtp;
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPIDI_msg_sz_t data_sz;
- req_state_t *req_state;
- MPIDI_VC_t *orig_vc, *target_vc;
- MPIU_CHKPMEM_DECL(1);
+ MPID_Request *ureq;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_RACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_RACCUMULATE);
@@ -337,60 +158,38 @@ int MPIDI_Raccumulate(const void *origin_addr, int origin_count,
win_ptr->states.access_state != MPIDI_RMA_LOCK_ALL_GRANTED,
mpi_errno, MPI_ERR_RMA_SYNC, "**rmasync");
- MPIU_CHKPMEM_MALLOC(req_state, req_state_t *,
- sizeof(req_state_t), mpi_errno, "req-based RMA state");
-
- req_state->win_ptr = win_ptr;
- req_state->target_rank = target_rank;
+ /* Create user request, initially cc=1, ref=1 */
+ ureq = MPID_Request_create();
+ MPIU_ERR_CHKANDJUMP(ureq == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
+ ureq->kind = MPID_REQUEST_SEND;
MPIDI_Datatype_get_info(origin_count, origin_datatype, dt_contig, data_sz, dtp, dt_true_lb);
/* Enqueue or perform the RMA operation */
if (target_rank != MPI_PROC_NULL && data_sz != 0) {
+
+ /* This request is referenced by user and ch3 by default. */
+ MPIU_Object_set_ref(ureq, 2);
+
mpi_errno = win_ptr->RMAFns.Accumulate(origin_addr, origin_count,
origin_datatype, target_rank,
target_disp, target_count,
- target_datatype, op, win_ptr);
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
- }
-
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
-
- /* If the operation is already complete, return a completed request.
- * Otherwise, generate a grequest. */
- /* FIXME: We still may need to flush or sync for shared memory windows */
- if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->comm_ptr->rank ||
- (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id) || data_sz == 0)
- {
- mpi_errno = MPIR_Grequest_start_impl(req_query,
- req_free, req_cancel, req_state, &req_state->request);
+ target_datatype, op, win_ptr, ureq);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_POP(mpi_errno);
}
-
- MPIR_Grequest_complete_impl(req_state->request);
}
else {
- mpi_errno = MPIX_Grequest_start_impl(req_query,
- req_free,
- req_cancel,
- req_poll, req_wait, req_state, &req_state->request);
-
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
+ /* set cc=0 if it is not a valid operation. */
+ MPID_Request_set_completed(ureq);
}
- *request = req_state->request;
+ *request = ureq;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_RACCUMULATE);
return mpi_errno;
fn_fail:
- MPIU_CHKPMEM_REAP();
goto fn_exit;
}
@@ -410,9 +209,7 @@ int MPIDI_Rget_accumulate(const void *origin_addr, int origin_count,
MPID_Datatype *dtp;
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPIDI_msg_sz_t data_sz, trg_data_sz;
- req_state_t *req_state;
- MPIDI_VC_t *orig_vc, *target_vc;
- MPIU_CHKPMEM_DECL(1);
+ MPID_Request *ureq;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_RGET_ACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_RGET_ACCUMULATE);
@@ -424,11 +221,10 @@ int MPIDI_Rget_accumulate(const void *origin_addr, int origin_count,
win_ptr->states.access_state != MPIDI_RMA_LOCK_ALL_GRANTED,
mpi_errno, MPI_ERR_RMA_SYNC, "**rmasync");
- MPIU_CHKPMEM_MALLOC(req_state, req_state_t *,
- sizeof(req_state_t), mpi_errno, "req-based RMA state");
-
- req_state->win_ptr = win_ptr;
- req_state->target_rank = target_rank;
+ /* Create user request, initially cc=1, ref=1 */
+ ureq = MPID_Request_create();
+ MPIU_ERR_CHKANDJUMP(ureq == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
+ ureq->kind = MPID_REQUEST_SEND;
/* Note that GACC is only a no-op if no data goes in both directions */
MPIDI_Datatype_get_info(origin_count, origin_datatype, dt_contig, data_sz, dtp, dt_true_lb);
@@ -436,50 +232,30 @@ int MPIDI_Rget_accumulate(const void *origin_addr, int origin_count,
/* Enqueue or perform the RMA operation */
if (target_rank != MPI_PROC_NULL && (data_sz != 0 || trg_data_sz != 0)) {
+
+ /* This request is referenced by user and ch3 by default. */
+ MPIU_Object_set_ref(ureq, 2);
+
mpi_errno = win_ptr->RMAFns.Get_accumulate(origin_addr, origin_count,
origin_datatype, result_addr,
result_count, result_datatype,
target_rank, target_disp,
- target_count, target_datatype, op, win_ptr);
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
- }
-
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
- MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
-
- /* If the operation is already complete, return a completed request.
- * Otherwise, generate a grequest. */
- /* FIXME: We still may need to flush or sync for shared memory windows */
- if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->comm_ptr->rank ||
- (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id) ||
- (data_sz == 0 && trg_data_sz == 0)) {
- mpi_errno = MPIR_Grequest_start_impl(req_query,
- req_free, req_cancel, req_state, &req_state->request);
+ target_count, target_datatype,
+ op, win_ptr, ureq);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_POP(mpi_errno);
}
-
- MPIR_Grequest_complete_impl(req_state->request);
}
else {
- mpi_errno = MPIX_Grequest_start_impl(req_query,
- req_free,
- req_cancel,
- req_poll, req_wait, req_state, &req_state->request);
-
- if (mpi_errno != MPI_SUCCESS) {
- MPIU_ERR_POP(mpi_errno);
- }
+ /* set cc=0 if it is not a valid operation. */
+ MPID_Request_set_completed(ureq);
}
- *request = req_state->request;
+ *request = ureq;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_RGET_ACCUMULATE);
return mpi_errno;
fn_fail:
- MPIU_CHKPMEM_REAP();
goto fn_exit;
}
-----------------------------------------------------------------------
Summary of changes:
src/include/mpiimpl.h | 14 +-
src/mpi/rma/accumulate.c | 2 +-
src/mpi/rma/get.c | 2 +-
src/mpi/rma/get_accumulate.c | 2 +-
src/mpi/rma/put.c | 2 +-
src/mpid/ch3/include/mpid_rma_oplist.h | 9 +
src/mpid/ch3/include/mpid_rma_types.h | 2 +
src/mpid/ch3/include/mpidimpl.h | 13 +-
src/mpid/ch3/include/mpidpre.h | 3 +-
src/mpid/ch3/src/Makefile.mk | 1 +
src/mpid/ch3/src/ch3u_handle_op_req.c | 43 ++++
src/mpid/ch3/src/ch3u_handle_recv_pkt.c | 9 +-
src/mpid/ch3/src/ch3u_rma_oplist.c | 40 ++++-
src/mpid/ch3/src/ch3u_rma_ops.c | 63 ++++++-
src/mpid/ch3/src/ch3u_rma_pkthandler.c | 47 +++--
src/mpid/ch3/src/ch3u_rma_reqops.c | 336 +++++-------------------------
test/mpi/rma/Makefile.am | 2 +
test/mpi/rma/racc_local_comp.c | 132 ++++++++++++
test/mpi/rma/rput_local_comp.c | 129 ++++++++++++
test/mpi/rma/testlist.in | 2 +
20 files changed, 524 insertions(+), 329 deletions(-)
create mode 100644 src/mpid/ch3/src/ch3u_handle_op_req.c
create mode 100644 test/mpi/rma/racc_local_comp.c
create mode 100644 test/mpi/rma/rput_local_comp.c
hooks/post-receive
--
MPICH primary repository
More information about the commits
mailing list