#include #include #include #include "mpi.h" #include #include #include #include #include #include #include #include #include #include #include using namespace std ; #define MAX_CLIENT 256 class mpiData { public: mpiData (char* portname) { MPI_Comm_dup(MPI_COMM_SELF, &dupSelfMain); MPI_Comm_dup(MPI_COMM_SELF, &dupSelfProxy); port = portname ; } ~mpiData () { MPI_Comm_free(&dupSelfMain); MPI_Comm_free(&dupSelfProxy); } MPI_Comm dupSelfMain; MPI_Comm dupSelfProxy; string port ; }; void *master (void* data) { cout <<"Master"<< endl; mpiData* mgr = (mpiData*) data ; MPI_Comm newComm ; MPI_Comm_connect (mgr->port.c_str(), MPI_INFO_NULL, 0, mgr->dupSelfMain, &newComm); cout << "Connected to server " << endl ; int val = 0 ; cout << "Doing MPI_Send from master" << endl ; MPI_Send(&val, 1, MPI_INT, 0, 99, newComm); cout << "MPI_Send complete" << endl ; MPI_Comm_disconnect (&newComm); pthread_exit(data); } void *proxy (void* data) { cout <<"Proxy"<< endl; mpiData* mgr = (mpiData*) data ; do { MPI_Comm newComm ; MPI_Comm_accept (mgr->port.c_str(), MPI_INFO_NULL, 0, mgr->dupSelfProxy, &newComm); cout << "Accepted a new connection" << endl ; int recv = 0 ; MPI_Status status ; cout << "Waiting on MPI Recv " << endl ; MPI_Recv(&recv, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, newComm, &status); cout << "MPI Recv compoleted " << endl ; if (status.MPI_TAG == 99 ) { MPI_Comm_disconnect (&newComm); break; } } while (1) ; pthread_exit(data); } int main (int argc, char* argv[]) { int provided ; MPI_Init_thread (&argc, &argv, MPI_THREAD_MULTIPLE, &provided); if (provided != MPI_THREAD_MULTIPLE) { cout << " Thread support not there " << endl; } const char* session = "randomsession"; // Open a port char portname[MPI_MAX_PORT_NAME]; MPI_Open_port(MPI_INFO_NULL, portname); MPI_Publish_name(session, MPI_INFO_NULL, portname); pthread_attr_t attr ; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); mpiData mgr(portname); pthread_t main_th ; int r_master = pthread_create(&main_th, &attr/*attr*/, master, (void*)&mgr); if (r_master) { cout << "Failed to create master thread" << endl; } pthread_t proxy_th ; int r_proxy = pthread_create(&proxy_th, &attr/*attr*/, proxy, (void*)&mgr); if (r_proxy) { cout << "Failed to create proxy thread" << endl; } pthread_attr_destroy(&attr); void *status ; if (pthread_join(main_th, &status)) { cout << "Join Failed" <