#include "BufferExchange.h" template int BufferExchange::_wait( std::vector &reqs ) { int result, index; START_ROUTINE_TIMER result = MPI_Waitany( reqs.size(), &reqs[0], &index, MPI_STATUS_IGNORE ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Waitany()\n" ); END_ROUTINE_TIMER if( index == MPI_UNDEFINED ) return -1; else return index; } template void BufferExchange::_waitall( std::vector &reqs ) { int result; START_ROUTINE_TIMER if( 1 ) { // default - use MPI_Waitall() result = MPI_Waitall( reqs.size(), &reqs[0], MPI_STATUSES_IGNORE ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Waitall()" ); } else { // print time intervals between each receive: double s, s2, e; Log( "%s:\n", __func__ ); s = GetTime(); s2 = s; while( (result = _wait(reqs) ) != -1 ) { e = GetTime(); Log( "\t peer index %d : %g, %g\n", result, e-s2, e-s ); s2 = e; } e = GetTime(); Log( "\t total: %g\n", result, e-s ); } END_ROUTINE_TIMER } template BufferExchange::BufferExchange() { full_exchange_length_tag = 1; full_exchange_buffer_tag = 2; exchange_tag = 3; } template BufferExchange::~BufferExchange() {} template void BufferExchange::Clear( IntVec &_ranks ) { size_t N; ranks = _ranks; N = ranks.size(); recv_reqs.resize( N ); send_reqs.resize( N ); recv_buffers.resize( N ); send_buffers.resize( N ); for( size_t i=0; i void BufferExchange::FullExchange() { const int divisor = 1024; // (1024*1024); size_t N; int result, index; unsigned int vs; unsigned int rss0, rss1, rss2, rss3, rss4; START_ROUTINE_TIMER // // Repurpose existing MPI_Request arrays, rename for clarity. // std::vector &length_reqs = recv_reqs; std::vector &buffer_reqs = send_reqs; N = ranks.size(); getmem( &rss0, &vs ); // // Incoming/outgoing buffer lengths IN UNITS OF T! // recv_lengths.resize( N ); send_lengths.resize( N ); // // Single flat array of handles for both incoming and outgoing lengths, and incoming/outgoing buffers (hence N*2). // length_reqs.resize( N*2 ); buffer_reqs.resize( N*2 ); for( size_t i=0; i<2*N; i++ ) { length_reqs[i] = MPI_REQUEST_NULL; buffer_reqs[i] = MPI_REQUEST_NULL; } getmem( &rss1, &vs ); // // Send & receive buffer lengths - order important, as we assume 0->(N-1) are receives, N->(N*2)-1 are sends. // for( size_t i=0; i 0, receive actual data. // int i = index; if( recv_lengths[i] > 0 ) { recv_buffers[i].resize( recv_lengths[i] ); result = MPI_Irecv( (char *)&recv_buffers[i][0], sizeof(T)*recv_lengths[i], MPI_CHAR, ranks[i], full_exchange_buffer_tag, MPI_COMM_WORLD, &buffer_reqs[n_buffer_reqs] ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Irecv" ); n_buffer_reqs++; } else { // prevent stale previous data in recv_buffers[i]: ensure new size() returns zero where received a zero length! recv_buffers[i].clear(); } } else { // // Length sent; if length > 0, send actual data. // int i = index-N; if( send_lengths[i] > 0 ) { result = MPI_Isend( (char *)&send_buffers[i][0], sizeof(T)*send_lengths[i], MPI_CHAR, ranks[i], full_exchange_buffer_tag, MPI_COMM_WORLD, &buffer_reqs[n_buffer_reqs] ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Isend" ); n_buffer_reqs++; } } } getmem( &rss3, &vs ); // // Wait individual // /* while( 1 ) { int index; result = MPI_Waitany( n_buffer_reqs, &buffer_reqs[0], &index, MPI_STATUSES_IGNORE ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Waitany" ); if( index == MPI_UNDEFINED ) break; } */ // // Wait all // result = MPI_Waitall( n_buffer_reqs, &buffer_reqs[0], MPI_STATUSES_IGNORE ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Waitall" ); getmem( &rss4, &vs ); Log( "%s() memory deltas:\n", __func__ ); Log( "\t initial setup: %f\n", (double)(rss1-rss0)/divisor ); Log( "\t send/recv lengths: %f\n", (double)(rss2-rss1)/divisor ); Log( "\t send/recv buffers: %f\n", (double)(rss3-rss2)/divisor ); Log( "\t final waitall: %f\n", (double)(rss4-rss3)/divisor ); END_ROUTINE_TIMER } template void BufferExchange::PostRecvs() { int result; START_ROUTINE_TIMER for( size_t i=0; i 0 ) { result = MPI_Irecv( (char *)&recv_buffers[i][0], len, MPI_CHAR, ranks[i], exchange_tag, MPI_COMM_WORLD, &recv_reqs[i] ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Irecv( %d bytes ) to rank %d (%d)\n", (int)len, ranks[i], (int)i ); } } END_ROUTINE_TIMER } template void BufferExchange::PostSends() { int result; START_ROUTINE_TIMER for( size_t i=0; i 0 ) { result = MPI_Isend( (char *)&send_buffers[i][0], len, MPI_CHAR, ranks[i], exchange_tag, MPI_COMM_WORLD, &send_reqs[i] ); if( result != MPI_SUCCESS ) ERROR_MACRO( "MPI_Isend( %d bytes ) to rank %d (%d)\n", (int)len, ranks[i], (int)i ); } } END_ROUTINE_TIMER } template int BufferExchange::WaitRecv() { return _wait( recv_reqs ); } template int BufferExchange::WaitSend() { return _wait( send_reqs ); } template void BufferExchange::WaitRecvs() { _waitall( recv_reqs ); } template void BufferExchange::WaitSends() { _waitall( send_reqs ); } template void BufferExchange::CancelRecvs() { int result; START_ROUTINE_TIMER for( size_t i=0; i;