[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.1.3-80-gf4253c3
Service Account
noreply at mpich.org
Mon Nov 3 20:25:18 CST 2014
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".
The branch, master has been updated
via f4253c3872f18b226db416f11ed3db71ff66c620 (commit)
from 28f6a6890ef19d0f47fb217747b5e2190dfbb1ff (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/f4253c3872f18b226db416f11ed3db71ff66c620
commit f4253c3872f18b226db416f11ed3db71ff66c620
Author: Pavan Balaji <balaji at anl.gov>
Date: Tue Oct 21 13:35:02 2014 -0500
Initial draft of flow-control in the portals4 netmod.
Portals4 by itself does not provide any flow-control. This needs to
be managed by an upper-layer, such as MPICH. Before this patch we
were relying on a bunch of unexpected buffers that were posted to the
portals library to manage unexpected messages. However, since portals
asynchronously pulls out messages from the network, if the application
is delayed, it might result in the unexpected buffers being filled out
and the portal disabled. This would cause MPICH to abort.
In this patch, we implement an initial version of flow-control that
allows us to reenable the portal when it gets disabled. All this is
done in the context of the "rportals" wrappers that are implemented in
the rptl.* files. We create an extra control portal that is only used
by rportals. When the primary data portal gets disabled, the target
sends PAUSE messages to all other processes. Once each process
confirms that it has no outstanding packets on the wire (i.e., all
packets have either been ACKed or NACKed), it sends a PAUSE-ACK
message. When the target receives PAUSE-ACK messages from all
processes (thus confirming that the network traffic to itself has been
quiesced), it reenables the portal and sends an UNPAUSE message to all
processes.
This patch still does not deal with origin-side resource exhaustion.
This can happen, for example, if we run out of space on the event
queue on the origin side.
Signed-off-by: Ken Raffenetti <raffenet at mcs.anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/Makefile.mk b/src/mpid/ch3/channels/nemesis/netmod/portals4/Makefile.mk
index 3901503..06c26d1 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/Makefile.mk
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/Makefile.mk
@@ -15,10 +15,12 @@ mpi_core_sources += \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_recv.c \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c \
- src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_lmt.c
+ src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_lmt.c \
+ src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
noinst_HEADERS += \
- src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
+ src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h \
+ src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
endif BUILD_NEMESIS_NETMOD_PORTALS4
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
index 6130d98..497a51d 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
@@ -18,6 +18,7 @@ extern ptl_handle_ni_t MPIDI_nem_ptl_ni;
extern ptl_pt_index_t MPIDI_nem_ptl_pt;
extern ptl_pt_index_t MPIDI_nem_ptl_get_pt; /* portal for gets by receiver */
extern ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages */
+extern ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for MPICH control messages */
extern ptl_handle_eq_t MPIDI_nem_ptl_eq;
extern ptl_handle_md_t MPIDI_nem_ptl_global_md;
@@ -88,6 +89,7 @@ typedef struct {
ptl_pt_index_t pt;
ptl_pt_index_t ptg;
ptl_pt_index_t ptc;
+ ptl_pt_index_t ptr;
int id_initialized; /* TRUE iff id and pt have been initialized */
MPIDI_msg_sz_t num_queued_sends; /* number of reqs for this vc in sendq */
} MPID_nem_ptl_vc_area;
@@ -154,7 +156,7 @@ int MPID_nem_ptl_poll_finalize(void);
int MPID_nem_ptl_poll(int is_blocking_poll);
int MPID_nem_ptl_vc_terminated(MPIDI_VC_t *vc);
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg,
- ptl_pt_index_t *ptc);
+ ptl_pt_index_t *ptc, ptl_pt_index_t *ptr);
void MPI_nem_ptl_pack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
MPID_nem_ptl_pack_overflow_t *overflow);
int MPID_nem_ptl_unpack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
@@ -197,7 +199,7 @@ const char *MPID_nem_ptl_strnifail(ptl_ni_fail_t ni_fail);
const char *MPID_nem_ptl_strlist(ptl_list_t list);
#define DBG_MSG_PUT(md_, data_sz_, pg_rank_, match_, header_) do { \
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut: md=%s data_sz=%lu pg_rank=%d", md_, data_sz_, pg_rank_)); \
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put: md=%s data_sz=%lu pg_rank=%d", md_, data_sz_, pg_rank_)); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, " tag=%#lx ctx=%#lx rank=%ld match=%#lx", \
NPTL_MATCH_GET_TAG(match_), NPTL_MATCH_GET_CTX(match_), NPTL_MATCH_GET_RANK(match_), match_)); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, " flags=%c%c%c data_sz=%ld header=%#lx", \
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
index a6ef6e6..4a53a3d 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
@@ -6,6 +6,7 @@
#include "ptl_impl.h"
#include <pmi.h>
+#include "rptl.h"
#ifdef ENABLE_CHECKPOINTING
#error Checkpointing not implemented
@@ -18,11 +19,13 @@
#define PTI_KEY "PTI"
#define PTIG_KEY "PTIG"
#define PTIC_KEY "PTIC"
+#define PTIR_KEY "PTIR"
ptl_handle_ni_t MPIDI_nem_ptl_ni;
ptl_pt_index_t MPIDI_nem_ptl_pt;
ptl_pt_index_t MPIDI_nem_ptl_get_pt; /* portal for gets by receiver */
ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages */
+ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for rportals control messages */
ptl_handle_eq_t MPIDI_nem_ptl_eq;
ptl_handle_md_t MPIDI_nem_ptl_global_md;
ptl_ni_limits_t MPIDI_nem_ptl_ni_limits;
@@ -74,6 +77,54 @@ static MPIDI_Comm_ops_t comm_ops = {
#undef FUNCNAME
+#define FUNCNAME get_target_info
+#undef FCNAME
+#define FCNAME MPIDI_QUOTE(FUNCNAME)
+static int get_target_info(int rank, ptl_process_t *id, ptl_pt_index_t local_data_pt, ptl_pt_index_t *target_data_pt,
+ ptl_pt_index_t *target_control_pt)
+{
+ int mpi_errno = MPI_SUCCESS;
+ struct MPIDI_VC *vc;
+ MPID_nem_ptl_vc_area *vc_ptl;
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_GET_TARGET_INFO);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_GET_TARGET_INFO);
+
+ MPIDI_PG_Get_vc(MPIDI_Process.my_pg, rank, &vc);
+ vc_ptl = VC_PTL(vc);
+ if (!vc_ptl->id_initialized) {
+ mpi_errno = MPID_nem_ptl_init_id(vc);
+ if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+ }
+
+ *id = vc_ptl->id;
+
+ MPIU_Assert(local_data_pt == MPIDI_nem_ptl_pt || local_data_pt == MPIDI_nem_ptl_get_pt ||
+ local_data_pt == MPIDI_nem_ptl_control_pt);
+
+ if (local_data_pt == MPIDI_nem_ptl_pt) {
+ *target_data_pt = vc_ptl->pt;
+ *target_control_pt = vc_ptl->ptr;
+ }
+ else if (local_data_pt == MPIDI_nem_ptl_get_pt) {
+ *target_data_pt = vc_ptl->ptg;
+ *target_control_pt = PTL_PT_ANY;
+ }
+ else if (local_data_pt == MPIDI_nem_ptl_control_pt) {
+ *target_data_pt = vc_ptl->ptc;
+ *target_control_pt = PTL_PT_ANY;
+ }
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_GET_TARGET_INFO);
+ return mpi_errno;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
#define FUNCNAME ptl_init
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
@@ -145,6 +196,11 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
PTL_PT_ANY, &MPIDI_nem_ptl_control_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
+ /* allocate portal for MPICH control messages */
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_eq,
+ PTL_PT_ANY, &MPIDI_nem_ptl_rpt_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
+
/* create an MD that covers all of memory */
md.start = 0;
md.length = (ptl_size_t)-1;
@@ -154,6 +210,24 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
ret = PtlMDBind(MPIDI_nem_ptl_ni, &md, &MPIDI_nem_ptl_global_md);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s", MPID_nem_ptl_strerror(ret));
+ /* currently, rportlas only works with a single NI and EQ */
+ ret = MPID_nem_ptl_rptl_init(MPIDI_Process.my_pg->size, 5, get_target_info);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlniinit", "**ptlniinit %s", MPID_nem_ptl_strerror(ret));
+
+ /* allow rportal to manage the primary portal and retransmit if needed */
+ ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_eq, MPIDI_nem_ptl_pt, MPIDI_nem_ptl_rpt_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
+
+ /* allow rportal to manage the get and control portals, but we
+ * don't expect retransmission to be needed on these portals, so
+ * we pass PTL_PT_ANY as the dummy portal. unfortunately, portals
+ * does not have an "invalid" PT constant, which would have been
+ * more appropriate to pass over here. */
+ ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_eq, MPIDI_nem_ptl_get_pt, PTL_PT_ANY);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
+
+ ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_eq, MPIDI_nem_ptl_control_pt, PTL_PT_ANY);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* create business card */
mpi_errno = get_business_card(pg_rank, bc_val_p, val_max_sz_p);
@@ -192,15 +266,30 @@ static int ptl_finalize(void)
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* shut down portals */
+ ret = MPID_nem_ptl_rptl_drain_eq(1, &MPIDI_nem_ptl_eq);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+
+ ret = MPID_nem_ptl_rptl_ptfini(MPIDI_nem_ptl_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+ ret = MPID_nem_ptl_rptl_ptfini(MPIDI_nem_ptl_get_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_get_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+ ret = MPID_nem_ptl_rptl_ptfini(MPIDI_nem_ptl_control_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+ ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_rpt_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+
ret = PtlNIFini(MPIDI_nem_ptl_ni);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlnifini", "**ptlnifini %s", MPID_nem_ptl_strerror(ret));
@@ -262,6 +351,12 @@ static int get_business_card(int my_rank, char **bc_val_p, int *val_max_sz_p)
MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
}
+ str_errno = MPIU_Str_add_binary_arg(bc_val_p, val_max_sz_p, PTIR_KEY, (char *)&MPIDI_nem_ptl_rpt_pt,
+ sizeof(MPIDI_nem_ptl_rpt_pt));
+ if (str_errno) {
+ MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
+ MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
+ }
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_GET_BUSINESS_CARD);
@@ -345,7 +440,7 @@ static int vc_destroy(MPIDI_VC_t *vc)
#define FUNCNAME MPID_nem_ptl_get_id_from_bc
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc)
+int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr)
{
int mpi_errno = MPI_SUCCESS;
int ret;
@@ -369,6 +464,9 @@ int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, pt
ret = MPIU_Str_get_binary_arg(business_card, PTIC_KEY, (char *)ptc, sizeof(ptc), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptc), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
+ ret = MPIU_Str_get_binary_arg(business_card, PTIR_KEY, (char *)ptr, sizeof(ptr), &len);
+ MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptr), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
+
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_GET_ID_FROM_BC);
return mpi_errno;
@@ -461,7 +559,7 @@ int MPID_nem_ptl_init_id(MPIDI_VC_t *vc)
mpi_errno = vc->pg->getConnInfo(vc->pg_rank, bc, val_max_sz, vc->pg);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc);
+ mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
vc_ptl->id_initialized = TRUE;
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
index 1032a2b..e461bbc 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
@@ -6,6 +6,7 @@
#include "ptl_impl.h"
#include <mpl_utlist.h>
+#include "rptl.h"
#define NUM_SEND_BUFS 20
#define NUM_RECV_BUFS 20
@@ -197,10 +198,10 @@ static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_
if (len > PTL_MAX_EAGER)
len = PTL_MAX_EAGER;
MPIU_Memcpy(sb->buf.hp.payload, *data_p, len);
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank);
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
+ MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
sizeof(sb->buf.hp.hdr) + len, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
*hdr_p = NULL;
@@ -214,9 +215,9 @@ static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_
if (len > BUFLEN)
len = BUFLEN;
MPIU_Memcpy(sb->buf.p, *data_p, len);
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb, MPIDI_Process.my_pg_rank);
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p", len,
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p", len,
vc_ptl->id.phys.nid, vc_ptl->id.phys.pid, vc_ptl->ptc, sb));
*data_p += len;
*data_sz_p -= len;
@@ -265,10 +266,10 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void **vhdr_p,
if (last > PTL_MAX_EAGER)
last = PTL_MAX_EAGER;
MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, 0, last, sb->buf.hp.payload, &REQ_PTL(sreq)->overflow[0]);
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + last, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank);
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + last, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
+ MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
sizeof(sb->buf.hp.hdr) + last, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
*vhdr_p = NULL;
@@ -290,10 +291,10 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void **vhdr_p,
MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, sreq->dev.segment_first, last, sb->buf.p, &REQ_PTL(sreq)->overflow[0]);
sreq->dev.segment_first = last;
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, last - sreq->dev.segment_first, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank);
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, last - sreq->dev.segment_first, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
+ MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
last - sreq->dev.segment_first, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
@@ -561,8 +562,8 @@ static int send_queued(void)
send_len += last - sreq->dev.segment_first;
sreq->dev.segment_first = last;
}
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, send_len, PTL_NO_ACK_REQ, VC_PTL(sreq->ch.vc)->id, VC_PTL(sreq->ch.vc)->ptc, 0, 0, sb,
- MPIDI_Process.my_pg_rank);
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, send_len, PTL_NO_ACK_REQ, VC_PTL(sreq->ch.vc)->id, VC_PTL(sreq->ch.vc)->ptc, 0, 0, sb,
+ MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
if (!complete)
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
index 8104eaf..60e07a0 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
@@ -5,6 +5,7 @@
*/
#include "ptl_impl.h"
+#include "rptl.h"
#define OVERFLOW_LENGTH (1024*1024)
#define NUM_OVERFLOW_ME 8
@@ -130,7 +131,7 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
/* MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_POLL); */
while (1) {
- ret = PtlEQGet(MPIDI_nem_ptl_eq, &event);
+ ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_eq, &event);
if (ret == PTL_EQ_EMPTY)
break;
MPIU_ERR_CHKANDJUMP(ret == PTL_EQ_DROPPED, mpi_errno, MPI_ERR_OTHER, "**eqdropped");
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_recv.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_recv.c
index 15a9345..fd694f5 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_recv.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_recv.c
@@ -5,6 +5,7 @@
*/
#include "ptl_impl.h"
+#include "rptl.h"
#undef FUNCNAME
#define FUNCNAME dequeue_req
@@ -233,7 +234,7 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
if (dt_contig) {
/* recv buffer is contig */
REQ_PTL(rreq)->event_handler = handler_recv_complete;
- ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)rreq->dev.user_buf + dt_true_lb + PTL_LARGE_THRESHOLD),
+ ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)rreq->dev.user_buf + dt_true_lb + PTL_LARGE_THRESHOLD),
data_sz - PTL_LARGE_THRESHOLD, vc_ptl->id, vc_ptl->ptg, e->match_bits, 0, rreq);
DBG_MSG_GET("global", data_sz - PTL_LARGE_THRESHOLD, vc->pg_rank, e->match_bits);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, " buf=%p", (char *)rreq->dev.user_buf + dt_true_lb + PTL_LARGE_THRESHOLD);
@@ -260,7 +261,7 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s", MPID_nem_ptl_strerror(ret));
REQ_PTL(rreq)->event_handler = handler_recv_complete;
- ret = PtlGet(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg,
+ ret = MPID_nem_ptl_rptl_get(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg,
e->match_bits, 0, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s", MPID_nem_ptl_strerror(ret));
goto fn_exit;
@@ -271,7 +272,7 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first, mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
- ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
+ ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg, e->match_bits, 0, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s", MPID_nem_ptl_strerror(ret));
@@ -331,7 +332,7 @@ static int handler_recv_dequeue_unpack_large(const ptl_event_t *e)
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first, mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
- ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
+ ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg, e->match_bits, 0, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s", MPID_nem_ptl_strerror(ret));
@@ -645,7 +646,7 @@ int MPID_nem_ptl_lmt_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_
void * real_user_buf = (char *)rreq->dev.user_buf + dt_true_lb;
REQ_PTL(rreq)->event_handler = handler_recv_complete;
- ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)real_user_buf + PTL_LARGE_THRESHOLD),
+ ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)real_user_buf + PTL_LARGE_THRESHOLD),
data_sz - PTL_LARGE_THRESHOLD, vc_ptl->id, vc_ptl->ptg, match_bits, 0, rreq);
DBG_MSG_GET("global", data_sz - PTL_LARGE_THRESHOLD, vc->pg_rank, match_bits);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, " buf=%p", (char *)real_user_buf + PTL_LARGE_THRESHOLD);
@@ -686,7 +687,7 @@ int MPID_nem_ptl_lmt_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_
MPID_nem_ptl_strerror(ret));
REQ_PTL(rreq)->event_handler = handler_recv_complete;
- ret = PtlGet(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size, vc_ptl->id, vc_ptl->ptg,
+ ret = MPID_nem_ptl_rptl_get(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size, vc_ptl->id, vc_ptl->ptg,
match_bits, PTL_LARGE_THRESHOLD, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
MPID_nem_ptl_strerror(ret));
@@ -697,7 +698,7 @@ int MPID_nem_ptl_lmt_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size,
mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
- ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
+ ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size, vc_ptl->id, vc_ptl->ptg, match_bits,
PTL_LARGE_THRESHOLD, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c
index 72d64b3..a6fab84 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c
@@ -5,6 +5,7 @@
*/
#include "ptl_impl.h"
+#include "rptl.h"
#undef FUNCNAME
#define FUNCNAME handler_send_complete
@@ -101,7 +102,7 @@ static int handler_pack_chunk(const ptl_event_t *e)
sreq->dev.segment_first += PTL_LARGE_THRESHOLD;
/* notify receiver */
- ret = PtlPut(MPIDI_nem_ptl_global_md, 0, 0, PTL_ACK_REQ, vc_ptl->id,
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_ACK_REQ, vc_ptl->id,
vc_ptl->pt, ?????, 0, sreq,
NPTL_HEADER(?????, MPIDI_Process.my_pg_rank, me.match_bits));
@@ -208,9 +209,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Small contig message");
REQ_PTL(sreq)->event_handler = handler_send_complete;
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "&REQ_PTL(sreq)->event_handler = %p", &(REQ_PTL(sreq)->event_handler));
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag, data_sz));
+ NPTL_HEADER(ssend_flag, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "id.nid = %#x", vc_ptl->id.phys.nid);
@@ -245,9 +246,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s", MPID_nem_ptl_strerror(ret));
REQ_PTL(sreq)->event_handler = handler_send_complete;
- ret = PtlPut(REQ_PTL(sreq)->md, 0, data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
+ ret = MPID_nem_ptl_rptl_put(REQ_PTL(sreq)->md, 0, data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag, data_sz));
+ NPTL_HEADER(ssend_flag, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("sreq", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
goto fn_exit;
@@ -262,9 +263,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPID_Segment_pack(sreq->dev.segment_ptr, sreq->dev.segment_first, &last, REQ_PTL(sreq)->chunk_buffer[0]);
MPIU_Assert(last == sreq->dev.segment_size);
REQ_PTL(sreq)->event_handler = handler_send_complete;
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], data_sz, PTL_ACK_REQ,
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], data_sz, PTL_ACK_REQ,
vc_ptl->id, vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag, data_sz));
+ NPTL_HEADER(ssend_flag, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
goto fn_exit;
@@ -292,9 +293,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large;
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
+ NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
@@ -359,9 +360,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large;
- ret = PtlPut(REQ_PTL(sreq)->md, 0, PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
+ ret = MPID_nem_ptl_rptl_put(REQ_PTL(sreq)->md, 0, PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
+ NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("req", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
@@ -397,9 +398,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large;
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ,
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ,
vc_ptl->id, vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
+ NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
@@ -438,9 +439,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large_multi;
- ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq_)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id,
+ ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq_)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id,
vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag | NPTL_LARGE | NPTL_MULTIPLE, data_sz));
+ NPTL_HEADER(ssend_flag | NPTL_LARGE | NPTL_MULTIPLE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
#endif
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
new file mode 100644
index 0000000..da126e4
--- /dev/null
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
@@ -0,0 +1,1272 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ * (C) 2014 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#include "ptl_impl.h"
+#include "rptl.h"
+
+/*
+ * Prereqs:
+ *
+ * 1. We create an extra control portal that is only used by rportals.
+ *
+ * 2. All communication operations are logged at the origin process,
+ * and their ACKs and NACKs are kept track of. If an operation gets
+ * an ACK, it is complete and can be deleted from the logs. If an
+ * operation gets a NACK, it will need to be retransmitted once the
+ * flow-control protocol described below has completed.
+ *
+ *
+ * Flow control algorithm:
+ *
+ * 1. When the primary data portal gets disabled, the target sends
+ * PAUSE messages to all other processes.
+ *
+ * 2. Once each process confirms that it has no outstanding packets on
+ * the wire (i.e., all packets have either been ACKed or NACKed), it
+ * sends a PAUSE-ACK message.
+ *
+ * 3. When the target receives PAUSE-ACK messages from all processes
+ * (thus confirming that the network traffic to itself has been
+ * quiesced), it waits till the user has dequeued at least half the
+ * messages from the overflow buffer. This is done by keeping track
+ * of the number of messages that are injected into the overflow
+ * buffer by portals and the number of messages that are dequeued by
+ * the user.
+ *
+ * 4. Once we know that there is enough free space in the overflow
+ * buffers, the target reenables the portal and send an UNPAUSE
+ * message to all processes.
+ *
+ *
+ * Known issues:
+ *
+ * 1. None of the error codes specified by portals allow us to return
+ * an "OTHER" error, when something bad happens internally. So we
+ * arbitrarily return PTL_FAIL when it is an internal error even
+ * though that's not a specified error return code for some portals
+ * functions. When portals functions are called internally, if they
+ * return an error, we funnel them back upstream. This is not an
+ * "issue" per se, but is still ugly.
+ *
+ * 2. None of the pt index types specified by portals allow us to
+ * retuen an "INVALID" pt entry, to show that a portal is invalid. So
+ * we arbitrarily use PTL_PT_ANY in such cases. Again, this is not an
+ * "issue" per se, but is ugly.
+ */
+
+#define IDS_ARE_EQUAL(t1, t2) \
+ (t1.phys.nid == t2.phys.nid && t1.phys.pid == t2.phys.pid)
+
+#define RPTL_OP_POOL_SEGMENT_COUNT (1024)
+
+static struct {
+ struct rptl *rptl_list;
+
+ struct rptl_op_pool_segment {
+ struct rptl_op op[RPTL_OP_POOL_SEGMENT_COUNT];
+ struct rptl_op_pool_segment *next;
+ struct rptl_op_pool_segment *prev;
+ } *op_segment_list;
+ struct rptl_op *op_pool;
+
+ struct rptl_op *op_list;
+
+ /* targets that we do not send messages to either because they
+ * sent a PAUSE message or because we received a NACK from them */
+ struct rptl_paused_target {
+ ptl_process_t id;
+ enum rptl_paused_target_state {
+ RPTL_TARGET_STATE_FLOWCONTROL,
+ RPTL_TARGET_STATE_DISABLED,
+ RPTL_TARGET_STATE_RECEIVED_PAUSE,
+ RPTL_TARGET_STATE_PAUSE_ACKED
+ } state;
+
+ /* the rptl on which the pause message came in, since we need
+ * to use it to send the pause ack to the right target
+ * portal */
+ struct rptl *rptl;
+
+ struct rptl_paused_target *next;
+ struct rptl_paused_target *prev;
+ } *paused_target_list;
+
+ int world_size;
+ uint64_t origin_events_left;
+ int (*get_target_info) (int rank, ptl_process_t * id, ptl_pt_index_t local_data_pt,
+ ptl_pt_index_t * target_data_pt, ptl_pt_index_t * target_control_pt);
+} rptl_info;
+
+
+#undef FUNCNAME
+#define FUNCNAME alloc_target
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int alloc_target(ptl_process_t id, enum rptl_paused_target_state state, struct rptl *rptl)
+{
+ int mpi_errno = MPI_SUCCESS;
+ int ret = PTL_OK;
+ struct rptl_paused_target *target;
+ MPIU_CHKPMEM_DECL(1);
+ MPIDI_STATE_DECL(MPID_STATE_ALLOC_TARGET);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_TARGET);
+
+ for (target = rptl_info.paused_target_list; target; target = target->next)
+ if (IDS_ARE_EQUAL(target->id, id))
+ break;
+
+ /* if a paused target does not already exist, create one */
+ if (target == NULL) {
+ /* create a new paused target */
+ MPIU_CHKPMEM_MALLOC(target, struct rptl_paused_target *, sizeof(struct rptl_paused_target),
+ mpi_errno, "rptl paused target");
+ MPL_DL_APPEND(rptl_info.paused_target_list, target);
+
+ target->id = id;
+ target->state = state;
+ target->rptl = rptl;
+ }
+ else if (target->state < state) {
+ target->state = state;
+ target->rptl = rptl;
+ }
+ else {
+ /* target already exists and is in a higher state than the
+ * state we are trying to set. e.g., this is possible if we
+ * got a PAUSE event from a different portal and acked. */
+ }
+
+ fn_exit:
+ MPIU_CHKPMEM_COMMIT();
+ MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_TARGET);
+ return ret;
+
+ fn_fail:
+ if (mpi_errno)
+ ret = PTL_FAIL;
+ MPIU_CHKPMEM_REAP();
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME alloc_op_segment
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int alloc_op_segment(void)
+{
+ struct rptl_op_pool_segment *op_segment;
+ int mpi_errno = MPI_SUCCESS;
+ int i;
+ int ret = PTL_OK;
+ MPIU_CHKPMEM_DECL(1);
+ MPIDI_STATE_DECL(MPID_STATE_ALLOC_OP_SEGMENT);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_OP_SEGMENT);
+
+ MPIU_CHKPMEM_MALLOC(op_segment, struct rptl_op_pool_segment *, sizeof(struct rptl_op_pool_segment),
+ mpi_errno, "op pool segment");
+ MPL_DL_APPEND(rptl_info.op_segment_list, op_segment);
+
+ for (i = 0; i < RPTL_OP_POOL_SEGMENT_COUNT; i++)
+ MPL_DL_APPEND(rptl_info.op_pool, &op_segment->op[i]);
+
+ fn_exit:
+ MPIU_CHKPMEM_COMMIT();
+ MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_OP_SEGMENT);
+ return ret;
+
+ fn_fail:
+ if (mpi_errno)
+ ret = PTL_FAIL;
+ MPIU_CHKPMEM_REAP();
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_ptl_rptl_init
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_init(int world_size, uint64_t max_origin_events,
+ int (*get_target_info) (int rank, ptl_process_t * id,
+ ptl_pt_index_t local_data_pt,
+ ptl_pt_index_t * target_data_pt,
+ ptl_pt_index_t * target_control_pt))
+{
+ int mpi_errno = MPI_SUCCESS;
+ int ret = PTL_OK;
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_INIT);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_INIT);
+
+ rptl_info.rptl_list = NULL;
+
+ rptl_info.op_pool = NULL;
+ ret = alloc_op_segment();
+ RPTLU_ERR_POP(ret, "error allocating op segment\n");
+
+ rptl_info.op_list = NULL;
+
+ rptl_info.paused_target_list = NULL;
+ rptl_info.world_size = world_size;
+ rptl_info.origin_events_left = max_origin_events;
+ rptl_info.get_target_info = get_target_info;
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_INIT);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_ptl_rptl_drain_eq
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_drain_eq(int eq_count, ptl_handle_eq_t *eq)
+{
+ int ret = PTL_OK;
+ ptl_event_t event;
+ struct rptl_op_pool_segment *op_segment;
+ int i;
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
+
+ while (rptl_info.op_list) {
+ for (i = 0; i < eq_count; i++) {
+ /* read and ignore all events */
+ ret = MPID_nem_ptl_rptl_eqget(eq[i], &event);
+ if (ret == PTL_EQ_EMPTY)
+ ret = PTL_OK;
+ RPTLU_ERR_POP(ret, "Error calling MPID_nem_ptl_rptl_eqget\n");
+ }
+ }
+
+ while (rptl_info.op_segment_list) {
+ op_segment = rptl_info.op_segment_list;
+ MPL_DL_DELETE(rptl_info.op_segment_list, op_segment);
+ MPIU_Free(op_segment);
+ }
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME post_empty_buffer
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static inline int post_empty_buffer(ptl_handle_ni_t ni_handle, ptl_pt_index_t pt,
+ ptl_handle_me_t * me_handle)
+{
+ int ret;
+ ptl_me_t me;
+ ptl_process_t id;
+ MPIDI_STATE_DECL(MPID_STATE_POST_EMPTY_BUFFER);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_POST_EMPTY_BUFFER);
+
+ id.phys.nid = PTL_NID_ANY;
+ id.phys.pid = PTL_PID_ANY;
+
+ me.start = NULL;
+ me.length = 0;
+ me.ct_handle = PTL_CT_NONE;
+ me.uid = PTL_UID_ANY;
+ me.options = (PTL_ME_OP_PUT | PTL_ME_OP_GET | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
+ PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE);
+ me.match_id = id;
+ me.match_bits = 0;
+ me.ignore_bits = 0;
+ me.min_free = 0;
+
+ ret = PtlMEAppend(ni_handle, pt, &me, PTL_PRIORITY_LIST, NULL, me_handle);
+ RPTLU_ERR_POP(ret, "Error appending empty buffer to priority list\n");
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_POST_EMPTY_BUFFER);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_ptl_rptl_ptinit
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_ptinit(ptl_handle_ni_t ni_handle, ptl_handle_eq_t eq_handle, ptl_pt_index_t data_pt,
+ ptl_pt_index_t control_pt)
+{
+ int ret = PTL_OK;
+ struct rptl *rptl;
+ int mpi_errno = MPI_SUCCESS;
+ int i;
+ ptl_md_t md;
+ MPIU_CHKPMEM_DECL(2);
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_PTINIT);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_PTINIT);
+
+
+ /* setup the parts of rptls that can be done before world size or
+ * target information */
+ MPIU_CHKPMEM_MALLOC(rptl, struct rptl *, sizeof(struct rptl), mpi_errno, "rptl");
+ MPL_DL_APPEND(rptl_info.rptl_list, rptl);
+
+ rptl->local_state = RPTL_LOCAL_STATE_NORMAL;
+ rptl->pause_ack_counter = 0;
+
+ rptl->data.ob_max_count = 0;
+ rptl->data.ob_curr_count = 0;
+
+ rptl->data.pt = data_pt;
+ rptl->control.pt = control_pt;
+
+ rptl->ni = ni_handle;
+ rptl->eq = eq_handle;
+
+ md.start = 0;
+ md.length = (ptl_size_t) (-1);
+ md.options = 0x0;
+ md.eq_handle = rptl->eq;
+ md.ct_handle = PTL_CT_NONE;
+ ret = PtlMDBind(rptl->ni, &md, &rptl->md);
+ RPTLU_ERR_POP(ret, "Error binding new global MD\n");
+
+ /* post world_size number of empty buffers on the control portal */
+ if (rptl->control.pt != PTL_PT_ANY) {
+ MPIU_CHKPMEM_MALLOC(rptl->control.me, ptl_handle_me_t *,
+ 2 * rptl_info.world_size * sizeof(ptl_handle_me_t), mpi_errno,
+ "rptl target info");
+ for (i = 0; i < 2 * rptl_info.world_size; i++) {
+ ret = post_empty_buffer(rptl->ni, rptl->control.pt, &rptl->control.me[i]);
+ RPTLU_ERR_POP(ret, "Error in post_empty_buffer\n");
+ }
+ rptl->control.me_idx = 0;
+ }
+
+ fn_exit:
+ MPIU_CHKPMEM_COMMIT();
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_PTINIT);
+ return ret;
+
+ fn_fail:
+ if (mpi_errno)
+ ret = PTL_FAIL;
+ MPIU_CHKPMEM_REAP();
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_ptl_rptl_ptfini
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_ptfini(ptl_pt_index_t pt_index)
+{
+ int i;
+ int ret = PTL_OK;
+ struct rptl *rptl;
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_PTFINI);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_PTFINI);
+
+ /* find the right rptl */
+ for (rptl = rptl_info.rptl_list; rptl && rptl->data.pt != pt_index; rptl = rptl->next);
+ assert(rptl);
+
+ /* free control portals that were created */
+ if (rptl->control.pt != PTL_PT_ANY) {
+ for (i = 0; i < rptl_info.world_size * 2; i++) {
+ ret = PtlMEUnlink(rptl->control.me[i]);
+ RPTLU_ERR_POP(ret, "Error unlinking control buffers\n");
+ }
+ MPIU_Free(rptl->control.me);
+ }
+
+ MPL_DL_DELETE(rptl_info.rptl_list, rptl);
+ MPIU_Free(rptl);
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_PTFINI);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME alloc_op
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int alloc_op(struct rptl_op **op)
+{
+ int ret = PTL_OK;
+ MPIDI_STATE_DECL(MPID_STATE_ALLOC_OP);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_OP);
+
+ if (rptl_info.op_pool == NULL) {
+ ret = alloc_op_segment();
+ RPTLU_ERR_POP(ret, "error allocating op segment\n");
+ }
+
+ *op = rptl_info.op_pool;
+ MPL_DL_DELETE(rptl_info.op_pool, *op);
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_OP);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME free_op
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+void free_op(struct rptl_op *op)
+{
+ MPIDI_STATE_DECL(MPID_STATE_FREE_OP);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_FREE_OP);
+
+ MPL_DL_APPEND(rptl_info.op_pool, op);
+
+ MPIDI_FUNC_EXIT(MPID_STATE_FREE_OP);
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME issue_op
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int issue_op(struct rptl_op *op)
+{
+ int ret = PTL_OK;
+ struct rptl_paused_target *target;
+ MPIDI_STATE_DECL(MPID_STATE_ISSUE_OP);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_OP);
+
+ if (op->op_type == RPTL_OP_PUT) {
+ for (target = rptl_info.paused_target_list; target; target = target->next)
+ if (IDS_ARE_EQUAL(target->id, op->u.put.target_id))
+ break;
+
+ if (target && op->u.put.flow_control)
+ goto fn_exit;
+
+ if (rptl_info.origin_events_left < 2) {
+ ret = alloc_target(op->u.put.target_id, RPTL_TARGET_STATE_FLOWCONTROL, NULL);
+ RPTLU_ERR_POP(ret, "error allocating paused target\n");
+ goto fn_exit;
+ }
+ rptl_info.origin_events_left -= 2;
+
+ /* force request for an ACK even if the user didn't ask for
+ * it. replace the user pointer with the OP id. */
+ ret =
+ PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length,
+ PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index,
+ op->u.put.match_bits, op->u.put.remote_offset, op,
+ op->u.put.hdr_data);
+ RPTLU_ERR_POP(ret, "Error issuing PUT\n");
+ }
+ else {
+ for (target = rptl_info.paused_target_list; target; target = target->next)
+ if (IDS_ARE_EQUAL(target->id, op->u.get.target_id))
+ break;
+
+ if (target)
+ goto fn_exit;
+
+ if (rptl_info.origin_events_left < 1) {
+ ret = alloc_target(op->u.get.target_id, RPTL_TARGET_STATE_FLOWCONTROL, NULL);
+ RPTLU_ERR_POP(ret, "error allocating paused target\n");
+ goto fn_exit;
+ }
+ rptl_info.origin_events_left--;
+
+ ret =
+ PtlGet(op->u.get.md_handle, op->u.get.local_offset, op->u.get.length,
+ op->u.get.target_id, op->u.get.pt_index, op->u.get.match_bits,
+ op->u.get.remote_offset, op);
+ RPTLU_ERR_POP(ret, "Error issuing GET\n");
+ }
+
+ op->state = RPTL_OP_STATE_ISSUED;
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_OP);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_ptl_rptl_put
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
+ ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
+ ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
+ ptl_hdr_data_t hdr_data, int flow_control)
+{
+ struct rptl_op *op;
+ int ret = PTL_OK;
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
+
+ ret = alloc_op(&op);
+ RPTLU_ERR_POP(ret, "error allocating op\n");
+
+ op->op_type = RPTL_OP_PUT;
+ op->state = RPTL_OP_STATE_QUEUED;
+
+ /* store the user parameters */
+ op->u.put.md_handle = md_handle;
+ op->u.put.local_offset = local_offset;
+ op->u.put.length = length;
+ op->u.put.ack_req = ack_req;
+ op->u.put.target_id = target_id;
+ op->u.put.pt_index = pt_index;
+ op->u.put.match_bits = match_bits;
+ op->u.put.remote_offset = remote_offset;
+ op->u.put.user_ptr = user_ptr;
+ op->u.put.hdr_data = hdr_data;
+
+ /* place to store the send and ack events */
+ op->u.put.send = NULL;
+ op->u.put.ack = NULL;
+ op->u.put.flow_control = flow_control;
+ op->events_ready = 0;
+
+ MPL_DL_APPEND(rptl_info.op_list, op);
+
+ /* if we are not in a PAUSED state, issue the operation */
+ ret = issue_op(op);
+ RPTLU_ERR_POP(ret, "Error from issue_op\n");
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_ptl_rptl_get
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
+ ptl_process_t target_id, ptl_pt_index_t pt_index,
+ ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr)
+{
+ struct rptl_op *op;
+ int ret = PTL_OK;
+ struct rptl_paused_target *target;
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
+
+ ret = alloc_op(&op);
+ RPTLU_ERR_POP(ret, "error allocating op\n");
+
+ op->op_type = RPTL_OP_GET;
+ op->state = RPTL_OP_STATE_QUEUED;
+
+ /* store the user parameters */
+ op->u.get.md_handle = md_handle;
+ op->u.get.local_offset = local_offset;
+ op->u.get.length = length;
+ op->u.get.target_id = target_id;
+ op->u.get.pt_index = pt_index;
+ op->u.get.match_bits = match_bits;
+ op->u.get.remote_offset = remote_offset;
+ op->u.get.user_ptr = user_ptr;
+
+ op->events_ready = 0;
+
+ MPL_DL_APPEND(rptl_info.op_list, op);
+
+ for (target = rptl_info.paused_target_list; target; target = target->next)
+ if (IDS_ARE_EQUAL(target->id, target_id))
+ break;
+
+ ret = issue_op(op);
+ RPTLU_ERR_POP(ret, "Error from issue_op\n");
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME send_pause_messages
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int send_pause_messages(struct rptl *rptl)
+{
+ int i, mpi_errno = MPI_SUCCESS;
+ ptl_process_t id;
+ ptl_pt_index_t data_pt, control_pt;
+ int ret = PTL_OK;
+ MPIDI_STATE_DECL(MPID_STATE_SEND_PAUSE_MESSAGES);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_SEND_PAUSE_MESSAGES);
+
+ /* if no control portal is setup for this rptl, we are doomed */
+ assert(rptl->control.pt != PTL_PT_ANY);
+
+ /* set the max message count in the overflow buffers we can keep
+ * before sending the unpause messages */
+ rptl->data.ob_max_count = rptl->data.ob_curr_count / 2;
+
+ for (i = 0; i < rptl_info.world_size; i++) {
+ mpi_errno = rptl_info.get_target_info(i, &id, rptl->data.pt, &data_pt, &control_pt);
+ if (mpi_errno) {
+ ret = PTL_FAIL;
+ RPTLU_ERR_POP(ret, "Error getting target info while sending pause messages\n");
+ }
+
+ /* disable flow control for control messages */
+ ret = MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0,
+ NULL, RPTL_CONTROL_MSG_PAUSE, 0);
+ RPTLU_ERR_POP(ret, "Error sending pause message\n");
+ }
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_SEND_PAUSE_MESSAGES);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME send_pause_ack_messages
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int send_pause_ack_messages(void)
+{
+ struct rptl_op *op;
+ int ret = PTL_OK;
+ struct rptl_paused_target *target;
+ MPIDI_STATE_DECL(MPID_STATE_SEND_PAUSE_ACK_MESSAGES);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_SEND_PAUSE_ACK_MESSAGES);
+
+ for (target = rptl_info.paused_target_list; target; target = target->next) {
+ if (target->state != RPTL_TARGET_STATE_RECEIVED_PAUSE)
+ continue;
+
+ for (op = rptl_info.op_list; op; op = op->next) {
+ if (op->op_type == RPTL_OP_GET && IDS_ARE_EQUAL(op->u.get.target_id, target->id) &&
+ op->state == RPTL_OP_STATE_ISSUED)
+ break;
+
+ if (op->op_type == RPTL_OP_PUT && IDS_ARE_EQUAL(op->u.put.target_id, target->id)) {
+ if (op->state == RPTL_OP_STATE_ISSUED)
+ break;
+ if (op->u.put.send || op->u.put.ack)
+ break;
+ }
+ }
+
+ if (op == NULL) {
+ ptl_process_t id;
+ ptl_pt_index_t data_pt, control_pt;
+ int i;
+ int mpi_errno = MPI_SUCCESS;
+
+ for (i = 0; i < rptl_info.world_size; i++) {
+ /* find the target that has this target id and get the
+ * control portal information for it */
+ mpi_errno = rptl_info.get_target_info(i, &id, target->rptl->data.pt, &data_pt, &control_pt);
+ if (mpi_errno) {
+ ret = PTL_FAIL;
+ RPTLU_ERR_POP(ret,
+ "Error getting target info while sending pause ack message\n");
+ }
+ if (IDS_ARE_EQUAL(id, target->id))
+ break;
+ }
+
+ /* disable flow control for control messages */
+ ret =
+ MPID_nem_ptl_rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0,
+ 0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, 0);
+ RPTLU_ERR_POP(ret, "Error sending pause ack message\n");
+
+ if (target->state == RPTL_TARGET_STATE_RECEIVED_PAUSE)
+ target->state = RPTL_TARGET_STATE_PAUSE_ACKED;
+ }
+ }
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_SEND_PAUSE_ACK_MESSAGES);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME send_unpause_messages
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int send_unpause_messages(void)
+{
+ int i, mpi_errno = MPI_SUCCESS;
+ ptl_process_t id;
+ ptl_pt_index_t data_pt, control_pt;
+ int ret = PTL_OK;
+ struct rptl *rptl;
+ MPIDI_STATE_DECL(MPID_STATE_SEND_UNPAUSE_MESSAGES);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_SEND_UNPAUSE_MESSAGES);
+
+ for (rptl = rptl_info.rptl_list; rptl; rptl = rptl->next) {
+ assert(rptl->local_state != RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS ||
+ rptl->control.pt != PTL_PT_ANY);
+ if (rptl->control.pt == PTL_PT_ANY)
+ continue;
+ if (rptl->local_state != RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS)
+ continue;
+
+ if (rptl->pause_ack_counter == rptl_info.world_size) {
+ /* if we are over the max count limit, do not send an
+ * unpause message yet */
+ if (rptl->data.ob_curr_count > rptl->data.ob_max_count)
+ goto fn_exit;
+
+ ret = PtlPTEnable(rptl->ni, rptl->data.pt);
+ RPTLU_ERR_POP(ret, "Error returned while reenabling PT\n");
+
+ rptl->local_state = RPTL_LOCAL_STATE_NORMAL;
+
+ for (i = 0; i < rptl_info.world_size; i++) {
+ mpi_errno = rptl_info.get_target_info(i, &id, rptl->data.pt, &data_pt, &control_pt);
+ if (mpi_errno) {
+ ret = PTL_FAIL;
+ RPTLU_ERR_POP(ret,
+ "Error getting target info while sending unpause messages\n");
+ }
+
+ /* disable flow control for control messages */
+ ret =
+ MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt,
+ 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, 0);
+ RPTLU_ERR_POP(ret, "Error sending unpause message\n");
+ }
+ }
+ }
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_SEND_UNPAUSE_MESSAGES);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME reissue_ops
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int reissue_ops(ptl_process_t target_id)
+{
+ struct rptl_paused_target *target;
+ struct rptl_op *op;
+ int ret = PTL_OK;
+ MPIDI_STATE_DECL(MPID_STATE_REISSUE_OPS);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_REISSUE_OPS);
+
+ for (target = rptl_info.paused_target_list; target; target = target->next)
+ if (IDS_ARE_EQUAL(target->id, target_id))
+ break;
+ assert(target);
+
+ MPL_DL_DELETE(rptl_info.paused_target_list, target);
+ MPIU_Free(target);
+
+ for (op = rptl_info.op_list; op; op = op->next) {
+ if ((op->op_type == RPTL_OP_PUT && IDS_ARE_EQUAL(op->u.put.target_id, target_id)) ||
+ (op->op_type == RPTL_OP_GET && IDS_ARE_EQUAL(op->u.get.target_id, target_id))) {
+ if (op->state != RPTL_OP_STATE_ISSUED) {
+ ret = issue_op(op);
+ RPTLU_ERR_POP(ret, "Error calling issue_op\n");
+ }
+ }
+ }
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_REISSUE_OPS);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME get_event_info
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static void get_event_info(ptl_event_t * event, struct rptl **ret_rptl, struct rptl_op **ret_op)
+{
+ struct rptl *rptl;
+ struct rptl_op *op;
+ struct rptl_paused_target *target, *tmp;
+ MPIDI_STATE_DECL(MPID_STATE_GET_EVENT_INFO);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_GET_EVENT_INFO);
+
+ if (event->type == PTL_EVENT_SEND || event->type == PTL_EVENT_REPLY ||
+ event->type == PTL_EVENT_ACK) {
+ op = (struct rptl_op *) event->user_ptr;
+
+ rptl_info.origin_events_left++;
+
+ if (rptl_info.origin_events_left >= 2) {
+ for (target = rptl_info.paused_target_list; target;) {
+ if (target->state == RPTL_TARGET_STATE_FLOWCONTROL) {
+ tmp = target->next;
+ MPL_DL_DELETE(rptl_info.paused_target_list, target);
+ MPIU_Free(target);
+ target = tmp;
+ }
+ else
+ target = target->next;
+ }
+ }
+
+ assert(op);
+ rptl = NULL;
+ }
+ else {
+ /* for all target-side events, we look up the rptl based on
+ * the pt_index */
+ for (rptl = rptl_info.rptl_list; rptl; rptl = rptl->next)
+ if (rptl->data.pt == event->pt_index || rptl->control.pt == event->pt_index)
+ break;
+
+ assert(rptl);
+ op = NULL;
+ }
+
+ *ret_rptl = rptl;
+ *ret_op = op;
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_GET_EVENT_INFO);
+ return;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME stash_event
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int stash_event(struct rptl_op *op, ptl_event_t event)
+{
+ int mpi_errno = MPI_SUCCESS;
+ int ret = PTL_OK;
+ MPIU_CHKPMEM_DECL(1);
+ MPIDI_STATE_DECL(MPID_STATE_STASH_EVENT);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_STASH_EVENT);
+
+ /* make sure this is of the event type we know of */
+ assert(event.type == PTL_EVENT_SEND || event.type == PTL_EVENT_ACK);
+
+ /* only PUT events are stashed */
+ assert(op->op_type == RPTL_OP_PUT);
+
+ /* we should never stash anything when we are in events ready */
+ assert(op->events_ready == 0);
+
+ /* only one of send or ack is stashed. if we are in this
+ * function, both the events should be NULL at this point. */
+ assert(op->u.put.send == NULL && op->u.put.ack == NULL);
+
+ if (event.type == PTL_EVENT_SEND) {
+ MPIU_CHKPMEM_MALLOC(op->u.put.send, ptl_event_t *, sizeof(ptl_event_t), mpi_errno,
+ "ptl event");
+ memcpy(op->u.put.send, &event, sizeof(ptl_event_t));
+ }
+ else {
+ MPIU_CHKPMEM_MALLOC(op->u.put.ack, ptl_event_t *, sizeof(ptl_event_t), mpi_errno,
+ "ptl event");
+ memcpy(op->u.put.ack, &event, sizeof(ptl_event_t));
+ }
+
+ fn_exit:
+ MPIU_CHKPMEM_COMMIT();
+ MPIDI_FUNC_EXIT(MPID_STATE_STASH_EVENT);
+ return ret;
+
+ fn_fail:
+ if (mpi_errno)
+ ret = PTL_FAIL;
+ MPIU_CHKPMEM_REAP();
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME retrieve_event
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static void retrieve_event(struct rptl *rptl, struct rptl_op *op, ptl_event_t * event)
+{
+ MPIDI_STATE_DECL(MPID_STATE_RETRIEVE_EVENT);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_RETRIEVE_EVENT);
+
+ assert(op->op_type == RPTL_OP_PUT);
+ assert(op->u.put.send || op->u.put.ack);
+
+ if (op->u.put.send) {
+ memcpy(event, op->u.put.send, sizeof(ptl_event_t));
+ MPIU_Free(op->u.put.send);
+ }
+ else {
+ memcpy(event, op->u.put.ack, sizeof(ptl_event_t));
+ MPIU_Free(op->u.put.ack);
+ }
+ event->user_ptr = op->u.put.user_ptr;
+
+ MPL_DL_DELETE(rptl_info.op_list, op);
+ free_op(op);
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_RETRIEVE_EVENT);
+ return;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME issue_pending_ops
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int issue_pending_ops(void)
+{
+ struct rptl_paused_target *target, *tmp;
+ struct rptl_op *op;
+ int ret = PTL_OK;
+ MPIDI_STATE_DECL(MPID_STATE_ISSUE_PENDING_OPS);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_PENDING_OPS);
+
+ for (op = rptl_info.op_list; op; op = op->next) {
+ if (op->state == RPTL_OP_STATE_QUEUED) {
+ for (target = rptl_info.paused_target_list; target; target = target->next)
+ if ((op->op_type == RPTL_OP_PUT && IDS_ARE_EQUAL(op->u.put.target_id, target->id)) ||
+ (op->op_type == RPTL_OP_GET && IDS_ARE_EQUAL(op->u.get.target_id, target->id)))
+ break;
+ if (target == NULL) {
+ ret = issue_op(op);
+ RPTLU_ERR_POP(ret, "error issuing op\n");
+ }
+ }
+ }
+
+ fn_exit:
+ MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_PENDING_OPS);
+ return ret;
+
+ fn_fail:
+ goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_ptl_rptl_eqget
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
+{
+ struct rptl_op *op;
+ struct rptl *rptl;
+ ptl_event_t e;
+ int ret = PTL_OK, tmp_ret = PTL_OK;
+ struct rptl_paused_target *target;
+ int mpi_errno = MPI_SUCCESS;
+ MPIU_CHKPMEM_DECL(1);
+ MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
+
+ MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
+
+ /* before we poll the eq, we need to check if there are any
+ * completed operations that need to be returned */
+ /* FIXME: this is an expensive loop over all pending operations
+ * everytime the user does an eqget */
+ for (op = rptl_info.op_list; op; op = op->next) {
+ if (op->events_ready) {
+ retrieve_event(rptl, op, event);
+ ret = PTL_OK;
+ goto fn_exit;
+ }
+ }
+
+ /* see if pause ack messages need to be sent out */
+ tmp_ret = send_pause_ack_messages();
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from send_pause_ack_messages\n");
+ }
+
+ /* see if unpause messages need to be sent out */
+ tmp_ret = send_unpause_messages();
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from send_unpause_messages\n");
+ }
+
+ /* see if there are any pending ops to be issued */
+ tmp_ret = issue_pending_ops();
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from issue_pending_ops\n");
+ }
+
+ ret = PtlEQGet(eq_handle, event);
+ if (ret == PTL_EQ_EMPTY)
+ goto fn_exit;
+
+ /* find the rptl and op associated with this event */
+ get_event_info(event, &rptl, &op);
+
+ /* PT_DISABLED events only occur on the target */
+ if (event->type == PTL_EVENT_PT_DISABLED) {
+ /* we hide PT disabled events from the user */
+ ret = PTL_EQ_EMPTY;
+
+ /* we should only receive disable events on the data pt */
+ assert(rptl->data.pt == event->pt_index);
+
+ /* if we don't have a control PT, we don't have a way to
+ * recover from disable events */
+ assert(rptl->control.pt != PTL_PT_ANY);
+
+ rptl->local_state = RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS;
+ rptl->pause_ack_counter = 0;
+
+ /* send out pause messages */
+ tmp_ret = send_pause_messages(rptl);
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from send_pause_messages\n");
+ }
+ }
+
+ /* PUT_OVERFLOW events only occur on the target and only for the
+ * data portal */
+ else if (event->type == PTL_EVENT_PUT_OVERFLOW || event->type == PTL_EVENT_GET_OVERFLOW) {
+ /* something is being pulled out of the overflow buffer,
+ * decrement counter */
+ rptl->data.ob_curr_count--;
+
+ /* we should only receive disable events on the data pt */
+ assert(rptl->data.pt == event->pt_index);
+ }
+
+ /* PUT events only occur on the target */
+ else if (event->type == PTL_EVENT_PUT || event->type == PTL_EVENT_GET) {
+ if (rptl->data.pt == event->pt_index) {
+ /* if the event is in the OVERFLOW list, then it means we
+ * just got a match in there */
+ if (event->ptl_list == PTL_OVERFLOW_LIST)
+ rptl->data.ob_curr_count++;
+ goto fn_exit;
+ }
+
+ /* control PT should never see a GET event */
+ assert(event->type == PTL_EVENT_PUT);
+
+ /* else, this message is on the control PT, so hide this event
+ * from the user */
+ ret = PTL_EQ_EMPTY;
+
+ /* the message came in on the control PT, repost it */
+ tmp_ret = post_empty_buffer(rptl->ni, rptl->control.pt,
+ &rptl->control.me[rptl->control.me_idx]);
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from post_empty_buffer\n");
+ }
+ rptl->control.me_idx++;
+ if (rptl->control.me_idx >= 2 * rptl_info.world_size)
+ rptl->control.me_idx = 0;
+
+ if (event->hdr_data == RPTL_CONTROL_MSG_PAUSE) {
+ tmp_ret = alloc_target(event->initiator, RPTL_TARGET_STATE_RECEIVED_PAUSE, rptl);
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from alloc_target\n");
+ }
+ }
+ else if (event->hdr_data == RPTL_CONTROL_MSG_PAUSE_ACK) {
+ rptl->pause_ack_counter++;
+ }
+ else { /* got an UNPAUSE message */
+ /* reissue all operations to this target */
+ tmp_ret = reissue_ops(event->initiator);
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from reissue_ops\n");
+ }
+ }
+ }
+
+ /* origin side events */
+ else if (event->type == PTL_EVENT_SEND || event->type == PTL_EVENT_ACK ||
+ event->type == PTL_EVENT_REPLY) {
+
+ /* if this is a failed event, we simply drop this event */
+ if (event->ni_fail_type == PTL_NI_PT_DISABLED) {
+ /* hide the event from the user */
+ ret = PTL_EQ_EMPTY;
+
+ op->state = RPTL_OP_STATE_NACKED;
+
+ if (op->op_type == RPTL_OP_PUT) {
+ assert(!(event->type == PTL_EVENT_SEND && op->u.put.send));
+ assert(!(event->type == PTL_EVENT_ACK && op->u.put.ack));
+
+ /* if we have received both events, discard them.
+ * otherwise, stash the one we received while waiting
+ * for the other. */
+ if (event->type == PTL_EVENT_SEND && op->u.put.ack) {
+ MPIU_Free(op->u.put.ack);
+ op->u.put.ack = NULL;
+ }
+ else if (event->type == PTL_EVENT_ACK && op->u.put.send) {
+ MPIU_Free(op->u.put.send);
+ op->u.put.send = NULL;
+ }
+ else {
+ ret = stash_event(op, *event);
+ RPTLU_ERR_POP(ret, "error stashing event\n");
+ }
+ }
+
+ if (op->op_type == RPTL_OP_PUT)
+ tmp_ret = alloc_target(op->u.put.target_id, RPTL_TARGET_STATE_DISABLED, NULL);
+ else
+ tmp_ret = alloc_target(op->u.get.target_id, RPTL_TARGET_STATE_DISABLED, NULL);
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from alloc_target\n");
+ }
+ }
+
+ /* if this is a REPLY event, we are done with this op */
+ else if (event->type == PTL_EVENT_REPLY) {
+ assert(op->op_type == RPTL_OP_GET);
+
+ event->user_ptr = op->u.get.user_ptr;
+ MPL_DL_DELETE(rptl_info.op_list, op);
+ free_op(op);
+ }
+
+ else if (event->type == PTL_EVENT_SEND && op->u.put.ack) {
+ assert(op->op_type == RPTL_OP_PUT);
+
+ /* we already got the other event we needed earlier. mark
+ * the op events as ready and return this current event to
+ * the user. */
+ op->events_ready = 1;
+ event->user_ptr = op->u.put.user_ptr;
+
+ /* if flow control is not set, ignore events */
+ if (op->u.put.flow_control == 0) {
+ retrieve_event(rptl, op, event);
+ ret = PTL_EQ_EMPTY;
+ }
+ }
+
+ else if (event->type == PTL_EVENT_ACK && op->u.put.send) {
+ assert(op->op_type == RPTL_OP_PUT);
+
+ /* we already got the other event we needed earlier. mark
+ * the op events as ready and return this current event to
+ * the user. */
+ op->events_ready = 1;
+ event->user_ptr = op->u.put.user_ptr;
+
+ /* if flow control is not set, ignore events */
+ if (op->u.put.flow_control == 0) {
+ retrieve_event(rptl, op, event);
+ ret = PTL_EQ_EMPTY;
+ }
+
+ /* if the user asked for an ACK, just return this event.
+ * if not, discard this event and retrieve the send
+ * event. */
+ else if (!(op->u.put.ack_req & PTL_ACK_REQ))
+ retrieve_event(rptl, op, event);
+ }
+
+ else {
+ assert(!(event->type == PTL_EVENT_SEND && op->u.put.send));
+ assert(!(event->type == PTL_EVENT_ACK && op->u.put.ack));
+
+ /* stash this event as we need to wait for the buddy event
+ * as well before returning to the user */
+ ret = stash_event(op, *event);
+ RPTLU_ERR_POP(ret, "error stashing event\n");
+ ret = PTL_EQ_EMPTY;
+ }
+ }
+
+ fn_exit:
+ MPIU_CHKPMEM_COMMIT();
+ MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
+ return ret;
+
+ fn_fail:
+ if (mpi_errno)
+ ret = PTL_FAIL;
+ MPIU_CHKPMEM_REAP();
+ goto fn_exit;
+}
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
new file mode 100644
index 0000000..359e24f
--- /dev/null
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
@@ -0,0 +1,149 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ * (C) 2014 by Argonne National Laboratory.
+ * See COPYRIGHT in top-level directory.
+ */
+
+#if !defined RPTL_H_INCLUDED
+#define RPTL_H_INCLUDED
+
+#if !defined HAVE_MACRO_VA_ARGS
+#error "portals requires VA_ARGS support"
+#endif /* HAVE_MACRO_VA_ARGS */
+
+#if defined HAVE__FUNC__
+#define RPTLU_FUNC __func__
+#elif defined HAVE_CAP__FUNC__
+#define RPTLU_FUNC __FUNC__
+#elif defined HAVE__FUNCTION__
+#define RPTLU_FUNC __FUNCTION__
+#else
+#define RPTLU_FUNC "Unknown"
+#endif
+
+#define RPTLU_ERR_POP(ret, ...) \
+ { \
+ if (ret) { \
+ MPIU_Error_printf("%s (%d): ", RPTLU_FUNC, __LINE__); \
+ MPIU_Error_printf(__VA_ARGS__); \
+ goto fn_fail; \
+ } \
+ }
+
+struct rptl_op {
+ enum {
+ RPTL_OP_PUT,
+ RPTL_OP_GET
+ } op_type;
+
+ enum {
+ RPTL_OP_STATE_QUEUED,
+ RPTL_OP_STATE_ISSUED,
+ RPTL_OP_STATE_NACKED
+ } state;
+
+ union {
+ struct {
+ ptl_handle_md_t md_handle;
+ ptl_size_t local_offset;
+ ptl_size_t length;
+ ptl_ack_req_t ack_req;
+ ptl_process_t target_id;
+ ptl_pt_index_t pt_index;
+ ptl_match_bits_t match_bits;
+ ptl_size_t remote_offset;
+ void *user_ptr;
+ ptl_hdr_data_t hdr_data;
+
+ /* internal variables store events */
+ ptl_event_t *send;
+ ptl_event_t *ack;
+ int flow_control;
+ } put;
+ struct {
+ ptl_handle_md_t md_handle;
+ ptl_size_t local_offset;
+ ptl_size_t length;
+ ptl_process_t target_id;
+ ptl_pt_index_t pt_index;
+ ptl_match_bits_t match_bits;
+ ptl_size_t remote_offset;
+ void *user_ptr;
+ } get;
+ } u;
+
+ int events_ready;
+
+ struct rptl_op *next;
+ struct rptl_op *prev;
+};
+
+#define RPTL_CONTROL_MSG_PAUSE (0)
+#define RPTL_CONTROL_MSG_PAUSE_ACK (1)
+#define RPTL_CONTROL_MSG_UNPAUSE (2)
+
+struct rptl {
+ /* local portal state */
+ enum {
+ RPTL_LOCAL_STATE_NORMAL,
+ RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS
+ } local_state;
+ uint64_t pause_ack_counter;
+
+ struct {
+ ptl_handle_eq_t eq;
+ ptl_pt_index_t pt; /* primary pt for data exchange */
+
+ /* ob_max_count refers to the number of messages that were in
+ * the overflow buffer when the pt was disabled */
+ uint64_t ob_max_count;
+
+ /* ob_curr_count refers to the current tally of messages in
+ * the overflow buffer. if we are in disabled state, when
+ * this count reaches half of the maximum count, we are ready
+ * to reenable the PT. */
+ uint64_t ob_curr_count;
+ } data;
+
+ struct {
+ ptl_pt_index_t pt; /* pt for control messages */
+
+ /* the remaining contents of the control structure are only
+ * valid when the control.pt field is not PTL_PT_ANY */
+ ptl_handle_me_t *me;
+ int me_idx;
+ } control;
+
+ ptl_handle_ni_t ni;
+ ptl_handle_eq_t eq;
+ ptl_handle_md_t md;
+
+ struct rptl *next;
+ struct rptl *prev;
+};
+
+int MPID_nem_ptl_rptl_init(int world_size, uint64_t max_origin_events,
+ int (*get_target_info) (int rank, ptl_process_t * id,
+ ptl_pt_index_t local_data_pt,
+ ptl_pt_index_t * target_data_pt,
+ ptl_pt_index_t * target_control_pt));
+
+int MPID_nem_ptl_rptl_drain_eq(int eq_count, ptl_handle_eq_t *eq);
+
+int MPID_nem_ptl_rptl_ptinit(ptl_handle_ni_t ni_handle, ptl_handle_eq_t eq_handle, ptl_pt_index_t data_pt,
+ ptl_pt_index_t control_pt);
+
+int MPID_nem_ptl_rptl_ptfini(ptl_pt_index_t pt_index);
+
+int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
+ ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
+ ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
+ ptl_hdr_data_t hdr_data, int flow_control);
+
+int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
+ ptl_process_t target_id, ptl_pt_index_t pt_index,
+ ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr);
+
+int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event);
+
+#endif /* RPTL_H_INCLUDED */
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/subconfigure.m4 b/src/mpid/ch3/channels/nemesis/netmod/portals4/subconfigure.m4
index 5e98951..1b4db86 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/subconfigure.m4
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/subconfigure.m4
@@ -14,6 +14,8 @@ AC_DEFUN([PAC_SUBCFG_BODY_]PAC_SUBCFG_AUTO_SUFFIX,[
AM_COND_IF([BUILD_NEMESIS_NETMOD_PORTALS4],[
AC_MSG_NOTICE([RUNNING CONFIGURE FOR ch3:nemesis:portals4])
+ PAC_CC_FUNCTION_NAME_SYMBOL
+
PAC_SET_HEADER_LIB_PATH(portals4)
PAC_PUSH_FLAG(LIBS)
PAC_CHECK_HEADER_LIB_FATAL(portals4, portals4.h, portals, PtlInit)
-----------------------------------------------------------------------
Summary of changes:
.../channels/nemesis/netmod/portals4/Makefile.mk | 6 +-
.../channels/nemesis/netmod/portals4/ptl_impl.h | 6 +-
.../channels/nemesis/netmod/portals4/ptl_init.c | 102 ++-
.../ch3/channels/nemesis/netmod/portals4/ptl_nm.c | 27 +-
.../channels/nemesis/netmod/portals4/ptl_poll.c | 3 +-
.../channels/nemesis/netmod/portals4/ptl_recv.c | 15 +-
.../channels/nemesis/netmod/portals4/ptl_send.c | 31 +-
.../ch3/channels/nemesis/netmod/portals4/rptl.c | 1272 ++++++++++++++++++++
.../ch3/channels/nemesis/netmod/portals4/rptl.h | 149 +++
.../nemesis/netmod/portals4/subconfigure.m4 | 2 +
10 files changed, 1571 insertions(+), 42 deletions(-)
create mode 100644 src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
create mode 100644 src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
hooks/post-receive
--
MPICH primary repository
More information about the commits
mailing list