[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.2b2-68-g72956d9
Service Account
noreply at mpich.org
Sat May 30 10:45:24 CDT 2015
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".
The branch, master has been updated
via 72956d99ea05f9b5dc0d88aae21330fb7ff15fe3 (commit)
via bd08290f0948a00b571fef0257cd5de08aeebfb2 (commit)
via e50bec3af65eb54005dd0150599a387a52dd1fda (commit)
via 25e40e4358f3a5008f78d670b01a4c2951214b7c (commit)
via e0eaed63e0a2228215b67672386e7323eb2049d5 (commit)
via 6f62c424b6add5339cdb295a794b8808ac7d8f98 (commit)
via caeb3b3a60bc89616c0ccb28d0127f96459607c9 (commit)
via a5a9c964a00179191ad9a4446a3e074227fdff2b (commit)
via b0668be93a65729668b9987a27b7171b472b3f59 (commit)
via c292eca93f2035d2cfcfb616f7f7acbb59766e15 (commit)
via 19bdecfa2485036f82118608f58bbc171d9c2596 (commit)
via 08d72e3790144b3058b6c9aaebe44f70a4329282 (commit)
via 6b693f72fdafb07962dc1be996c1b9fcb2c19f8d (commit)
via e595d1ec065faa03c43c7c750b9b1b0c81abc11d (commit)
via 6155592fbfd62974c157320016e333a8913eec3e (commit)
from ccf8bcb8e35debb869eb1508d4748d89c22b3286 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/72956d99ea05f9b5dc0d88aae21330fb7ff15fe3
commit 72956d99ea05f9b5dc0d88aae21330fb7ff15fe3
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Tue May 19 15:50:03 2015 -0500
Merge issuing functions for streamed msgs and non-streamed msgs in CH3.
Originally in CH3, we have two issuing functions to issue RMA messages:
issue_from_origin_buffer() for issuing non-streamed messages and
issue_from_origin_buffer_stream() for issuing streamed messages. Most
code in those two functions are the same, therefore here we merge them
into one function. The function requires stream_offset and stream_size
as input arguments, for non-streamed messages, we pass stream_offset
as 0 and stream_size as the size of the entire message.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/include/mpid_rma_issue.h b/src/mpid/ch3/include/mpid_rma_issue.h
index e6cc3f7..f530a24 100644
--- a/src/mpid/ch3/include/mpid_rma_issue.h
+++ b/src/mpid/ch3/include/mpid_rma_issue.h
@@ -163,200 +163,9 @@ static int create_datatype(int *ints, MPI_Aint * displaces, MPI_Datatype * datat
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
+ MPIDI_msg_sz_t stream_offset, MPIDI_msg_sz_t stream_size,
MPID_Request ** req_ptr)
{
- MPI_Aint origin_type_size;
- MPI_Datatype target_datatype;
- MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
- int is_origin_contig;
- MPID_IOV iov[MPID_IOV_LIMIT];
- MPID_Request *req = NULL;
- int count;
- int *ints = NULL;
- int *blocklens = NULL;
- MPI_Aint *displaces = NULL;
- MPI_Datatype *datatypes = NULL;
- MPI_Aint dt_true_lb;
- int mpi_errno = MPI_SUCCESS;
- MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
-
- MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
-
- /* Judge if target datatype is derived datatype. */
- MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
- if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
- MPID_Datatype_get_ptr(target_datatype, target_dtp);
-
- /* Fill derived datatype info. */
- mpi_errno = fill_in_derived_dtp_info(rma_op, target_dtp);
- if (mpi_errno != MPI_SUCCESS)
- MPIU_ERR_POP(mpi_errno);
-
- /* Set dataloop size in pkt header */
- MPIDI_CH3_PKT_RMA_SET_DATALOOP_SIZE(rma_op->pkt, target_dtp->dataloop_size, mpi_errno);
- }
-
- /* Judge if origin datatype is derived datatype. */
- if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
- MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
- }
-
- MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
-
- /* check if origin data is contiguous and get true lb */
- MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
- MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
-
- iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
- iov[0].MPID_IOV_LEN = sizeof(rma_op->pkt);
-
- if (target_dtp == NULL) {
- /* basic datatype on target */
- if (is_origin_contig) {
- /* origin data is contiguous */
- int iovcnt = 2;
-
- iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb);
- iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size;
-
- MPIU_THREAD_CS_ENTER(CH3COMM, vc);
- mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
- MPIU_THREAD_CS_EXIT(CH3COMM, vc);
- MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
-
- if (origin_dtp != NULL) {
- if (req == NULL) {
- MPID_Datatype_release(origin_dtp);
- }
- else {
- /* this will cause the datatype to be freed when the request
- * is freed. */
- req->dev.datatype_ptr = origin_dtp;
- }
- }
- }
- else {
- /* origin data is non-contiguous */
- req = MPID_Request_create();
- MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
-
- MPIU_Object_set_ref(req, 2);
- req->kind = MPID_REQUEST_SEND;
-
- req->dev.segment_ptr = MPID_Segment_alloc();
- MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
- MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
-
- if (origin_dtp != NULL) {
- req->dev.datatype_ptr = origin_dtp;
- /* this will cause the datatype to be freed when the request
- * is freed. */
- }
- MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
- rma_op->origin_datatype, req->dev.segment_ptr, 0);
- req->dev.segment_first = 0;
- req->dev.segment_size = rma_op->origin_count * origin_type_size;
-
- req->dev.OnFinal = 0;
- req->dev.OnDataAvail = 0;
-
- MPIU_THREAD_CS_ENTER(CH3COMM, vc);
- mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
- MPIU_THREAD_CS_EXIT(CH3COMM, vc);
- MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
- }
- }
- else {
- /* derived datatype on target */
- MPID_Datatype *combined_dtp = NULL;
-
- req = MPID_Request_create();
- if (req == NULL) {
- MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
- }
-
- MPIU_Object_set_ref(req, 2);
- req->kind = MPID_REQUEST_SEND;
-
- req->dev.segment_ptr = MPID_Segment_alloc();
- MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
- "**nomem", "**nomem %s", "MPID_Segment_alloc");
-
- /* create a new datatype containing the dtype_info, dataloop, and origin data */
-
- count = 3;
- ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
- blocklens = &ints[1];
- displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
- datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
-
- ints[0] = count;
-
- displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
- blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
- datatypes[0] = MPI_BYTE;
-
- displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
- MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
- datatypes[1] = MPI_BYTE;
-
- displaces[2] = MPIU_PtrToAint(rma_op->origin_addr);
- blocklens[2] = rma_op->origin_count;
- datatypes[2] = rma_op->origin_datatype;
-
- mpi_errno = create_datatype(ints, displaces, datatypes, &combined_dtp);
- if (mpi_errno)
- MPIU_ERR_POP(mpi_errno);
-
- req->dev.datatype_ptr = combined_dtp;
- /* combined_datatype will be freed when request is freed */
-
- MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, req->dev.segment_ptr, 0);
- req->dev.segment_first = 0;
- req->dev.segment_size = combined_dtp->size;
-
- req->dev.OnFinal = 0;
- req->dev.OnDataAvail = 0;
-
- MPIU_THREAD_CS_ENTER(CH3COMM, vc);
- mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
- MPIU_THREAD_CS_EXIT(CH3COMM, vc);
- MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
-
- MPIU_Free(ints);
- MPIU_Free(displaces);
- MPIU_Free(datatypes);
-
- /* we're done with the datatypes */
- if (origin_dtp != NULL)
- MPID_Datatype_release(origin_dtp);
- MPID_Datatype_release(target_dtp);
- }
-
- (*req_ptr) = req;
-
- fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
- return mpi_errno;
- fn_fail:
- if ((*req_ptr)) {
- if ((*req_ptr)->dev.datatype_ptr)
- MPID_Datatype_release((*req_ptr)->dev.datatype_ptr);
- MPID_Request_release((*req_ptr));
- }
- (*req_ptr) = NULL;
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME issue_from_origin_buffer_stream
-#undef FCNAME
-#define FCNAME MPIDI_QUOTE(FUNCNAME)
-static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
- MPIDI_msg_sz_t stream_offset, MPIDI_msg_sz_t stream_size,
- MPID_Request ** req_ptr)
-{
MPI_Datatype target_datatype;
MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
int is_origin_contig;
@@ -371,9 +180,9 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPI_Aint dt_true_lb;
MPIDI_CH3_Pkt_flags_t flags;
int mpi_errno = MPI_SUCCESS;
- MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
+ MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
- MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
+ MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
/* Judge if target datatype is derived datatype. */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
@@ -522,56 +331,76 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIDI_msg_sz_t first = stream_offset;
MPIDI_msg_sz_t last = stream_offset + stream_size;
- req->dev.segment_ptr = MPID_Segment_alloc();
- MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
- "**nomem", "**nomem %s", "MPID_Segment_alloc");
-
/* create a new datatype containing the dtype_info, dataloop, and origin data */
- segp = MPID_Segment_alloc();
- MPIU_ERR_CHKANDJUMP1(segp == NULL, mpi_errno, MPI_ERR_OTHER,
- "**nomem", "**nomem %s", "MPID_Segment_alloc");
- MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count, rma_op->origin_datatype, segp,
- 0);
-
- MPID_Datatype_get_ptr(rma_op->origin_datatype, dtp);
- vec_len = dtp->max_contig_blocks * rma_op->origin_count + 1;
- dloop_vec = (DLOOP_VECTOR *) MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
- /* --BEGIN ERROR HANDLING-- */
- if (!dloop_vec) {
- mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
- FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);
- goto fn_fail;
- }
- /* --END ERROR HANDLING-- */
+ if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ segp = MPID_Segment_alloc();
+ MPIU_ERR_CHKANDJUMP1(segp == NULL, mpi_errno, MPI_ERR_OTHER,
+ "**nomem", "**nomem %s", "MPID_Segment_alloc");
+
+ MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count, rma_op->origin_datatype,
+ segp, 0);
+
+ MPID_Datatype_get_ptr(rma_op->origin_datatype, dtp);
+ vec_len = dtp->max_contig_blocks * rma_op->origin_count + 1;
+ dloop_vec = (DLOOP_VECTOR *) MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
+ /* --BEGIN ERROR HANDLING-- */
+ if (!dloop_vec) {
+ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
+ FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);
+ goto fn_fail;
+ }
+ /* --END ERROR HANDLING-- */
- MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
+ MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
- count = 2 + vec_len;
+ count = 2 + vec_len;
- ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
- blocklens = &ints[1];
- displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
- datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
+ ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
+ blocklens = &ints[1];
+ displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
+ datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
- ints[0] = count;
+ ints[0] = count;
- displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
- blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
- datatypes[0] = MPI_BYTE;
+ displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
+ blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
+ datatypes[0] = MPI_BYTE;
- displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
- MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
- datatypes[1] = MPI_BYTE;
+ displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
+ MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
+ datatypes[1] = MPI_BYTE;
+
+ for (i = 0; i < vec_len; i++) {
+ displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
+ MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
+ datatypes[i + 2] = MPI_BYTE;
+ }
- for (i = 0; i < vec_len; i++) {
- displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
- MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
- datatypes[i + 2] = MPI_BYTE;
+ MPID_Segment_free(segp);
+ MPIU_Free(dloop_vec);
}
+ else {
+ count = 3;
+ ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
+ blocklens = &ints[1];
+ displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
+ datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
+
+ ints[0] = count;
+
+ displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
+ blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
+ datatypes[0] = MPI_BYTE;
- MPID_Segment_free(segp);
- MPIU_Free(dloop_vec);
+ displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
+ MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
+ datatypes[1] = MPI_BYTE;
+
+ displaces[2] = MPIU_PtrToAint(rma_op->origin_addr);
+ blocklens[2] = rma_op->origin_count;
+ datatypes[2] = rma_op->origin_datatype;
+ }
mpi_errno = create_datatype(ints, displaces, datatypes, &combined_dtp);
if (mpi_errno)
@@ -580,6 +409,10 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
req->dev.datatype_ptr = combined_dtp;
/* combined_datatype will be freed when request is freed */
+ req->dev.segment_ptr = MPID_Segment_alloc();
+ MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
+ "**nomem", "**nomem %s", "MPID_Segment_alloc");
+
MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = combined_dtp->size;
@@ -605,7 +438,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
fn_exit:
(*req_ptr) = req;
- MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
+ MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
return mpi_errno;
fn_fail:
if ((*req_ptr)) {
@@ -648,7 +481,10 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
- mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
+ MPI_Aint origin_type_size;
+ MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
+ mpi_errno = issue_from_origin_buffer(rma_op, vc, 0,
+ rma_op->origin_count * origin_type_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
@@ -785,8 +621,7 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
- mpi_errno =
- issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
+ mpi_errno = issue_from_origin_buffer(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
@@ -1010,8 +845,7 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_offset;
}
- mpi_errno =
- issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
+ mpi_errno = issue_from_origin_buffer(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
@@ -1332,7 +1166,9 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
- mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
+ MPI_Aint origin_dtp_size;
+ MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
+ mpi_errno = issue_from_origin_buffer(rma_op, vc, 0, 1 * origin_dtp_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
http://git.mpich.org/mpich.git/commitdiff/bd08290f0948a00b571fef0257cd5de08aeebfb2
commit bd08290f0948a00b571fef0257cd5de08aeebfb2
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Thu May 14 11:18:57 2015 -0500
Rename request handlers for ACC/GACC.
Here we rename the request handlers for ACC/GACC as follows:
MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete
--> MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete
MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete
--> MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete
Those handlers are originally triggered when the
target side receives complete derived datatype info.
Now we extended those handlers to be triggered when
extended packet header or derived datatype info is
arrived, therefore we rename them here.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/include/mpidimpl.h b/src/mpid/ch3/include/mpidimpl.h
index 46cb9a8..70c6ac5 100644
--- a/src/mpid/ch3/include/mpidimpl.h
+++ b/src/mpid/ch3/include/mpidimpl.h
@@ -1951,12 +1951,12 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_FOPRecvComplete( MPIDI_VC_t *, MPID_Request *,
int * );
-int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete( MPIDI_VC_t *,
- MPID_Request *,
- int * );
-int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete( MPIDI_VC_t *,
- MPID_Request *,
- int * );
+int MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete( MPIDI_VC_t *,
+ MPID_Request *,
+ int * );
+int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete( MPIDI_VC_t *,
+ MPID_Request *,
+ int * );
int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *,
MPID_Request *, int * );
int MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete( MPIDI_VC_t *,
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_req.c b/src/mpid/ch3/src/ch3u_handle_recv_req.c
index 4e4c2dc..e7aebdb 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_req.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_req.c
@@ -549,11 +549,11 @@ int MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unu
}
#undef FUNCNAME
-#define FUNCNAME MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete
+#define FUNCNAME MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
-int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
- MPID_Request * rreq, int *complete)
+int MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
+ MPID_Request * rreq, int *complete)
{
int mpi_errno = MPI_SUCCESS;
MPID_Datatype *new_dtp = NULL;
@@ -562,9 +562,9 @@ int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((u
MPI_Aint stream_offset;
MPI_Aint type_size;
MPI_Datatype basic_dtp;
- MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMDERIVEDDTRECVCOMPLETE);
+ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
- MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMDERIVEDDTRECVCOMPLETE);
+ MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
@@ -644,17 +644,17 @@ int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((u
*complete = FALSE;
fn_fail:
- MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMDERIVEDDTRECVCOMPLETE);
+ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
return mpi_errno;
}
#undef FUNCNAME
-#define FUNCNAME MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete
+#define FUNCNAME MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
-int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
- MPID_Request * rreq, int *complete)
+int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
+ MPID_Request * rreq, int *complete)
{
int mpi_errno = MPI_SUCCESS;
MPID_Datatype *new_dtp = NULL;
@@ -663,9 +663,9 @@ int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((
MPI_Aint stream_offset;
MPI_Aint type_size;
MPI_Datatype basic_dtp;
- MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMDERIVEDDTRECVCOMPLETE);
+ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
- MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMDERIVEDDTRECVCOMPLETE);
+ MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
@@ -745,7 +745,7 @@ int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((
*complete = FALSE;
fn_fail:
- MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMDERIVEDDTRECVCOMPLETE);
+ MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
return mpi_errno;
}
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index 70a2bc9..54e953b 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -662,7 +662,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.datatype = accum_pkt->datatype;
if (req->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
- req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
+ req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete;
/* if this is a streamed op pkt, set iov to receive extended pkt header. */
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.ext_hdr_ptr;
@@ -722,7 +722,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int metadata_sz = 0;
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV_DERIVED_DT);
- req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
+ req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
if (accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM)
@@ -765,7 +765,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
*buflen = sizeof(MPIDI_CH3_Pkt_t) + metadata_sz;
/* All dtype data has been received, call req handler */
- mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
+ mpi_errno = MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete(vc, req, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
if (complete) {
@@ -969,7 +969,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.datatype = get_accum_pkt->datatype;
if (req->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
- req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete;
+ req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete;
/* if this is a streamed op pkt, set iov to receive extended pkt header. */
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.ext_hdr_ptr;
@@ -1028,7 +1028,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int metadata_sz = 0;
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV_DERIVED_DT);
- req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete;
+ req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM)
@@ -1071,7 +1071,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
*buflen = sizeof(MPIDI_CH3_Pkt_t) + metadata_sz;
/* All dtype data has been received, call req handler */
- mpi_errno = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(vc, req, &complete);
+ mpi_errno = MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(vc, req, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
if (complete) {
http://git.mpich.org/mpich.git/commitdiff/e50bec3af65eb54005dd0150599a387a52dd1fda
commit e50bec3af65eb54005dd0150599a387a52dd1fda
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Thu May 14 10:39:06 2015 -0500
Issue extended packet header in nemesis/netmod.
There are two APIs that are used to issue data with a request
created and passed:
iSendv() --- issue contiguous data;
sendNoncontig_fn() --- issue non-contiguous data;
In this patch, we modify the implementation of those two functions
in nemesis and netmod (tcp/mxm/ptl) to make them issue the extended
packet header stored in the request.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h b/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h
index fbf2895..22baa82 100644
--- a/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h
+++ b/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h
@@ -19,15 +19,17 @@ static inline int MPID_nem_mpich_send_header (void* buf, int size, MPIDI_VC_t *v
static inline int MPID_nem_mpich_sendv (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again);
static inline void MPID_nem_mpich_dequeue_fastbox (int local_rank);
static inline void MPID_nem_mpich_enqueue_fastbox (int local_rank);
-static inline int MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again);
+static inline int MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov,
+ void *ext_header, MPIDI_msg_sz_t ext_header_sz,
+ MPIDI_VC_t *vc, int *again);
static inline int MPID_nem_recv_seqno_matches (MPID_nem_queue_ptr_t qhead);
static inline int MPID_nem_mpich_test_recv (MPID_nem_cell_ptr_t *cell, int *in_fbox, int in_blocking_progress);
static inline int MPID_nem_mpich_blocking_recv (MPID_nem_cell_ptr_t *cell, int *in_fbox, int completions);
static inline int MPID_nem_mpich_test_recv_wait (MPID_nem_cell_ptr_t *cell, int *in_fbox, int timeout);
static inline int MPID_nem_mpich_release_cell (MPID_nem_cell_ptr_t cell, MPIDI_VC_t *vc);
static inline void MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_first,
- MPIDI_msg_sz_t segment_size, void *header, MPIDI_msg_sz_t header_sz,
- MPIDI_VC_t *vc, int *again);
+ MPIDI_msg_sz_t segment_size, void *header, MPIDI_msg_sz_t header_sz,
+ void *ext_header, MPIDI_msg_sz_t ext_header_sz, MPIDI_VC_t *vc, int *again);
static inline void MPID_nem_mpich_send_seg (MPID_Segment *segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size,
MPIDI_VC_t *vc, int *again);
@@ -271,7 +273,9 @@ MPID_nem_mpich_sendv (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again)
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int
-MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *again)
+MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov,
+ void *ext_hdr_ptr, MPIDI_msg_sz_t ext_hdr_sz,
+ MPIDI_VC_t *vc, int *again)
{
int mpi_errno = MPI_SUCCESS;
MPID_nem_cell_ptr_t el;
@@ -279,6 +283,7 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
MPIDI_msg_sz_t payload_len;
int my_rank;
MPIDI_CH3I_VC *vc_ch = &vc->ch;
+ MPI_Aint buf_offset = 0;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_MPICH_SENDV_HEADER);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_MPICH_SENDV_HEADER);
@@ -291,7 +296,8 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
my_rank = MPID_nem_mem_region.rank;
#ifdef USE_FASTBOX
- if (*n_iov == 2 && (*iov)[1].MPID_IOV_LEN + sizeof(MPIDI_CH3_Pkt_t) <= MPID_NEM_FBOX_DATALEN)
+ /* Note: use fastbox only when there is no streaming optimization. */
+ if (ext_hdr_sz == 0 && *n_iov == 2 && (*iov)[1].MPID_IOV_LEN + sizeof(MPIDI_CH3_Pkt_t) <= MPID_NEM_FBOX_DATALEN)
{
MPID_nem_fbox_mpich_t *pbox = vc_ch->fbox_out;
@@ -343,12 +349,19 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
#endif /*PREFETCH_CELL */
MPIU_Memcpy((void *)el->pkt.mpich.p.payload, (*iov)->MPID_IOV_BUF, sizeof(MPIDI_CH3_Pkt_t));
+ buf_offset += sizeof(MPIDI_CH3_Pkt_t);
- cell_buf = (char *)(el->pkt.mpich.p.payload) + sizeof(MPIDI_CH3_Pkt_t);
+ if (ext_hdr_sz > 0) {
+ /* when extended packet header exists, copy it */
+ MPIU_Memcpy((void *)((char *)(el->pkt.mpich.p.payload) + buf_offset), ext_hdr_ptr, ext_hdr_sz);
+ buf_offset += ext_hdr_sz;
+ }
+
+ cell_buf = (char *)(el->pkt.mpich.p.payload) + buf_offset;
++(*iov);
--(*n_iov);
- payload_len = MPID_NEM_MPICH_DATA_LEN - sizeof(MPIDI_CH3_Pkt_t);
+ payload_len = MPID_NEM_MPICH_DATA_LEN - buf_offset;
while (*n_iov && payload_len >= (*iov)->MPID_IOV_LEN)
{
size_t _iov_len = (*iov)->MPID_IOV_LEN;
@@ -419,13 +432,15 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov, MPIDI_VC_t *vc, int *ag
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline void
MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size,
- void *header, MPIDI_msg_sz_t header_sz, MPIDI_VC_t *vc, int *again)
+ void *header, MPIDI_msg_sz_t header_sz, void *ext_header, MPIDI_msg_sz_t ext_header_sz,
+ MPIDI_VC_t *vc, int *again)
{
MPID_nem_cell_ptr_t el;
MPIDI_msg_sz_t datalen;
int my_rank;
MPIDI_msg_sz_t last;
MPIDI_CH3I_VC *vc_ch = &vc->ch;
+ MPI_Aint buf_offset = 0;
MPIU_Assert(vc_ch->is_local); /* netmods will have their own implementation */
MPIU_Assert(header_sz <= sizeof(MPIDI_CH3_Pkt_t));
@@ -436,7 +451,7 @@ MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_f
my_rank = MPID_nem_mem_region.rank;
#ifdef USE_FASTBOX
- if (sizeof(MPIDI_CH3_Pkt_t) + segment_size <= MPID_NEM_FBOX_DATALEN)
+ if (ext_header_sz == 0 && sizeof(MPIDI_CH3_Pkt_t) + segment_size <= MPID_NEM_FBOX_DATALEN)
{
MPID_nem_fbox_mpich_t *pbox = vc_ch->fbox_out;
@@ -508,14 +523,22 @@ MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_f
/* copy header */
MPIU_Memcpy((void *)el->pkt.mpich.p.payload, header, header_sz);
+ buf_offset += sizeof(MPIDI_CH3_Pkt_t);
+
+ if (ext_header_sz > 0) {
+ /* when extended packet header exists, copy it */
+ MPIU_Memcpy((void *)((char *)(el->pkt.mpich.p.payload) + buf_offset), ext_header, ext_header_sz);
+ buf_offset += ext_header_sz;
+ }
+
/* copy data */
- if (segment_size - *segment_first <= MPID_NEM_MPICH_DATA_LEN - sizeof(MPIDI_CH3_Pkt_t))
+ if (segment_size - *segment_first <= MPID_NEM_MPICH_DATA_LEN - buf_offset)
last = segment_size;
else
- last = *segment_first + MPID_NEM_MPICH_DATA_LEN - sizeof(MPIDI_CH3_Pkt_t);
-
- MPID_Segment_pack(segment, *segment_first, &last, (char *)el->pkt.mpich.p.payload + sizeof(MPIDI_CH3_Pkt_t));
- datalen = sizeof(MPIDI_CH3_Pkt_t) + last - *segment_first;
+ last = *segment_first + MPID_NEM_MPICH_DATA_LEN - buf_offset;
+
+ MPID_Segment_pack(segment, *segment_first, &last, (char *)el->pkt.mpich.p.payload + buf_offset);
+ datalen = buf_offset + last - *segment_first;
*segment_first = last;
el->pkt.mpich.source = my_rank;
diff --git a/src/mpid/ch3/channels/nemesis/include/mpid_nem_post.h b/src/mpid/ch3/channels/nemesis/include/mpid_nem_post.h
index e06aecd..07415e9 100644
--- a/src/mpid/ch3/channels/nemesis/include/mpid_nem_post.h
+++ b/src/mpid/ch3/channels/nemesis/include/mpid_nem_post.h
@@ -95,10 +95,12 @@ int MPID_nem_mpich_getv (MPID_IOV **s_iov, int *s_niov, MPID_IOV **d_iov, int *d
#if !defined (MPID_NEM_INLINE) || !MPID_NEM_INLINE
int MPID_nem_mpich_send_header(void* buf, int size, struct MPIDI_VC *vc, int *again);
int MPID_nem_mpich_sendv(MPID_IOV **iov, int *n_iov, struct MPIDI_VC *vc, int *again);
-int MPID_nem_mpich_sendv_header(MPID_IOV **iov, int *n_iov, struct MPIDI_VC *vc, int *again);
+int MPID_nem_mpich_sendv_header(MPID_IOV **iov, int *n_iov, void *ext_header,
+ MPIDI_msg_sz_t ext_header_sz, struct MPIDI_VC *vc, int *again);
void MPID_nem_mpich_send_seg(MPID_Segment segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_sz, struct MPIDI_VC *vc, int *again);
void MPID_nem_mpich_send_seg_header(MPID_Segment segment, MPIDI_msg_sz_t *segment_first, MPIDI_msg_sz_t segment_size,
- void *header, MPIDI_msg_sz_t header_sz, struct MPIDI_VC *vc, int *again);
+ void *header, MPIDI_msg_sz_t header_sz, void *ext_header,
+ MPIDI_msg_sz_t ext_header_sz, struct MPIDI_VC *vc, int *again);
int MPID_nem_mpich_test_recv(MPID_nem_cell_ptr_t *cell, int *in_fbox, int in_blocking_progress);
int MPID_nem_mpich_test_recv_wait(MPID_nem_cell_ptr_t *cell, int *in_fbox, int timeout);
int MPID_nem_recv_seqno_matches(MPID_nem_queue_ptr_t qhead) ;
diff --git a/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_impl.h b/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_impl.h
index 43070da..e44a0a3 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_impl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_impl.h
@@ -132,7 +132,7 @@ static inline list_item_t *list_dequeue(list_head_t * list_head)
#define MXM_MPICH_MAX_ADDR_SIZE 512
#define MXM_MPICH_ENDPOINT_KEY "endpoint_id"
#define MXM_MPICH_MAX_REQ 100
-#define MXM_MPICH_MAX_IOV 2
+#define MXM_MPICH_MAX_IOV 3
/* The vc provides a generic buffer in which network modules can store
diff --git a/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c b/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c
index 2c23d5e..c113413 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c
@@ -54,13 +54,22 @@ int MPID_nem_mxm_iSendContig(MPIDI_VC_t * vc, MPID_Request * sreq, void *hdr, MP
req_area->ctx = sreq;
req_area->iov_buf = req_area->tmp_buf;
- req_area->iov_count = 1;
- req_area->iov_buf[0].ptr = (void *) &(sreq->dev.pending_pkt);
- req_area->iov_buf[0].length = sizeof(MPIDI_CH3_Pkt_t);
+ req_area->iov_count = 0;
+
+ req_area->iov_buf[req_area->iov_count].ptr = (void *) &(sreq->dev.pending_pkt);
+ req_area->iov_buf[req_area->iov_count].length = sizeof(MPIDI_CH3_Pkt_t);
+ (req_area->iov_count)++;
+
+ if (sreq->dev.ext_hdr_sz != 0) {
+ req_area->iov_buf[req_area->iov_count].ptr = (void *) (sreq->dev.ext_hdr_ptr);
+ req_area->iov_buf[req_area->iov_count].length = sreq->dev.ext_hdr_sz;
+ (req_area->iov_count)++;
+ }
+
if (data_sz) {
- req_area->iov_count = 2;
- req_area->iov_buf[1].ptr = (void *) data;
- req_area->iov_buf[1].length = data_sz;
+ req_area->iov_buf[req_area->iov_count].ptr = (void *) data;
+ req_area->iov_buf[req_area->iov_count].length = data_sz;
+ (req_area->iov_count)++;
}
vc_area->pending_sends += 1;
@@ -175,9 +184,17 @@ int MPID_nem_mxm_SendNoncontig(MPIDI_VC_t * vc, MPID_Request * sreq, void *hdr,
req_area->ctx = sreq;
req_area->iov_buf = req_area->tmp_buf;
- req_area->iov_count = 1;
- req_area->iov_buf[0].ptr = (void *) &(sreq->dev.pending_pkt);
- req_area->iov_buf[0].length = sizeof(MPIDI_CH3_Pkt_t);
+ req_area->iov_count = 0;
+
+ req_area->iov_buf[req_area->iov_count].ptr = (void *) &(sreq->dev.pending_pkt);
+ req_area->iov_buf[req_area->iov_count].length = sizeof(MPIDI_CH3_Pkt_t);
+ (req_area->iov_count)++;
+
+ if (sreq->dev.ext_hdr_ptr != NULL) {
+ req_area->iov_buf[req_area->iov_count].ptr = (void *) (sreq->dev.ext_hdr_ptr);
+ req_area->iov_buf[req_area->iov_count].length = sreq->dev.ext_hdr_sz;
+ (req_area->iov_count)++;
+ }
last = sreq->dev.segment_size;
@@ -193,9 +210,9 @@ int MPID_nem_mxm_SendNoncontig(MPIDI_VC_t * vc, MPID_Request * sreq, void *hdr,
MPID_Segment_pack(sreq->dev.segment_ptr, sreq->dev.segment_first, &last, sreq->dev.tmpbuf);
MPIU_Assert(last == sreq->dev.segment_size);
- req_area->iov_count = 2;
- req_area->iov_buf[1].ptr = sreq->dev.tmpbuf;
- req_area->iov_buf[1].length = last - sreq->dev.segment_first;
+ req_area->iov_buf[req_area->iov_count].ptr = sreq->dev.tmpbuf;
+ req_area->iov_buf[req_area->iov_count].length = last - sreq->dev.segment_first;
+ (req_area->iov_count)++;
}
vc_area->pending_sends += 1;
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
index 1766acb..d924954 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
@@ -96,6 +96,8 @@ static inline MPID_nem_ptl_req_area * REQ_PTL(MPID_Request *req) {
(sreq_)->dev.datatype_ptr = NULL; \
(sreq_)->dev.segment_ptr = NULL; \
(sreq_)->dev.tmpbuf = NULL; \
+ (sreq_)->dev.ext_hdr_ptr = NULL; \
+ (sreq_)->dev.ext_hdr_sz = 0; \
\
MPID_nem_ptl_init_req(sreq_); \
} while (0)
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
index bb1d361..0f106c4 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
@@ -152,8 +152,8 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
char *sendbuf;
- const size_t sent_sz = data_sz < PAYLOAD_SIZE ? data_sz : PAYLOAD_SIZE;
- const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
+ const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz);
+ const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz);
const size_t remaining = data_sz - sent_sz;
ptl_match_bits_t match_bits = NPTL_MATCH(CTL_TAG, 0, MPIDI_Process.my_pg_rank);
MPIDI_STATE_DECL(MPID_STATE_SEND_PKT);
@@ -163,12 +163,19 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
+
+ if (sreq->dev.ext_hdr_sz > 0) {
+ /* copy extended packet header to send buf */
+ MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t),
+ sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
+ }
+
TMPBUF(sreq) = NULL;
REQ_PTL(sreq)->num_gets = 0;
REQ_PTL(sreq)->put_done = 0;
if (data_sz) {
- MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t), data_p, sent_sz);
+ MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz, data_p, sent_sz);
if (remaining) /* Post MEs for the remote gets */
mpi_errno = meappend_large(vc_ptl->id, sreq, NPTL_MATCH(GET_TAG, 0, MPIDI_Process.my_pg_rank),
(char *)data_p + sent_sz, remaining);
@@ -208,8 +215,8 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
int ret;
char *sendbuf;
const size_t data_sz = sreq->dev.segment_size - sreq->dev.segment_first;
- const size_t sent_sz = data_sz < PAYLOAD_SIZE ? data_sz : PAYLOAD_SIZE;
- const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
+ const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz);
+ const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz);
const size_t remaining = data_sz - sent_sz;
ptl_match_bits_t match_bits = NPTL_MATCH(CTL_TAG, 0, MPIDI_Process.my_pg_rank);
MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT);
@@ -218,6 +225,13 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
+
+ if (sreq->dev.ext_hdr_sz > 0) {
+ /* copy extended packet header to send buf */
+ MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t),
+ sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
+ }
+
TMPBUF(sreq) = NULL;
REQ_PTL(sreq)->num_gets = 0;
REQ_PTL(sreq)->put_done = 0;
@@ -225,7 +239,7 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
if (data_sz) {
MPIDI_msg_sz_t first = sreq->dev.segment_first;
MPIDI_msg_sz_t last = sreq->dev.segment_first + sent_sz;
- MPID_Segment_pack(sreq->dev.segment_ptr, first, &last, sendbuf + sizeof(MPIDI_CH3_Pkt_t));
+ MPID_Segment_pack(sreq->dev.segment_ptr, first, &last, sendbuf + sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz);
if (remaining) { /* Post MEs for the remote gets */
TMPBUF(sreq) = MPIU_Malloc(remaining);
diff --git a/src/mpid/ch3/channels/nemesis/netmod/tcp/tcp_send.c b/src/mpid/ch3/channels/nemesis/netmod/tcp/tcp_send.c
index 80c503d..cc40a12 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/tcp/tcp_send.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/tcp/tcp_send.c
@@ -524,14 +524,24 @@ int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
{
if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
- MPID_IOV iov[2];
-
- iov[0].MPID_IOV_BUF = hdr;
- iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
- iov[1].MPID_IOV_BUF = data;
- iov[1].MPID_IOV_LEN = data_sz;
+ MPID_IOV iov[3];
+ int iov_n = 0;
+
+ iov[iov_n].MPID_IOV_BUF = hdr;
+ iov[iov_n].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
+ iov_n++;
+
+ if (sreq->dev.ext_hdr_sz != 0) {
+ iov[iov_n].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
+ iov[iov_n].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
+ iov_n++;
+ }
+
+ iov[iov_n].MPID_IOV_BUF = data;
+ iov[iov_n].MPID_IOV_LEN = data_sz;
+ iov_n++;
- offset = MPL_large_writev(sc->fd, iov, 2);
+ offset = MPL_large_writev(sc->fd, iov, iov_n);
if (offset == 0) {
int req_errno = MPI_SUCCESS;
@@ -556,7 +566,7 @@ int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
}
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "write " MPIDI_MSG_SZ_FMT, offset);
- if (offset == sizeof(MPIDI_CH3_Pkt_t) + data_sz)
+ if (offset == sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz + data_sz)
{
/* sent whole message */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
@@ -599,25 +609,45 @@ int MPID_nem_tcp_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
/* save iov */
+ sreq->dev.iov_count = 0;
if (offset < sizeof(MPIDI_CH3_Pkt_t))
{
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
- sreq->dev.iov[0].MPID_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
- sreq->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset;
+
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = (char *)&sreq->dev.pending_pkt + offset;
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t) - offset;
+ sreq->dev.iov_count++;
+
+ if (sreq->dev.ext_hdr_sz > 0) {
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
+ sreq->dev.iov_count++;
+ }
+
if (data_sz)
{
- sreq->dev.iov[1].MPID_IOV_BUF = data;
- sreq->dev.iov[1].MPID_IOV_LEN = data_sz;
- sreq->dev.iov_count = 2;
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = data;
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = data_sz;
+ sreq->dev.iov_count++;
+ }
+ }
+ else if (offset < sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz) {
+ MPIU_Assert(sreq->dev.ext_hdr_sz > 0);
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
+ sreq->dev.iov_count++;
+
+ if (data_sz) {
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = data;
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = data_sz;
+ sreq->dev.iov_count++;
}
- else
- sreq->dev.iov_count = 1;
}
else
{
- sreq->dev.iov[0].MPID_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t));
- sreq->dev.iov[0].MPID_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t));
- sreq->dev.iov_count = 1;
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_BUF = (char *)data + (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
+ sreq->dev.iov[sreq->dev.iov_count].MPID_IOV_LEN = data_sz - (offset - sizeof(MPIDI_CH3_Pkt_t) - sreq->dev.ext_hdr_sz);
+ sreq->dev.iov_count++;
}
enqueue_request:
@@ -668,21 +698,31 @@ int MPID_nem_tcp_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
MPIDI_msg_sz_t offset;
int complete;
MPID_nem_tcp_vc_area *vc_tcp = VC_TCP(vc);
+ int iov_offset;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_SENDNONCONTIG);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "tcp_SendNoncontig");
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
-
- iov[0].MPID_IOV_BUF = header;
- iov[0].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
- iov_n = MPID_IOV_LIMIT - 1;
+ iov_n = 0;
+
+ iov[iov_n].MPID_IOV_BUF = header;
+ iov[iov_n].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
+ iov_n++;
+
+ if (sreq->dev.ext_hdr_ptr != NULL) {
+ iov[iov_n].MPID_IOV_BUF = sreq->dev.ext_hdr_ptr;
+ iov[iov_n].MPID_IOV_LEN = sreq->dev.ext_hdr_sz;
+ iov_n++;
+ }
+
+ iov_offset = iov_n;
- mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[1], &iov_n);
+ mpi_errno = MPIDI_CH3U_Request_load_send_iov(sreq, &iov[iov_offset], &iov_n);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|loadsendiov");
- iov_n += 1;
+ iov_n += iov_offset;
offset = 0;
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
diff --git a/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c b/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c
index 15ac7c8..26125c5 100644
--- a/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c
+++ b/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c
@@ -72,7 +72,9 @@ int MPIDI_CH3_iSendv (MPIDI_VC_t *vc, MPID_Request *sreq, MPID_IOV *iov, int n_i
int remaining_n_iov = n_iov;
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "iSendv");
- mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov, vc, &again);
+ mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov,
+ sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz,
+ vc, &again);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
while (!again && remaining_n_iov > 0)
{
diff --git a/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c b/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c
index 10af38c..09e3c34 100644
--- a/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c
+++ b/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c
@@ -89,7 +89,8 @@ int MPIDI_CH3_iStartMsgv (MPIDI_VC_t *vc, MPID_IOV *iov, int n_iov, MPID_Request
MPIU_DBG_MSG_D (CH3_CHANNEL, VERBOSE, " + len=%d ", total);
});
- mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov, vc, &again);
+ mpi_errno = MPID_nem_mpich_sendv_header (&remaining_iov, &remaining_n_iov,
+ NULL, 0, vc, &again);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
while (!again && (remaining_n_iov > 0))
{
diff --git a/src/mpid/ch3/channels/nemesis/src/ch3_progress.c b/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
index 351bf81..6e699b7 100644
--- a/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
+++ b/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
@@ -195,7 +195,8 @@ int MPIDI_CH3I_Shm_send_progress(void)
iov = &sreq->dev.iov[sreq->dev.iov_offset];
n_iov = sreq->dev.iov_count;
- mpi_errno = MPID_nem_mpich_sendv_header(&iov, &n_iov, sreq->ch.vc, &again);
+ mpi_errno = MPID_nem_mpich_sendv_header(&iov, &n_iov, sreq->dev.ext_hdr_ptr,
+ sreq->dev.ext_hdr_sz, sreq->ch.vc, &again);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
if (!again)
{
@@ -219,7 +220,8 @@ int MPIDI_CH3I_Shm_send_progress(void)
else
{
MPID_nem_mpich_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
- &sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->ch.vc, &again);
+ &sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->dev.ext_hdr_ptr,
+ sreq->dev.ext_hdr_sz, sreq->ch.vc, &again);
if (!again)
{
MPIDI_CH3I_shm_active_send = sreq;
diff --git a/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c b/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
index 24fccde..fb14c55 100644
--- a/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
+++ b/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
@@ -50,7 +50,8 @@ int MPIDI_CH3I_SendNoncontig( MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
}
/* send as many cells of data as you can */
- MPID_nem_mpich_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size, header, hdr_sz, vc, &again);
+ MPID_nem_mpich_send_seg_header(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
+ header, hdr_sz, sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz, vc, &again);
while(!again && sreq->dev.segment_first < sreq->dev.segment_size)
MPID_nem_mpich_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size, vc, &again);
http://git.mpich.org/mpich.git/commitdiff/25e40e4358f3a5008f78d670b01a4c2951214b7c
commit 25e40e4358f3a5008f78d670b01a4c2951214b7c
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Mon May 18 15:00:37 2015 -0500
Add extended packet header in CH3 layer used by RMA messages
Here we added extended packet header in CH3 layer used to
transmit attributes that are only needed in RMA and are not
needed in two-sided communication. The key implementation
details are listed as follows:
Origin side:
(1) The extended packet header is stored in the request, and
the request is passed to the issuing function (iSendv() or
sendNoncontig_fn()) in the lower layer. The issuing function
checks if the extended packet header exists in the request,
if so, it will issue that header. (The modifications in lower
layer are in the next commit.)
(2) There is a fast path used when (origin data is contiguous &&
target data is predefined && extended packet header is not used).
In such case, we do not need to create a request beforehand
but can use iStartMsgv() issuing function which try to issue
the entire message as soon as possible.
Target side:
(1) There are two req handler being used when extended packet header
is used or target datatype is derived. The first req handler is
triggered when extended packet header / target datatype info is
arrived, and the second req handler is triggered when actual data
is arrived.
(2) When target side receives a stream unit which is piggybacked with
LOCK, it will drop the stream_offset in extended packet header, since
the stream unit must be the first one and stream_offset must be 0.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/include/mpid_rma_issue.h b/src/mpid/ch3/include/mpid_rma_issue.h
index 3ae5ab6..e6cc3f7 100644
--- a/src/mpid/ch3/include/mpid_rma_issue.h
+++ b/src/mpid/ch3/include/mpid_rma_issue.h
@@ -361,6 +361,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
int is_origin_contig;
MPID_IOV iov[MPID_IOV_LIMIT];
+ int iovcnt = 0;
MPID_Request *req = NULL;
int count;
int *ints = NULL;
@@ -368,6 +369,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPI_Aint *displaces = NULL;
MPI_Datatype *datatypes = NULL;
MPI_Aint dt_true_lb;
+ MPIDI_CH3_Pkt_flags_t flags;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
@@ -398,52 +400,104 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
- iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
- iov[0].MPID_IOV_LEN = sizeof(rma_op->pkt);
+ iov[iovcnt].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
+ iov[iovcnt].MPID_IOV_LEN = sizeof(rma_op->pkt);
+ iovcnt++;
+
+ MPIDI_CH3_PKT_RMA_GET_FLAGS(rma_op->pkt, flags, mpi_errno);
+ if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) && target_dtp == NULL && is_origin_contig) {
+ /* Fast path --- use iStartMsgv() to issue the data, which does not need a request
+ * to be passed in:
+ * (1) non-streamed op (do not need to send extended packet header);
+ * (2) target datatype is predefined (do not need to send derived datatype info);
+ * (3) origin datatype is contiguous (do not need to pack the data and send);
+ */
+
+ iov[iovcnt].MPID_IOV_BUF =
+ (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
+ iov[iovcnt].MPID_IOV_LEN = stream_size;
+ iovcnt++;
+
+ MPIU_THREAD_CS_ENTER(CH3COMM, vc);
+ mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
+ MPIU_THREAD_CS_EXIT(CH3COMM, vc);
+ MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
+
+ if (origin_dtp != NULL) {
+ if (req == NULL) {
+ MPID_Datatype_release(origin_dtp);
+ }
+ else {
+ /* this will cause the datatype to be freed when the request
+ * is freed. */
+ req->dev.datatype_ptr = origin_dtp;
+ }
+ }
+
+ goto fn_exit;
+ }
+
+ /* Normal path: use iSendv() and sendNoncontig_fn() to issue the data, which
+ * always need a request to be passed in. */
+
+ /* create a new request */
+ req = MPID_Request_create();
+ MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
+
+ MPIU_Object_set_ref(req, 2);
+ req->kind = MPID_REQUEST_SEND;
+
+ /* allocate and fill in extended packet header in the request */
+ if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ MPIU_Assert(rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
+ rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM);
+ if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
+ req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_accum_t));
+ if (!req->dev.ext_hdr_ptr) {
+ MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
+ "MPIDI_CH3_Ext_pkt_accum_t");
+ }
+ req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_t);
+ ((MPIDI_CH3_Ext_pkt_accum_t *) req->dev.ext_hdr_ptr)->stream_offset = stream_offset;
+ }
+ else {
+ req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_get_accum_t));
+ if (!req->dev.ext_hdr_ptr) {
+ MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
+ "MPIDI_CH3_Ext_pkt_get_accum_t");
+ }
+ req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
+ ((MPIDI_CH3_Ext_pkt_get_accum_t *) req->dev.ext_hdr_ptr)->stream_offset = stream_offset;
+ }
+ }
if (target_dtp == NULL) {
/* basic datatype on target */
+
+ if (origin_dtp != NULL) {
+ req->dev.datatype_ptr = origin_dtp;
+ /* this will cause the datatype to be freed when the request
+ * is freed. */
+ }
+
if (is_origin_contig) {
/* origin data is contiguous */
- int iovcnt = 2;
-
- iov[1].MPID_IOV_BUF =
+ iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
- iov[1].MPID_IOV_LEN = stream_size;
+ iov[iovcnt].MPID_IOV_LEN = stream_size;
+ iovcnt++;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
- mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
+ mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
-
- if (origin_dtp != NULL) {
- if (req == NULL) {
- MPID_Datatype_release(origin_dtp);
- }
- else {
- /* this will cause the datatype to be freed when the request
- * is freed. */
- req->dev.datatype_ptr = origin_dtp;
- }
- }
}
else {
/* origin data is non-contiguous */
- req = MPID_Request_create();
- MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
-
- MPIU_Object_set_ref(req, 2);
- req->kind = MPID_REQUEST_SEND;
-
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
- if (origin_dtp != NULL) {
- req->dev.datatype_ptr = origin_dtp;
- /* this will cause the datatype to be freed when the request
- * is freed. */
- }
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
rma_op->origin_datatype, req->dev.segment_ptr, 0);
req->dev.segment_first = stream_offset;
@@ -468,14 +522,6 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIDI_msg_sz_t first = stream_offset;
MPIDI_msg_sz_t last = stream_offset + stream_size;
- req = MPID_Request_create();
- if (req == NULL) {
- MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
- }
-
- MPIU_Object_set_ref(req, 2);
- req->kind = MPID_REQUEST_SEND;
-
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
@@ -556,9 +602,9 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Datatype_release(target_dtp);
}
+ fn_exit:
(*req_ptr) = req;
- fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
return mpi_errno;
fn_fail:
@@ -709,6 +755,11 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
+ /* If there are more than one stream unit, mark the current packet
+ * as stream packet */
+ if (stream_unit_count > 1)
+ flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
+
rest_len = total_len;
MPIU_Assert(rma_op->issued_stream_count >= 0);
for (j = 0; j < stream_unit_count; j++) {
@@ -734,8 +785,6 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
- accum_pkt->info.metadata.stream_offset = stream_offset;
-
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
@@ -884,6 +933,11 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
+ /* If there are more than one stream unit, mark the current packet
+ * as stream packet */
+ if (stream_unit_count > 1)
+ flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
+
rest_len = total_len;
rma_op->reqs_size = stream_unit_count;
@@ -943,9 +997,18 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
- get_accum_pkt->info.metadata.stream_offset = stream_offset;
+ if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ /* allocate extended packet header in request */
+ resp_req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_get_accum_t));
+ if (!resp_req->dev.ext_hdr_ptr) {
+ MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
+ "MPIDI_CH3_Ext_pkt_get_accum_t");
+ }
+ resp_req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
- resp_req->dev.stream_offset = stream_offset;
+ ((MPIDI_CH3_Ext_pkt_get_accum_t *) resp_req->dev.ext_hdr_ptr)->stream_offset =
+ stream_offset;
+ }
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
diff --git a/src/mpid/ch3/include/mpidimpl.h b/src/mpid/ch3/include/mpidimpl.h
index 061c0f2..46cb9a8 100644
--- a/src/mpid/ch3/include/mpidimpl.h
+++ b/src/mpid/ch3/include/mpidimpl.h
@@ -354,6 +354,8 @@ extern MPIDI_Process_t MPIDI_Process;
(sreq_)->dev.iov_count = 0; \
(sreq_)->dev.iov_offset = 0; \
(sreq_)->dev.tmpbuf = NULL; \
+ (sreq_)->dev.ext_hdr_ptr = NULL; \
+ (sreq_)->dev.ext_hdr_sz = 0; \
MPIDI_Request_clear_dbg(sreq_); \
}
@@ -388,6 +390,8 @@ extern MPIDI_Process_t MPIDI_Process;
(rreq_)->dev.OnFinal = NULL; \
(rreq_)->dev.drop_data = FALSE; \
(rreq_)->dev.tmpbuf = NULL; \
+ (rreq_)->dev.ext_hdr_ptr = NULL; \
+ (rreq_)->dev.ext_hdr_sz = 0; \
MPIDI_CH3_REQUEST_INIT(rreq_);\
}
diff --git a/src/mpid/ch3/include/mpidpkt.h b/src/mpid/ch3/include/mpidpkt.h
index ca5ed2d..1abc4b2 100644
--- a/src/mpid/ch3/include/mpidpkt.h
+++ b/src/mpid/ch3/include/mpidpkt.h
@@ -129,7 +129,8 @@ typedef enum {
MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED = 1024,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED = 2048,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK = 4096,
- MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP = 8192
+ MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP = 8192,
+ MPIDI_CH3_PKT_FLAG_RMA_STREAM = 16384
} MPIDI_CH3_Pkt_flags_t;
typedef struct MPIDI_CH3_Pkt_send {
@@ -522,31 +523,16 @@ MPIDI_CH3_PKT_DEFS
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
- (pkt_).put.info.metadata.dataloop_size = (dataloop_size_); \
+ (pkt_).put.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET): \
- (pkt_).get.info.metadata.dataloop_size = (dataloop_size_); \
+ (pkt_).get.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
- (pkt_).accum.info.metadata.dataloop_size = (dataloop_size_); \
+ (pkt_).accum.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
- (pkt_).get_accum.info.metadata.dataloop_size = (dataloop_size_); \
- break; \
- default: \
- MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
- } \
- }
-
-#define MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET(pkt_, stream_offset_, err_) \
- { \
- err_ = MPI_SUCCESS; \
- switch((pkt_).type) { \
- case (MPIDI_CH3_PKT_ACCUMULATE): \
- (stream_offset_) = (pkt_).accum.info.metadata.stream_offset; \
- break; \
- case (MPIDI_CH3_PKT_GET_ACCUM): \
- (stream_offset_) = (pkt_).get_accum.info.metadata.stream_offset; \
+ (pkt_).get_accum.info.dataloop_size = (dataloop_size_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
@@ -609,12 +595,7 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
- /* note that we use struct here in order
- * to consistently access dataloop_size
- * by "pkt->info.metadata.dataloop_size". */
- struct {
- int dataloop_size;
- } metadata;
+ int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_put_t;
@@ -626,12 +607,7 @@ typedef struct MPIDI_CH3_Pkt_get {
int count;
MPI_Datatype datatype;
struct {
- /* note that we use struct here in order
- * to consistently access dataloop_size
- * by "pkt->info.metadata.dataloop_size". */
- struct {
- int dataloop_size; /* for derived datatypes */
- } metadata;
+ int dataloop_size; /* for derived datatypes */
} info;
MPI_Request request_handle;
MPI_Win target_win_handle;
@@ -662,10 +638,7 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
- struct {
- int dataloop_size;
- MPI_Aint stream_offset;
- } metadata;
+ int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_accum_t;
@@ -680,10 +653,7 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Op op;
MPI_Win target_win_handle;
union {
- struct {
- int dataloop_size;
- MPI_Aint stream_offset;
- } metadata;
+ int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_get_accum_t;
@@ -865,6 +835,14 @@ typedef union MPIDI_CH3_Pkt {
#endif
} MPIDI_CH3_Pkt_t;
+typedef struct MPIDI_CH3_Ext_pkt_accum {
+ MPI_Aint stream_offset;
+} MPIDI_CH3_Ext_pkt_accum_t;
+
+typedef struct MPIDI_CH3_Ext_pkt_get_accum {
+ MPI_Aint stream_offset;
+} MPIDI_CH3_Ext_pkt_get_accum_t;
+
#if defined(MPID_USE_SEQUENCE_NUMBERS)
typedef struct MPIDI_CH3_Pkt_send_container {
MPIDI_CH3_Pkt_send_t pkt;
diff --git a/src/mpid/ch3/include/mpidpre.h b/src/mpid/ch3/include/mpidpre.h
index 09d5d12..255b9d3 100644
--- a/src/mpid/ch3/include/mpidpre.h
+++ b/src/mpid/ch3/include/mpidpre.h
@@ -447,8 +447,8 @@ typedef struct MPIDI_Request {
struct MPIDI_RMA_Lock_entry *lock_queue_entry;
MPI_Request resp_request_handle; /* Handle for get_accumulate response */
- MPI_Aint stream_offset; /* used when streaming ACC/GACC packets, specifying the start
- location of the current streaming unit. */
+ void *ext_hdr_ptr; /* pointer to extended packet header */
+ MPIDI_msg_sz_t ext_hdr_sz;
MPIDI_REQUEST_SEQNUM
diff --git a/src/mpid/ch3/include/mpidrma.h b/src/mpid/ch3/include/mpidrma.h
index d6f4c15..3ddb28e 100644
--- a/src/mpid/ch3/include/mpidrma.h
+++ b/src/mpid/ch3/include/mpidrma.h
@@ -364,11 +364,13 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
int complete = 0;
MPIDI_msg_sz_t data_len;
char *data_buf = NULL;
+ MPIDI_CH3_Pkt_flags_t flags;
/* This is PUT, ACC, GACC, FOP */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);
+ MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
MPID_Datatype_get_extent_macro(target_dtp, type_extent);
MPID_Datatype_get_size_macro(target_dtp, type_size);
@@ -378,17 +380,29 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
buf_size = type_extent * target_count;
}
else {
- MPI_Aint stream_offset, stream_elem_count;
- MPI_Aint total_len, rest_len;
+ MPI_Aint stream_elem_count;
+ MPI_Aint total_len;
- MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET((*pkt), stream_offset, mpi_errno);
stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
total_len = type_size * target_count;
- rest_len = total_len - stream_offset;
- recv_data_sz = MPIR_MIN(rest_len, type_size * stream_elem_count);
+ recv_data_sz = MPIR_MIN(total_len, type_size * stream_elem_count);
buf_size = type_extent * (recv_data_sz / type_size);
}
+ if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ MPIU_Assert(pkt->type == MPIDI_CH3_PKT_ACCUMULATE ||
+ pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
+
+ if (pkt->type == MPIDI_CH3_PKT_ACCUMULATE) {
+ recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_accum_t);
+ buf_size += sizeof(MPIDI_CH3_Ext_pkt_accum_t);
+ }
+ else {
+ recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
+ buf_size += sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
+ }
+ }
+
if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
new_ptr->data = MPIU_Malloc(buf_size);
@@ -404,10 +418,8 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPIDI_CH3_Pkt_t new_pkt;
MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
MPI_Win target_win_handle;
- MPIDI_CH3_Pkt_flags_t flags;
MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
- MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_ACCUMULATE) {
MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_req.c b/src/mpid/ch3/src/ch3u_handle_recv_req.c
index b39616f..4e4c2dc 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_req.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_req.c
@@ -139,6 +139,7 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq,
MPIDI_CH3_Pkt_flags_t flags = rreq->dev.flags;
MPI_Datatype basic_type;
MPI_Aint predef_count, predef_dtp_size;
+ MPI_Aint stream_offset;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
@@ -176,12 +177,19 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq,
predef_count = rreq->dev.recv_data_sz / predef_dtp_size;
MPIU_Assert(predef_count > 0);
+ if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
+ stream_offset = ((MPIDI_CH3_Ext_pkt_accum_t *) rreq->dev.ext_hdr_ptr)->stream_offset;
+ }
+ else
+ stream_offset = 0;
+
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
/* accumulate data from tmp_buf into user_buf */
mpi_errno = do_accumulate_op(rreq->dev.user_buf, predef_count, basic_type,
rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
- rreq->dev.stream_offset, rreq->dev.op);
+ stream_offset, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) {
@@ -233,6 +241,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPI_Datatype basic_type;
MPI_Aint predef_count, predef_dtp_size;
MPI_Aint dt_true_lb;
+ MPI_Aint stream_offset;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
@@ -262,6 +271,14 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
+ if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
+ stream_offset = ((MPIDI_CH3_Ext_pkt_get_accum_t *) rreq->dev.ext_hdr_ptr)->stream_offset;
+ }
+ else {
+ stream_offset = 0;
+ }
+
/* check if data is contiguous and get true lb */
MPID_Datatype_is_contig(rreq->dev.datatype, &is_contig);
MPID_Datatype_get_true_lb(rreq->dev.datatype, &dt_true_lb);
@@ -284,11 +301,11 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
if (is_contig) {
MPIU_Memcpy(resp_req->dev.user_buf,
(void *) ((char *) rreq->dev.real_user_buf + dt_true_lb +
- rreq->dev.stream_offset), rreq->dev.recv_data_sz);
+ stream_offset), rreq->dev.recv_data_sz);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
- MPI_Aint first = rreq->dev.stream_offset;
+ MPI_Aint first = stream_offset;
MPI_Aint last = first + rreq->dev.recv_data_sz;
if (seg == NULL) {
@@ -306,7 +323,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
/* accumulate data from tmp_buf into user_buf */
mpi_errno = do_accumulate_op(rreq->dev.user_buf, predef_count, basic_type,
rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
- rreq->dev.stream_offset, rreq->dev.op);
+ stream_offset, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
@@ -542,18 +559,49 @@ int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((u
MPID_Datatype *new_dtp = NULL;
MPI_Aint basic_type_extent, basic_type_size;
MPI_Aint total_len, rest_len, stream_elem_count;
+ MPI_Aint stream_offset;
+ MPI_Aint type_size;
+ MPI_Datatype basic_dtp;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMDERIVEDDTRECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMDERIVEDDTRECVCOMPLETE);
- /* create derived datatype */
- create_derived_datatype(rreq, &new_dtp);
+ if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
+ stream_offset = ((MPIDI_CH3_Ext_pkt_accum_t *) rreq->dev.ext_hdr_ptr)->stream_offset;
+ }
+ else
+ stream_offset = 0;
- /* update new request to get the data */
- MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_ACCUM_RECV);
+ if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV_DERIVED_DT) {
+ /* create derived datatype */
+ create_derived_datatype(rreq, &new_dtp);
- MPID_Datatype_get_size_macro(new_dtp->basic_type, basic_type_size);
- MPID_Datatype_get_extent_macro(new_dtp->basic_type, basic_type_extent);
+ /* update new request to get the data */
+ MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_ACCUM_RECV);
+
+ MPIU_Assert(rreq->dev.datatype == MPI_DATATYPE_NULL);
+ rreq->dev.datatype = new_dtp->handle;
+ rreq->dev.datatype_ptr = new_dtp;
+ /* this will cause the datatype to be freed when the
+ * request is freed. free dtype_info here. */
+ MPIU_Free(rreq->dev.dtype_info);
+
+ type_size = new_dtp->size;
+
+ basic_dtp = new_dtp->basic_type;
+ }
+ else {
+ MPIU_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV);
+ MPIU_Assert(rreq->dev.datatype != MPI_DATATYPE_NULL);
+
+ MPID_Datatype_get_size_macro(rreq->dev.datatype, type_size);
+
+ basic_dtp = rreq->dev.datatype;
+ }
+
+ MPID_Datatype_get_size_macro(basic_dtp, basic_type_size);
+ MPID_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
/* allocate a SRBuf for receiving stream unit */
@@ -570,17 +618,12 @@ int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((u
/* --END ERROR HANDLING-- */
rreq->dev.user_buf = rreq->dev.tmpbuf;
- rreq->dev.datatype = new_dtp->handle;
- total_len = new_dtp->size * rreq->dev.user_count;
- rest_len = total_len - rreq->dev.stream_offset;
+ total_len = type_size * rreq->dev.user_count;
+ rest_len = total_len - stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
rreq->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * basic_type_size);
- rreq->dev.datatype_ptr = new_dtp;
- /* this will cause the datatype to be freed when the
- * request is freed. free dtype_info here. */
- MPIU_Free(rreq->dev.dtype_info);
rreq->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1((rreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem",
@@ -588,7 +631,7 @@ int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((u
MPID_Segment_init(rreq->dev.user_buf,
(rreq->dev.recv_data_sz / basic_type_size),
- new_dtp->basic_type, rreq->dev.segment_ptr, 0);
+ basic_dtp, rreq->dev.segment_ptr, 0);
rreq->dev.segment_first = 0;
rreq->dev.segment_size = rreq->dev.recv_data_sz;
@@ -617,18 +660,49 @@ int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((
MPID_Datatype *new_dtp = NULL;
MPI_Aint basic_type_extent, basic_type_size;
MPI_Aint total_len, rest_len, stream_elem_count;
+ MPI_Aint stream_offset;
+ MPI_Aint type_size;
+ MPI_Datatype basic_dtp;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMDERIVEDDTRECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMDERIVEDDTRECVCOMPLETE);
- /* create derived datatype */
- create_derived_datatype(rreq, &new_dtp);
+ if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
+ stream_offset = ((MPIDI_CH3_Ext_pkt_get_accum_t *) rreq->dev.ext_hdr_ptr)->stream_offset;
+ }
+ else
+ stream_offset = 0;
+
+ if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV_DERIVED_DT) {
+ /* create derived datatype */
+ create_derived_datatype(rreq, &new_dtp);
- /* update new request to get the data */
- MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
+ /* update new request to get the data */
+ MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
- MPID_Datatype_get_size_macro(new_dtp->basic_type, basic_type_size);
- MPID_Datatype_get_extent_macro(new_dtp->basic_type, basic_type_extent);
+ MPIU_Assert(rreq->dev.datatype == MPI_DATATYPE_NULL);
+ rreq->dev.datatype = new_dtp->handle;
+ rreq->dev.datatype_ptr = new_dtp;
+ /* this will cause the datatype to be freed when the
+ * request is freed. free dtype_info here. */
+ MPIU_Free(rreq->dev.dtype_info);
+
+ type_size = new_dtp->size;
+
+ basic_dtp = new_dtp->basic_type;
+ }
+ else {
+ MPIU_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
+ MPIU_Assert(rreq->dev.datatype != MPI_DATATYPE_NULL);
+
+ MPID_Datatype_get_size_macro(rreq->dev.datatype, type_size);
+
+ basic_dtp = rreq->dev.datatype;
+ }
+
+ MPID_Datatype_get_size_macro(basic_dtp, basic_type_size);
+ MPID_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
/* allocate a SRBuf for receiving stream unit */
@@ -645,17 +719,12 @@ int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((
/* --END ERROR HANDLING-- */
rreq->dev.user_buf = rreq->dev.tmpbuf;
- rreq->dev.datatype = new_dtp->handle;
- total_len = new_dtp->size * rreq->dev.user_count;
- rest_len = total_len - rreq->dev.stream_offset;
+ total_len = type_size * rreq->dev.user_count;
+ rest_len = total_len - stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
rreq->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * basic_type_size);
- rreq->dev.datatype_ptr = new_dtp;
- /* this will cause the datatype to be freed when the
- * request is freed. free dtype_info here. */
- MPIU_Free(rreq->dev.dtype_info);
rreq->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1((rreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem",
@@ -663,7 +732,7 @@ int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((
MPID_Segment_init(rreq->dev.user_buf,
(rreq->dev.recv_data_sz / basic_type_size),
- new_dtp->basic_type, rreq->dev.segment_ptr, 0);
+ basic_dtp, rreq->dev.segment_ptr, 0);
rreq->dev.segment_first = 0;
rreq->dev.segment_size = rreq->dev.recv_data_sz;
@@ -1122,19 +1191,20 @@ static inline int perform_acc_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
else {
MPIU_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
MPI_Aint type_size, type_extent;
- MPI_Aint total_len, rest_len, recv_count;
+ MPI_Aint total_len, recv_count;
MPID_Datatype_get_size_macro(acc_pkt->datatype, type_size);
MPID_Datatype_get_extent_macro(acc_pkt->datatype, type_extent);
total_len = type_size * acc_pkt->count;
- rest_len = total_len - acc_pkt->info.metadata.stream_offset;
- recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
+ recv_count = MPIR_MIN((total_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
MPIU_Assert(recv_count > 0);
+ /* Note: here stream_offset is 0 because when piggybacking LOCK, we must use
+ * the first stream unit. */
mpi_errno = do_accumulate_op(lock_entry->data, recv_count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
- acc_pkt->info.metadata.stream_offset, acc_pkt->op);
+ 0, acc_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
@@ -1169,7 +1239,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
int is_contig;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_extent;
- MPI_Aint total_len, rest_len, recv_count;
+ MPI_Aint total_len, recv_count;
/* Piggyback candidate should have basic datatype for target datatype. */
MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));
@@ -1263,8 +1333,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, type_extent);
total_len = type_size * get_accum_pkt->count;
- rest_len = total_len - get_accum_pkt->info.metadata.stream_offset;
- recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
+ recv_count = MPIR_MIN((total_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
MPIU_Assert(recv_count > 0);
sreq->dev.user_buf = (void *) MPIU_Malloc(recv_count * type_size);
@@ -1278,14 +1347,15 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
/* Copy data from target window to temporary buffer */
+ /* Note: here stream_offset is 0 because when piggybacking LOCK, we must use
+ * the first stream unit. */
+
if (is_contig) {
- MPIU_Memcpy(sreq->dev.user_buf,
- (void *) ((char *) get_accum_pkt->addr +
- get_accum_pkt->info.metadata.stream_offset), recv_count * type_size);
+ MPIU_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr, recv_count * type_size);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
- MPI_Aint first = get_accum_pkt->info.metadata.stream_offset;
+ MPI_Aint first = 0;
MPI_Aint last = first + type_size * recv_count;
if (seg == NULL) {
@@ -1304,7 +1374,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
mpi_errno = do_accumulate_op(lock_entry->data, recv_count, get_accum_pkt->datatype,
get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
- get_accum_pkt->info.metadata.stream_offset, get_accum_pkt->op);
+ 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
@@ -1815,6 +1885,26 @@ int MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(MPIDI_VC_t * vc,
mpi_errno);
MPID_Win_get_ptr(target_win_handle, win_ptr);
+ if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM && (rreq->dev.lock_queue_entry)->data != NULL) {
+
+ MPIU_Assert(lock_queue_entry->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
+ lock_queue_entry->pkt.type == MPIDI_CH3_PKT_GET_ACCUM);
+
+ int ext_hdr_sz;
+
+ if (lock_queue_entry->pkt.type == MPIDI_CH3_PKT_ACCUMULATE)
+ ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_t);
+ else
+ ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
+
+ /* here we drop the stream_offset received, because the stream unit that piggybacked with
+ * LOCK must be the first stream unit, with stream_offset equals to 0. */
+ rreq->dev.recv_data_sz -= ext_hdr_sz;
+ memmove((rreq->dev.lock_queue_entry)->data,
+ (void *) ((char *) ((rreq->dev.lock_queue_entry)->data) + ext_hdr_sz),
+ rreq->dev.recv_data_sz);
+ }
+
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED) {
requested_lock = MPI_LOCK_SHARED;
}
diff --git a/src/mpid/ch3/src/ch3u_request.c b/src/mpid/ch3/src/ch3u_request.c
index 63b5f6e..0be196d 100644
--- a/src/mpid/ch3/src/ch3u_request.c
+++ b/src/mpid/ch3/src/ch3u_request.c
@@ -94,8 +94,9 @@ MPID_Request * MPID_Request_create(void)
req->dev.OnFinal = NULL;
req->dev.user_buf = NULL;
req->dev.drop_data = FALSE;
- req->dev.stream_offset = 0;
req->dev.tmpbuf = NULL;
+ req->dev.ext_hdr_ptr = NULL;
+ req->dev.ext_hdr_sz = 0;
#ifdef MPIDI_CH3_REQUEST_INIT
MPIDI_CH3_REQUEST_INIT(req);
#endif
@@ -171,6 +172,10 @@ void MPIDI_CH3_Request_destroy(MPID_Request * req)
MPIDI_CH3U_SRBuf_free(req);
}
+ if (req->dev.ext_hdr_ptr != NULL) {
+ MPIU_Free(req->dev.ext_hdr_ptr);
+ }
+
MPIU_Handle_obj_free(&MPID_Request_mem, req);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQUEST_DESTROY);
diff --git a/src/mpid/ch3/src/ch3u_rma_ops.c b/src/mpid/ch3/src/ch3u_rma_ops.c
index 9611716..6a23a9f 100644
--- a/src/mpid/ch3/src/ch3u_rma_ops.c
+++ b/src/mpid/ch3/src/ch3u_rma_ops.c
@@ -198,7 +198,7 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
put_pkt->count = target_count;
put_pkt->datatype = target_datatype;
- put_pkt->info.metadata.dataloop_size = 0;
+ put_pkt->info.dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
@@ -387,7 +387,7 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
- get_pkt->info.metadata.dataloop_size = 0;
+ get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt)
@@ -612,11 +612,10 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
accum_pkt->count = target_count;
accum_pkt->datatype = target_datatype;
- accum_pkt->info.metadata.dataloop_size = 0;
+ accum_pkt->info.dataloop_size = 0;
accum_pkt->op = op;
accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
accum_pkt->source_win_handle = win_ptr->handle;
- accum_pkt->info.metadata.stream_offset = 0;
accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (accum_pkt->info.data);
@@ -810,7 +809,7 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
- get_pkt->info.metadata.dataloop_size = 0;
+ get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
@@ -931,10 +930,9 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_accum_pkt->count = target_count;
get_accum_pkt->datatype = target_datatype;
- get_accum_pkt->info.metadata.dataloop_size = 0;
+ get_accum_pkt->info.dataloop_size = 0;
get_accum_pkt->op = op;
get_accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
- get_accum_pkt->info.metadata.stream_offset = 0;
get_accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (get_accum_pkt->info.data);
@@ -1346,7 +1344,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = 1;
get_pkt->datatype = datatype;
- get_pkt->info.metadata.dataloop_size = 0;
+ get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index 120fb1b..70a2bc9 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -293,24 +293,24 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(put_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- put_pkt->info.metadata.dataloop_size);
+ put_pkt->info.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.metadata.dataloop_size) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- put_pkt->info.metadata.dataloop_size);
+ put_pkt->info.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- put_pkt->info.metadata.dataloop_size;
+ put_pkt->info.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
@@ -325,7 +325,7 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.metadata.dataloop_size;
+ req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
@@ -519,24 +519,24 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(get_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- get_pkt->info.metadata.dataloop_size);
+ get_pkt->info.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.metadata.dataloop_size) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- get_pkt->info.metadata.dataloop_size);
+ get_pkt->info.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- get_pkt->info.metadata.dataloop_size;
+ get_pkt->info.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
@@ -549,7 +549,7 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.metadata.dataloop_size;
+ req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
@@ -582,7 +582,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
- MPI_Aint stream_elem_count, rest_len, total_len;
+ MPI_Aint stream_elem_count, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
@@ -640,90 +640,129 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
- req->dev.stream_offset = accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
- req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
/* get start location of data and length of data */
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
+ if (req->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ /* allocate extended header in the request */
+ req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_accum_t));
+ if (!req->dev.ext_hdr_ptr) {
+ MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
+ "MPIDI_CH3_Ext_pkt_accum_t");
+ }
+ req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_t);
+ }
+
if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
req->dev.datatype = accum_pkt->datatype;
- MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
+ if (req->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
- MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
- /* allocate a SRBuf for receiving stream unit */
- MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
- /* --BEGIN ERROR HANDLING-- */
- if (req->dev.tmpbuf_sz == 0) {
- MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
- mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
- FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
- "**nomem %d", MPIDI_CH3U_SRBuf_size);
- req->status.MPI_ERROR = mpi_errno;
- goto fn_fail;
+ /* if this is a streamed op pkt, set iov to receive extended pkt header. */
+ req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.ext_hdr_ptr;
+ req->dev.iov[0].MPID_IOV_LEN = req->dev.ext_hdr_sz;
+ req->dev.iov_count = 1;
+
+ *buflen = sizeof(MPIDI_CH3_Pkt_t);
}
- /* --END ERROR HANDLING-- */
+ else {
+ req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
+
+ MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
+
+ MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
+ /* allocate a SRBuf for receiving stream unit */
+ MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
+ /* --BEGIN ERROR HANDLING-- */
+ if (req->dev.tmpbuf_sz == 0) {
+ MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
+ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
+ FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
+ "**nomem %d", MPIDI_CH3U_SRBuf_size);
+ req->status.MPI_ERROR = mpi_errno;
+ goto fn_fail;
+ }
+ /* --END ERROR HANDLING-- */
- req->dev.user_buf = req->dev.tmpbuf;
+ req->dev.user_buf = req->dev.tmpbuf;
- MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
+ MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
- total_len = type_size * accum_pkt->count;
- rest_len = total_len - req->dev.stream_offset;
- stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
+ total_len = type_size * accum_pkt->count;
+ stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
- req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
- MPIU_Assert(req->dev.recv_data_sz > 0);
+ req->dev.recv_data_sz = MPIR_MIN(total_len, stream_elem_count * type_size);
+ MPIU_Assert(req->dev.recv_data_sz > 0);
- mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
- MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
- "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
+ mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
+ MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
+ "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
- /* return the number of bytes processed in this function */
- *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
+ /* return the number of bytes processed in this function */
+ *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
- if (complete) {
- mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(vc, req, &complete);
- if (mpi_errno)
- MPIU_ERR_POP(mpi_errno);
if (complete) {
- *rreqp = NULL;
- goto fn_exit;
+ mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(vc, req, &complete);
+ if (mpi_errno)
+ MPIU_ERR_POP(mpi_errno);
+ if (complete) {
+ *rreqp = NULL;
+ goto fn_exit;
+ }
}
}
}
else {
+ int metadata_sz = 0;
+
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV_DERIVED_DT);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
+ if (accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM)
+ metadata_sz += sizeof(MPIDI_CH3_Ext_pkt_accum_t);
+
req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
if (!req->dev.dtype_info) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_RMA_dtype_info");
}
+ metadata_sz += sizeof(MPIDI_RMA_dtype_info);
- req->dev.dataloop = MPIU_Malloc(accum_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- accum_pkt->info.metadata.dataloop_size);
+ accum_pkt->info.dataloop_size);
}
+ metadata_sz += accum_pkt->info.dataloop_size;
+
+ if (data_len >= metadata_sz) {
+ int buf_offset = 0;
+
+ if (accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ /* copy extended header */
+ MPIU_Memcpy(req->dev.ext_hdr_ptr, data_buf + buf_offset,
+ sizeof(MPIDI_CH3_Ext_pkt_accum_t));
+ buf_offset += sizeof(MPIDI_CH3_Ext_pkt_accum_t);
+ }
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
- MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
- MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- accum_pkt->info.metadata.dataloop_size);
+ MPIU_Memcpy(req->dev.dtype_info, data_buf + buf_offset,
+ sizeof(MPIDI_RMA_dtype_info));
+ buf_offset += sizeof(MPIDI_RMA_dtype_info);
- *buflen =
- sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- accum_pkt->info.metadata.dataloop_size;
+ MPIU_Memcpy(req->dev.dataloop, data_buf + buf_offset,
+ accum_pkt->info.dataloop_size);
+ buf_offset += accum_pkt->info.dataloop_size;
+
+ *buflen = sizeof(MPIDI_CH3_Pkt_t) + metadata_sz;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
@@ -735,11 +774,23 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
}
else {
- req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
- req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
- req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.metadata.dataloop_size;
- req->dev.iov_count = 2;
+ int iov_n = 0;
+
+ if (accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ req->dev.iov[iov_n].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.ext_hdr_ptr;
+ req->dev.iov[iov_n].MPID_IOV_LEN = req->dev.ext_hdr_sz;
+ iov_n++;
+ }
+
+ req->dev.iov[iov_n].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
+ req->dev.iov[iov_n].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
+ iov_n++;
+
+ req->dev.iov[iov_n].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
+ req->dev.iov[iov_n].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
+ iov_n++;
+
+ req->dev.iov_count = iov_n;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
@@ -776,7 +827,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Win *win_ptr;
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
- MPI_Aint stream_elem_count, rest_len, total_len;
+ MPI_Aint stream_elem_count, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
@@ -894,92 +945,130 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.real_user_buf = get_accum_pkt->addr;
req->dev.target_win_handle = get_accum_pkt->target_win_handle;
req->dev.flags = get_accum_pkt->flags;
- req->dev.stream_offset = get_accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = get_accum_pkt->request_handle;
- req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
/* get start location of data and length of data */
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
+ if (req->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ /* allocate extended header in the request */
+ req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_get_accum_t));
+ if (!req->dev.ext_hdr_ptr) {
+ MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
+ "MPIDI_CH3_Ext_pkt_get_accum_t");
+ }
+ req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
+ }
+
if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
MPI_Aint type_size;
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
req->dev.datatype = get_accum_pkt->datatype;
- MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
+ if (req->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete;
- MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
- /* allocate a SRBuf for receiving stream unit */
- MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
- /* --BEGIN ERROR HANDLING-- */
- if (req->dev.tmpbuf_sz == 0) {
- MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
- mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
- FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
- "**nomem %d", MPIDI_CH3U_SRBuf_size);
- req->status.MPI_ERROR = mpi_errno;
- goto fn_fail;
+ /* if this is a streamed op pkt, set iov to receive extended pkt header. */
+ req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.ext_hdr_ptr;
+ req->dev.iov[0].MPID_IOV_LEN = req->dev.ext_hdr_sz;
+ req->dev.iov_count = 1;
+
+ *buflen = sizeof(MPIDI_CH3_Pkt_t);
}
- /* --END ERROR HANDLING-- */
+ else {
+ req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
+
+ MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
+
+ MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
+ /* allocate a SRBuf for receiving stream unit */
+ MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
+ /* --BEGIN ERROR HANDLING-- */
+ if (req->dev.tmpbuf_sz == 0) {
+ MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
+ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
+ FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
+ "**nomem %d", MPIDI_CH3U_SRBuf_size);
+ req->status.MPI_ERROR = mpi_errno;
+ goto fn_fail;
+ }
+ /* --END ERROR HANDLING-- */
- req->dev.user_buf = req->dev.tmpbuf;
+ req->dev.user_buf = req->dev.tmpbuf;
- MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
- total_len = type_size * get_accum_pkt->count;
- rest_len = total_len - req->dev.stream_offset;
- stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
+ MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
+ total_len = type_size * get_accum_pkt->count;
+ stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
- req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
- MPIU_Assert(req->dev.recv_data_sz > 0);
+ req->dev.recv_data_sz = MPIR_MIN(total_len, stream_elem_count * type_size);
+ MPIU_Assert(req->dev.recv_data_sz > 0);
- mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
- MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
- "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
+ mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
+ MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
+ "**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
- /* return the number of bytes processed in this function */
- *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
+ /* return the number of bytes processed in this function */
+ *buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
- if (complete) {
- mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, req, &complete);
- if (mpi_errno)
- MPIU_ERR_POP(mpi_errno);
if (complete) {
- *rreqp = NULL;
- goto fn_exit;
+ mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, req, &complete);
+ if (mpi_errno)
+ MPIU_ERR_POP(mpi_errno);
+ if (complete) {
+ *rreqp = NULL;
+ goto fn_exit;
+ }
}
}
}
else {
+ int metadata_sz = 0;
+
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV_DERIVED_DT);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
+ if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM)
+ metadata_sz += sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
+
req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
if (!req->dev.dtype_info) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_RMA_dtype_info");
}
+ metadata_sz += sizeof(MPIDI_RMA_dtype_info);
- req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.metadata.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- get_accum_pkt->info.metadata.dataloop_size);
+ get_accum_pkt->info.dataloop_size);
}
+ metadata_sz += get_accum_pkt->info.dataloop_size;
+
+ if (data_len >= metadata_sz) {
+ int buf_offset = 0;
+
+ if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ /* copy extended header */
+ MPIU_Memcpy(req->dev.ext_hdr_ptr, data_buf + buf_offset,
+ sizeof(MPIDI_CH3_Ext_pkt_get_accum_t));
+ buf_offset += sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
+ }
- if (data_len >=
- sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
- MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
- MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- get_accum_pkt->info.metadata.dataloop_size);
+ MPIU_Memcpy(req->dev.dtype_info, data_buf + buf_offset,
+ sizeof(MPIDI_RMA_dtype_info));
+ buf_offset += sizeof(MPIDI_RMA_dtype_info);
- *buflen =
- sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- get_accum_pkt->info.metadata.dataloop_size;
+ MPIU_Memcpy(req->dev.dataloop, data_buf + buf_offset,
+ get_accum_pkt->info.dataloop_size);
+ buf_offset += get_accum_pkt->info.dataloop_size;
+
+ *buflen = sizeof(MPIDI_CH3_Pkt_t) + metadata_sz;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(vc, req, &complete);
@@ -991,11 +1080,23 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
}
else {
- req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
- req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
- req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.metadata.dataloop_size;
- req->dev.iov_count = 2;
+ int iov_n = 0;
+
+ if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
+ req->dev.iov[iov_n].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.ext_hdr_ptr;
+ req->dev.iov[iov_n].MPID_IOV_LEN = req->dev.ext_hdr_sz;
+ iov_n++;
+ }
+
+ req->dev.iov[iov_n].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
+ req->dev.iov[iov_n].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
+ iov_n++;
+
+ req->dev.iov[iov_n].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
+ req->dev.iov[iov_n].MPID_IOV_LEN = get_accum_pkt->info.dataloop_size;
+ iov_n++;
+
+ req->dev.iov_count = iov_n;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
@@ -1518,6 +1619,7 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPI_Aint stream_elem_count;
MPI_Aint total_len, rest_len;
MPI_Aint real_stream_offset;
+ MPI_Aint contig_stream_offset = 0;
if (MPIR_DATATYPE_IS_PREDEFINED(req->dev.datatype)) {
basic_type = req->dev.datatype;
@@ -1530,12 +1632,16 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_extent_macro(basic_type, basic_type_extent);
MPID_Datatype_get_size_macro(basic_type, basic_type_size);
+ if (req->dev.ext_hdr_ptr != NULL)
+ contig_stream_offset =
+ ((MPIDI_CH3_Ext_pkt_get_accum_t *) req->dev.ext_hdr_ptr)->stream_offset;
+
total_len = type_size * req->dev.user_count;
- rest_len = total_len - req->dev.stream_offset;
+ rest_len = total_len - contig_stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * basic_type_size);
- real_stream_offset = (req->dev.stream_offset / basic_type_size) * basic_type_extent;
+ real_stream_offset = (contig_stream_offset / basic_type_size) * basic_type_extent;
if (MPIR_DATATYPE_IS_PREDEFINED(req->dev.datatype)) {
req->dev.user_buf = (void *) ((char *) req->dev.user_buf + real_stream_offset);
@@ -1552,8 +1658,8 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.segment_ptr = MPID_Segment_alloc();
MPID_Segment_init(req->dev.user_buf, req->dev.user_count, req->dev.datatype,
req->dev.segment_ptr, 0);
- req->dev.segment_first = req->dev.stream_offset;
- req->dev.segment_size = req->dev.stream_offset + req->dev.recv_data_sz;
+ req->dev.segment_first = contig_stream_offset;
+ req->dev.segment_size = contig_stream_offset + req->dev.recv_data_sz;
mpi_errno = MPIDI_CH3U_Request_load_recv_iov(req);
if (mpi_errno != MPI_SUCCESS) {
@@ -2007,7 +2113,7 @@ int MPIDI_CH3_PktPrint_Put(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->put.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->put.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->put.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.metadata.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.dataloop_size));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->put.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->put.source_win_handle));
/*MPIU_DBG_PRINTF((" win_ptr ...... 0x%08X\n", pkt->put.win_ptr)); */
@@ -2020,7 +2126,7 @@ int MPIDI_CH3_PktPrint_Get(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->get.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->get.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->get.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.metadata.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.dataloop_size));
MPIU_DBG_PRINTF((" request ...... 0x%08X\n", pkt->get.request_handle));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->get.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->get.source_win_handle));
@@ -2045,7 +2151,7 @@ int MPIDI_CH3_PktPrint_Accumulate(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->accum.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->accum.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->accum.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.metadata.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.dataloop_size));
MPIU_DBG_PRINTF((" op ........... 0x%08X\n", pkt->accum.op));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->accum.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->accum.source_win_handle));
http://git.mpich.org/mpich.git/commitdiff/e0eaed63e0a2228215b67672386e7323eb2049d5
commit e0eaed63e0a2228215b67672386e7323eb2049d5
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Wed May 13 15:04:58 2015 -0500
Initialize tmpbuf in request to NULL when creating new request.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
index 30dcde9..1766acb 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
@@ -95,6 +95,7 @@ static inline MPID_nem_ptl_req_area * REQ_PTL(MPID_Request *req) {
(sreq_)->dev.state = 0; \
(sreq_)->dev.datatype_ptr = NULL; \
(sreq_)->dev.segment_ptr = NULL; \
+ (sreq_)->dev.tmpbuf = NULL; \
\
MPID_nem_ptl_init_req(sreq_); \
} while (0)
diff --git a/src/mpid/ch3/include/mpidimpl.h b/src/mpid/ch3/include/mpidimpl.h
index 995a81f..061c0f2 100644
--- a/src/mpid/ch3/include/mpidimpl.h
+++ b/src/mpid/ch3/include/mpidimpl.h
@@ -353,6 +353,7 @@ extern MPIDI_Process_t MPIDI_Process;
(sreq_)->dev.OnFinal = NULL; \
(sreq_)->dev.iov_count = 0; \
(sreq_)->dev.iov_offset = 0; \
+ (sreq_)->dev.tmpbuf = NULL; \
MPIDI_Request_clear_dbg(sreq_); \
}
@@ -386,6 +387,7 @@ extern MPIDI_Process_t MPIDI_Process;
(rreq_)->dev.OnDataAvail = NULL; \
(rreq_)->dev.OnFinal = NULL; \
(rreq_)->dev.drop_data = FALSE; \
+ (rreq_)->dev.tmpbuf = NULL; \
MPIDI_CH3_REQUEST_INIT(rreq_);\
}
diff --git a/src/mpid/ch3/src/ch3u_request.c b/src/mpid/ch3/src/ch3u_request.c
index 6793ce8..63b5f6e 100644
--- a/src/mpid/ch3/src/ch3u_request.c
+++ b/src/mpid/ch3/src/ch3u_request.c
@@ -95,6 +95,7 @@ MPID_Request * MPID_Request_create(void)
req->dev.user_buf = NULL;
req->dev.drop_data = FALSE;
req->dev.stream_offset = 0;
+ req->dev.tmpbuf = NULL;
#ifdef MPIDI_CH3_REQUEST_INIT
MPIDI_CH3_REQUEST_INIT(req);
#endif
http://git.mpich.org/mpich.git/commitdiff/6f62c424b6add5339cdb295a794b8808ac7d8f98
commit 6f62c424b6add5339cdb295a794b8808ac7d8f98
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Thu Apr 30 12:38:39 2015 -0700
Revert "Move 'stream_offset' out of RMA packet struct."
This reverts commit 19f29078c001ff4176330815b066eac3da7b9e52.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/include/mpid_rma_issue.h b/src/mpid/ch3/include/mpid_rma_issue.h
index d6ee801..3ae5ab6 100644
--- a/src/mpid/ch3/include/mpid_rma_issue.h
+++ b/src/mpid/ch3/include/mpid_rma_issue.h
@@ -501,7 +501,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
- count = 3 + vec_len;
+ count = 2 + vec_len;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
@@ -518,16 +518,10 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
- req->dev.stream_offset = stream_offset;
-
- displaces[2] = MPIU_PtrToAint(&(req->dev.stream_offset));
- blocklens[2] = sizeof(req->dev.stream_offset);
- datatypes[2] = MPI_BYTE;
-
for (i = 0; i < vec_len; i++) {
- displaces[i + 3] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
- MPIU_Assign_trunc(blocklens[i + 3], dloop_vec[i].DLOOP_VECTOR_LEN, int);
- datatypes[i + 3] = MPI_BYTE;
+ displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
+ MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
+ datatypes[i + 2] = MPI_BYTE;
}
MPID_Segment_free(segp);
@@ -740,11 +734,7 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
- if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
- accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
- + j * stream_elem_count * predefined_dtp_extent);
- accum_pkt->count = stream_size / predefined_dtp_size;
- }
+ accum_pkt->info.metadata.stream_offset = stream_offset;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
@@ -953,11 +943,7 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
- if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
- get_accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
- + j * stream_elem_count * predefined_dtp_extent);
- get_accum_pkt->count = stream_size / predefined_dtp_size;
- }
+ get_accum_pkt->info.metadata.stream_offset = stream_offset;
resp_req->dev.stream_offset = stream_offset;
diff --git a/src/mpid/ch3/include/mpid_rma_types.h b/src/mpid/ch3/include/mpid_rma_types.h
index ff8c63d..be0e869 100644
--- a/src/mpid/ch3/include/mpid_rma_types.h
+++ b/src/mpid/ch3/include/mpid_rma_types.h
@@ -60,9 +60,6 @@ typedef struct MPIDI_RMA_Op {
int result_count;
MPI_Datatype result_datatype;
- /* used in streaming ACCs */
- void *original_target_addr;
-
struct MPID_Request **reqs;
int reqs_size;
diff --git a/src/mpid/ch3/include/mpidpkt.h b/src/mpid/ch3/include/mpidpkt.h
index eec2083..ca5ed2d 100644
--- a/src/mpid/ch3/include/mpidpkt.h
+++ b/src/mpid/ch3/include/mpidpkt.h
@@ -522,16 +522,31 @@ MPIDI_CH3_PKT_DEFS
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
- (pkt_).put.info.dataloop_size = (dataloop_size_); \
+ (pkt_).put.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET): \
- (pkt_).get.info.dataloop_size = (dataloop_size_); \
+ (pkt_).get.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
- (pkt_).accum.info.dataloop_size = (dataloop_size_); \
+ (pkt_).accum.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
- (pkt_).get_accum.info.dataloop_size = (dataloop_size_); \
+ (pkt_).get_accum.info.metadata.dataloop_size = (dataloop_size_); \
+ break; \
+ default: \
+ MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
+ } \
+ }
+
+#define MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET(pkt_, stream_offset_, err_) \
+ { \
+ err_ = MPI_SUCCESS; \
+ switch((pkt_).type) { \
+ case (MPIDI_CH3_PKT_ACCUMULATE): \
+ (stream_offset_) = (pkt_).accum.info.metadata.stream_offset; \
+ break; \
+ case (MPIDI_CH3_PKT_GET_ACCUM): \
+ (stream_offset_) = (pkt_).get_accum.info.metadata.stream_offset; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
@@ -594,7 +609,12 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
- int dataloop_size;
+ /* note that we use struct here in order
+ * to consistently access dataloop_size
+ * by "pkt->info.metadata.dataloop_size". */
+ struct {
+ int dataloop_size;
+ } metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_put_t;
@@ -608,8 +628,10 @@ typedef struct MPIDI_CH3_Pkt_get {
struct {
/* note that we use struct here in order
* to consistently access dataloop_size
- * by "pkt->info.dataloop_size". */
- int dataloop_size; /* for derived datatypes */
+ * by "pkt->info.metadata.dataloop_size". */
+ struct {
+ int dataloop_size; /* for derived datatypes */
+ } metadata;
} info;
MPI_Request request_handle;
MPI_Win target_win_handle;
@@ -640,7 +662,10 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
- int dataloop_size;
+ struct {
+ int dataloop_size;
+ MPI_Aint stream_offset;
+ } metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_accum_t;
@@ -655,7 +680,10 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Op op;
MPI_Win target_win_handle;
union {
- int dataloop_size;
+ struct {
+ int dataloop_size;
+ MPI_Aint stream_offset;
+ } metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_get_accum_t;
diff --git a/src/mpid/ch3/include/mpidrma.h b/src/mpid/ch3/include/mpidrma.h
index 50ea300..d6f4c15 100644
--- a/src/mpid/ch3/include/mpidrma.h
+++ b/src/mpid/ch3/include/mpidrma.h
@@ -373,8 +373,21 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPID_Datatype_get_extent_macro(target_dtp, type_extent);
MPID_Datatype_get_size_macro(target_dtp, type_size);
- recv_data_sz = type_size * target_count;
- buf_size = type_extent * target_count;
+ if (pkt->type == MPIDI_CH3_PKT_PUT) {
+ recv_data_sz = type_size * target_count;
+ buf_size = type_extent * target_count;
+ }
+ else {
+ MPI_Aint stream_offset, stream_elem_count;
+ MPI_Aint total_len, rest_len;
+
+ MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET((*pkt), stream_offset, mpi_errno);
+ stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
+ total_len = type_size * target_count;
+ rest_len = total_len - stream_offset;
+ recv_data_sz = MPIR_MIN(rest_len, type_size * stream_elem_count);
+ buf_size = type_extent * (recv_data_sz / type_size);
+ }
if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_req.c b/src/mpid/ch3/src/ch3u_handle_recv_req.c
index 33ecb54..b39616f 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_req.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_req.c
@@ -1115,20 +1115,26 @@ static inline int perform_acc_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
if (acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
/* All data fits in packet header */
- /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
- that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(acc_pkt->info.data, acc_pkt->count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
- 0/* stream offset */, acc_pkt->op);
+ 0, acc_pkt->op);
}
else {
MPIU_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
+ MPI_Aint type_size, type_extent;
+ MPI_Aint total_len, rest_len, recv_count;
- /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
- that is piggybacked with LOCK flag must be the first stream unit */
- mpi_errno = do_accumulate_op(lock_entry->data, acc_pkt->count, acc_pkt->datatype,
+ MPID_Datatype_get_size_macro(acc_pkt->datatype, type_size);
+ MPID_Datatype_get_extent_macro(acc_pkt->datatype, type_extent);
+
+ total_len = type_size * acc_pkt->count;
+ rest_len = total_len - acc_pkt->info.metadata.stream_offset;
+ recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
+ MPIU_Assert(recv_count > 0);
+
+ mpi_errno = do_accumulate_op(lock_entry->data, recv_count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
- 0/* stream offset */, acc_pkt->op);
+ acc_pkt->info.metadata.stream_offset, acc_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
@@ -1162,6 +1168,8 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_IOV iov[MPID_IOV_LIMIT];
int is_contig;
int mpi_errno = MPI_SUCCESS;
+ MPI_Aint type_extent;
+ MPI_Aint total_len, rest_len, recv_count;
/* Piggyback candidate should have basic datatype for target datatype. */
MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));
@@ -1221,12 +1229,10 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
/* Perform ACCUMULATE OP */
/* All data fits in packet header */
- /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
- that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
- get_accum_pkt->datatype, 0/* stream offset */, get_accum_pkt->op);
+ get_accum_pkt->datatype, 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
@@ -1254,7 +1260,14 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
- sreq->dev.user_buf = (void *) MPIU_Malloc(get_accum_pkt->count * type_size);
+ MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, type_extent);
+
+ total_len = type_size * get_accum_pkt->count;
+ rest_len = total_len - get_accum_pkt->info.metadata.stream_offset;
+ recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
+ MPIU_Assert(recv_count > 0);
+
+ sreq->dev.user_buf = (void *) MPIU_Malloc(recv_count * type_size);
MPID_Datatype_is_contig(get_accum_pkt->datatype, &is_contig);
@@ -1265,16 +1278,15 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
/* Copy data from target window to temporary buffer */
- /* NOTE: here we copy data from stream_offset = 0, because
- the unit that is piggybacked with LOCK flag must be the
- first stream unit. */
if (is_contig) {
- MPIU_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr, get_accum_pkt->count * type_size);
+ MPIU_Memcpy(sreq->dev.user_buf,
+ (void *) ((char *) get_accum_pkt->addr +
+ get_accum_pkt->info.metadata.stream_offset), recv_count * type_size);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
- MPI_Aint first = 0;
- MPI_Aint last = first + type_size * get_accum_pkt->count;
+ MPI_Aint first = get_accum_pkt->info.metadata.stream_offset;
+ MPI_Aint last = first + type_size * recv_count;
if (seg == NULL) {
if (win_ptr->shm_allocated == TRUE)
@@ -1290,11 +1302,9 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
/* Perform ACCUMULATE OP */
- /* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
- that is piggybacked with LOCK flag must be the first stream unit */
- mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->count, get_accum_pkt->datatype,
+ mpi_errno = do_accumulate_op(lock_entry->data, recv_count, get_accum_pkt->datatype,
get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
- 0/* stream offset */, get_accum_pkt->op);
+ get_accum_pkt->info.metadata.stream_offset, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
@@ -1320,7 +1330,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) sreq->dev.user_buf);
- iov[1].MPID_IOV_LEN = get_accum_pkt->count * type_size;
+ iov[1].MPID_IOV_LEN = recv_count * type_size;
iovcnt = 2;
mpi_errno = MPIDI_CH3_iSendv(lock_entry->vc, sreq, iov, iovcnt);
diff --git a/src/mpid/ch3/src/ch3u_rma_ops.c b/src/mpid/ch3/src/ch3u_rma_ops.c
index 136c260..9611716 100644
--- a/src/mpid/ch3/src/ch3u_rma_ops.c
+++ b/src/mpid/ch3/src/ch3u_rma_ops.c
@@ -198,7 +198,7 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
put_pkt->count = target_count;
put_pkt->datatype = target_datatype;
- put_pkt->info.dataloop_size = 0;
+ put_pkt->info.metadata.dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
@@ -387,7 +387,7 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
- get_pkt->info.dataloop_size = 0;
+ get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt)
@@ -612,10 +612,11 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
accum_pkt->count = target_count;
accum_pkt->datatype = target_datatype;
- accum_pkt->info.dataloop_size = 0;
+ accum_pkt->info.metadata.dataloop_size = 0;
accum_pkt->op = op;
accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
accum_pkt->source_win_handle = win_ptr->handle;
+ accum_pkt->info.metadata.stream_offset = 0;
accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (accum_pkt->info.data);
@@ -624,12 +625,6 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
MPIU_ERR_POP(mpi_errno);
}
- /* NOTE: here we backup the original starting address for the entire operation
- on target window in 'original_target_addr', because when actually issuing
- this operation, we may stream this operation and overwrite 'addr' with the
- starting address for the streaming unit. */
- new_ptr->original_target_addr = accum_pkt->addr;
-
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
@@ -815,7 +810,7 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
- get_pkt->info.dataloop_size = 0;
+ get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
@@ -936,9 +931,10 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_accum_pkt->count = target_count;
get_accum_pkt->datatype = target_datatype;
- get_accum_pkt->info.dataloop_size = 0;
+ get_accum_pkt->info.metadata.dataloop_size = 0;
get_accum_pkt->op = op;
get_accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
+ get_accum_pkt->info.metadata.stream_offset = 0;
get_accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (get_accum_pkt->info.data);
@@ -946,12 +942,6 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
-
- /* NOTE: here we backup the original starting address for the entire operation
- on target window in 'original_target_addr', because when actually issuing
- this operation, we may stream this operation and overwrite 'addr' with the
- starting address for the streaming unit. */
- new_ptr->original_target_addr = get_accum_pkt->addr;
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
@@ -1356,7 +1346,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = 1;
get_pkt->datatype = datatype;
- get_pkt->info.dataloop_size = 0;
+ get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
diff --git a/src/mpid/ch3/src/ch3u_rma_pkthandler.c b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
index 5ec61a4..120fb1b 100644
--- a/src/mpid/ch3/src/ch3u_rma_pkthandler.c
+++ b/src/mpid/ch3/src/ch3u_rma_pkthandler.c
@@ -293,24 +293,24 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(put_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- put_pkt->info.dataloop_size);
+ put_pkt->info.metadata.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- put_pkt->info.dataloop_size);
+ put_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- put_pkt->info.dataloop_size;
+ put_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
@@ -325,7 +325,7 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
+ req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
@@ -519,24 +519,24 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(get_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- get_pkt->info.dataloop_size);
+ get_pkt->info.metadata.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- get_pkt->info.dataloop_size);
+ get_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- get_pkt->info.dataloop_size;
+ get_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
@@ -549,7 +549,7 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
+ req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
@@ -574,6 +574,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
MPID_Request *req = NULL;
+ MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
@@ -581,6 +582,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
+ MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
@@ -638,6 +640,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
+ req->dev.stream_offset = accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
@@ -650,6 +653,8 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
req->dev.datatype = accum_pkt->datatype;
+ MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
+
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
@@ -668,7 +673,11 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
- req->dev.recv_data_sz = type_size * accum_pkt->count;
+ total_len = type_size * accum_pkt->count;
+ rest_len = total_len - req->dev.stream_offset;
+ stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
+
+ req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
@@ -700,25 +709,21 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(accum_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- accum_pkt->info.dataloop_size);
+ accum_pkt->info.metadata.dataloop_size);
}
- if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size +
- sizeof(req->dev.stream_offset)) {
+ if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- accum_pkt->info.dataloop_size);
- MPIU_Memcpy(&(req->dev.stream_offset),
- data_buf + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size,
- sizeof(req->dev.stream_offset));
+ accum_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
+ accum_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
@@ -733,10 +738,8 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
- req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
- req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
- req->dev.iov_count = 3;
+ req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.metadata.dataloop_size;
+ req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
@@ -766,12 +769,14 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &pkt->get_accum;
MPID_Request *req = NULL;
+ MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
MPID_Win *win_ptr;
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
+ MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
@@ -889,6 +894,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.real_user_buf = get_accum_pkt->addr;
req->dev.target_win_handle = get_accum_pkt->target_win_handle;
req->dev.flags = get_accum_pkt->flags;
+ req->dev.stream_offset = get_accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = get_accum_pkt->request_handle;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
@@ -903,6 +909,8 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
req->dev.datatype = get_accum_pkt->datatype;
+ MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
+
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
@@ -920,7 +928,11 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.user_buf = req->dev.tmpbuf;
MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
- req->dev.recv_data_sz = type_size * get_accum_pkt->count;
+ total_len = type_size * get_accum_pkt->count;
+ rest_len = total_len - req->dev.stream_offset;
+ stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
+
+ req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
@@ -952,26 +964,22 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
- req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
+ req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
- get_accum_pkt->info.dataloop_size);
+ get_accum_pkt->info.metadata.dataloop_size);
}
if (data_len >=
- sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.dataloop_size +
- sizeof(req->dev.stream_offset)) {
+ sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
- get_accum_pkt->info.dataloop_size);
- MPIU_Memcpy(&(req->dev.stream_offset),
- data_buf + sizeof(MPIDI_RMA_dtype_info) +
- get_accum_pkt->info.dataloop_size, sizeof(req->dev.stream_offset));
+ get_accum_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
- get_accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
+ get_accum_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete(vc, req, &complete);
@@ -986,10 +994,8 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
- req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.dataloop_size;
- req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
- req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
- req->dev.iov_count = 3;
+ req->dev.iov[1].MPID_IOV_LEN = get_accum_pkt->info.metadata.dataloop_size;
+ req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
@@ -2001,7 +2007,7 @@ int MPIDI_CH3_PktPrint_Put(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->put.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->put.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->put.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. 0x%08X\n", pkt->put.info.metadata.dataloop_size));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->put.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->put.source_win_handle));
/*MPIU_DBG_PRINTF((" win_ptr ...... 0x%08X\n", pkt->put.win_ptr)); */
@@ -2014,7 +2020,7 @@ int MPIDI_CH3_PktPrint_Get(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->get.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->get.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->get.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->get.info.metadata.dataloop_size));
MPIU_DBG_PRINTF((" request ...... 0x%08X\n", pkt->get.request_handle));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->get.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->get.source_win_handle));
@@ -2039,7 +2045,7 @@ int MPIDI_CH3_PktPrint_Accumulate(FILE * fp, MPIDI_CH3_Pkt_t * pkt)
MPIU_DBG_PRINTF((" addr ......... %p\n", pkt->accum.addr));
MPIU_DBG_PRINTF((" count ........ %d\n", pkt->accum.count));
MPIU_DBG_PRINTF((" datatype ..... 0x%08X\n", pkt->accum.datatype));
- MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.dataloop_size));
+ MPIU_DBG_PRINTF((" dataloop_size. %d\n", pkt->accum.info.metadata.dataloop_size));
MPIU_DBG_PRINTF((" op ........... 0x%08X\n", pkt->accum.op));
MPIU_DBG_PRINTF((" target ....... 0x%08X\n", pkt->accum.target_win_handle));
MPIU_DBG_PRINTF((" source ....... 0x%08X\n", pkt->accum.source_win_handle));
http://git.mpich.org/mpich.git/commitdiff/caeb3b3a60bc89616c0ccb28d0127f96459607c9
commit caeb3b3a60bc89616c0ccb28d0127f96459607c9
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Wed Apr 29 06:39:24 2015 -0500
Modify comments about piggybacking LOCK with op.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/include/mpidrma.h b/src/mpid/ch3/include/mpidrma.h
index 4c90166..50ea300 100644
--- a/src/mpid/ch3/include/mpidrma.h
+++ b/src/mpid/ch3/include/mpidrma.h
@@ -365,7 +365,7 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPIDI_msg_sz_t data_len;
char *data_buf = NULL;
- /* This is PUT, ACC, GACC */
+ /* This is PUT, ACC, GACC, FOP */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);
http://git.mpich.org/mpich.git/commitdiff/a5a9c964a00179191ad9a4446a3e074227fdff2b
commit a5a9c964a00179191ad9a4446a3e074227fdff2b
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Mon May 18 14:30:43 2015 -0500
Modify comments about issuing origin data in RMA.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/include/mpid_rma_issue.h b/src/mpid/ch3/include/mpid_rma_issue.h
index 9345b70..d6ee801 100644
--- a/src/mpid/ch3/include/mpid_rma_issue.h
+++ b/src/mpid/ch3/include/mpid_rma_issue.h
@@ -213,7 +213,7 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
if (target_dtp == NULL) {
/* basic datatype on target */
if (is_origin_contig) {
- /* basic datatype on origin */
+ /* origin data is contiguous */
int iovcnt = 2;
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb);
@@ -236,7 +236,7 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
}
}
else {
- /* derived datatype on origin */
+ /* origin data is non-contiguous */
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
@@ -404,7 +404,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
if (target_dtp == NULL) {
/* basic datatype on target */
if (is_origin_contig) {
- /* basic datatype on origin */
+ /* origin data is contiguous */
int iovcnt = 2;
iov[1].MPID_IOV_BUF =
@@ -428,7 +428,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
}
}
else {
- /* derived datatype on origin */
+ /* origin data is non-contiguous */
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
http://git.mpich.org/mpich.git/commitdiff/b0668be93a65729668b9987a27b7171b472b3f59
commit b0668be93a65729668b9987a27b7171b472b3f59
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Mon May 18 14:29:57 2015 -0500
Bug-fix: add true_lb when directly issuing contiguous derived data.
In RMA communication, when data to be sent is derived datatype but
contiguous, we can avoid copy/packing but directly issue it from
the user buffer. However, the data may have non-zero lb and we should
add the true lb when calculating the starting address. This patch
fixes this issue.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/include/mpid_rma_issue.h b/src/mpid/ch3/include/mpid_rma_issue.h
index ff6005a..9345b70 100644
--- a/src/mpid/ch3/include/mpid_rma_issue.h
+++ b/src/mpid/ch3/include/mpid_rma_issue.h
@@ -176,6 +176,7 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
int *blocklens = NULL;
MPI_Aint *displaces = NULL;
MPI_Datatype *datatypes = NULL;
+ MPI_Aint dt_true_lb;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
@@ -201,7 +202,10 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
}
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
+
+ /* check if origin data is contiguous and get true lb */
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
+ MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
iov[0].MPID_IOV_LEN = sizeof(rma_op->pkt);
@@ -212,7 +216,7 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
/* basic datatype on origin */
int iovcnt = 2;
- iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr);
+ iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
@@ -363,6 +367,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
int *blocklens = NULL;
MPI_Aint *displaces = NULL;
MPI_Datatype *datatypes = NULL;
+ MPI_Aint dt_true_lb;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
@@ -389,7 +394,9 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
}
+ /* check if origin data is contiguous and get true lb */
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
+ MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
iov[0].MPID_IOV_LEN = sizeof(rma_op->pkt);
@@ -401,7 +408,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
int iovcnt = 2;
iov[1].MPID_IOV_BUF =
- (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + stream_offset);
+ (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[1].MPID_IOV_LEN = stream_size;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
diff --git a/src/mpid/ch3/src/ch3u_handle_recv_req.c b/src/mpid/ch3/src/ch3u_handle_recv_req.c
index 5e62e66..33ecb54 100644
--- a/src/mpid/ch3/src/ch3u_handle_recv_req.c
+++ b/src/mpid/ch3/src/ch3u_handle_recv_req.c
@@ -232,6 +232,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
int is_contig;
MPI_Datatype basic_type;
MPI_Aint predef_count, predef_dtp_size;
+ MPI_Aint dt_true_lb;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
@@ -261,7 +262,9 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
+ /* check if data is contiguous and get true lb */
MPID_Datatype_is_contig(rreq->dev.datatype, &is_contig);
+ MPID_Datatype_get_true_lb(rreq->dev.datatype, &dt_true_lb);
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
@@ -280,8 +283,8 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
if (is_contig) {
MPIU_Memcpy(resp_req->dev.user_buf,
- (void *) ((char *) rreq->dev.real_user_buf + rreq->dev.stream_offset),
- rreq->dev.recv_data_sz);
+ (void *) ((char *) rreq->dev.real_user_buf + dt_true_lb +
+ rreq->dev.stream_offset), rreq->dev.recv_data_sz);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
diff --git a/src/mpid/common/datatype/mpid_datatype.h b/src/mpid/common/datatype/mpid_datatype.h
index 7af878d..7fd50be 100644
--- a/src/mpid/common/datatype/mpid_datatype.h
+++ b/src/mpid/common/datatype/mpid_datatype.h
@@ -565,6 +565,20 @@ do { \
} \
} while (0)
+/* helper macro: takes an MPI_Datatype handle value and returns true_lb in
+ * (*true_lb_) */
+#define MPID_Datatype_get_true_lb(dtype_, true_lb_) \
+ do { \
+ if (HANDLE_GET_KIND(dtype_) == HANDLE_KIND_BUILTIN) { \
+ *(true_lb_) = 0; \
+ } \
+ else { \
+ MPID_Datatype *dtp_ = NULL; \
+ MPID_Datatype_get_ptr((dtype_), dtp_); \
+ *(true_lb_) = dtp_->true_lb; \
+ } \
+ } while (0)
+
/* Datatype functions */
int MPID_Type_commit(MPI_Datatype *type);
http://git.mpich.org/mpich.git/commitdiff/c292eca93f2035d2cfcfb616f7f7acbb59766e15
commit c292eca93f2035d2cfcfb616f7f7acbb59766e15
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Thu May 28 20:38:51 2015 -0500
Correct usage of req's segment_first and segment_size in Nemesis
Originally the implementation of sendNoncontig() in Nemesis
assume that req->dev.segment_first is 0 and req->dev.segment_size
is the size of data, which potentially asssumes that the data passed
from upper layer must be the entire data, prohibiting the possibility
of streaming that data in the upper layer. The general way to use them
should be that req->dev.segment_first specifies the current starting
location and req->dev.segment_size specifies the current ending location.
We fixed this issue in commit 5132e070e0499b3a6fa2955710bef4c699d531fc, but
there are some places missed in that fixing. Here we fixed those places.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c b/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
index a45950f..24fccde 100644
--- a/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
+++ b/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
@@ -22,6 +22,7 @@ int MPIDI_CH3I_SendNoncontig( MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
{
int mpi_errno = MPI_SUCCESS;
int again = 0;
+ int orig_segment_first = sreq->dev.segment_first;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SENDNONCONTIG);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SENDNONCONTIG);
@@ -58,7 +59,7 @@ int MPIDI_CH3I_SendNoncontig( MPIDI_VC_t *vc, MPID_Request *sreq, void *header,
/* we didn't finish sending everything */
sreq->ch.noncontig = TRUE;
sreq->ch.vc = vc;
- if (sreq->dev.segment_first == 0) /* nothing was sent, save header */
+ if (sreq->dev.segment_first == orig_segment_first) /* nothing was sent, save header */
{
sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)header;
sreq->ch.header_sz = hdr_sz;
http://git.mpich.org/mpich.git/commitdiff/19bdecfa2485036f82118608f58bbc171d9c2596
commit 19bdecfa2485036f82118608f58bbc171d9c2596
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Thu May 28 20:48:53 2015 -0500
Add asserts and comments in Nemesis and MXM netmod about streaming offset.
Note that here we move the definition of streaming unit size to
mpidimpl.h in CH3, so that Nemesis can see it and do assert check on it.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h b/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h
index e537dc2..fbf2895 100644
--- a/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h
+++ b/src/mpid/ch3/channels/nemesis/include/mpid_nem_inline.h
@@ -440,6 +440,16 @@ MPID_nem_mpich_send_seg_header (MPID_Segment *segment, MPIDI_msg_sz_t *segment_f
{
MPID_nem_fbox_mpich_t *pbox = vc_ch->fbox_out;
+ /* Add a compiler time check on streaming unit size and FASTBOX size */
+ MPIU_Static_assert((MPIDI_CH3U_Acc_stream_size > MPID_NEM_FBOX_DATALEN),
+ "RMA ACC Streaming unit size <= FASTBOX size in Nemesis.");
+
+ /* NOTE: when FASTBOX is being used, streaming optimization is never triggered,
+ * because streaming unit size is larger than FASTBOX size. In such case,
+ * first offset (*segment_first) is zero, and last offset (segment_size)
+ * is the data size */
+ MPIU_Assert(*segment_first == 0);
+
if (MPID_nem_fbox_is_full((MPID_nem_fbox_common_ptr_t)pbox))
goto usequeue_l;
diff --git a/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c b/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c
index fd52108..2c23d5e 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/mxm/mxm_send.c
@@ -180,6 +180,13 @@ int MPID_nem_mxm_SendNoncontig(MPIDI_VC_t * vc, MPID_Request * sreq, void *hdr,
req_area->iov_buf[0].length = sizeof(MPIDI_CH3_Pkt_t);
last = sreq->dev.segment_size;
+
+ /* NOTE: currently upper layer never pass packet with data that has
+ * either "last <= 0" or "last-sreq->dev.segment_first <=0" to this
+ * layer. In future, if upper layer passes such kind of packet, the
+ * judgement of the following IF branch needs to be modified. */
+ MPIU_Assert(last > 0 && last - sreq->dev.segment_first > 0);
+
if (last > 0) {
sreq->dev.tmpbuf = MPIU_Malloc((size_t) (sreq->dev.segment_size - sreq->dev.segment_first));
MPIU_Assert(sreq->dev.tmpbuf);
diff --git a/src/mpid/ch3/include/mpid_rma_issue.h b/src/mpid/ch3/include/mpid_rma_issue.h
index 17c6930..ff6005a 100644
--- a/src/mpid/ch3/include/mpid_rma_issue.h
+++ b/src/mpid/ch3/include/mpid_rma_issue.h
@@ -10,9 +10,6 @@
#include "mpl_utlist.h"
#include "mpid_rma_types.h"
-/* define ACC stream size as the SRBuf size */
-#define MPIDI_CH3U_Acc_stream_size MPIDI_CH3U_SRBuf_size
-
/* =========================================================== */
/* auxiliary functions */
/* =========================================================== */
diff --git a/src/mpid/ch3/include/mpid_rma_shm.h b/src/mpid/ch3/include/mpid_rma_shm.h
index 103e8cc..a34e64f 100644
--- a/src/mpid/ch3/include/mpid_rma_shm.h
+++ b/src/mpid/ch3/include/mpid_rma_shm.h
@@ -10,9 +10,6 @@
#include "mpl_utlist.h"
#include "mpid_rma_types.h"
-/* define ACC stream size as the SRBuf size */
-#define MPIDI_CH3U_Acc_stream_size MPIDI_CH3U_SRBuf_size
-
static inline int do_accumulate_op(void *source_buf, int source_count, MPI_Datatype source_dtp,
void *target_buf, int target_count, MPI_Datatype target_dtp,
MPI_Aint stream_offset, MPI_Op acc_op);
diff --git a/src/mpid/ch3/include/mpidimpl.h b/src/mpid/ch3/include/mpidimpl.h
index b9d6a75..995a81f 100644
--- a/src/mpid/ch3/include/mpidimpl.h
+++ b/src/mpid/ch3/include/mpidimpl.h
@@ -925,6 +925,11 @@ extern MPIDI_CH3U_SRBuf_element_t * MPIDI_CH3U_SRBuf_pool;
END SEND/RECEIVE BUFFER SECTION
-------------------------------*/
+/* define ACC stream size as the SRBuf size */
+#if !defined(MPIDI_CH3U_Acc_stream_size)
+#define MPIDI_CH3U_Acc_stream_size MPIDI_CH3U_SRBuf_size
+#endif
+
/*----------------------------
BEGIN DEBUGGING TOOL SECTION
----------------------------*/
http://git.mpich.org/mpich.git/commitdiff/08d72e3790144b3058b6c9aaebe44f70a4329282
commit 08d72e3790144b3058b6c9aaebe44f70a4329282
Author: Xin Zhao <xinzhao3 at illinois.edu>
Date: Tue May 19 15:10:11 2015 -0500
Add a test for derived ACCs working with MPI_Win_flush_local.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/test/mpi/rma/Makefile.am b/test/mpi/rma/Makefile.am
index 50785e9..5c6de78 100644
--- a/test/mpi/rma/Makefile.am
+++ b/test/mpi/rma/Makefile.am
@@ -145,7 +145,8 @@ noinst_PROGRAMS = \
atomic_rmw_cas \
atomic_rmw_gacc \
acc-pairtype \
- manyget
+ manyget \
+ derived-acc-flush_local
if BUILD_MPIX_TESTS
noinst_PROGRAMS += aint
diff --git a/test/mpi/rma/derived-acc-flush_local.c b/test/mpi/rma/derived-acc-flush_local.c
new file mode 100644
index 0000000..97668db
--- /dev/null
+++ b/test/mpi/rma/derived-acc-flush_local.c
@@ -0,0 +1,125 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ * (C) 2015 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+/* This code tests the case when one process issues large number
+ * of MPI_Accumulate operations (with large derived datatype) and
+ * issues a MPI_Win_flush_local at end. */
+
+/* FIXME: we should merge this into a comprehensive test for RMA
+ * operations + MPI_Win_flush_local. */
+
+#include "mpi.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+
+#define DATA_SIZE 1000000
+#define COUNT 5000
+#define BLOCKLENGTH (DATA_SIZE/COUNT)
+#define STRIDE BLOCKLENGTH
+#define OPS_NUM 500
+
+int main(int argc, char *argv[])
+{
+ int rank, nproc;
+ int i, j;
+ MPI_Win win;
+ int *tar_buf = NULL;
+ int *orig_buf = NULL;
+ MPI_Datatype derived_dtp;
+ int errors = 0;
+
+ MPI_Init(&argc, &argv);
+
+ MPI_Comm_size(MPI_COMM_WORLD, &nproc);
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+
+ MPI_Alloc_mem(sizeof(int) * DATA_SIZE, MPI_INFO_NULL, &orig_buf);
+ MPI_Alloc_mem(sizeof(int) * DATA_SIZE, MPI_INFO_NULL, &tar_buf);
+
+ for (i = 0; i < DATA_SIZE; i++) {
+ orig_buf[i] = 1;
+ tar_buf[i] = 0;
+ }
+
+ MPI_Type_vector(COUNT, BLOCKLENGTH - 1, STRIDE, MPI_INT, &derived_dtp);
+ MPI_Type_commit(&derived_dtp);
+
+ MPI_Win_create(tar_buf, sizeof(int) * DATA_SIZE, sizeof(int),
+ MPI_INFO_NULL, MPI_COMM_WORLD, &win);
+
+ /***** test between rank 0 and rank 1 *****/
+
+ if (rank == 1) {
+ MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, win);
+
+ for (i = 0; i < OPS_NUM; i++) {
+ MPI_Accumulate(orig_buf, 1, derived_dtp,
+ 0, 0, DATA_SIZE - COUNT, MPI_INT, MPI_SUM, win);
+ MPI_Win_flush_local(0, win);
+ }
+
+ MPI_Win_unlock(0, win);
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ /* check results */
+ if (rank == 0) {
+ for (i = 0; i < DATA_SIZE - COUNT; i++) {
+ if (tar_buf[i] != OPS_NUM) {
+ printf("tar_buf[%d] = %d, expected %d\n", i, tar_buf[i], OPS_NUM);
+ errors++;
+ }
+ }
+ }
+
+ for (i = 0; i < DATA_SIZE; i++) {
+ tar_buf[i] = 0;
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ /***** test between rank 0 and rank 2 *****/
+
+ if (rank == 2) {
+ MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, win);
+
+ for (i = 0; i < OPS_NUM; i++) {
+ MPI_Accumulate(orig_buf, 1, derived_dtp,
+ 0, 0, DATA_SIZE - COUNT, MPI_INT, MPI_SUM, win);
+ MPI_Win_flush_local(0, win);
+ }
+
+ MPI_Win_unlock(0, win);
+ }
+
+ MPI_Barrier(MPI_COMM_WORLD);
+
+ /* check results */
+ if (rank == 0) {
+ for (i = 0; i < DATA_SIZE - COUNT; i++) {
+ if (tar_buf[i] != OPS_NUM) {
+ printf("tar_buf[%d] = %d, expected %d\n", i, tar_buf[i], OPS_NUM);
+ errors++;
+ }
+ }
+
+ if (errors == 0)
+ printf(" No Errors\n");
+ }
+
+ MPI_Win_free(&win);
+
+ MPI_Type_free(&derived_dtp);
+
+ MPI_Free_mem(orig_buf);
+ MPI_Free_mem(tar_buf);
+
+ MPI_Finalize();
+
+ return 0;
+}
diff --git a/test/mpi/rma/testlist.in b/test/mpi/rma/testlist.in
index 649e88a..c707b6c 100644
--- a/test/mpi/rma/testlist.in
+++ b/test/mpi/rma/testlist.in
@@ -134,6 +134,7 @@ atomic_rmw_gacc 3
@mpix at aint 2 strict=false
acc-pairtype 2
manyget 2 xfail=ticket2264
+derived-acc-flush_local 3 mpiversion=3.0
## This test is not strictly correct. This was meant to test out the
## case when MPI_Test is not nonblocking. However, we ended up
http://git.mpich.org/mpich.git/commitdiff/6b693f72fdafb07962dc1be996c1b9fcb2c19f8d
commit 6b693f72fdafb07962dc1be996c1b9fcb2c19f8d
Author: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Date: Tue May 19 13:16:14 2015 -0500
Added contig derived dt tests in mtest datatype.
The original mtest routine only generated tests for noncontiguous
derived datatypes. This patch added two tests for every derived
datatype: (1) contiguous ddt (stride=block length) without lower-bound;
(2) contiguous ddt with lower-bound.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/test/mpi/util/mtest_datatype_gen.c b/test/mpi/util/mtest_datatype_gen.c
index 315ef81..dfacdbd 100644
--- a/test/mpi/util/mtest_datatype_gen.c
+++ b/test/mpi/util/mtest_datatype_gen.c
@@ -54,6 +54,8 @@ static int verbose = 0; /* Message level (0 is none) */
* L count & S block length & L stride
* S count & L block length & L stride
* S count & L block length & S stride & S lower-bound
+ * contiguous (stride = block length)
+ * contiguous (stride = block length) & S lower-bound
*
* How to add a new structure for each datatype:
* 1. Add structure definition in function MTestDdtStructDefine.
@@ -97,7 +99,7 @@ static int datatype_index = 0;
/* Routine and internal parameters to define the range of datatype tests */
/* ------------------------------------------------------------------------ */
-#define MTEST_DDT_NUM_SUBTESTS 5 /* 5 kinds of derived datatype structure */
+#define MTEST_DDT_NUM_SUBTESTS 7 /* 7 kinds of derived datatype structure */
static MTestDdtCreator mtestDdtCreators[MTEST_DDT_MAX];
static int MTEST_BDT_START_IDX = -1;
@@ -295,6 +297,19 @@ static inline int MTestDdtStructDefine(int ddt_index, MPI_Aint tot_count, MPI_Ai
_stride = _blen * 2;
_lb = _short / 2; /* make sure lb < blen */
break;
+ case 5:
+ /* Contig ddt (stride = block length) without lb */
+ _count = _align_tot_count / _short;
+ _blen = _short;
+ _stride = _blen;
+ break;
+ case 6:
+ /* Contig ddt (stride = block length) with lb */
+ _count = _short;
+ _blen = _align_tot_count / _short;
+ _stride = _blen;
+ _lb = _short / 2; /* make sure lb < blen */
+ break;
default:
/* Undefined index */
merr = 1;
http://git.mpich.org/mpich.git/commitdiff/e595d1ec065faa03c43c7c750b9b1b0c81abc11d
commit e595d1ec065faa03c43c7c750b9b1b0c81abc11d
Author: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Date: Tue May 19 13:02:09 2015 -0500
Added lower-bound tests in mtest datatype routine.
The original mtest routine only generated datatype tests with zero
lower-bound. This patch added a test for every derived datatype that
checks datatype structure with non-zero lower-bound.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/test/mpi/util/mtest_datatype.c b/test/mpi/util/mtest_datatype.c
index ea59d50..249dedf 100644
--- a/test/mpi/util/mtest_datatype.c
+++ b/test/mpi/util/mtest_datatype.c
@@ -623,9 +623,10 @@ static int MTestTypeSubarrayCheckbuf(MTestDatatype * mtype)
* nblock: Number of blocks.
* blocklen: Number of elements in each block. The total number of elements in
* this datatype is set as (nblock * blocklen).
+ * lb: Lower bound of the new datatype (ignored).
* oldtype: Datatype of element.
*/
-static int MTestTypeContiguousCreate(int nblock, int blocklen, int stride,
+static int MTestTypeContiguousCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -667,9 +668,10 @@ static int MTestTypeContiguousCreate(int nblock, int blocklen, int stride,
* nblock: Number of blocks.
* blocklen: Number of elements in each block.
* stride: Strided number of elements between blocks.
+ * lb: Lower bound of the new datatype (ignored).
* oldtype: Datatype of element.
*/
-static int MTestTypeVectorCreate(int nblock, int blocklen, int stride,
+static int MTestTypeVectorCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -715,9 +717,10 @@ static int MTestTypeVectorCreate(int nblock, int blocklen, int stride,
* nblock: Number of blocks.
* blocklen: Number of elements in each block.
* stride: Strided number of elements between blocks.
+ * lb: Lower bound of the new datatype (ignored).
* oldtype: Datatype of element.
*/
-static int MTestTypeHvectorCreate(int nblock, int blocklen, int stride,
+static int MTestTypeHvectorCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -766,9 +769,10 @@ static int MTestTypeHvectorCreate(int nblock, int blocklen, int stride,
* blocklen: Number of elements in each block. Each block has the same length.
* stride: Strided number of elements between two adjacent blocks. The
* displacement of each block is set as (index of current block * stride).
+ * lb: Lower bound of the new datatype.
* oldtype: Datatype of element.
*/
-static int MTestTypeIndexedCreate(int nblock, int blocklen, int stride,
+static int MTestTypeIndexedCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -794,8 +798,8 @@ static int MTestTypeIndexedCreate(int nblock, int blocklen, int stride,
mtype->nblock = nblock;
for (i = 0; i < nblock; i++) {
mtype->index[i] = blocklen;
- mtype->displs[i] = stride * i; /*stride between the start of two blocks */
- mtype->displ_in_bytes[i] = stride * i * mtype->basesize;
+ mtype->displs[i] = lb + stride * i; /*stride between the start of two blocks */
+ mtype->displ_in_bytes[i] = (lb + stride * i) * mtype->basesize;
}
/* Indexed uses displacement in oldtypes */
@@ -807,8 +811,8 @@ static int MTestTypeIndexedCreate(int nblock, int blocklen, int stride,
MTestPrintError(merr);
memset(type_name, 0, sizeof(type_name));
- sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride)", typename_prefix, "index", nblock,
- blocklen, stride);
+ sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride %d lb)", typename_prefix,
+ "index", nblock, blocklen, stride, lb);
merr = MPI_Type_set_name(mtype->datatype, (char *) type_name);
if (merr)
MTestPrintError(merr);
@@ -828,9 +832,10 @@ static int MTestTypeIndexedCreate(int nblock, int blocklen, int stride,
* blocklen: Number of elements in each block. Each block has the same length.
* stride: Strided number of elements between two adjacent blocks. The byte
* displacement of each block is set as (index of current block * stride * size of oldtype).
+ * lb: Lower bound of the new datatype.
* oldtype: Datatype of element.
*/
-static inline int MTestTypeHindexedCreate(int nblock, int blocklen, int stride,
+static inline int MTestTypeHindexedCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -855,7 +860,7 @@ static inline int MTestTypeHindexedCreate(int nblock, int blocklen, int stride,
mtype->nblock = nblock;
for (i = 0; i < nblock; i++) {
mtype->index[i] = blocklen;
- mtype->displ_in_bytes[i] = stride * i * mtype->basesize;
+ mtype->displ_in_bytes[i] = (lb + stride * i) * mtype->basesize;
}
/* Hindexed uses displacement in bytes */
@@ -868,8 +873,8 @@ static inline int MTestTypeHindexedCreate(int nblock, int blocklen, int stride,
MTestPrintError(merr);
memset(type_name, 0, sizeof(type_name));
- sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride)", typename_prefix, "hindex", nblock,
- blocklen, stride);
+ sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride %d lb)", typename_prefix,
+ "hindex", nblock, blocklen, stride, lb);
merr = MPI_Type_set_name(mtype->datatype, (char *) type_name);
if (merr)
MTestPrintError(merr);
@@ -891,9 +896,10 @@ static inline int MTestTypeHindexedCreate(int nblock, int blocklen, int stride,
* blocklen: Number of elements in each block.
* stride: Strided number of elements between two adjacent blocks. The
* displacement of each block is set as (index of current block * stride).
+ * lb: Lower bound of the new datatype.
* oldtype: Datatype of element.
*/
-static int MTestTypeIndexedBlockCreate(int nblock, int blocklen, int stride,
+static int MTestTypeIndexedBlockCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -918,8 +924,8 @@ static int MTestTypeIndexedBlockCreate(int nblock, int blocklen, int stride,
mtype->nblock = nblock;
mtype->blksize = blocklen * mtype->basesize;
for (i = 0; i < nblock; i++) {
- mtype->displs[i] = stride * i;
- mtype->displ_in_bytes[i] = stride * i * mtype->basesize;
+ mtype->displs[i] = lb + stride * i;
+ mtype->displ_in_bytes[i] = (lb + stride * i) * mtype->basesize;
}
/* Indexed-block uses displacement in oldtypes */
@@ -932,8 +938,8 @@ static int MTestTypeIndexedBlockCreate(int nblock, int blocklen, int stride,
MTestPrintError(merr);
memset(type_name, 0, sizeof(type_name));
- sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride)", typename_prefix, "index_block",
- nblock, blocklen, stride);
+ sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride %d lb)", typename_prefix,
+ "index_block", nblock, blocklen, stride, lb);
merr = MPI_Type_set_name(mtype->datatype, (char *) type_name);
if (merr)
MTestPrintError(merr);
@@ -953,9 +959,10 @@ static int MTestTypeIndexedBlockCreate(int nblock, int blocklen, int stride,
* blocklen: Number of elements in each block.
* stride: Strided number of elements between two adjacent blocks. The byte
* displacement of each block is set as (index of current block * stride * size of oldtype).
+ * lb: Lower bound of the new datatype.
* oldtype: Datatype of element.
*/
-static int MTestTypeHindexedBlockCreate(int nblock, int blocklen, int stride,
+static int MTestTypeHindexedBlockCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -979,7 +986,7 @@ static int MTestTypeHindexedBlockCreate(int nblock, int blocklen, int stride,
mtype->nblock = nblock;
mtype->blksize = blocklen * mtype->basesize;
for (i = 0; i < nblock; i++) {
- mtype->displ_in_bytes[i] = stride * i * mtype->basesize;
+ mtype->displ_in_bytes[i] = (lb + stride * i) * mtype->basesize;
}
/* Hindexed-block uses displacement in bytes */
@@ -992,8 +999,8 @@ static int MTestTypeHindexedBlockCreate(int nblock, int blocklen, int stride,
MTestPrintError(merr);
memset(type_name, 0, sizeof(type_name));
- sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride)", typename_prefix, "hindex_block",
- nblock, blocklen, stride);
+ sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride %d lb)", typename_prefix,
+ "hindex_block", nblock, blocklen, stride, lb);
merr = MPI_Type_set_name(mtype->datatype, (char *) type_name);
if (merr)
MTestPrintError(merr);
@@ -1014,9 +1021,10 @@ static int MTestTypeHindexedBlockCreate(int nblock, int blocklen, int stride,
* blocklen: Number of elements in each block. Each block has the same length.
* stride: Strided number of elements between two adjacent blocks. The byte
* displacement of each block is set as (index of current block * stride * size of oldtype).
+ * lb: Lower bound of the new datatype.
* oldtype: Datatype of element. Each block has the same oldtype.
*/
-static int MTestTypeStructCreate(int nblock, int blocklen, int stride,
+static int MTestTypeStructCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -1042,7 +1050,7 @@ static int MTestTypeStructCreate(int nblock, int blocklen, int stride,
mtype->nblock = nblock;
mtype->blksize = blocklen * mtype->basesize;
for (i = 0; i < nblock; i++) {
- mtype->displ_in_bytes[i] = stride * i * mtype->basesize;
+ mtype->displ_in_bytes[i] = (lb + stride * i) * mtype->basesize;
mtype->old_datatypes[i] = oldtype;
mtype->index[i] = blocklen;
}
@@ -1057,8 +1065,8 @@ static int MTestTypeStructCreate(int nblock, int blocklen, int stride,
MTestPrintError(merr);
memset(type_name, 0, sizeof(type_name));
- sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride)", typename_prefix, "struct",
- nblock, blocklen, stride);
+ sprintf(type_name, "%s %s (%d nblock %d blocklen %d stride %d lb)", typename_prefix,
+ "struct", nblock, blocklen, stride, lb);
merr = MPI_Type_set_name(mtype->datatype, (char *) type_name);
if (merr)
MTestPrintError(merr);
@@ -1074,15 +1082,15 @@ static int MTestTypeStructCreate(int nblock, int blocklen, int stride,
/*
* Setup order-C subarray type info and handlers.
*
- * A 2D-subarray datatype specified with order C and located in the left-middle
+ * A 2D-subarray datatype specified with order C and located in the right-bottom
* of the full array is created by using input parameters.
- * Number of elements in the dimensions of the full array: {nblock + 2, stride}
+ * Number of elements in the dimensions of the full array: {nblock + lb, stride}
* Number of elements in the dimensions of the subarray: {nblock, blocklen}
* Starting of the subarray in each dimension: {1, stride - blocklen}
* order: MPI_ORDER_C
* oldtype: oldtype
*/
-static int MTestTypeSubArrayOrderCCreate(int nblock, int blocklen, int stride,
+static int MTestTypeSubArrayOrderCCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -1095,11 +1103,11 @@ static int MTestTypeSubArrayOrderCCreate(int nblock, int blocklen, int stride,
if (merr)
MTestPrintError(merr);
- mtype->arr_sizes[0] = nblock + 2; /* {row, col} */
+ mtype->arr_sizes[0] = nblock + lb; /* {row, col} */
mtype->arr_sizes[1] = stride;
mtype->arr_subsizes[0] = nblock; /* {row, col} */
mtype->arr_subsizes[1] = blocklen;
- mtype->arr_starts[0] = 1; /* {row, col} */
+ mtype->arr_starts[0] = lb; /* {row, col} */
mtype->arr_starts[1] = stride - blocklen;
mtype->order = MPI_ORDER_C;
@@ -1131,15 +1139,15 @@ static int MTestTypeSubArrayOrderCCreate(int nblock, int blocklen, int stride,
/*
* Setup order-Fortran subarray type info and handlers.
*
- * A 2D-subarray datatype specified with order Fortran and located in the middle
+ * A 2D-subarray datatype specified with order Fortran and located in the right
* bottom of the full array is created by using input parameters.
- * Number of elements in the dimensions of the full array: {stride, nblock + 2}
+ * Number of elements in the dimensions of the full array: {stride, nblock + lb}
* Number of elements in the dimensions of the subarray: {blocklen, nblock}
- * Starting of the subarray in each dimension: {stride - blocklen, 1}
+ * Starting of the subarray in each dimension: {stride - blocklen, lb}
* order: MPI_ORDER_FORTRAN
* oldtype: oldtype
*/
-static int MTestTypeSubArrayOrderFortranCreate(int nblock, int blocklen, int stride,
+static int MTestTypeSubArrayOrderFortranCreate(int nblock, int blocklen, int stride, int lb,
MPI_Datatype oldtype, const char *typename_prefix,
MTestDatatype * mtype)
{
@@ -1154,11 +1162,11 @@ static int MTestTypeSubArrayOrderFortranCreate(int nblock, int blocklen, int str
/* use the same row and col as that of order-c subarray for buffer
* initialization and check because we access buffer in order-c */
- mtype->arr_sizes[0] = nblock + 2; /* {row, col} */
+ mtype->arr_sizes[0] = nblock + lb; /* {row, col} */
mtype->arr_sizes[1] = stride;
mtype->arr_subsizes[0] = nblock; /* {row, col} */
mtype->arr_subsizes[1] = blocklen;
- mtype->arr_starts[0] = 1; /* {row, col} */
+ mtype->arr_starts[0] = lb; /* {row, col} */
mtype->arr_starts[1] = stride - blocklen;
mtype->order = MPI_ORDER_FORTRAN;
diff --git a/test/mpi/util/mtest_datatype.h b/test/mpi/util/mtest_datatype.h
index c33a8e4..e549406 100644
--- a/test/mpi/util/mtest_datatype.h
+++ b/test/mpi/util/mtest_datatype.h
@@ -50,7 +50,7 @@ enum MTEST_MIN_DERIVED_DT {
MTEST_MIN_DDT_MAX
};
-typedef int (*MTestDdtCreator) (int, int, int, MPI_Datatype, const char *, MTestDatatype *);
+typedef int (*MTestDdtCreator) (int, int, int, int, MPI_Datatype, const char *, MTestDatatype *);
extern void MTestTypeCreatorInit(MTestDdtCreator * creators);
extern void MTestTypeMinCreatorInit(MTestDdtCreator * creators);
diff --git a/test/mpi/util/mtest_datatype_gen.c b/test/mpi/util/mtest_datatype_gen.c
index 5c12371..315ef81 100644
--- a/test/mpi/util/mtest_datatype_gen.c
+++ b/test/mpi/util/mtest_datatype_gen.c
@@ -53,6 +53,7 @@ static int verbose = 0; /* Message level (0 is none) */
* S count & L block length & S stride
* L count & S block length & L stride
* S count & L block length & L stride
+ * S count & L block length & S stride & S lower-bound
*
* How to add a new structure for each datatype:
* 1. Add structure definition in function MTestDdtStructDefine.
@@ -95,7 +96,8 @@ static int datatype_index = 0;
/* ------------------------------------------------------------------------ */
/* Routine and internal parameters to define the range of datatype tests */
/* ------------------------------------------------------------------------ */
-#define MTEST_DDT_NUM_SUBTESTS 4 /* 4 kinds of derived datatype structure */
+
+#define MTEST_DDT_NUM_SUBTESTS 5 /* 5 kinds of derived datatype structure */
static MTestDdtCreator mtestDdtCreators[MTEST_DDT_MAX];
static int MTEST_BDT_START_IDX = -1;
@@ -236,12 +238,15 @@ static inline void MTestInitDatatypeEnv()
/* Routine to define various sets of blocklen/count/stride for derived datatypes. */
/* ------------------------------------------------------------------------------ */
-static inline int MTestDdtStructDefine(int ddt_index, MPI_Aint tot_count, MPI_Aint *count,
- MPI_Aint *blen, MPI_Aint *stride, MPI_Aint *align_tot_count)
+static inline int MTestDdtStructDefine(int ddt_index, MPI_Aint tot_count, MPI_Aint * count,
+ MPI_Aint * blen, MPI_Aint * stride,
+ MPI_Aint * align_tot_count, MPI_Aint * lb)
{
int merr = 0;
int ddt_c_st;
MPI_Aint _short = 0, _align_tot_count = 0, _count = 0, _blen = 0, _stride = 0;
+ MPI_Aint _lb = 0;
+
ddt_c_st = ddt_index % MTEST_DDT_NUM_SUBTESTS;
/* Get short value according to user specified tot_count.
@@ -283,6 +288,13 @@ static inline int MTestDdtStructDefine(int ddt_index, MPI_Aint tot_count, MPI_Ai
_blen = _short;
_stride = _blen * 10;
break;
+ case 4:
+ /* Large block length with lb */
+ _count = _short;
+ _blen = _align_tot_count / _short;
+ _stride = _blen * 2;
+ _lb = _short / 2; /* make sure lb < blen */
+ break;
default:
/* Undefined index */
merr = 1;
@@ -293,6 +305,7 @@ static inline int MTestDdtStructDefine(int ddt_index, MPI_Aint tot_count, MPI_Ai
*count = _count;
*blen = _blen;
*stride = _stride;
+ *lb = _lb;
return merr;
}
@@ -366,7 +379,7 @@ static inline int MTestGetSendDerivedDatatypes(MTestDatatype * sendtype,
int merr = 0;
int ddt_datatype_index, ddt_c_dt;
MPI_Count tsize = 1;
- MPI_Aint blen, stride, count, align_tot_count;;
+ MPI_Aint blen, stride, count, align_tot_count, lb;
MPI_Datatype old_type = MPI_DOUBLE;
/* Check index */
@@ -381,7 +394,7 @@ static inline int MTestGetSendDerivedDatatypes(MTestDatatype * sendtype,
/* Set datatype structure */
merr = MTestDdtStructDefine(ddt_datatype_index, tot_count, &count, &blen,
- &stride, &align_tot_count);
+ &stride, &align_tot_count, &lb);
if (merr) {
printf("Wrong index: global %d, send %d send-ddt %d, or undefined ddt structure in %s\n",
datatype_index, ddt_datatype_index, ddt_c_dt, __FUNCTION__);
@@ -390,7 +403,7 @@ static inline int MTestGetSendDerivedDatatypes(MTestDatatype * sendtype,
}
/* Create send datatype */
- merr = mtestDdtCreators[ddt_c_dt] (count, blen, stride, old_type, "send", sendtype);
+ merr = mtestDdtCreators[ddt_c_dt] (count, blen, stride, lb, old_type, "send", sendtype);
if (merr)
return merr;
@@ -412,7 +425,7 @@ static inline int MTestGetRecvDerivedDatatypes(MTestDatatype * sendtype,
int merr = 0;
int ddt_datatype_index, ddt_c_dt;
MPI_Count tsize;
- MPI_Aint blen, stride, count, align_tot_count;
+ MPI_Aint blen, stride, count, align_tot_count, lb;
MPI_Datatype old_type = MPI_DOUBLE;
/* Check index */
@@ -427,7 +440,7 @@ static inline int MTestGetRecvDerivedDatatypes(MTestDatatype * sendtype,
/* Set datatype structure */
merr = MTestDdtStructDefine(ddt_datatype_index, tot_count, &count, &blen,
- &stride, &align_tot_count);
+ &stride, &align_tot_count, &lb);
if (merr) {
printf("Wrong index: global %d, recv %d recv-ddt %d, or undefined ddt structure in %s\n",
datatype_index, ddt_datatype_index, ddt_c_dt, __FUNCTION__);
@@ -435,7 +448,7 @@ static inline int MTestGetRecvDerivedDatatypes(MTestDatatype * sendtype,
}
/* Create receive datatype */
- merr = mtestDdtCreators[ddt_c_dt] (count, blen, stride, old_type, "recv", recvtype);
+ merr = mtestDdtCreators[ddt_c_dt] (count, blen, stride, lb, old_type, "recv", recvtype);
if (merr)
return merr;
@@ -504,15 +517,19 @@ int MTestGetDatatypes(MTestDatatype * sendtype, MTestDatatype * recvtype, MPI_Ai
if (verbose >= 2 && datatype_index > 0) {
MPI_Count ssize, rsize;
+ MPI_Aint slb, rlb, sextent, rextent;
const char *sendtype_nm = MTestGetDatatypeName(sendtype);
const char *recvtype_nm = MTestGetDatatypeName(recvtype);
MPI_Type_size_x(sendtype->datatype, &ssize);
MPI_Type_size_x(recvtype->datatype, &rsize);
- MTestPrintfMsg(2, "Get datatypes: send = %s(size %d count %d basesize %d), "
- "recv = %s(size %d count %d basesize %d), tot_count=%d\n",
- sendtype_nm, ssize, sendtype->count, sendtype->basesize,
- recvtype_nm, rsize, recvtype->count, recvtype->basesize,
+ MPI_Type_get_extent(sendtype->datatype, &slb, &sextent);
+ MPI_Type_get_extent(recvtype->datatype, &rlb, &rextent);
+
+ MTestPrintfMsg(2, "Get datatypes: send = %s(size %d ext %ld lb %ld count %d basesize %d), "
+ "recv = %s(size %d ext %ld lb %ld count %d basesize %d), tot_count=%d\n",
+ sendtype_nm, ssize, sextent, slb, sendtype->count, sendtype->basesize,
+ recvtype_nm, rsize, rextent, rlb, recvtype->count, recvtype->basesize,
tot_count);
fflush(stdout);
}
http://git.mpich.org/mpich.git/commitdiff/6155592fbfd62974c157320016e333a8913eec3e
commit 6155592fbfd62974c157320016e333a8913eec3e
Author: Min Si <msi at il.is.s.u-tokyo.ac.jp>
Date: Tue May 19 23:15:12 2015 -0500
Bug fix: use lb + extent for derived dt in mtest datatype.
The original mtest routine used extent as the space required by
derived datatypes (i.e., to calculate the offset per datatype in a
data buffer). However, it is incorrect if the lower-bound is not zero.
The space must be lower-bound + extent. This patch fixed it.
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/test/mpi/util/mtest_datatype.c b/test/mpi/util/mtest_datatype.c
index 39e3ceb..ea59d50 100644
--- a/test/mpi/util/mtest_datatype.c
+++ b/test/mpi/util/mtest_datatype.c
@@ -77,15 +77,17 @@ static inline void MTestTypeReset(MTestDatatype * mtype)
*/
static void *MTestTypeContigInit(MTestDatatype * mtype)
{
- MPI_Aint size;
+ MPI_Aint extent = 0, lb = 0, size;
int merr;
if (mtype->count > 0) {
unsigned char *p;
MPI_Aint i, totsize;
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+
+ size = extent + lb;
totsize = size * mtype->count;
if (!mtype->buf) {
mtype->buf = (void *) malloc(totsize);
@@ -117,13 +119,15 @@ static int MTestTypeContigCheckbuf(MTestDatatype * mtype)
unsigned char *p;
unsigned char expected;
int err = 0, merr;
- MPI_Aint i, totsize, size;
+ MPI_Aint i, totsize, size, extent = 0, lb = 0;
p = (unsigned char *) mtype->buf;
if (p) {
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+
+ size = lb + extent;
totsize = size * mtype->count;
for (i = 0; i < totsize; i++) {
expected = (unsigned char) (0xff ^ (i & 0xff));
@@ -149,7 +153,7 @@ static int MTestTypeContigCheckbuf(MTestDatatype * mtype)
*/
static void *MTestTypeVectorInit(MTestDatatype * mtype)
{
- MPI_Aint size, totsize, dt_offset, byte_offset;
+ MPI_Aint extent = 0, lb = 0, size, totsize, dt_offset, byte_offset;
int merr;
if (mtype->count > 0) {
@@ -157,9 +161,11 @@ static void *MTestTypeVectorInit(MTestDatatype * mtype)
int j, k, nc;
MPI_Aint i;
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+
+ size = extent + lb;
totsize = mtype->count * size;
if (!mtype->buf) {
mtype->buf = (void *) malloc(totsize);
@@ -207,15 +213,16 @@ static int MTestTypeVectorCheckbuf(MTestDatatype * mtype)
unsigned char *p;
unsigned char expected;
int i, err = 0, merr;
- MPI_Aint size = 0, byte_offset, dt_offset;
+ MPI_Aint size = 0, byte_offset, dt_offset, extent, lb;
p = (unsigned char *) mtype->buf;
if (p) {
int j, k, nc;
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+ size = extent + lb;
nc = 0;
dt_offset = 0;
/* For each datatype */
@@ -253,7 +260,7 @@ static int MTestTypeVectorCheckbuf(MTestDatatype * mtype)
*/
static void *MTestTypeIndexedInit(MTestDatatype * mtype)
{
- MPI_Aint size = 0, totsize, dt_offset, offset;
+ MPI_Aint extent = 0, lb = 0, size, totsize, dt_offset, offset;
int merr;
if (mtype->count > 0) {
@@ -262,9 +269,11 @@ static void *MTestTypeIndexedInit(MTestDatatype * mtype)
MPI_Aint i;
/* Allocate buffer */
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+
+ size = extent + lb;
totsize = size * mtype->count;
if (!mtype->buf) {
@@ -320,15 +329,16 @@ static int MTestTypeIndexedCheckbuf(MTestDatatype * mtype)
unsigned char *p;
unsigned char expected;
int err = 0, merr;
- MPI_Aint size = 0, offset, dt_offset;
+ MPI_Aint size = 0, offset, dt_offset, extent = 0, lb = 0;
p = (unsigned char *) mtype->buf;
if (p) {
int i, j, k, b, nc;
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+ size = lb + extent;
nc = 0;
dt_offset = 0;
/* For each datatype */
@@ -368,7 +378,7 @@ static int MTestTypeIndexedCheckbuf(MTestDatatype * mtype)
*/
static void *MTestTypeIndexedBlockInit(MTestDatatype * mtype)
{
- MPI_Aint size = 0, totsize, offset, dt_offset;
+ MPI_Aint extent = 0, lb = 0, size, totsize, offset, dt_offset;
int merr;
if (mtype->count > 0) {
@@ -377,9 +387,10 @@ static void *MTestTypeIndexedBlockInit(MTestDatatype * mtype)
MPI_Aint i;
/* Allocate the send/recv buffer */
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+ size = extent + lb;
totsize = size * mtype->count;
if (!mtype->buf) {
@@ -431,15 +442,16 @@ static int MTestTypeIndexedBlockCheckbuf(MTestDatatype * mtype)
unsigned char *p;
unsigned char expected;
int err = 0, merr;
- MPI_Aint size = 0, offset, dt_offset;
+ MPI_Aint size = 0, offset, dt_offset, lb = 0, extent = 0;
p = (unsigned char *) mtype->buf;
if (p) {
int i, j, k, nc;
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+ size = lb + extent;
nc = 0;
dt_offset = 0;
/* For each datatype */
@@ -475,7 +487,7 @@ static int MTestTypeIndexedBlockCheckbuf(MTestDatatype * mtype)
*/
static void *MTestTypeSubarrayInit(MTestDatatype * mtype)
{
- MPI_Aint size = 0, totsize, offset, dt_offset, byte_offset;
+ MPI_Aint extent = 0, lb = 0, size, totsize, offset, dt_offset, byte_offset;
int merr;
if (mtype->count > 0) {
@@ -484,9 +496,11 @@ static void *MTestTypeSubarrayInit(MTestDatatype * mtype)
MPI_Aint i;
/* Allocate the send/recv buffer */
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+
+ size = extent + lb;
totsize = size * mtype->count;
if (!mtype->buf) {
@@ -548,15 +562,17 @@ static int MTestTypeSubarrayCheckbuf(MTestDatatype * mtype)
unsigned char *p;
unsigned char expected;
int err = 0, merr;
- MPI_Aint size, offset, dt_offset, byte_offset;
+ MPI_Aint size, offset, dt_offset, byte_offset, lb = 0, extent = 0;
p = (unsigned char *) mtype->buf;
if (p) {
int j, k, i, b, nc;
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+ size = lb + extent;
+
int ncol, sub_ncol, sub_nrow, sub_col_start, sub_row_start;
ncol = mtype->arr_sizes[1];
sub_nrow = mtype->arr_subsizes[0];
@@ -1239,15 +1255,17 @@ int MTestTypeDupCreate(MPI_Datatype oldtype, MTestDatatype * mtype)
*/
void *MTestTypeInitRecv(MTestDatatype * mtype)
{
- MPI_Aint size;
+ MPI_Aint size, extent = 0, lb = 0;
int merr;
if (mtype->count > 0) {
signed char *p;
MPI_Aint i, totsize;
- merr = MPI_Type_extent(mtype->datatype, &size);
+ merr = MPI_Type_get_extent(mtype->datatype, &lb, &extent);
if (merr)
MTestPrintError(merr);
+
+ size = extent + lb;
totsize = size * mtype->count;
if (!mtype->buf) {
mtype->buf = (void *) malloc(totsize);
-----------------------------------------------------------------------
Summary of changes:
.../ch3/channels/nemesis/include/mpid_nem_inline.h | 61 ++-
.../ch3/channels/nemesis/include/mpid_nem_post.h | 6 +-
.../ch3/channels/nemesis/netmod/mxm/mxm_impl.h | 2 +-
.../ch3/channels/nemesis/netmod/mxm/mxm_send.c | 48 ++-
.../channels/nemesis/netmod/portals4/ptl_impl.h | 3 +
.../ch3/channels/nemesis/netmod/portals4/ptl_nm.c | 26 +-
.../ch3/channels/nemesis/netmod/tcp/tcp_send.c | 88 +++-
src/mpid/ch3/channels/nemesis/src/ch3_isendv.c | 4 +-
src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c | 3 +-
src/mpid/ch3/channels/nemesis/src/ch3_progress.c | 6 +-
.../ch3/channels/nemesis/src/ch3i_eagernoncontig.c | 6 +-
src/mpid/ch3/include/mpid_rma_issue.h | 469 ++++++++------------
src/mpid/ch3/include/mpid_rma_shm.h | 3 -
src/mpid/ch3/include/mpid_rma_types.h | 3 -
src/mpid/ch3/include/mpidimpl.h | 23 +-
src/mpid/ch3/include/mpidpkt.h | 14 +-
src/mpid/ch3/include/mpidpre.h | 4 +-
src/mpid/ch3/include/mpidrma.h | 35 ++-
src/mpid/ch3/src/ch3u_handle_recv_req.c | 235 +++++++---
src/mpid/ch3/src/ch3u_request.c | 8 +-
src/mpid/ch3/src/ch3u_rma_ops.c | 12 -
src/mpid/ch3/src/ch3u_rma_pkthandler.c | 310 +++++++++----
src/mpid/common/datatype/mpid_datatype.h | 14 +
test/mpi/rma/Makefile.am | 3 +-
test/mpi/rma/derived-acc-flush_local.c | 125 ++++++
test/mpi/rma/testlist.in | 1 +
test/mpi/util/mtest_datatype.c | 142 ++++---
test/mpi/util/mtest_datatype.h | 2 +-
test/mpi/util/mtest_datatype_gen.c | 58 ++-
29 files changed, 1085 insertions(+), 629 deletions(-)
create mode 100644 test/mpi/rma/derived-acc-flush_local.c
hooks/post-receive
--
MPICH primary repository
More information about the commits
mailing list