[mpich-commits] [mpich] MPICH primary repository branch, master, updated. v3.1.3-193-ged81938
Service Account
noreply at mpich.org
Wed Nov 12 20:12:27 CST 2014
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "MPICH primary repository".
The branch, master has been updated
via ed819382454bfd187f30d0861df35bb919cb93a8 (commit)
via f67c19850e3ba739189ed8ed15a097aca743d161 (commit)
via aa350fba3064f119061e94056d3d04083e5382a6 (commit)
via f68228a9f5ef63ef2062520ac66decc16b20f513 (commit)
via 5087383555964e15106cc66123a4c084f5326dff (commit)
via a839daf81cb8f044a0090eee537a1e83930582e4 (commit)
from 504619783b12672cf2329768f3bb9688790d6cb2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
http://git.mpich.org/mpich.git/commitdiff/ed819382454bfd187f30d0861df35bb919cb93a8
commit ed819382454bfd187f30d0861df35bb919cb93a8
Author: Antonio Pena Monferrer <apenya at mcs.anl.gov>
Date: Wed Nov 12 19:55:23 2014 -0600
Code cleanup to fix compiler warnings in rportals
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
index af67c47..4de135c 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
@@ -128,7 +128,6 @@ int MPID_nem_ptl_rptl_init(int world_size, uint64_t max_origin_events,
ptl_pt_index_t * target_data_pt,
ptl_pt_index_t * target_control_pt))
{
- int mpi_errno = MPI_SUCCESS;
int ret = PTL_OK;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_INIT);
@@ -393,7 +392,7 @@ static int alloc_op(struct rptl_op **op, struct rptl_target *target)
#define FUNCNAME free_op
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-void free_op(struct rptl_op *op)
+static void free_op(struct rptl_op *op)
{
MPIDI_STATE_DECL(MPID_STATE_FREE_OP);
@@ -414,7 +413,7 @@ static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size
#define FUNCNAME poke_progress
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int poke_progress(void)
+static int poke_progress(void)
{
int ret = PTL_OK;
struct rptl_target *target;
@@ -965,13 +964,11 @@ static int retrieve_event(ptl_event_t * event)
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
{
- struct rptl_op *op;
- struct rptl *rptl;
- ptl_event_t e;
+ struct rptl_op *op = NULL;
+ struct rptl *rptl = NULL;
int ret = PTL_OK, tmp_ret = PTL_OK;
int mpi_errno = MPI_SUCCESS;
struct rptl_target *target;
- MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
@@ -1208,13 +1205,11 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
}
fn_exit:
- MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
return ret;
fn_fail:
if (mpi_errno)
ret = PTL_FAIL;
- MPIU_CHKPMEM_REAP();
goto fn_exit;
}
http://git.mpich.org/mpich.git/commitdiff/f67c19850e3ba739189ed8ed15a097aca743d161
commit f67c19850e3ba739189ed8ed15a097aca743d161
Author: Pavan Balaji <balaji at anl.gov>
Date: Wed Nov 12 13:51:16 2014 -0600
Change "flow_control" to data/control portal type.
The terminology "flow_control" was a bit of a misnomer since we do
more than just enable/disable flow control based on whether messages
are on the data or control portal.
Signed-off-by: Antonio Pena Monferrer <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
index e6b4de6..af67c47 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
@@ -408,7 +408,7 @@ void free_op(struct rptl_op *op)
static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
- ptl_hdr_data_t hdr_data, int flow_control);
+ ptl_hdr_data_t hdr_data, enum rptl_pt_type pt_type);
#undef FUNCNAME
#define FUNCNAME poke_progress
@@ -458,9 +458,8 @@ int poke_progress(void)
/* make sure the user setup a control portal */
assert(control_pt != PTL_PT_ANY);
- /* disable flow control for control messages */
ret = rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt,
- 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, 0);
+ 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, RPTL_PT_CONTROL);
RPTLU_ERR_POP(ret, "Error sending unpause message\n");
}
}
@@ -494,9 +493,8 @@ int poke_progress(void)
target->state = RPTL_TARGET_STATE_PAUSE_ACKED;
- /* disable flow control for control messages */
ret = rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0,
- 0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, 0);
+ 0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, RPTL_PT_CONTROL);
RPTLU_ERR_POP(ret, "Error sending pause ack message\n");
continue;
@@ -611,7 +609,7 @@ int poke_progress(void)
static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
- ptl_hdr_data_t hdr_data, int flow_control)
+ ptl_hdr_data_t hdr_data, enum rptl_pt_type pt_type)
{
struct rptl_op *op;
int ret = PTL_OK;
@@ -644,11 +642,11 @@ static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size
/* place to store the send and ack events */
op->u.put.send = NULL;
op->u.put.ack = NULL;
- op->u.put.flow_control = flow_control;
+ op->u.put.pt_type = pt_type;
op->events_ready = 0;
op->target = target;
- if (op->u.put.flow_control)
+ if (op->u.put.pt_type == RPTL_PT_DATA)
MPL_DL_APPEND(target->data_op_list, op);
else
MPL_DL_APPEND(target->control_op_list, op);
@@ -675,7 +673,7 @@ int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
ptl_hdr_data_t hdr_data)
{
return rptl_put(md_handle, local_offset, length, ack_req, target_id, pt_index, match_bits,
- remote_offset, user_ptr, hdr_data, 1);
+ remote_offset, user_ptr, hdr_data, RPTL_PT_DATA);
}
@@ -761,9 +759,8 @@ static int send_pause_messages(struct rptl *rptl)
/* make sure the user setup a control portal */
assert(control_pt != PTL_PT_ANY);
- /* disable flow control for control messages */
ret = rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0,
- NULL, RPTL_CONTROL_MSG_PAUSE, 0);
+ NULL, RPTL_CONTROL_MSG_PAUSE, RPTL_PT_CONTROL);
RPTLU_ERR_POP(ret, "Error sending pause message\n");
}
@@ -1099,7 +1096,7 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
/* we should not get NACKs on the control portal */
if (event->type == PTL_EVENT_ACK)
- assert(op->u.put.flow_control);
+ assert(op->u.put.pt_type == RPTL_PT_DATA);
op->state = RPTL_OP_STATE_NACKED;
@@ -1159,8 +1156,9 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
op->events_ready = 1;
event->user_ptr = op->u.put.user_ptr;
- /* if flow control is not set, ignore the ACK event */
- if (op->u.put.flow_control == 0) {
+ /* if the message is over the control portal, ignore the
+ * ACK event */
+ if (op->u.put.pt_type == RPTL_PT_CONTROL) {
MPIU_Free(op->u.put.ack);
MPL_DL_DELETE(op->target->control_op_list, op);
free_op(op);
@@ -1177,8 +1175,9 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
op->events_ready = 1;
event->user_ptr = op->u.put.user_ptr;
- /* if flow control is not set, ignore ACK event */
- if (op->u.put.flow_control == 0) {
+ /* if the message is over the control portal, ignore ACK
+ * event */
+ if (op->u.put.pt_type == RPTL_PT_CONTROL) {
MPIU_Free(op->u.put.send);
MPL_DL_DELETE(op->target->control_op_list, op);
free_op(op);
@@ -1190,7 +1189,7 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
else if (!(op->u.put.ack_req & PTL_ACK_REQ)) {
memcpy(event, op->u.put.send, sizeof(ptl_event_t));
MPIU_Free(op->u.put.send);
- /* flow control is set, we should be in the data op list */
+ /* we should be in the data op list */
MPL_DL_DELETE(op->target->data_op_list, op);
free_op(op);
}
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
index 08e95e7..7ce31d9 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
@@ -30,6 +30,11 @@
} \
}
+enum rptl_pt_type {
+ RPTL_PT_DATA,
+ RPTL_PT_CONTROL
+};
+
struct rptl_target;
struct rptl_op {
enum {
@@ -59,7 +64,7 @@ struct rptl_op {
/* internal variables store events */
ptl_event_t *send;
ptl_event_t *ack;
- int flow_control;
+ enum rptl_pt_type pt_type;
} put;
struct {
ptl_handle_md_t md_handle;
http://git.mpich.org/mpich.git/commitdiff/aa350fba3064f119061e94056d3d04083e5382a6
commit aa350fba3064f119061e94056d3d04083e5382a6
Author: Pavan Balaji <balaji at anl.gov>
Date: Wed Nov 12 13:45:36 2014 -0600
Do not expose the flowcontrol parameter to the user.
Signed-off-by: Antonio Pena Monferrer <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
index 996f4be..481b88b 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
@@ -245,7 +245,7 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
goto fn_fail;
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sendbuf, sendbuf_sz, PTL_NO_ACK_REQ,
- vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank, 1);
+ vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x)",
@@ -308,7 +308,7 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
goto fn_fail;
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sendbuf, sendbuf_sz, PTL_NO_ACK_REQ,
- vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank, 1);
+ vc_ptl->id, vc_ptl->ptc, CTL_TAG, 0, sreq, MPIDI_Process.my_pg_rank);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x)",
@@ -461,7 +461,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
MPIU_ERR_POP(mpi_errno);
/* Notify we're done */
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
- DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank, 1);
+ DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
@@ -531,7 +531,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
/* Notify we're done */
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
- DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank, 1);
+ DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c
index a6f7d4b..51c0016 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c
@@ -267,7 +267,7 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "&REQ_PTL(sreq)->event_handler = %p", &(REQ_PTL(sreq)->event_handler));
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag, data_sz), 1);
+ NPTL_HEADER(ssend_flag, data_sz));
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "id.nid = %#x", vc_ptl->id.phys.nid);
@@ -304,7 +304,7 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->event_handler = handler_send_complete;
ret = MPID_nem_ptl_rptl_put(REQ_PTL(sreq)->md, 0, data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag, data_sz), 1);
+ NPTL_HEADER(ssend_flag, data_sz));
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("sreq", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
goto fn_exit;
@@ -321,7 +321,7 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->event_handler = handler_send_complete;
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], data_sz, PTL_ACK_REQ,
vc_ptl->id, vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag, data_sz), 1);
+ NPTL_HEADER(ssend_flag, data_sz));
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
goto fn_exit;
@@ -338,7 +338,7 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->event_handler = handler_large;
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
+ NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
@@ -408,7 +408,7 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->event_handler = handler_large;
ret = MPID_nem_ptl_rptl_put(REQ_PTL(sreq)->md, 0, PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
- NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
+ NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("req", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
@@ -432,7 +432,7 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->event_handler = handler_large;
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], PTL_LARGE_THRESHOLD,
PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank),
- 0, sreq, NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
+ 0, sreq, NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
index 210bfdc..e6b4de6 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
@@ -405,6 +405,11 @@ void free_op(struct rptl_op *op)
}
+static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
+ ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
+ ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
+ ptl_hdr_data_t hdr_data, int flow_control);
+
#undef FUNCNAME
#define FUNCNAME poke_progress
#undef FCNAME
@@ -454,8 +459,8 @@ int poke_progress(void)
assert(control_pt != PTL_PT_ANY);
/* disable flow control for control messages */
- ret = MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt,
- 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, 0);
+ ret = rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt,
+ 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, 0);
RPTLU_ERR_POP(ret, "Error sending unpause message\n");
}
}
@@ -490,7 +495,7 @@ int poke_progress(void)
target->state = RPTL_TARGET_STATE_PAUSE_ACKED;
/* disable flow control for control messages */
- ret = MPID_nem_ptl_rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0,
+ ret = rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0,
0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, 0);
RPTLU_ERR_POP(ret, "Error sending pause ack message\n");
@@ -600,20 +605,20 @@ int poke_progress(void)
#undef FUNCNAME
-#define FUNCNAME MPID_nem_ptl_rptl_put
+#define FUNCNAME rptl_put
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
- ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
- ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
- ptl_hdr_data_t hdr_data, int flow_control)
+static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
+ ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
+ ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
+ ptl_hdr_data_t hdr_data, int flow_control)
{
struct rptl_op *op;
int ret = PTL_OK;
struct rptl_target *target;
- MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
+ MPIDI_STATE_DECL(MPID_STATE_RPTL_PUT);
- MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
+ MPIDI_FUNC_ENTER(MPID_STATE_RPTL_PUT);
ret = find_target(target_id, &target);
RPTLU_ERR_POP(ret, "error finding target structure\n");
@@ -652,7 +657,7 @@ int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
RPTLU_ERR_POP(ret, "Error from poke_progress\n");
fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
+ MPIDI_FUNC_EXIT(MPID_STATE_RPTL_PUT);
return ret;
fn_fail:
@@ -661,6 +666,20 @@ int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
#undef FUNCNAME
+#define FUNCNAME rptl_put
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
+ ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
+ ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
+ ptl_hdr_data_t hdr_data)
+{
+ return rptl_put(md_handle, local_offset, length, ack_req, target_id, pt_index, match_bits,
+ remote_offset, user_ptr, hdr_data, 1);
+}
+
+
+#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_rptl_get
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
@@ -743,7 +762,7 @@ static int send_pause_messages(struct rptl *rptl)
assert(control_pt != PTL_PT_ANY);
/* disable flow control for control messages */
- ret = MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0,
+ ret = rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0,
NULL, RPTL_CONTROL_MSG_PAUSE, 0);
RPTLU_ERR_POP(ret, "Error sending pause message\n");
}
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
index c5f1254..08e95e7 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
@@ -171,7 +171,7 @@ int MPID_nem_ptl_rptl_ptfini(ptl_pt_index_t pt_index);
int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index,
ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr,
- ptl_hdr_data_t hdr_data, int flow_control);
+ ptl_hdr_data_t hdr_data);
int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length,
ptl_process_t target_id, ptl_pt_index_t pt_index,
http://git.mpich.org/mpich.git/commitdiff/f68228a9f5ef63ef2062520ac66decc16b20f513
commit f68228a9f5ef63ef2062520ac66decc16b20f513
Author: Pavan Balaji <balaji at anl.gov>
Date: Tue Nov 4 22:03:36 2014 -0600
Several updates to the rportals code.
We now use a target structure for each target ID that we want to send
data to. This allows us to separate out target-specific states and
more cleanly manage operations to a single target.
Signed-off-by: Antonio Pena Monferrer <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
index 60e8db8..996f4be 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c
@@ -461,7 +461,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
MPIU_ERR_POP(mpi_errno);
/* Notify we're done */
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
- DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
+ DONE_TAG(recvbufs[buf_idx].tag), 0, done_req, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
@@ -531,7 +531,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
/* Notify we're done */
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc,
- DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank, 0);
+ DONE_TAG(rreq->dev.match.parts.tag), 0, done_req, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
index da126e4..210bfdc 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
@@ -60,39 +60,9 @@
#define IDS_ARE_EQUAL(t1, t2) \
(t1.phys.nid == t2.phys.nid && t1.phys.pid == t2.phys.pid)
-#define RPTL_OP_POOL_SEGMENT_COUNT (1024)
-
static struct {
struct rptl *rptl_list;
-
- struct rptl_op_pool_segment {
- struct rptl_op op[RPTL_OP_POOL_SEGMENT_COUNT];
- struct rptl_op_pool_segment *next;
- struct rptl_op_pool_segment *prev;
- } *op_segment_list;
- struct rptl_op *op_pool;
-
- struct rptl_op *op_list;
-
- /* targets that we do not send messages to either because they
- * sent a PAUSE message or because we received a NACK from them */
- struct rptl_paused_target {
- ptl_process_t id;
- enum rptl_paused_target_state {
- RPTL_TARGET_STATE_FLOWCONTROL,
- RPTL_TARGET_STATE_DISABLED,
- RPTL_TARGET_STATE_RECEIVED_PAUSE,
- RPTL_TARGET_STATE_PAUSE_ACKED
- } state;
-
- /* the rptl on which the pause message came in, since we need
- * to use it to send the pause ack to the right target
- * portal */
- struct rptl *rptl;
-
- struct rptl_paused_target *next;
- struct rptl_paused_target *prev;
- } *paused_target_list;
+ struct rptl_target *target_list;
int world_size;
uint64_t origin_events_left;
@@ -102,82 +72,42 @@ static struct {
#undef FUNCNAME
-#define FUNCNAME alloc_target
+#define FUNCNAME find_target
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int alloc_target(ptl_process_t id, enum rptl_paused_target_state state, struct rptl *rptl)
+static int find_target(ptl_process_t id, struct rptl_target **target)
{
int mpi_errno = MPI_SUCCESS;
int ret = PTL_OK;
- struct rptl_paused_target *target;
+ struct rptl_target *t;
MPIU_CHKPMEM_DECL(1);
- MPIDI_STATE_DECL(MPID_STATE_ALLOC_TARGET);
+ MPIDI_STATE_DECL(MPID_STATE_FIND_TARGET);
- MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_TARGET);
+ MPIDI_FUNC_ENTER(MPID_STATE_FIND_TARGET);
- for (target = rptl_info.paused_target_list; target; target = target->next)
- if (IDS_ARE_EQUAL(target->id, id))
+ for (t = rptl_info.target_list; t; t = t->next)
+ if (IDS_ARE_EQUAL(t->id, id))
break;
- /* if a paused target does not already exist, create one */
- if (target == NULL) {
- /* create a new paused target */
- MPIU_CHKPMEM_MALLOC(target, struct rptl_paused_target *, sizeof(struct rptl_paused_target),
- mpi_errno, "rptl paused target");
- MPL_DL_APPEND(rptl_info.paused_target_list, target);
-
- target->id = id;
- target->state = state;
- target->rptl = rptl;
- }
- else if (target->state < state) {
- target->state = state;
- target->rptl = rptl;
- }
- else {
- /* target already exists and is in a higher state than the
- * state we are trying to set. e.g., this is possible if we
- * got a PAUSE event from a different portal and acked. */
+ /* if the target does not already exist, create one */
+ if (t == NULL) {
+ MPIU_CHKPMEM_MALLOC(t, struct rptl_target *, sizeof(struct rptl_target), mpi_errno, "rptl target");
+ MPL_DL_APPEND(rptl_info.target_list, t);
+
+ t->id = id;
+ t->state = RPTL_TARGET_STATE_ACTIVE;
+ t->rptl = NULL;
+ t->op_segment_list = NULL;
+ t->op_pool = NULL;
+ t->data_op_list = NULL;
+ t->control_op_list = NULL;
}
- fn_exit:
- MPIU_CHKPMEM_COMMIT();
- MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_TARGET);
- return ret;
-
- fn_fail:
- if (mpi_errno)
- ret = PTL_FAIL;
- MPIU_CHKPMEM_REAP();
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME alloc_op_segment
-#undef FCNAME
-#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int alloc_op_segment(void)
-{
- struct rptl_op_pool_segment *op_segment;
- int mpi_errno = MPI_SUCCESS;
- int i;
- int ret = PTL_OK;
- MPIU_CHKPMEM_DECL(1);
- MPIDI_STATE_DECL(MPID_STATE_ALLOC_OP_SEGMENT);
-
- MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_OP_SEGMENT);
-
- MPIU_CHKPMEM_MALLOC(op_segment, struct rptl_op_pool_segment *, sizeof(struct rptl_op_pool_segment),
- mpi_errno, "op pool segment");
- MPL_DL_APPEND(rptl_info.op_segment_list, op_segment);
-
- for (i = 0; i < RPTL_OP_POOL_SEGMENT_COUNT; i++)
- MPL_DL_APPEND(rptl_info.op_pool, &op_segment->op[i]);
+ *target = t;
fn_exit:
MPIU_CHKPMEM_COMMIT();
- MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_OP_SEGMENT);
+ MPIDI_FUNC_EXIT(MPID_STATE_FIND_TARGET);
return ret;
fn_fail:
@@ -205,14 +135,8 @@ int MPID_nem_ptl_rptl_init(int world_size, uint64_t max_origin_events,
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_INIT);
rptl_info.rptl_list = NULL;
+ rptl_info.target_list = NULL;
- rptl_info.op_pool = NULL;
- ret = alloc_op_segment();
- RPTLU_ERR_POP(ret, "error allocating op segment\n");
-
- rptl_info.op_list = NULL;
-
- rptl_info.paused_target_list = NULL;
rptl_info.world_size = world_size;
rptl_info.origin_events_left = max_origin_events;
rptl_info.get_target_info = get_target_info;
@@ -236,24 +160,36 @@ int MPID_nem_ptl_rptl_drain_eq(int eq_count, ptl_handle_eq_t *eq)
ptl_event_t event;
struct rptl_op_pool_segment *op_segment;
int i;
+ struct rptl_target *target, *t;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_FINALIZE);
- while (rptl_info.op_list) {
- for (i = 0; i < eq_count; i++) {
- /* read and ignore all events */
- ret = MPID_nem_ptl_rptl_eqget(eq[i], &event);
- if (ret == PTL_EQ_EMPTY)
- ret = PTL_OK;
- RPTLU_ERR_POP(ret, "Error calling MPID_nem_ptl_rptl_eqget\n");
+ for (target = rptl_info.target_list; target; target = target->next) {
+ while (target->control_op_list || target->data_op_list) {
+ for (i = 0; i < eq_count; i++) {
+ /* read and ignore all events */
+ ret = MPID_nem_ptl_rptl_eqget(eq[i], &event);
+ if (ret == PTL_EQ_EMPTY)
+ ret = PTL_OK;
+ RPTLU_ERR_POP(ret, "Error calling MPID_nem_ptl_rptl_eqget\n");
+ }
}
}
- while (rptl_info.op_segment_list) {
- op_segment = rptl_info.op_segment_list;
- MPL_DL_DELETE(rptl_info.op_segment_list, op_segment);
- MPIU_Free(op_segment);
+ for (target = rptl_info.target_list; target;) {
+ assert(target->data_op_list == NULL);
+ assert(target->control_op_list == NULL);
+
+ while (target->op_segment_list) {
+ op_segment = target->op_segment_list;
+ MPL_DL_DELETE(target->op_segment_list, op_segment);
+ MPIU_Free(op_segment);
+ }
+
+ t = target->next;
+ MPIU_Free(target);
+ target = t;
}
fn_exit:
@@ -328,7 +264,7 @@ int MPID_nem_ptl_rptl_ptinit(ptl_handle_ni_t ni_handle, ptl_handle_eq_t eq_handl
MPIU_CHKPMEM_MALLOC(rptl, struct rptl *, sizeof(struct rptl), mpi_errno, "rptl");
MPL_DL_APPEND(rptl_info.rptl_list, rptl);
- rptl->local_state = RPTL_LOCAL_STATE_NORMAL;
+ rptl->local_state = RPTL_LOCAL_STATE_ACTIVE;
rptl->pause_ack_counter = 0;
rptl->data.ob_max_count = 0;
@@ -415,26 +351,40 @@ int MPID_nem_ptl_rptl_ptfini(ptl_pt_index_t pt_index)
#define FUNCNAME alloc_op
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int alloc_op(struct rptl_op **op)
+static int alloc_op(struct rptl_op **op, struct rptl_target *target)
{
int ret = PTL_OK;
+ struct rptl_op_pool_segment *op_segment;
+ int mpi_errno = MPI_SUCCESS;
+ int i;
+ MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_ALLOC_OP);
MPIDI_FUNC_ENTER(MPID_STATE_ALLOC_OP);
- if (rptl_info.op_pool == NULL) {
- ret = alloc_op_segment();
- RPTLU_ERR_POP(ret, "error allocating op segment\n");
+ assert(target);
+
+ if (target->op_pool == NULL) {
+ MPIU_CHKPMEM_MALLOC(op_segment, struct rptl_op_pool_segment *, sizeof(struct rptl_op_pool_segment),
+ mpi_errno, "op pool segment");
+ MPL_DL_APPEND(target->op_segment_list, op_segment);
+
+ for (i = 0; i < RPTL_OP_POOL_SEGMENT_COUNT; i++)
+ MPL_DL_APPEND(target->op_pool, &op_segment->op[i]);
}
- *op = rptl_info.op_pool;
- MPL_DL_DELETE(rptl_info.op_pool, *op);
+ *op = target->op_pool;
+ MPL_DL_DELETE(target->op_pool, *op);
fn_exit:
+ MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_ALLOC_OP);
return ret;
fn_fail:
+ if (mpi_errno)
+ ret = PTL_FAIL;
+ MPIU_CHKPMEM_REAP();
goto fn_exit;
}
@@ -449,74 +399,199 @@ void free_op(struct rptl_op *op)
MPIDI_FUNC_ENTER(MPID_STATE_FREE_OP);
- MPL_DL_APPEND(rptl_info.op_pool, op);
+ MPL_DL_APPEND(op->target->op_pool, op);
MPIDI_FUNC_EXIT(MPID_STATE_FREE_OP);
}
#undef FUNCNAME
-#define FUNCNAME issue_op
+#define FUNCNAME poke_progress
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int issue_op(struct rptl_op *op)
+int poke_progress(void)
{
int ret = PTL_OK;
- struct rptl_paused_target *target;
- MPIDI_STATE_DECL(MPID_STATE_ISSUE_OP);
+ struct rptl_target *target;
+ struct rptl_op *op;
+ struct rptl *rptl;
+ int i;
+ int mpi_errno = MPI_SUCCESS;
+ ptl_process_t id;
+ ptl_pt_index_t data_pt, control_pt;
+ MPIDI_STATE_DECL(MPID_STATE_POKE_PROGRESS);
- MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_OP);
+ MPIDI_FUNC_ENTER(MPID_STATE_POKE_PROGRESS);
- if (op->op_type == RPTL_OP_PUT) {
- for (target = rptl_info.paused_target_list; target; target = target->next)
- if (IDS_ARE_EQUAL(target->id, op->u.put.target_id))
- break;
+ /* make progress on local RPTLs */
+ for (rptl = rptl_info.rptl_list; rptl; rptl = rptl->next) {
+ /* if the local state is active, there's nothing to do */
+ if (rptl->local_state == RPTL_LOCAL_STATE_ACTIVE)
+ continue;
- if (target && op->u.put.flow_control)
- goto fn_exit;
+ /* if we are in a local AWAITING PAUSE ACKS state, see if we
+ * can send out the unpause message */
+ if (rptl->local_state == RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS &&
+ rptl->pause_ack_counter == rptl_info.world_size) {
+ /* if we are over the max count limit, do not send an
+ * unpause message yet */
+ if (rptl->data.ob_curr_count > rptl->data.ob_max_count)
+ continue;
- if (rptl_info.origin_events_left < 2) {
- ret = alloc_target(op->u.put.target_id, RPTL_TARGET_STATE_FLOWCONTROL, NULL);
- RPTLU_ERR_POP(ret, "error allocating paused target\n");
- goto fn_exit;
+ ret = PtlPTEnable(rptl->ni, rptl->data.pt);
+ RPTLU_ERR_POP(ret, "Error returned while reenabling PT\n");
+
+ rptl->local_state = RPTL_LOCAL_STATE_ACTIVE;
+
+ for (i = 0; i < rptl_info.world_size; i++) {
+ mpi_errno = rptl_info.get_target_info(i, &id, rptl->data.pt, &data_pt, &control_pt);
+ if (mpi_errno) {
+ ret = PTL_FAIL;
+ RPTLU_ERR_POP(ret, "Error getting target info\n");
+ }
+
+ /* make sure the user setup a control portal */
+ assert(control_pt != PTL_PT_ANY);
+
+ /* disable flow control for control messages */
+ ret = MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt,
+ 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, 0);
+ RPTLU_ERR_POP(ret, "Error sending unpause message\n");
+ }
}
- rptl_info.origin_events_left -= 2;
-
- /* force request for an ACK even if the user didn't ask for
- * it. replace the user pointer with the OP id. */
- ret =
- PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length,
- PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index,
- op->u.put.match_bits, op->u.put.remote_offset, op,
- op->u.put.hdr_data);
- RPTLU_ERR_POP(ret, "Error issuing PUT\n");
}
- else {
- for (target = rptl_info.paused_target_list; target; target = target->next)
- if (IDS_ARE_EQUAL(target->id, op->u.get.target_id))
+
+ /* make progress on targets */
+ for (target = rptl_info.target_list; target; target = target->next) {
+ if (target->state == RPTL_TARGET_STATE_RECEIVED_PAUSE) {
+ for (op = target->data_op_list; op; op = op->next)
+ if (op->state == RPTL_OP_STATE_ISSUED)
+ break;
+ if (op)
+ continue;
+
+ /* send a pause ack message */
+ assert(target->rptl);
+ for (i = 0; i < rptl_info.world_size; i++) {
+ /* find the target that has this target id and get the
+ * control portal information for it */
+ mpi_errno = rptl_info.get_target_info(i, &id, target->rptl->data.pt, &data_pt, &control_pt);
+ if (mpi_errno) {
+ ret = PTL_FAIL;
+ RPTLU_ERR_POP(ret, "Error getting target info\n");
+ }
+ if (IDS_ARE_EQUAL(id, target->id))
+ break;
+ }
+
+ /* make sure the user setup a control portal */
+ assert(control_pt != PTL_PT_ANY);
+
+ target->state = RPTL_TARGET_STATE_PAUSE_ACKED;
+
+ /* disable flow control for control messages */
+ ret = MPID_nem_ptl_rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0,
+ 0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, 0);
+ RPTLU_ERR_POP(ret, "Error sending pause ack message\n");
+
+ continue;
+ }
+
+ /* issue out all the control messages first */
+ for (op = target->control_op_list; op; op = op->next) {
+ assert(op->op_type == RPTL_OP_PUT);
+
+ /* skip all the issued ops */
+ if (op->state == RPTL_OP_STATE_ISSUED)
+ continue;
+
+ /* we should not get any NACKs on the control portal */
+ assert(op->state != RPTL_OP_STATE_NACKED);
+
+ if (rptl_info.origin_events_left < 2) {
+ /* too few origin events left. we can't issue this op
+ * or any following op to this target in order to
+ * maintain ordering */
break;
+ }
- if (target)
- goto fn_exit;
+ rptl_info.origin_events_left -= 2;
- if (rptl_info.origin_events_left < 1) {
- ret = alloc_target(op->u.get.target_id, RPTL_TARGET_STATE_FLOWCONTROL, NULL);
- RPTLU_ERR_POP(ret, "error allocating paused target\n");
- goto fn_exit;
+ /* force request for an ACK even if the user didn't ask
+ * for it. replace the user pointer with the OP id. */
+ ret = PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length,
+ PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index,
+ op->u.put.match_bits, op->u.put.remote_offset, op,
+ op->u.put.hdr_data);
+ RPTLU_ERR_POP(ret, "Error issuing PUT\n");
+
+ op->state = RPTL_OP_STATE_ISSUED;
}
- rptl_info.origin_events_left--;
- ret =
- PtlGet(op->u.get.md_handle, op->u.get.local_offset, op->u.get.length,
- op->u.get.target_id, op->u.get.pt_index, op->u.get.match_bits,
- op->u.get.remote_offset, op);
- RPTLU_ERR_POP(ret, "Error issuing GET\n");
- }
+ if (target->state == RPTL_TARGET_STATE_DISABLED || target->state == RPTL_TARGET_STATE_PAUSE_ACKED)
+ continue;
- op->state = RPTL_OP_STATE_ISSUED;
+ /* then issue out all the data messages */
+ for (op = target->data_op_list; op; op = op->next) {
+ if (op->op_type == RPTL_OP_PUT) {
+ /* skip all the issued ops */
+ if (op->state == RPTL_OP_STATE_ISSUED)
+ continue;
+
+ /* if an op has been nacked, don't issue anything else
+ * to this target */
+ if (op->state == RPTL_OP_STATE_NACKED)
+ break;
+
+ if (rptl_info.origin_events_left < 2) {
+ /* too few origin events left. we can't issue
+ * this op or any following op to this target in
+ * order to maintain ordering */
+ break;
+ }
+
+ rptl_info.origin_events_left -= 2;
+
+ /* force request for an ACK even if the user didn't
+ * ask for it. replace the user pointer with the OP
+ * id. */
+ ret = PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length,
+ PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index,
+ op->u.put.match_bits, op->u.put.remote_offset, op,
+ op->u.put.hdr_data);
+ RPTLU_ERR_POP(ret, "Error issuing PUT\n");
+ }
+ else if (op->op_type == RPTL_OP_GET) {
+ /* skip all the issued ops */
+ if (op->state == RPTL_OP_STATE_ISSUED)
+ continue;
+
+ /* if an op has been nacked, don't issue anything else
+ * to this target */
+ if (op->state == RPTL_OP_STATE_NACKED)
+ break;
+
+ if (rptl_info.origin_events_left < 1) {
+ /* too few origin events left. we can't issue
+ * this op or any following op to this target in
+ * order to maintain ordering */
+ break;
+ }
+
+ rptl_info.origin_events_left--;
+
+ ret = PtlGet(op->u.get.md_handle, op->u.get.local_offset, op->u.get.length,
+ op->u.get.target_id, op->u.get.pt_index, op->u.get.match_bits,
+ op->u.get.remote_offset, op);
+ RPTLU_ERR_POP(ret, "Error issuing GET\n");
+ }
+
+ op->state = RPTL_OP_STATE_ISSUED;
+ }
+ }
fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_OP);
+ MPIDI_FUNC_EXIT(MPID_STATE_POKE_PROGRESS);
return ret;
fn_fail:
@@ -535,11 +610,15 @@ int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
{
struct rptl_op *op;
int ret = PTL_OK;
+ struct rptl_target *target;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
- ret = alloc_op(&op);
+ ret = find_target(target_id, &target);
+ RPTLU_ERR_POP(ret, "error finding target structure\n");
+
+ ret = alloc_op(&op, target);
RPTLU_ERR_POP(ret, "error allocating op\n");
op->op_type = RPTL_OP_PUT;
@@ -562,12 +641,15 @@ int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
op->u.put.ack = NULL;
op->u.put.flow_control = flow_control;
op->events_ready = 0;
+ op->target = target;
- MPL_DL_APPEND(rptl_info.op_list, op);
+ if (op->u.put.flow_control)
+ MPL_DL_APPEND(target->data_op_list, op);
+ else
+ MPL_DL_APPEND(target->control_op_list, op);
- /* if we are not in a PAUSED state, issue the operation */
- ret = issue_op(op);
- RPTLU_ERR_POP(ret, "Error from issue_op\n");
+ ret = poke_progress();
+ RPTLU_ERR_POP(ret, "Error from poke_progress\n");
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_PUT);
@@ -588,12 +670,15 @@ int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
{
struct rptl_op *op;
int ret = PTL_OK;
- struct rptl_paused_target *target;
+ struct rptl_target *target;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
- ret = alloc_op(&op);
+ ret = find_target(target_id, &target);
+ RPTLU_ERR_POP(ret, "error finding target structure\n");
+
+ ret = alloc_op(&op, target);
RPTLU_ERR_POP(ret, "error allocating op\n");
op->op_type = RPTL_OP_GET;
@@ -610,15 +695,12 @@ int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, pt
op->u.get.user_ptr = user_ptr;
op->events_ready = 0;
+ op->target = target;
- MPL_DL_APPEND(rptl_info.op_list, op);
+ MPL_DL_APPEND(target->data_op_list, op);
- for (target = rptl_info.paused_target_list; target; target = target->next)
- if (IDS_ARE_EQUAL(target->id, target_id))
- break;
-
- ret = issue_op(op);
- RPTLU_ERR_POP(ret, "Error from issue_op\n");
+ ret = poke_progress();
+ RPTLU_ERR_POP(ret, "Error from poke_progress\n");
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_GET);
@@ -657,6 +739,9 @@ static int send_pause_messages(struct rptl *rptl)
RPTLU_ERR_POP(ret, "Error getting target info while sending pause messages\n");
}
+ /* make sure the user setup a control portal */
+ assert(control_pt != PTL_PT_ANY);
+
/* disable flow control for control messages */
ret = MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0,
NULL, RPTL_CONTROL_MSG_PAUSE, 0);
@@ -673,167 +758,35 @@ static int send_pause_messages(struct rptl *rptl)
#undef FUNCNAME
-#define FUNCNAME send_pause_ack_messages
+#define FUNCNAME clear_nacks
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int send_pause_ack_messages(void)
+static int clear_nacks(ptl_process_t target_id)
{
+ struct rptl_target *target;
struct rptl_op *op;
int ret = PTL_OK;
- struct rptl_paused_target *target;
- MPIDI_STATE_DECL(MPID_STATE_SEND_PAUSE_ACK_MESSAGES);
+ MPIDI_STATE_DECL(MPID_STATE_CLEAR_NACKS);
- MPIDI_FUNC_ENTER(MPID_STATE_SEND_PAUSE_ACK_MESSAGES);
-
- for (target = rptl_info.paused_target_list; target; target = target->next) {
- if (target->state != RPTL_TARGET_STATE_RECEIVED_PAUSE)
- continue;
-
- for (op = rptl_info.op_list; op; op = op->next) {
- if (op->op_type == RPTL_OP_GET && IDS_ARE_EQUAL(op->u.get.target_id, target->id) &&
- op->state == RPTL_OP_STATE_ISSUED)
- break;
-
- if (op->op_type == RPTL_OP_PUT && IDS_ARE_EQUAL(op->u.put.target_id, target->id)) {
- if (op->state == RPTL_OP_STATE_ISSUED)
- break;
- if (op->u.put.send || op->u.put.ack)
- break;
- }
- }
+ MPIDI_FUNC_ENTER(MPID_STATE_CLEAR_NACKS);
- if (op == NULL) {
- ptl_process_t id;
- ptl_pt_index_t data_pt, control_pt;
- int i;
- int mpi_errno = MPI_SUCCESS;
+ ret = find_target(target_id, &target);
+ RPTLU_ERR_POP(ret, "error finding target\n");
- for (i = 0; i < rptl_info.world_size; i++) {
- /* find the target that has this target id and get the
- * control portal information for it */
- mpi_errno = rptl_info.get_target_info(i, &id, target->rptl->data.pt, &data_pt, &control_pt);
- if (mpi_errno) {
- ret = PTL_FAIL;
- RPTLU_ERR_POP(ret,
- "Error getting target info while sending pause ack message\n");
- }
- if (IDS_ARE_EQUAL(id, target->id))
- break;
- }
-
- /* disable flow control for control messages */
- ret =
- MPID_nem_ptl_rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0,
- 0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, 0);
- RPTLU_ERR_POP(ret, "Error sending pause ack message\n");
-
- if (target->state == RPTL_TARGET_STATE_RECEIVED_PAUSE)
- target->state = RPTL_TARGET_STATE_PAUSE_ACKED;
- }
- }
-
- fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_SEND_PAUSE_ACK_MESSAGES);
- return ret;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME send_unpause_messages
-#undef FCNAME
-#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int send_unpause_messages(void)
-{
- int i, mpi_errno = MPI_SUCCESS;
- ptl_process_t id;
- ptl_pt_index_t data_pt, control_pt;
- int ret = PTL_OK;
- struct rptl *rptl;
- MPIDI_STATE_DECL(MPID_STATE_SEND_UNPAUSE_MESSAGES);
-
- MPIDI_FUNC_ENTER(MPID_STATE_SEND_UNPAUSE_MESSAGES);
-
- for (rptl = rptl_info.rptl_list; rptl; rptl = rptl->next) {
- assert(rptl->local_state != RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS ||
- rptl->control.pt != PTL_PT_ANY);
- if (rptl->control.pt == PTL_PT_ANY)
- continue;
- if (rptl->local_state != RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS)
- continue;
-
- if (rptl->pause_ack_counter == rptl_info.world_size) {
- /* if we are over the max count limit, do not send an
- * unpause message yet */
- if (rptl->data.ob_curr_count > rptl->data.ob_max_count)
- goto fn_exit;
-
- ret = PtlPTEnable(rptl->ni, rptl->data.pt);
- RPTLU_ERR_POP(ret, "Error returned while reenabling PT\n");
-
- rptl->local_state = RPTL_LOCAL_STATE_NORMAL;
-
- for (i = 0; i < rptl_info.world_size; i++) {
- mpi_errno = rptl_info.get_target_info(i, &id, rptl->data.pt, &data_pt, &control_pt);
- if (mpi_errno) {
- ret = PTL_FAIL;
- RPTLU_ERR_POP(ret,
- "Error getting target info while sending unpause messages\n");
- }
-
- /* disable flow control for control messages */
- ret =
- MPID_nem_ptl_rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt,
- 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, 0);
- RPTLU_ERR_POP(ret, "Error sending unpause message\n");
- }
- }
- }
-
- fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_SEND_UNPAUSE_MESSAGES);
- return ret;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME reissue_ops
-#undef FCNAME
-#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int reissue_ops(ptl_process_t target_id)
-{
- struct rptl_paused_target *target;
- struct rptl_op *op;
- int ret = PTL_OK;
- MPIDI_STATE_DECL(MPID_STATE_REISSUE_OPS);
-
- MPIDI_FUNC_ENTER(MPID_STATE_REISSUE_OPS);
-
- for (target = rptl_info.paused_target_list; target; target = target->next)
- if (IDS_ARE_EQUAL(target->id, target_id))
- break;
- assert(target);
-
- MPL_DL_DELETE(rptl_info.paused_target_list, target);
- MPIU_Free(target);
-
- for (op = rptl_info.op_list; op; op = op->next) {
+ for (op = target->data_op_list; op; op = op->next) {
if ((op->op_type == RPTL_OP_PUT && IDS_ARE_EQUAL(op->u.put.target_id, target_id)) ||
(op->op_type == RPTL_OP_GET && IDS_ARE_EQUAL(op->u.get.target_id, target_id))) {
- if (op->state != RPTL_OP_STATE_ISSUED) {
- ret = issue_op(op);
- RPTLU_ERR_POP(ret, "Error calling issue_op\n");
- }
+ if (op->state == RPTL_OP_STATE_NACKED)
+ op->state = RPTL_OP_STATE_QUEUED;
}
}
+ target->state = RPTL_TARGET_STATE_ACTIVE;
+
+ ret = poke_progress();
+ RPTLU_ERR_POP(ret, "error in poke_progress\n");
fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_REISSUE_OPS);
+ MPIDI_FUNC_EXIT(MPID_STATE_CLEAR_NACKS);
return ret;
fn_fail:
@@ -845,11 +798,11 @@ static int reissue_ops(ptl_process_t target_id)
#define FUNCNAME get_event_info
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static void get_event_info(ptl_event_t * event, struct rptl **ret_rptl, struct rptl_op **ret_op)
+static int get_event_info(ptl_event_t * event, struct rptl **ret_rptl, struct rptl_op **ret_op)
{
struct rptl *rptl;
struct rptl_op *op;
- struct rptl_paused_target *target, *tmp;
+ int ret = PTL_OK;
MPIDI_STATE_DECL(MPID_STATE_GET_EVENT_INFO);
MPIDI_FUNC_ENTER(MPID_STATE_GET_EVENT_INFO);
@@ -860,18 +813,9 @@ static void get_event_info(ptl_event_t * event, struct rptl **ret_rptl, struct r
rptl_info.origin_events_left++;
- if (rptl_info.origin_events_left >= 2) {
- for (target = rptl_info.paused_target_list; target;) {
- if (target->state == RPTL_TARGET_STATE_FLOWCONTROL) {
- tmp = target->next;
- MPL_DL_DELETE(rptl_info.paused_target_list, target);
- MPIU_Free(target);
- target = tmp;
- }
- else
- target = target->next;
- }
- }
+ /* see if there are any pending ops to be issued */
+ ret = poke_progress();
+ RPTLU_ERR_POP(ret, "Error returned from poke_progress\n");
assert(op);
rptl = NULL;
@@ -892,7 +836,7 @@ static void get_event_info(ptl_event_t * event, struct rptl **ret_rptl, struct r
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_GET_EVENT_INFO);
- return;
+ return ret;
fn_fail:
goto fn_exit;
@@ -953,72 +897,52 @@ static int stash_event(struct rptl_op *op, ptl_event_t event)
#define FUNCNAME retrieve_event
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-static void retrieve_event(struct rptl *rptl, struct rptl_op *op, ptl_event_t * event)
+static int retrieve_event(ptl_event_t * event)
{
+ struct rptl_target *target;
+ struct rptl_op *op;
+ int have_event = 0;
MPIDI_STATE_DECL(MPID_STATE_RETRIEVE_EVENT);
MPIDI_FUNC_ENTER(MPID_STATE_RETRIEVE_EVENT);
- assert(op->op_type == RPTL_OP_PUT);
- assert(op->u.put.send || op->u.put.ack);
-
- if (op->u.put.send) {
- memcpy(event, op->u.put.send, sizeof(ptl_event_t));
- MPIU_Free(op->u.put.send);
- }
- else {
- memcpy(event, op->u.put.ack, sizeof(ptl_event_t));
- MPIU_Free(op->u.put.ack);
- }
- event->user_ptr = op->u.put.user_ptr;
-
- MPL_DL_DELETE(rptl_info.op_list, op);
- free_op(op);
-
- fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_RETRIEVE_EVENT);
- return;
-
- fn_fail:
- goto fn_exit;
-}
-
-
-#undef FUNCNAME
-#define FUNCNAME issue_pending_ops
-#undef FCNAME
-#define FCNAME MPIU_QUOTE(FUNCNAME)
-static int issue_pending_ops(void)
-{
- struct rptl_paused_target *target, *tmp;
- struct rptl_op *op;
- int ret = PTL_OK;
- MPIDI_STATE_DECL(MPID_STATE_ISSUE_PENDING_OPS);
+ /* FIXME: this is an expensive loop over all pending operations
+ * everytime the user does an eqget */
+ for (target = rptl_info.target_list; target; target = target->next) {
+ for (op = target->data_op_list; op; op = op->next) {
+ if (op->events_ready) {
+ assert(op->op_type == RPTL_OP_PUT);
+ assert(op->u.put.send || op->u.put.ack);
+
+ if (op->u.put.send) {
+ memcpy(event, op->u.put.send, sizeof(ptl_event_t));
+ MPIU_Free(op->u.put.send);
+ op->u.put.send = NULL;
+ }
+ else {
+ memcpy(event, op->u.put.ack, sizeof(ptl_event_t));
+ MPIU_Free(op->u.put.ack);
+ op->u.put.ack = NULL;
+ }
+ event->user_ptr = op->u.put.user_ptr;
- MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_PENDING_OPS);
+ MPL_DL_DELETE(target->data_op_list, op);
+ free_op(op);
- for (op = rptl_info.op_list; op; op = op->next) {
- if (op->state == RPTL_OP_STATE_QUEUED) {
- for (target = rptl_info.paused_target_list; target; target = target->next)
- if ((op->op_type == RPTL_OP_PUT && IDS_ARE_EQUAL(op->u.put.target_id, target->id)) ||
- (op->op_type == RPTL_OP_GET && IDS_ARE_EQUAL(op->u.get.target_id, target->id)))
- break;
- if (target == NULL) {
- ret = issue_op(op);
- RPTLU_ERR_POP(ret, "error issuing op\n");
+ have_event = 1;
+ goto fn_exit;
}
}
}
fn_exit:
- MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_PENDING_OPS);
- return ret;
+ MPIDI_FUNC_EXIT(MPID_STATE_RETRIEVE_EVENT);
+ return have_event;
fn_fail:
goto fn_exit;
}
-
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_rptl_eqget
#undef FCNAME
@@ -1029,44 +953,21 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
struct rptl *rptl;
ptl_event_t e;
int ret = PTL_OK, tmp_ret = PTL_OK;
- struct rptl_paused_target *target;
int mpi_errno = MPI_SUCCESS;
+ struct rptl_target *target;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET);
+ ret = poke_progress();
+ RPTLU_ERR_POP(ret, "error poking progress\n");
+
/* before we poll the eq, we need to check if there are any
* completed operations that need to be returned */
- /* FIXME: this is an expensive loop over all pending operations
- * everytime the user does an eqget */
- for (op = rptl_info.op_list; op; op = op->next) {
- if (op->events_ready) {
- retrieve_event(rptl, op, event);
- ret = PTL_OK;
- goto fn_exit;
- }
- }
-
- /* see if pause ack messages need to be sent out */
- tmp_ret = send_pause_ack_messages();
- if (tmp_ret) {
- ret = tmp_ret;
- RPTLU_ERR_POP(ret, "Error returned from send_pause_ack_messages\n");
- }
-
- /* see if unpause messages need to be sent out */
- tmp_ret = send_unpause_messages();
- if (tmp_ret) {
- ret = tmp_ret;
- RPTLU_ERR_POP(ret, "Error returned from send_unpause_messages\n");
- }
-
- /* see if there are any pending ops to be issued */
- tmp_ret = issue_pending_ops();
- if (tmp_ret) {
- ret = tmp_ret;
- RPTLU_ERR_POP(ret, "Error returned from issue_pending_ops\n");
+ if (retrieve_event(event)) {
+ ret = PTL_OK;
+ goto fn_exit;
}
ret = PtlEQGet(eq_handle, event);
@@ -1074,7 +975,11 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
goto fn_exit;
/* find the rptl and op associated with this event */
- get_event_info(event, &rptl, &op);
+ tmp_ret = get_event_info(event, &rptl, &op);
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from get_event_info\n");
+ }
/* PT_DISABLED events only occur on the target */
if (event->type == PTL_EVENT_PT_DISABLED) {
@@ -1088,14 +993,16 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
* recover from disable events */
assert(rptl->control.pt != PTL_PT_ANY);
- rptl->local_state = RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS;
- rptl->pause_ack_counter = 0;
+ if (rptl->local_state == RPTL_LOCAL_STATE_ACTIVE) {
+ rptl->local_state = RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS;
+ rptl->pause_ack_counter = 0;
- /* send out pause messages */
- tmp_ret = send_pause_messages(rptl);
- if (tmp_ret) {
- ret = tmp_ret;
- RPTLU_ERR_POP(ret, "Error returned from send_pause_messages\n");
+ /* send out pause messages */
+ tmp_ret = send_pause_messages(rptl);
+ if (tmp_ret) {
+ ret = tmp_ret;
+ RPTLU_ERR_POP(ret, "Error returned from send_pause_messages\n");
+ }
}
}
@@ -1139,21 +1046,25 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
rptl->control.me_idx = 0;
if (event->hdr_data == RPTL_CONTROL_MSG_PAUSE) {
- tmp_ret = alloc_target(event->initiator, RPTL_TARGET_STATE_RECEIVED_PAUSE, rptl);
+ tmp_ret = find_target(event->initiator, &target);
if (tmp_ret) {
ret = tmp_ret;
- RPTLU_ERR_POP(ret, "Error returned from alloc_target\n");
+ RPTLU_ERR_POP(ret, "Error finding target\n");
}
+ assert(target->state < RPTL_TARGET_STATE_RECEIVED_PAUSE);
+ target->state = RPTL_TARGET_STATE_RECEIVED_PAUSE;
+ target->rptl = rptl;
}
else if (event->hdr_data == RPTL_CONTROL_MSG_PAUSE_ACK) {
rptl->pause_ack_counter++;
}
else { /* got an UNPAUSE message */
- /* reissue all operations to this target */
- tmp_ret = reissue_ops(event->initiator);
+ /* clear NACKs from all operations to this target and poke
+ * progress */
+ tmp_ret = clear_nacks(event->initiator);
if (tmp_ret) {
ret = tmp_ret;
- RPTLU_ERR_POP(ret, "Error returned from reissue_ops\n");
+ RPTLU_ERR_POP(ret, "Error returned from clear_nacks\n");
}
}
}
@@ -1167,6 +1078,10 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
/* hide the event from the user */
ret = PTL_EQ_EMPTY;
+ /* we should not get NACKs on the control portal */
+ if (event->type == PTL_EVENT_ACK)
+ assert(op->u.put.flow_control);
+
op->state = RPTL_OP_STATE_NACKED;
if (op->op_type == RPTL_OP_PUT) {
@@ -1191,12 +1106,17 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
}
if (op->op_type == RPTL_OP_PUT)
- tmp_ret = alloc_target(op->u.put.target_id, RPTL_TARGET_STATE_DISABLED, NULL);
+ tmp_ret = find_target(op->u.put.target_id, &target);
else
- tmp_ret = alloc_target(op->u.get.target_id, RPTL_TARGET_STATE_DISABLED, NULL);
+ tmp_ret = find_target(op->u.get.target_id, &target);
if (tmp_ret) {
ret = tmp_ret;
- RPTLU_ERR_POP(ret, "Error returned from alloc_target\n");
+ RPTLU_ERR_POP(ret, "Error finding target\n");
+ }
+
+ if (target->state == RPTL_TARGET_STATE_ACTIVE) {
+ target->state = RPTL_TARGET_STATE_DISABLED;
+ target->rptl = NULL;
}
}
@@ -1205,7 +1125,9 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
assert(op->op_type == RPTL_OP_GET);
event->user_ptr = op->u.get.user_ptr;
- MPL_DL_DELETE(rptl_info.op_list, op);
+
+ /* GET operations only go into the data op list */
+ MPL_DL_DELETE(op->target->data_op_list, op);
free_op(op);
}
@@ -1218,9 +1140,11 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
op->events_ready = 1;
event->user_ptr = op->u.put.user_ptr;
- /* if flow control is not set, ignore events */
+ /* if flow control is not set, ignore the ACK event */
if (op->u.put.flow_control == 0) {
- retrieve_event(rptl, op, event);
+ MPIU_Free(op->u.put.ack);
+ MPL_DL_DELETE(op->target->control_op_list, op);
+ free_op(op);
ret = PTL_EQ_EMPTY;
}
}
@@ -1234,17 +1158,23 @@ int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event)
op->events_ready = 1;
event->user_ptr = op->u.put.user_ptr;
- /* if flow control is not set, ignore events */
+ /* if flow control is not set, ignore ACK event */
if (op->u.put.flow_control == 0) {
- retrieve_event(rptl, op, event);
+ MPIU_Free(op->u.put.send);
+ MPL_DL_DELETE(op->target->control_op_list, op);
+ free_op(op);
ret = PTL_EQ_EMPTY;
}
- /* if the user asked for an ACK, just return this event.
- * if not, discard this event and retrieve the send
- * event. */
- else if (!(op->u.put.ack_req & PTL_ACK_REQ))
- retrieve_event(rptl, op, event);
+ /* if the user did not ask for an ACK discard this event
+ * and return the send event. */
+ else if (!(op->u.put.ack_req & PTL_ACK_REQ)) {
+ memcpy(event, op->u.put.send, sizeof(ptl_event_t));
+ MPIU_Free(op->u.put.send);
+ /* flow control is set, we should be in the data op list */
+ MPL_DL_DELETE(op->target->data_op_list, op);
+ free_op(op);
+ }
}
else {
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
index 359e24f..c5f1254 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
@@ -30,6 +30,7 @@
} \
}
+struct rptl_target;
struct rptl_op {
enum {
RPTL_OP_PUT,
@@ -73,6 +74,7 @@ struct rptl_op {
} u;
int events_ready;
+ struct rptl_target *target;
struct rptl_op *next;
struct rptl_op *prev;
@@ -85,7 +87,7 @@ struct rptl_op {
struct rptl {
/* local portal state */
enum {
- RPTL_LOCAL_STATE_NORMAL,
+ RPTL_LOCAL_STATE_ACTIVE,
RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS
} local_state;
uint64_t pause_ack_counter;
@@ -122,6 +124,37 @@ struct rptl {
struct rptl *prev;
};
+#define RPTL_OP_POOL_SEGMENT_COUNT (1024)
+
+struct rptl_target {
+ ptl_process_t id;
+
+ enum rptl_target_state {
+ RPTL_TARGET_STATE_ACTIVE,
+ RPTL_TARGET_STATE_DISABLED,
+ RPTL_TARGET_STATE_RECEIVED_PAUSE,
+ RPTL_TARGET_STATE_PAUSE_ACKED
+ } state;
+
+ /* when we get a pause message, we need to know which rptl it came
+ * in on, so we can figure out what the corresponding target
+ * portal is. for this, we store the local rptl */
+ struct rptl *rptl;
+
+ struct rptl_op_pool_segment {
+ struct rptl_op op[RPTL_OP_POOL_SEGMENT_COUNT];
+ struct rptl_op_pool_segment *next;
+ struct rptl_op_pool_segment *prev;
+ } *op_segment_list;
+ struct rptl_op *op_pool;
+
+ struct rptl_op *data_op_list;
+ struct rptl_op *control_op_list;
+
+ struct rptl_target *next;
+ struct rptl_target *prev;
+};
+
int MPID_nem_ptl_rptl_init(int world_size, uint64_t max_origin_events,
int (*get_target_info) (int rank, ptl_process_t * id,
ptl_pt_index_t local_data_pt,
http://git.mpich.org/mpich.git/commitdiff/5087383555964e15106cc66123a4c084f5326dff
commit 5087383555964e15106cc66123a4c084f5326dff
Author: Ken Raffenetti <raffenet at mcs.anl.gov>
Date: Tue Nov 11 22:53:49 2014 -0600
portals4: use a separate EQ per PT
Signed-off-by: Antonio Pena Monferrer <apenya at mcs.anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
index 61ceaed..e5e1aea 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
@@ -19,8 +19,12 @@ extern ptl_pt_index_t MPIDI_nem_ptl_pt;
extern ptl_pt_index_t MPIDI_nem_ptl_get_pt; /* portal for gets by receiver */
extern ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages */
extern ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for MPICH control messages */
-extern ptl_handle_eq_t MPIDI_nem_ptl_target_eq;
extern ptl_handle_eq_t MPIDI_nem_ptl_origin_eq;
+extern ptl_handle_eq_t MPIDI_nem_ptl_eq;
+extern ptl_handle_eq_t MPIDI_nem_ptl_get_eq;
+extern ptl_handle_eq_t MPIDI_nem_ptl_control_eq;
+extern ptl_handle_eq_t MPIDI_nem_ptl_origin_eq;
+extern ptl_handle_eq_t MPIDI_nem_ptl_rpt_eq;
extern ptl_handle_md_t MPIDI_nem_ptl_global_md;
extern ptl_ni_limits_t MPIDI_nem_ptl_ni_limits;
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
index 5087c92..ffad963 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
@@ -28,7 +28,9 @@ ptl_pt_index_t MPIDI_nem_ptl_pt;
ptl_pt_index_t MPIDI_nem_ptl_get_pt; /* portal for gets by receiver */
ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages */
ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for rportals control messages */
-ptl_handle_eq_t MPIDI_nem_ptl_target_eq;
+ptl_handle_eq_t MPIDI_nem_ptl_eq;
+ptl_handle_eq_t MPIDI_nem_ptl_get_eq;
+ptl_handle_eq_t MPIDI_nem_ptl_control_eq;
ptl_handle_eq_t MPIDI_nem_ptl_origin_eq;
ptl_pt_index_t MPIDI_nem_ptl_control_rpt_pt; /* portal for rportals control messages */
ptl_pt_index_t MPIDI_nem_ptl_get_rpt_pt; /* portal for rportals control messages */
@@ -184,7 +186,14 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
PTL_PID_ANY, &desired, &MPIDI_nem_ptl_ni_limits, &MPIDI_nem_ptl_ni);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlniinit", "**ptlniinit %s", MPID_nem_ptl_strerror(ret));
- ret = PtlEQAlloc(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_ni_limits.max_eqs, &MPIDI_nem_ptl_target_eq);
+ /* allocate EQs for each portal */
+ ret = PtlEQAlloc(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_ni_limits.max_eqs, &MPIDI_nem_ptl_eq);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqalloc", "**ptleqalloc %s", MPID_nem_ptl_strerror(ret));
+
+ ret = PtlEQAlloc(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_ni_limits.max_eqs, &MPIDI_nem_ptl_get_eq);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqalloc", "**ptleqalloc %s", MPID_nem_ptl_strerror(ret));
+
+ ret = PtlEQAlloc(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_ni_limits.max_eqs, &MPIDI_nem_ptl_control_eq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqalloc", "**ptleqalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate a separate EQ for origin events. with this, we can implement rate-limit operations
@@ -193,32 +202,32 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqalloc", "**ptleqalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for matching messages */
- ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for large messages where receiver does a get */
- ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_get_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_get_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for MPICH control messages */
- ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_control_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_control_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for MPICH control messages */
- ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for MPICH control messages */
- ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_get_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_get_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for MPICH control messages */
- ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_control_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_control_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
@@ -276,7 +285,7 @@ static int ptl_finalize(void)
{
int mpi_errno = MPI_SUCCESS;
int ret;
- ptl_handle_eq_t eqs[2];
+ ptl_handle_eq_t eqs[4];
MPIDI_STATE_DECL(MPID_STATE_PTL_FINALIZE);
MPIDI_FUNC_ENTER(MPID_STATE_PTL_FINALIZE);
@@ -288,9 +297,11 @@ static int ptl_finalize(void)
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* shut down portals */
- eqs[0] = MPIDI_nem_ptl_target_eq;
- eqs[1] = MPIDI_nem_ptl_origin_eq;
- ret = MPID_nem_ptl_rptl_drain_eq(2, eqs);
+ eqs[0] = MPIDI_nem_ptl_eq;
+ eqs[1] = MPIDI_nem_ptl_get_eq;
+ eqs[2] = MPIDI_nem_ptl_control_eq;
+ eqs[3] = MPIDI_nem_ptl_origin_eq;
+ ret = MPID_nem_ptl_rptl_drain_eq(4, eqs);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = MPID_nem_ptl_rptl_ptfini(MPIDI_nem_ptl_pt);
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
index 857c9ec..85ef2f8 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_poll.c
@@ -131,16 +131,26 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
/* MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_POLL); */
while (1) {
- /* check both origin and target EQs for events */
- ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_target_eq, &event);
+ /* check EQs for events */
+ ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_eq, &event);
MPIU_ERR_CHKANDJUMP(ret == PTL_EQ_DROPPED, mpi_errno, MPI_ERR_OTHER, "**eqdropped");
if (ret == PTL_EQ_EMPTY) {
- ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_origin_eq, &event);
+ ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_get_eq, &event);
MPIU_ERR_CHKANDJUMP(ret == PTL_EQ_DROPPED, mpi_errno, MPI_ERR_OTHER, "**eqdropped");
- /* if both queues are empty, exit the loop */
- if (ret == PTL_EQ_EMPTY)
- break;
+ if (ret == PTL_EQ_EMPTY) {
+ ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_control_eq, &event);
+ MPIU_ERR_CHKANDJUMP(ret == PTL_EQ_DROPPED, mpi_errno, MPI_ERR_OTHER, "**eqdropped");
+
+ if (ret == PTL_EQ_EMPTY) {
+ ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_origin_eq, &event);
+ MPIU_ERR_CHKANDJUMP(ret == PTL_EQ_DROPPED, mpi_errno, MPI_ERR_OTHER, "**eqdropped");
+ }
+
+ /* all EQs are empty */
+ if (ret == PTL_EQ_EMPTY)
+ break;
+ }
}
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqget", "**ptleqget %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "Received event %s pt_idx=%d ni_fail=%s list=%s user_ptr=%p hdr_data=%#lx mlength=%lu rlength=%lu",
http://git.mpich.org/mpich.git/commitdiff/a839daf81cb8f044a0090eee537a1e83930582e4
commit a839daf81cb8f044a0090eee537a1e83930582e4
Author: Antonio Pena Monferrer <apenya at mcs.anl.gov>
Date: Tue Nov 11 21:02:14 2014 -0600
Added internal control portal for the get portal
Signed-off-by: Pavan Balaji <balaji at anl.gov>
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
index 62dd474..61ceaed 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
@@ -99,6 +99,7 @@ typedef struct {
ptl_pt_index_t ptg;
ptl_pt_index_t ptc;
ptl_pt_index_t ptr;
+ ptl_pt_index_t ptrg;
ptl_pt_index_t ptrc;
int id_initialized; /* TRUE iff id and pt have been initialized */
MPIDI_msg_sz_t num_queued_sends; /* number of reqs for this vc in sendq */
@@ -166,7 +167,7 @@ int MPID_nem_ptl_poll_finalize(void);
int MPID_nem_ptl_poll(int is_blocking_poll);
int MPID_nem_ptl_vc_terminated(MPIDI_VC_t *vc);
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg,
- ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrc);
+ ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrg, ptl_pt_index_t *ptrc);
void MPI_nem_ptl_pack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
MPID_nem_ptl_pack_overflow_t *overflow);
int MPID_nem_ptl_unpack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
diff --git a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
index a631723..5087c92 100644
--- a/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
+++ b/src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_init.c
@@ -20,6 +20,7 @@
#define PTIG_KEY "PTIG"
#define PTIC_KEY "PTIC"
#define PTIR_KEY "PTIR"
+#define PTIRG_KEY "PTIRG"
#define PTIRC_KEY "PTIRC"
ptl_handle_ni_t MPIDI_nem_ptl_ni;
@@ -30,6 +31,7 @@ ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for rportals control messages */
ptl_handle_eq_t MPIDI_nem_ptl_target_eq;
ptl_handle_eq_t MPIDI_nem_ptl_origin_eq;
ptl_pt_index_t MPIDI_nem_ptl_control_rpt_pt; /* portal for rportals control messages */
+ptl_pt_index_t MPIDI_nem_ptl_get_rpt_pt; /* portal for rportals control messages */
ptl_handle_md_t MPIDI_nem_ptl_global_md;
ptl_ni_limits_t MPIDI_nem_ptl_ni_limits;
@@ -212,6 +214,11 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
/* allocate portal for MPICH control messages */
ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
+ PTL_PT_ANY, &MPIDI_nem_ptl_get_rpt_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
+
+ /* allocate portal for MPICH control messages */
+ ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_target_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_control_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
@@ -237,7 +244,7 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
* we pass PTL_PT_ANY as the dummy portal. unfortunately, portals
* does not have an "invalid" PT constant, which would have been
* more appropriate to pass over here. */
- ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_get_pt, PTL_PT_ANY);
+ ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_get_pt, MPIDI_nem_ptl_get_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_origin_eq, MPIDI_nem_ptl_control_pt, MPIDI_nem_ptl_control_rpt_pt);
@@ -307,6 +314,9 @@ static int ptl_finalize(void)
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+ ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_get_rpt_pt);
+ MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
+
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
@@ -377,6 +387,12 @@ static int get_business_card(int my_rank, char **bc_val_p, int *val_max_sz_p)
MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
}
+ str_errno = MPIU_Str_add_binary_arg(bc_val_p, val_max_sz_p, PTIRG_KEY, (char *)&MPIDI_nem_ptl_get_rpt_pt,
+ sizeof(MPIDI_nem_ptl_get_rpt_pt));
+ if (str_errno) {
+ MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
+ MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
+ }
str_errno = MPIU_Str_add_binary_arg(bc_val_p, val_max_sz_p, PTIRC_KEY, (char *)&MPIDI_nem_ptl_control_rpt_pt,
sizeof(MPIDI_nem_ptl_control_rpt_pt));
if (str_errno) {
@@ -475,7 +491,7 @@ static int vc_destroy(MPIDI_VC_t *vc)
#define FUNCNAME MPID_nem_ptl_get_id_from_bc
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
-int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrc)
+int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr, ptl_pt_index_t *ptrg, ptl_pt_index_t *ptrc)
{
int mpi_errno = MPI_SUCCESS;
int ret;
@@ -502,6 +518,9 @@ int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, pt
ret = MPIU_Str_get_binary_arg(business_card, PTIR_KEY, (char *)ptr, sizeof(ptr), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptr), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
+ ret = MPIU_Str_get_binary_arg(business_card, PTIRG_KEY, (char *)ptrg, sizeof(ptr), &len);
+ MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptrc), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
+
ret = MPIU_Str_get_binary_arg(business_card, PTIRC_KEY, (char *)ptrc, sizeof(ptr), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptrc), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
@@ -595,7 +614,7 @@ int MPID_nem_ptl_init_id(MPIDI_VC_t *vc)
mpi_errno = vc->pg->getConnInfo(vc->pg_rank, bc, val_max_sz, vc->pg);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
- mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr, &vc_ptl->ptrc);
+ mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr, &vc_ptl->ptrg, &vc_ptl->ptrc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
vc_ptl->id_initialized = TRUE;
-----------------------------------------------------------------------
Summary of changes:
.../channels/nemesis/netmod/portals4/ptl_impl.h | 9 +-
.../channels/nemesis/netmod/portals4/ptl_init.c | 58 +-
.../ch3/channels/nemesis/netmod/portals4/ptl_nm.c | 8 +-
.../channels/nemesis/netmod/portals4/ptl_poll.c | 22 +-
.../channels/nemesis/netmod/portals4/ptl_send.c | 12 +-
.../ch3/channels/nemesis/netmod/portals4/rptl.c | 871 +++++++++-----------
.../ch3/channels/nemesis/netmod/portals4/rptl.h | 44 +-
7 files changed, 525 insertions(+), 499 deletions(-)
hooks/post-receive
--
MPICH primary repository
More information about the commits
mailing list