[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.1.3-187-g5046197
Service Account
noreply at mpich.org
Wed Nov 12 20:09:19 CST 2014
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".
The branch, master has been updated
via 504619783b12672cf2329768f3bb9688790d6cb2 (commit)
from 1ea97753f08b047ef73e4b071bc2c97c6c6705d0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/504619783b12672cf2329768f3bb9688790d6cb2
commit 504619783b12672cf2329768f3bb9688790d6cb2
Author: Antonio J. Pena <apenya at mcs.anl.gov>
Date: Wed Oct 22 16:23:56 2014 -0500
Fix Portals4 RMA
Full redesign, mainly of the functions in ptl_nm.c and the
communications involving the "control" portal. Still some
problems with flow control.
Signed-off-by: Ken Raffenetti <raffenet at mcs.anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
index f94fa9a..62dd474 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
@@ -99,6 +99,7 @@ typedef struct {
ptl_pt_index_t ptg;
ptl_pt_index_t ptc;
ptl_pt_index_t ptr;
+ ptl_pt_index_t ptrc;
int id_initialized; /* TRUE iff id and pt have been initialized */
MPIDI_msg_sz_t num_queued_sends; /* number of reqs for this vc in sendq */
} MPID_nem_ptl_vc_area;
@@ -153,7 +154,7 @@ typedef struct {
int MPID_nem_ptl_nm_init(void);
int MPID_nem_ptl_nm_finalize(void);
-int MPID_nem_ptl_nm_event_handler(const ptl_event_t *e);
+int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e);
int MPID_nem_ptl_sendq_complete_with_error(MPIDI_VC_t *vc, int req_errno);
int MPID_nem_ptl_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz);
int MPID_nem_ptl_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data, MPIDI_msg_sz_t data_sz,
@@ -165,7 +166,7 @@ int MPID_nem_ptl_poll_finalize(void);
int MPID_nem_ptl_poll(int is_blocking_poll);
int MPID_nem_ptl_vc_terminated(MPIDI_VC_t *vc);
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg,
- ptl_pt_index_t *ptc, ptl_pt_index_t *ptr);
+ ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrc);
void MPI_nem_ptl_pack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
MPID_nem_ptl_pack_overflow_t *overflow);
int MPID_nem_ptl_unpack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
index 96ada05..a631723 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
@@ -20,6 +20,7 @@
#define PTIG_KEY "PTIG"
#define PTIC_KEY "PTIC"
#define PTIR_KEY "PTIR"
+#define PTIRC_KEY "PTIRC"
ptl_handle_ni_t MPIDI_nem_ptl_ni;
ptl_pt_index_t MPIDI_nem_ptl_pt;
@@ -28,6 +29,7 @@ ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages *
ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for rportals control messages */
ptl_handle_eq_t MPIDI_nem_ptl_target_eq;
ptl_handle_eq_t MPIDI_nem_ptl_origin_eq;
+ptl_pt_index_t MPIDI_nem_ptl_control_rpt_pt; /* portal for rportals control messages */
ptl_handle_md_t MPIDI_nem_ptl_global_md;
ptl_ni_limits_t MPIDI_nem_ptl_ni_limits;
@@ -114,7 +116,7 @@ static int get_target_info(int rank, ptl_process_t *id, ptl_pt_index_t local_dat
}
else if (local_data_pt == MPIDI_nem_ptl_control_pt) {
*target_data_pt = vc_ptl->ptc;
- *target_control_pt = PTL_PT_ANY;
+ *target_control_pt = vc_ptl->ptrc;
}
fn_exit:
@@ -208,6 +210,11 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
PTL_PT_ANY, &MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
+ /* allocate portal for MPICH control messages */
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ PTL_PT_ANY, &MPIDI_nem_ptl_control_rpt_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
+
/* create an MD that covers all of memory */
md.start = 0;
md.length = (ptl_size_t)-1;
@@ -226,14 +233,14 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allow rportal to manage the get and control portals, but we
- * don't expect retransmission to be needed on these portals, so
+ * don't expect retransmission to be needed on the get portal, so
* we pass PTL_PT_ANY as the dummy portal. unfortunately, portals
* does not have an "invalid" PT constant, which would have been
* more appropriate to pass over here. */
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_get_pt, PTL_PT_ANY);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
- ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_control_pt, PTL_PT_ANY);
+ ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_control_pt, MPIDI_nem_ptl_control_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* create business card */
@@ -300,6 +307,9 @@ static int ptl_finalize(void)
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+ ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_rpt_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+
ret = PtlNIFini(MPIDI_nem_ptl_ni);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlnifini", "**ptlnifini %s", MPID_nem_ptl_strerror(ret));
@@ -367,6 +377,12 @@ static int get_business_card(int my_rank, char **bc_val_p, int *val_max_sz_p)
MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
}
+ str_errno = MPIU_Str_add_binary_arg(bc_val_p, val_max_sz_p, PTIRC_KEY, (char *)&MPIDI_nem_ptl_control_rpt_pt,
+ sizeof(MPIDI_nem_ptl_control_rpt_pt));
+ if (str_errno) {
+ MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
+ MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
+ }
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_GET_BUSINESS_CARD);
@@ -435,6 +451,8 @@ static int vc_init(MPIDI_VC_t *vc)
vc_ptl->id_initialized = FALSE;
vc_ptl->num_queued_sends = 0;
+ mpi_errno = MPID_nem_ptl_init_id(vc);
+
MPIDI_FUNC_EXIT(MPID_STATE_VC_INIT);
return mpi_errno;
}
@@ -457,7 +475,7 @@ static int vc_destroy(MPIDI_VC_t *vc)
#define FUNCNAME MPID_nem_ptl_get_id_from_bc
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr)
+int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrc)
{
int mpi_errno = MPI_SUCCESS;
int ret;
@@ -484,6 +502,9 @@ int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, pt
ret = MPIU_Str_get_binary_arg(business_card, PTIR_KEY, (char *)ptr, sizeof(ptr), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptr), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
+ ret = MPIU_Str_get_binary_arg(business_card, PTIRC_KEY, (char *)ptrc, sizeof(ptr), &len);
+ MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptrc), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
+
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_GET_ID_FROM_BC);
return mpi_errno;
@@ -509,8 +530,6 @@ int vc_terminate(MPIDI_VC_t *vc)
outstanding sends with an error and terminate
connection immediately. */
MPIU_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
- mpi_errno = MPID_nem_ptl_sendq_complete_with_error(vc, req_errno);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPID_nem_ptl_vc_terminated(vc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} else if (vc_ptl->num_queued_sends == 0) {
@@ -576,7 +595,7 @@ int MPID_nem_ptl_init_id(MPIDI_VC_t *vc)
mpi_errno = vc->pg->getConnInfo(vc->pg_rank, bc, val_max_sz, vc->pg);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr);
+ mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr, &vc_ptl->ptrc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
vc_ptl->id_initialized = TRUE;
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
index f0d447d..60e8db8 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
@@ -1,46 +1,42 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
- * (C) 2012 by Argonne National Laboratory.
+ * (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "ptl_impl.h"
+#include "stddef.h" /* C99; for offsetof */
#include <mpl_utlist.h>
#include "rptl.h"
-#define NUM_SEND_BUFS 100
-#define NUM_RECV_BUFS 100
-#define BUFLEN (sizeof(MPIDI_CH3_Pkt_t) + PTL_MAX_EAGER)
+#define NUM_RECV_BUFS 50
+#define CTL_TAG 0
+#define PAYLOAD_SIZE (PTL_MAX_EAGER - offsetof(buf_t, packet) - sizeof(MPIDI_CH3_Pkt_t))
+#define SENDBUF_SIZE(sent_sz_) (offsetof(buf_t, packet) + sizeof(MPIDI_CH3_Pkt_t) + (sent_sz_))
+#define SENDBUF(req_) REQ_PTL(req_)->chunk_buffer[0]
+#define TMPBUF(req_) REQ_PTL(req_)->chunk_buffer[1]
+#define NEW_TAG(tag_) do { \
+ global_tag += 2; \
+ if (global_tag == CTL_TAG) \
+ global_tag += 2; \
+ (tag_) = global_tag; \
+} while(0)
+#define GET_TAG(tag_) (((tag_) >> 1) << 1)
+#define DONE_TAG(tag_) ((tag_) | 0x1)
+
+typedef struct {
+ size_t remaining;
+ ptl_match_bits_t tag;
+ char packet[PTL_MAX_EAGER];
+} buf_t;
+
+static buf_t recvbufs[NUM_RECV_BUFS];
+static ptl_me_t mes[NUM_RECV_BUFS];
+static ptl_handle_me_t me_handles[NUM_RECV_BUFS];
+static unsigned long long put_cnt = 0; /* required to not finalizing too early */
+static MPID_Request *done_req;
+static ptl_match_bits_t global_tag = 0;
-typedef struct MPID_nem_ptl_sendbuf {
- struct MPID_nem_ptl_sendbuf *next;
- union {
- struct {
- MPIDI_CH3_Pkt_t hdr;
- char payload[PTL_MAX_EAGER];
- } hp; /* header+payload */
- char p[BUFLEN]; /* just payload */
- } buf;
-} MPID_nem_ptl_sendbuf_t;
-
-static MPID_nem_ptl_sendbuf_t sendbuf[NUM_SEND_BUFS];
-static MPID_nem_ptl_sendbuf_t *free_head = NULL;
-static MPID_nem_ptl_sendbuf_t *free_tail = NULL;
-
-static char recvbuf[BUFLEN][NUM_RECV_BUFS];
-static ptl_me_t recvbuf_me[NUM_RECV_BUFS];
-static ptl_handle_me_t recvbuf_me_handle[NUM_RECV_BUFS];
-
-#define FREE_EMPTY() (free_head == NULL)
-#define FREE_HEAD() free_head
-#define FREE_PUSH(buf_p) MPL_LL_PREPEND(free_head, free_tail, buf_p)
-#define FREE_POP(buf_pp) do { *(buf_pp) = free_head; MPL_LL_DELETE(free_head, free_tail, free_head); } while (0)
-
-static struct {MPID_Request *head, *tail;} send_queue;
-
-static int send_queued(void);
-
-static void vc_dbg_print_sendq(FILE *stream, MPIDI_VC_t *vc) {/* FIXME: write real function */ return;}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_nm_init
@@ -56,36 +52,33 @@ int MPID_nem_ptl_nm_init(void)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_INIT);
- MPIU_Assert(BUFLEN == sizeof(sendbuf->buf));
-
- /* init send */
- for (i = 0; i < NUM_SEND_BUFS; ++i)
- FREE_PUSH(&sendbuf[i]);
-
- send_queue.head = send_queue.tail = NULL;
-
- MPID_nem_net_module_vc_dbg_print_sendq = vc_dbg_print_sendq;
-
/* init recv */
id_any.phys.pid = PTL_PID_ANY;
id_any.phys.nid = PTL_NID_ANY;
for (i = 0; i < NUM_RECV_BUFS; ++i) {
- recvbuf_me[i].start = recvbuf[i];
- recvbuf_me[i].length = BUFLEN;
- recvbuf_me[i].ct_handle = PTL_CT_NONE;
- recvbuf_me[i].uid = PTL_UID_ANY;
- recvbuf_me[i].options = (PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_EVENT_UNLINK_DISABLE |
- PTL_ME_EVENT_LINK_DISABLE | PTL_ME_IS_ACCESSIBLE);
- recvbuf_me[i].match_id = id_any;
- recvbuf_me[i].match_bits = 0;
- recvbuf_me[i].ignore_bits = (ptl_match_bits_t)~0;
-
- ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &recvbuf_me[i], PTL_PRIORITY_LIST, (void *)(uint64_t)i,
- &recvbuf_me_handle[i]);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
+ mes[i].start = &recvbufs[i];
+ mes[i].length = sizeof(buf_t);
+ mes[i].ct_handle = PTL_CT_NONE;
+ mes[i].uid = PTL_UID_ANY;
+ mes[i].options = (PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_EVENT_UNLINK_DISABLE |
+ PTL_ME_EVENT_LINK_DISABLE | PTL_ME_IS_ACCESSIBLE);
+ mes[i].match_id = id_any;
+ mes[i].match_bits = CTL_TAG;
+ mes[i].ignore_bits = 0;
+
+ ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &mes[i],
+ PTL_PRIORITY_LIST, (void *)(uint64_t)i, &me_handles[i]);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
+ MPID_nem_ptl_strerror(ret));
}
+ done_req = MPID_Request_create();
+ MPIU_Assert(done_req != NULL);
+ done_req->dev.OnDataAvail = NULL;
+ SENDBUF(done_req) = NULL;
+ REQ_PTL(done_req)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
+
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_INIT);
return mpi_errno;
@@ -106,11 +99,16 @@ int MPID_nem_ptl_nm_finalize(void)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);
+ while (put_cnt) MPID_nem_ptl_poll(1); /* Wait for puts to finish */
+
for (i = 0; i < NUM_RECV_BUFS; ++i) {
- ret = PtlMEUnlink(recvbuf_me_handle[i]);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s", MPID_nem_ptl_strerror(ret));
+ ret = PtlMEUnlink(me_handles[i]);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s",
+ MPID_nem_ptl_strerror(ret));
}
+ MPIDI_CH3_Request_destroy(done_req);
+
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);
return mpi_errno;
@@ -119,111 +117,140 @@ int MPID_nem_ptl_nm_finalize(void)
}
#undef FUNCNAME
-#define FUNCNAME MPID_nem_ptl_sendq_complete_with_error
+#define FUNCNAME meappend_done
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int MPID_nem_ptl_sendq_complete_with_error(MPIDI_VC_t *vc, int req_errno)
+static inline int meappend_done(ptl_process_t id, MPID_Request *req, ptl_match_bits_t tag)
{
int mpi_errno = MPI_SUCCESS;
- MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_SENDQ_COMPLETE_WITH_ERROR);
-
- MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_SENDQ_COMPLETE_WITH_ERROR);
-
+ int ret;
+ ptl_me_t me;
+ ptl_handle_me_t me_handle;
+ MPIDI_STATE_DECL(MPID_STATE_MEAPPEND_DONE);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MEAPPEND_DONE);
+
+ me.start = NULL;
+ me.length = 0;
+ me.ct_handle = PTL_CT_NONE;
+ me.uid = PTL_UID_ANY;
+ me.options = ( PTL_ME_OP_PUT | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
+ PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
+ me.match_id = id;
+ me.match_bits = DONE_TAG(tag);
+ me.ignore_bits = 0;
+ me.min_free = 0;
+ ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &me, PTL_PRIORITY_LIST, req,
+ &me_handle);
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlMEAppend(req=%p tag=%#lx)", req, DONE_TAG(tag)));
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
+ MPID_nem_ptl_strerror(ret));
+ ++put_cnt;
fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_SENDQ_COMPLETE_WITH_ERROR);
+ MPIDI_FUNC_EXIT(MPID_STATE_MEAPPEND_DONE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
-
-
#undef FUNCNAME
-#define FUNCNAME save_iov
+#define FUNCNAME meappend_large
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static inline void save_iov(MPID_Request *sreq, void *hdr, void *data, MPIDI_msg_sz_t data_sz)
+static inline int meappend_large(ptl_process_t id, MPID_Request *req, ptl_match_bits_t tag, void *buf, size_t remaining)
{
- int index = 0;
- MPIDI_STATE_DECL(MPID_STATE_SAVE_IOV);
-
- MPIDI_FUNC_ENTER(MPID_STATE_SAVE_IOV);
-
- MPIU_Assert(hdr || data_sz);
-
- if (hdr) {
- sreq->dev.pending_pkt = *(MPIDI_CH3_Pkt_t *)hdr;
- sreq->dev.iov[index].MPID_IOV_BUF = &sreq->dev.pending_pkt;
- sreq->dev.iov[index].MPID_IOV_LEN = sizeof(MPIDI_CH3_Pkt_t);
- ++index;
- }
- if (data_sz) {
- sreq->dev.iov[index].MPID_IOV_BUF = data;
- sreq->dev.iov[index].MPID_IOV_LEN = data_sz;
- ++index;
+ int mpi_errno = MPI_SUCCESS;
+ int ret;
+ ptl_me_t me;
+ MPIDI_STATE_DECL(MPID_STATE_MEAPPEND_LARGE);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MEAPPEND_LARGE);
+
+ me.start = buf;
+ me.length = remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ?
+ remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
+ me.ct_handle = PTL_CT_NONE;
+ me.uid = PTL_UID_ANY;
+ me.options = ( PTL_ME_OP_GET | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
+ PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
+ me.match_id = id;
+ me.match_bits = GET_TAG(tag);
+ me.ignore_bits = 0;
+ me.min_free = 0;
+
+ while (remaining) {
+ int incomplete;
+ ptl_handle_me_t foo_me_handle;
+
+ MPIDI_CH3U_Request_increment_cc(req, &incomplete); /* Cannot avoid GET events from poll infrastructure */
+
+ ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &me, PTL_PRIORITY_LIST, req,
+ &foo_me_handle);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
+ MPID_nem_ptl_strerror(ret));
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlMEAppend(req=%p tag=%#lx)", req, GET_TAG(tag)));
+
+ me.start = (char *)me.start + me.length;
+ remaining -= me.length;
+ if (remaining < MPIDI_nem_ptl_ni_limits.max_msg_size)
+ me.length = remaining;
}
- sreq->dev.iov_count = index;
- MPIDI_FUNC_EXIT(MPID_STATE_SAVE_IOV);
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MEAPPEND_LARGE);
+ return mpi_errno;
+ fn_fail:
+ goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME send_pkt
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_msg_sz_t *data_sz_p)
+static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_sz_t data_sz,
+ MPID_Request *sreq)
{
int mpi_errno = MPI_SUCCESS;
- MPID_nem_ptl_sendbuf_t *sb;
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
- MPIDI_CH3_Pkt_t **hdr_p = (MPIDI_CH3_Pkt_t **)vhdr_p;
- char **data_p = (char **)vdata_p;
+ buf_t *sendbuf;
+ const size_t sent_sz = data_sz < PAYLOAD_SIZE ? data_sz : PAYLOAD_SIZE;
+ const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
MPIDI_STATE_DECL(MPID_STATE_SEND_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_SEND_PKT);
- if (!vc_ptl->id_initialized) {
- mpi_errno = MPID_nem_ptl_init_id(vc);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- }
+ sendbuf = MPIU_Malloc(sendbuf_sz);
+ MPIU_Assert(sendbuf != NULL);
+ MPIU_Memcpy(sendbuf->packet, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
+ sendbuf->remaining = data_sz - sent_sz;
+ NEW_TAG(sendbuf->tag);
+ TMPBUF(sreq) = NULL;
- if (MPIDI_CH3I_Sendq_empty(send_queue) && !FREE_EMPTY()) {
- MPIDI_msg_sz_t len;
- /* send header and first chunk of data */
- FREE_POP(&sb);
- sb->buf.hp.hdr = **hdr_p;
- len = *data_sz_p;
- if (len > PTL_MAX_EAGER)
- len = PTL_MAX_EAGER;
- MPIU_Memcpy(sb->buf.hp.payload, *data_p, len);
- ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank, 1);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
- sizeof(sb->buf.hp.hdr) + len, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
- vc_ptl->ptc, sb));
- *hdr_p = NULL;
- *data_p += len;
- *data_sz_p -= len;
-
- /* send additional data chunks if necessary */
- while (*data_sz_p && !FREE_EMPTY()) {
- FREE_POP(&sb);
- len = *data_sz_p;
- if (len > BUFLEN)
- len = BUFLEN;
- MPIU_Memcpy(sb->buf.p, *data_p, len);
- ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb, MPIDI_Process.my_pg_rank, 1);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p", len,
- vc_ptl->id.phys.nid, vc_ptl->id.phys.pid, vc_ptl->ptc, sb));
- *data_p += len;
- *data_sz_p -= len;
- }
+ if (data_sz) {
+ MPIU_Memcpy(sendbuf->packet + sizeof(MPIDI_CH3_Pkt_t), data_p, sent_sz);
+ if (sendbuf->remaining) /* Post MEs for the remote gets */
+ mpi_errno = meappend_large(vc_ptl->id, sreq, sendbuf->tag, (char *)data_p + sent_sz, sendbuf->remaining);
+ if (mpi_errno)
+ goto fn_fail;
}
+ SENDBUF(sreq) = sendbuf;
+ REQ_PTL(sreq)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
+
+ /* Post ME for the DONE message */
+ mpi_errno = meappend_done(vc_ptl->id, sreq, sendbuf->tag);
+ if (mpi_errno)
+ goto fn_fail;
+
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sendbuf, sendbuf_sz, PTL_NO_ACK_REQ,
+ vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank, 1);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
+ MPID_nem_ptl_strerror(ret));
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x)",
+ sendbuf_sz, vc_ptl->id.phys.nid,
+ vc_ptl->id.phys.pid, vc_ptl->ptc));
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_SEND_PKT);
return mpi_errno;
@@ -235,116 +262,61 @@ static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_
#define FUNCNAME send_noncontig_pkt
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void **vhdr_p, int *complete)
+static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
{
int mpi_errno = MPI_SUCCESS;
- MPID_nem_ptl_sendbuf_t *sb;
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
- MPIDI_msg_sz_t last;
- MPIDI_CH3_Pkt_t **hdr_p = (MPIDI_CH3_Pkt_t **)vhdr_p;
+ buf_t *sendbuf;
+ const size_t sent_sz = sreq->dev.segment_size < PAYLOAD_SIZE ? sreq->dev.segment_size : PAYLOAD_SIZE;
+ size_t sendbuf_sz = SENDBUF_SIZE(sent_sz);
MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT);
-
MPIDI_FUNC_ENTER(MPID_STATE_SEND_NONCONTIG_PKT);
- *complete = 0;
- MPID_nem_ptl_init_req(sreq);
+ MPIU_Assert(sreq->dev.segment_first == 0);
- if (!vc_ptl->id_initialized) {
- mpi_errno = MPID_nem_ptl_init_id(vc);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- }
+ sendbuf = MPIU_Malloc(sendbuf_sz);
+ MPIU_Assert(sendbuf != NULL);
+ MPIU_Memcpy(sendbuf->packet, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
+ sendbuf->remaining = sreq->dev.segment_size - sent_sz;
+ NEW_TAG(sendbuf->tag);
+ TMPBUF(sreq) = NULL;
- if (MPIDI_CH3I_Sendq_empty(send_queue) && !FREE_EMPTY()) {
- /* send header and first chunk of data */
- FREE_POP(&sb);
- sb->buf.hp.hdr = **hdr_p;
-
- MPIU_Assert(sreq->dev.segment_first == 0);
-
- last = sreq->dev.segment_size;
- if (last > PTL_MAX_EAGER)
- last = PTL_MAX_EAGER;
- MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, 0, last, sb->buf.hp.payload, &REQ_PTL(sreq)->overflow[0]);
- ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + last, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank, 1);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
- sizeof(sb->buf.hp.hdr) + last, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
- vc_ptl->ptc, sb));
- *vhdr_p = NULL;
-
- if (last == sreq->dev.segment_size) {
- *complete = 1;
- goto fn_exit;
- }
-
- /* send additional data chunks */
- sreq->dev.segment_first = last;
+ if (sreq->dev.segment_size) {
+ MPIDI_msg_sz_t last = sent_sz;
+ MPID_Segment_pack(sreq->dev.segment_ptr, 0, &last, sendbuf->packet + sizeof(MPIDI_CH3_Pkt_t));
- while (!FREE_EMPTY()) {
- FREE_POP(&sb);
-
+ if (sendbuf->remaining) { /* Post MEs for the remote gets */
+ TMPBUF(sreq) = MPIU_Malloc(sendbuf->remaining);
+ sreq->dev.segment_first = last;
last = sreq->dev.segment_size;
- if (last > sreq->dev.segment_first+BUFLEN)
- last = sreq->dev.segment_first+BUFLEN;
+ MPID_Segment_pack(sreq->dev.segment_ptr, sreq->dev.segment_first, &last, TMPBUF(sreq));
+ MPIU_Assert(last == sreq->dev.segment_size);
- MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, sreq->dev.segment_first, last, sb->buf.p, &REQ_PTL(sreq)->overflow[0]);
- sreq->dev.segment_first = last;
- ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, last - sreq->dev.segment_first, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank, 1);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
- last - sreq->dev.segment_first, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
- vc_ptl->ptc, sb));
-
- if (last == sreq->dev.segment_size) {
- *complete = 1;
- goto fn_exit;
- }
+ mpi_errno = meappend_large(vc_ptl->id, sreq, sendbuf->tag, TMPBUF(sreq), sendbuf->remaining);
+ if (mpi_errno)
+ goto fn_fail;
}
}
- fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_SEND_NONCONTIG_PKT);
- return mpi_errno;
- fn_fail:
- goto fn_exit;
-}
-
+ SENDBUF(sreq) = sendbuf;
+ REQ_PTL(sreq)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
-#undef FUNCNAME
-#define FUNCNAME enqueue_request
-#undef FCNAME
-#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int enqueue_request(MPIDI_VC_t *vc, MPID_Request *sreq)
-{
- int mpi_errno = MPI_SUCCESS;
- MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
- MPIDI_STATE_DECL(MPID_STATE_ENQUEUE_REQUEST);
+ /* Post ME for the DONE message */
+ mpi_errno = meappend_done(vc_ptl->id, sreq, sendbuf->tag);
+ if (mpi_errno)
+ goto fn_fail;
- MPIDI_FUNC_ENTER(MPID_STATE_ENQUEUE_REQUEST);
-
- MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "enqueuing");
- MPIU_Assert(FREE_EMPTY() || !MPIDI_CH3I_Sendq_empty(send_queue));
- MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPID_IOV_LEN > 0);
-
- sreq->ch.vc = vc;
- sreq->dev.iov_offset = 0;
-
- ++(vc_ptl->num_queued_sends);
-
- if (FREE_EMPTY()) {
- MPIDI_CH3I_Sendq_enqueue(&send_queue, sreq);
- } else {
- /* there are other sends in the queue before this one: try to send from the queue */
- MPIDI_CH3I_Sendq_enqueue(&send_queue, sreq);
- mpi_errno = send_queued();
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- }
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sendbuf, sendbuf_sz, PTL_NO_ACK_REQ,
+ vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank, 1);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
+ MPID_nem_ptl_strerror(ret));
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x)",
+ sendbuf_sz, vc_ptl->id.phys.nid,
+ vc_ptl->id.phys.pid, vc_ptl->ptc));
fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_ENQUEUE_REQUEST);
+ MPIDI_FUNC_EXIT(MPID_STATE_SEND_NONCONTIG_PKT);
return mpi_errno;
fn_fail:
goto fn_exit;
@@ -358,46 +330,14 @@ static int enqueue_request(MPIDI_VC_t *vc, MPID_Request *sreq)
int MPID_nem_ptl_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPIDI_msg_sz_t hdr_sz)
{
int mpi_errno = MPI_SUCCESS;
- int complete = 0;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_SENDNONCONTIG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_SENDNONCONTIG);
- MPIU_ERR_SETFATALANDJUMP(mpi_errno, MPI_ERR_OTHER, "**notimpl");
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
-
- mpi_errno = send_noncontig_pkt(vc, sreq, &hdr, &complete);
+ mpi_errno = send_noncontig_pkt(vc, sreq, hdr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- if (complete) {
- /* sent whole message */
- int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
- reqFn = sreq->dev.OnDataAvail;
- if (!reqFn) {
- MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
- MPIDI_CH3U_Request_complete(sreq);
- MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
- goto fn_exit;
- } else {
- complete = 0;
- mpi_errno = reqFn(vc, sreq, &complete);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
- if (complete) {
- MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
- goto fn_exit;
- }
- /* not completed: more to send */
- }
- }
-
- REQ_PTL(sreq)->noncontig = TRUE;
- save_iov(sreq, hdr, NULL, 0); /* save the header in IOV if necessary */
-
- /* enqueue request */
- mpi_errno = enqueue_request(vc, sreq);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_SENDNONCONTIG);
return mpi_errno;
@@ -409,40 +349,25 @@ int MPID_nem_ptl_SendNoncontig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MP
#define FUNCNAME MPID_nem_ptl_iStartContigMsg
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int MPID_nem_ptl_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data, MPIDI_msg_sz_t data_sz,
- MPID_Request **sreq_ptr)
+int MPID_nem_ptl_iStartContigMsg(MPIDI_VC_t *vc, void *hdr, MPIDI_msg_sz_t hdr_sz, void *data,
+ MPIDI_msg_sz_t data_sz, MPID_Request **sreq_ptr)
{
int mpi_errno = MPI_SUCCESS;
- MPID_Request *sreq = NULL;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_ISTARTCONTIGMSG);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ISTARTCONTIGMSG);
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
- mpi_errno = send_pkt(vc, &hdr, &data, &data_sz);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
- if (hdr == NULL && data_sz == 0) {
- /* sent whole message */
- *sreq_ptr = NULL;
- goto fn_exit;
- }
-
/* create a request */
- sreq = MPID_Request_create();
- MPIU_Assert(sreq != NULL);
- MPIU_Object_set_ref(sreq, 2);
- sreq->kind = MPID_REQUEST_SEND;
-
- sreq->dev.OnDataAvail = 0;
- REQ_PTL(sreq)->noncontig = FALSE;
- save_iov(sreq, hdr, data, data_sz);
-
- /* enqueue request */
- mpi_errno = enqueue_request(vc, sreq);
+ *sreq_ptr = MPID_Request_create();
+ MPIU_Assert(*sreq_ptr != NULL);
+ MPIU_Object_set_ref(*sreq_ptr, 2);
+ (*sreq_ptr)->kind = MPID_REQUEST_SEND;
+ (*sreq_ptr)->dev.OnDataAvail = NULL;
+ (*sreq_ptr)->dev.user_buf = NULL;
+
+ mpi_errno = send_pkt(vc, hdr, data, data_sz, *sreq_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
- *sreq_ptr = sreq;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_ISTARTCONTIGMSG);
@@ -464,40 +389,7 @@ int MPID_nem_ptl_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ISENDCONTIG);
MPIU_Assert(hdr_sz <= sizeof(MPIDI_CH3_Pkt_t));
- mpi_errno = send_pkt(vc, &hdr, &data, &data_sz);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
- if (hdr == NULL && data_sz == 0) {
- /* sent whole message */
- int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
- reqFn = sreq->dev.OnDataAvail;
- if (!reqFn) {
- MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
- MPIDI_CH3U_Request_complete(sreq);
- MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
- goto fn_exit;
- } else {
- int complete = 0;
-
- mpi_errno = reqFn(vc, sreq, &complete);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
- if (complete) {
- MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
- goto fn_exit;
- }
- /* not completed: more to send */
- }
- } else {
- save_iov(sreq, hdr, data, data_sz);
- }
-
- REQ_PTL(sreq)->noncontig = FALSE;
-
- /* enqueue request */
- MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPID_IOV_LEN > 0);
-
- mpi_errno = enqueue_request(vc, sreq);
+ mpi_errno = send_pkt(vc, hdr, data, data_sz, sreq);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
fn_exit:
@@ -508,159 +400,156 @@ int MPID_nem_ptl_iSendContig(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr, MPID
}
#undef FUNCNAME
-#define FUNCNAME send_queued
+#define FUNCNAME on_data_avail
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int send_queued(void)
+static inline void on_data_avail(MPID_Request * req)
{
- int mpi_errno = MPI_SUCCESS;
- MPID_nem_ptl_sendbuf_t *sb;
- int ret;
- MPIDI_STATE_DECL(MPID_STATE_SEND_QUEUED);
-
- MPIDI_FUNC_ENTER(MPID_STATE_SEND_QUEUED);
-
- while (!MPIDI_CH3I_Sendq_empty(send_queue) && !FREE_EMPTY()) {
- int complete = TRUE;
- MPIDI_msg_sz_t send_len = 0;
- int i;
- MPID_Request *sreq;
- int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
-
- sreq = MPIDI_CH3I_Sendq_head(send_queue); /* don't dequeue until we're finished sending this request */
- FREE_POP(&sb);
-
- /* copy the iov */
- MPIU_Assert(sreq->dev.iov_count <= 2);
- for (i = sreq->dev.iov_offset; i < sreq->dev.iov_count + sreq->dev.iov_offset; ++i) {
- MPIDI_msg_sz_t len;
- len = sreq->dev.iov[i].iov_len;
- if (len > BUFLEN)
- len = BUFLEN;
- MPIU_Memcpy(sb->buf.p, sreq->dev.iov[i].iov_base, len);
- send_len += len;
- if (len < sreq->dev.iov[i].iov_len) {
- /* ran out of space in buffer */
- sreq->dev.iov[i].iov_base = (char *)sreq->dev.iov[i].iov_base + len;
- sreq->dev.iov[i].iov_len -= len;
- sreq->dev.iov_offset = i;
- complete = FALSE;
- break;
- }
- }
+ MPIDI_STATE_DECL(MPID_STATE_ON_DATA_AVAIL);
+ MPIDI_FUNC_ENTER(MPID_STATE_ON_DATA_AVAIL);
- /* copy any noncontig data if there's room left in the send buffer */
- if (send_len < BUFLEN && REQ_PTL(sreq)->noncontig) {
- MPIDI_msg_sz_t last;
- MPIU_Assert(complete); /* if complete has been set to false, there can't be any space left in the send buffer */
- last = sreq->dev.segment_size;
- if (last > sreq->dev.segment_first+BUFLEN) {
- last = sreq->dev.segment_first+BUFLEN;
- complete = FALSE;
- }
- MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, sreq->dev.segment_first, last, sb->buf.p, &REQ_PTL(sreq)->overflow[0]);
- send_len += last - sreq->dev.segment_first;
- sreq->dev.segment_first = last;
- }
- ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, send_len, PTL_NO_ACK_REQ, VC_PTL(sreq->ch.vc)->id, VC_PTL(sreq->ch.vc)->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank, 1);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
-
- if (!complete)
- continue;
-
- /* sent all of the data */
- reqFn = sreq->dev.OnDataAvail;
- if (!reqFn) {
- MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
- MPIDI_CH3U_Request_complete(sreq);
- } else {
- complete = 0;
- mpi_errno = reqFn(sreq->ch.vc, sreq, &complete);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
- if (!complete)
- continue;
- }
-
- /* completed the request */
- --(VC_PTL(sreq->ch.vc)->num_queued_sends);
- MPIDI_CH3I_Sendq_dequeue(&send_queue, &sreq);
+ int (*reqFn) (MPIDI_VC_t *, MPID_Request *, int *);
+ reqFn = req->dev.OnDataAvail;
+ if (!reqFn) {
+ MPIDI_CH3U_Request_complete(req);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
-
- if (VC_PTL(sreq->ch.vc)->num_queued_sends == 0 && sreq->ch.vc->state == MPIDI_VC_STATE_CLOSED) {
- /* this VC is closing, if this was the last req queued for that vc, call vc_terminated() */
- mpi_errno = MPID_nem_ptl_vc_terminated(sreq->ch.vc);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- }
-
}
-
- fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_SEND_QUEUED);
- return mpi_errno;
- fn_fail:
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME handle_ack
-#undef FCNAME
-#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int handle_ack(const ptl_event_t *e)
-{
- int mpi_errno = MPI_SUCCESS;
- MPIDI_STATE_DECL(HANDLE_ACK);
-
- MPIDI_FUNC_ENTER(HANDLE_ACK);
- MPIU_Assert(e->type == PTL_EVENT_SEND);
-
- FREE_PUSH((MPID_nem_ptl_sendbuf_t *)e->user_ptr);
-
- if (!MPIDI_CH3I_Sendq_empty(send_queue)) {
- mpi_errno = send_queued();
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+ else {
+ int complete;
+ MPIDI_VC_t *vc = req->ch.vc;
+ reqFn(vc, req, &complete);
+ MPIU_Assert(complete == TRUE);
}
-
- fn_exit:
- MPIDI_FUNC_EXIT(HANDLE_ACK);
- return mpi_errno;
- fn_fail:
- goto fn_exit;
+ MPIDI_FUNC_EXIT(MPID_STATE_ON_DATA_AVAIL);
}
#undef FUNCNAME
-#define FUNCNAME MPID_nem_ptl_nm_event_handler
+#define FUNCNAME MPID_nem_ptl_nm_ctl_event_handler
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int MPID_nem_ptl_nm_event_handler(const ptl_event_t *e)
+int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
{
int mpi_errno = MPI_SUCCESS;
- MPIDI_VC_t *vc;
- int ret;
- MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_NM_EVENT_HANDLER);
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_NM_CTL_EVENT_HANDLER);
- MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_EVENT_HANDLER);
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_CTL_EVENT_HANDLER);
+
+ switch(e->type) {
- switch (e->type) {
case PTL_EVENT_PUT:
- MPIDI_PG_Get_vc_set_active(MPIDI_Process.my_pg, (uint64_t)e->hdr_data, &vc);
- mpi_errno = MPID_nem_handle_pkt(vc, e->start, e->rlength);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-
- MPIU_Assert(e->start == recvbuf[(uint64_t)e->user_ptr]);
- ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &recvbuf_me[(uint64_t)e->user_ptr],
- PTL_PRIORITY_LIST, e->user_ptr, &recvbuf_me_handle[(uint64_t)e->user_ptr]);
- MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
+ if (e->match_bits != CTL_TAG) {
+ MPIU_Free(SENDBUF((MPID_Request *)e->user_ptr));
+ MPIU_Free(TMPBUF((MPID_Request *)e->user_ptr));
+ on_data_avail((MPID_Request *)e->user_ptr);
+ --put_cnt;
+ }
+ else {
+ int ret;
+ const uint64_t buf_idx = (uint64_t) e->user_ptr;
+ const size_t packet_sz = e->mlength - offsetof(buf_t, packet);
+ MPIDI_VC_t *vc;
+ MPID_nem_ptl_vc_area * vc_ptl;
+
+ MPIU_Assert(e->start == &recvbufs[buf_idx]);
+
+ MPIDI_PG_Get_vc(MPIDI_Process.my_pg, (uint64_t)e->hdr_data, &vc);
+ vc_ptl = VC_PTL(vc);
+
+ if (recvbufs[buf_idx].remaining == 0) {
+ mpi_errno = MPID_nem_handle_pkt(vc, recvbufs[buf_idx].packet, packet_sz);
+ if (mpi_errno)
+ MPIU_ERR_POP(mpi_errno);
+ /* Notify we're done */
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
+ DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
+ MPID_nem_ptl_strerror(ret));
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
+ "PtlPut(size=0 id=(%#x,%#x) pt=%#x tag=%#lx)",
+ vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
+ vc_ptl->ptc, DONE_TAG(recvbufs[buf_idx].tag)));
+ }
+ else {
+ int incomplete;
+ size_t size;
+ char *buf_ptr;
+
+ MPID_Request *req = MPID_Request_create();
+ MPIU_Assert(req != NULL);
+ MPIDI_CH3U_Request_decrement_cc(req, &incomplete); /* We'll increment it below */
+ REQ_PTL(req)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
+ REQ_PTL(req)->bytes_put = packet_sz + recvbufs[buf_idx].remaining;
+ TMPBUF(req) = MPIU_Malloc(REQ_PTL(req)->bytes_put);
+ MPIU_Assert(TMPBUF(req) != NULL);
+ MPIU_Memcpy(TMPBUF(req), recvbufs[buf_idx].packet, packet_sz);
+
+ req->ch.vc = vc;
+
+ req->dev.match.parts.tag = recvbufs[buf_idx].tag;
+
+ size = recvbufs[buf_idx].remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ?
+ recvbufs[buf_idx].remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
+ buf_ptr = (char *)TMPBUF(req) + packet_sz;
+ while (recvbufs[buf_idx].remaining) {
+ MPIDI_CH3U_Request_increment_cc(req, &incomplete); /* Will be decremented - and eventually freed in REPLY */
+ ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)buf_ptr,
+ size, vc_ptl->id, vc_ptl->ptc, GET_TAG(recvbufs[buf_idx].tag), 0, req);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
+ MPID_nem_ptl_strerror(ret));
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
+ "PtlGet(size=%lu id=(%#x,%#x) pt=%#x tag=%#lx)", size,
+ vc_ptl->id.phys.nid,
+ vc_ptl->id.phys.pid, vc_ptl->ptc, GET_TAG(recvbufs[buf_idx].tag)));
+ buf_ptr += size;
+ recvbufs[buf_idx].remaining -= size;
+ if (recvbufs[buf_idx].remaining < MPIDI_nem_ptl_ni_limits.max_msg_size)
+ size = recvbufs[buf_idx].remaining;
+ }
+ }
+
+ /* Repost the recv buffer */
+ ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &mes[buf_idx],
+ PTL_PRIORITY_LIST, e->user_ptr /* buf_idx */, &me_handles[buf_idx]);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend",
+ "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
+ }
break;
- case PTL_EVENT_ACK:
- mpi_errno = handle_ack(e);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+ case PTL_EVENT_REPLY:
+ {
+ int incomplete;
+ MPID_Request *const rreq = e->user_ptr;
+
+ MPIDI_CH3U_Request_decrement_cc(rreq, &incomplete);
+ if (!incomplete) {
+ int ret;
+ MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(rreq->ch.vc);
+
+ mpi_errno = MPID_nem_handle_pkt(rreq->ch.vc, TMPBUF(rreq), REQ_PTL(rreq)->bytes_put);
+ if (mpi_errno)
+ MPIU_ERR_POP(mpi_errno);
+
+ /* Notify we're done */
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
+ DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
+ MPID_nem_ptl_strerror(ret));
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
+ "PtlPut(size=0 id=(%#x,%#x) pt=%#x tag=%#lx)",
+ vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
+ vc_ptl->ptc, DONE_TAG((ptl_match_bits_t)SENDBUF(rreq))));
+
+ /* Free resources */
+ MPIU_Free(TMPBUF(rreq));
+ MPID_Request_release(rreq);
+ }
+ }
break;
- case PTL_EVENT_SEND:
- /* ignore */
+
+ case PTL_EVENT_GET:
+ MPIDI_CH3U_Request_complete((MPID_Request *)e->user_ptr);
break;
+
default:
MPIU_Error_printf("Received unexpected event type: %d %s", e->type, MPID_nem_ptl_strevent(e));
MPIU_ERR_INTERNALANDJUMP(mpi_errno, "Unexpected event type");
@@ -668,7 +557,7 @@ int MPID_nem_ptl_nm_event_handler(const ptl_event_t *e)
}
fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_EVENT_HANDLER);
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_CTL_EVENT_HANDLER);
return mpi_errno;
fn_fail:
goto fn_exit;
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
index 26a1eb2..857c9ec 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
@@ -143,23 +143,21 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
break;
}
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqget", "**ptleqget %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "Received event %s ni_fail=%s list=%s user_ptr=%p hdr_data=%#lx mlength=%lu",
- MPID_nem_ptl_strevent(&event), MPID_nem_ptl_strnifail(event.ni_fail_type),
- MPID_nem_ptl_strlist(event.ptl_list), event.user_ptr, event.hdr_data, event.mlength));
-
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "Received event %s pt_idx=%d ni_fail=%s list=%s user_ptr=%p hdr_data=%#lx mlength=%lu rlength=%lu",
+ MPID_nem_ptl_strevent(&event), event.pt_index, MPID_nem_ptl_strnifail(event.ni_fail_type),
+ MPID_nem_ptl_strlist(event.ptl_list), event.user_ptr, event.hdr_data, event.mlength, event.rlength));
MPIU_ERR_CHKANDJUMP2(event.ni_fail_type != PTL_NI_OK && event.ni_fail_type != PTL_NI_NO_MATCH, mpi_errno, MPI_ERR_OTHER, "**ptlni_fail", "**ptlni_fail %s %s", MPID_nem_ptl_strevent(&event), MPID_nem_ptl_strnifail(event.ni_fail_type));
-
- /* handle control messages */
- if (event.pt_index == MPIDI_nem_ptl_control_pt) {
- mpi_errno = MPID_nem_ptl_nm_event_handler(&event);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- goto fn_exit;
- }
-
+
switch (event.type) {
case PTL_EVENT_PUT:
if (event.ptl_list == PTL_OVERFLOW_LIST)
break;
+ if (event.pt_index == MPIDI_nem_ptl_control_pt) {
+ mpi_errno = MPID_nem_ptl_nm_ctl_event_handler(&event);
+ if (mpi_errno)
+ MPIU_ERR_POP(mpi_errno);
+ break;
+ }
case PTL_EVENT_PUT_OVERFLOW:
case PTL_EVENT_GET:
case PTL_EVENT_ACK:
@@ -168,8 +166,10 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
MPID_Request * const req = event.user_ptr;
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "req = %p", req);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "REQ_PTL(req)->event_handler = %p", REQ_PTL(req)->event_handler);
- mpi_errno = REQ_PTL(req)->event_handler(&event);
- if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+ if (REQ_PTL(req)->event_handler) {
+ mpi_errno = REQ_PTL(req)->event_handler(&event);
+ if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+ }
break;
}
case PTL_EVENT_AUTO_FREE:
@@ -179,8 +179,8 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
case PTL_EVENT_AUTO_UNLINK:
overflow_me_handle[(size_t)event.user_ptr] = PTL_INVALID_HANDLE;
break;
- case PTL_EVENT_LINK:
case PTL_EVENT_SEND:
+ case PTL_EVENT_LINK:
/* ignore */
break;
default:
-----------------------------------------------------------------------
Summary of changes:
.../channels/nemesis/netmod/portals4/ptl_impl.h | 5 +-
.../channels/nemesis/netmod/portals4/ptl_init.c | 33 +-
.../ch3/channels/nemesis/netmod/portals4/ptl_nm.c | 781 +++++++++-----------
.../channels/nemesis/netmod/portals4/ptl_poll.c | 30 +-
4 files changed, 379 insertions(+), 470 deletions(-)
hooks/post-receive
--
MPICH primary repository
More information about the commits
mailing list