[mpich-devel] Deadlock when using MPIX_Grequest interface

Giuseppe Congiu giuseppe.congiu at seagate.com
Tue Feb 28 11:59:16 CST 2017


Hello,

I am trying to integrate an asynchronous API in ROMIO but I am experiencing
some problems, probably caused by the improper use of the MPIX_Grequest
interface.

Short description of the integrated functions:
I have a ROMIO implementation with extended MPI-IO hints that can be used
to divert written data, during collective I/O, to a local file system
(typically running on a SSD). The full description of the modification can
be found at this link: https://github.com/gcongiu/E10/tree/beegfs-devel

The BeeGFS ROMIO driver uses the cache API internally provided by BeeGFS to
move data around, while the custom ROMIO implementation provides support
for any other file system (in the "common" layer).

Description of the problem:
I am currently having problems making the BeeGFS driver work properly. More
specifically I am noticing that when the size of the shared file is large
(32GB in my configuration) and I am writing multiple files one after the
other (using a custom coll_perf benchmark) the i-th file (for i > 0 with 0
<= i < 4) gets stuck.

BeeGFS provides a deeper_cache_flush_range() function to flush data in the
cache to the global file. This is non blocking and will submit a range
(offset, length) for a certain filename to a cache deamon that transfers
the data. Multiple ranges can be submitted one after the other.

Completion can be checked using deeper_cache_flush_wait() using the
filename as input. Since the MPI_Grequest model requires the external
thread to make progress by invoking MPI_Grequest_complete() I have opted
for the MPIX_Grequest interface which allows me to make progress
(MPI_Grequest_complete) inside MPI while invoking the MPI_Wait() function.

deeper_cache_flush_range() does not require the application to keep any
handler for the submitted request. The only thing the application needs is
the name of the file to pass to deeper_cache_flush_wait().

Follows a code snippet for the corresponding non blocking implementation in
ROMIO:

/* arguments for callback function */
struct callback {
    ADIO_File fd_;
    ADIOI_Sync_req_t req_;
};

/*
 * ADIOI_BEEGFS_Sync_thread_start - start synchronisation of req
 */
int ADIOI_BEEGFS_Sync_thread_start(ADIOI_Sync_thread_t t) {
    ADIOI_Atomic_queue_t q = t->sub_;
    ADIOI_Sync_req_t r;
    int retval, count, fflags, error_code, i;
    ADIO_Offset offset, len;
    MPI_Count datatype_size;
    MPI_Datatype datatype;
    ADIO_Request *req;
    char myname[] = "ADIOI_BEEGFS_SYNC_THREAD_START";

    r = ADIOI_Atomic_queue_front(q);
    ADIOI_Atomic_queue_pop(q);

    ADIOI_Sync_req_get_key(r, ADIOI_SYNC_ALL, &offset,
   &datatype, &count, &req, &error_code, &fflags);

    MPI_Type_size_x(datatype, &datatype_size);
    len = (ADIO_Offset)datatype_size * (ADIO_Offset)count;

    retval = deeper_cache_flush_range(t->fd_->filename, (off_t)offset,
(size_t)len, fflags);

    if (retval == DEEPER_RETVAL_SUCCESS && ADIOI_BEEGFS_greq_class == 1) {
MPIX_Grequest_class_create(ADIOI_BEEGFS_Sync_req_query,
                  ADIOI_BEEGFS_Sync_req_free,
                  MPIU_Greq_cancel_fn,
          ADIOI_BEEGFS_Sync_req_poll,
          ADIOI_BEEGFS_Sync_req_wait,
  &ADIOI_BEEGFS_greq_class);
    } else {
/* --BEGIN ERROR HANDLING-- */
return MPIO_Err_create_code(MPI_SUCCESS,
                   MPIR_ERR_RECOVERABLE,
                   "ADIOI_BEEGFS_Cache_sync_req",
           __LINE__, MPI_ERR_IO, "**io %s",
           strerror(errno));
        /* --END ERROR HANDLING-- */
    }

    /* init args for the callback functions */
    struct callback *args = (struct callback *)ADIOI_Malloc(sizeof(struct
callback));
    args->fd_ = t->fd_;
    args->req_ = r;

    MPIX_Grequest_class_allocate(ADIOI_BEEGFS_greq_class, args, req);

    return MPI_SUCCESS;
}

/*
 * ADIOI_BEEGFS_Sync_req_poll -
 */
int ADIOI_BEEGFS_Sync_req_poll(void *extra_state, MPI_Status *status) {
    struct callback *cb = (struct callback *)extra_state;
    ADIOI_Sync_req_t r = (ADIOI_Sync_req_t)cb->req_;
    ADIO_File fd = (ADIO_File)cb->fd_;
    char *filename = fd->filename;
    int count, cache_flush_flags, error_code;
    MPI_Datatype datatype;
    ADIO_Offset offset;
    MPI_Aint lb, extent;
    ADIO_Offset len;
    ADIO_Request *req;

    ADIOI_Sync_req_get_key(r, ADIOI_SYNC_ALL, &offset,
   &datatype, &count, &req, &error_code, &cache_flush_flags);

    int retval = deeper_cache_flush_wait(filename, cache_flush_flags);

    MPI_Type_get_extent(datatype, &lb, &extent);
    len = (ADIO_Offset)extent * (ADIO_Offset)count;

    if (fd->hints->e10_cache_coherent == ADIOI_HINT_ENABLE)
ADIOI_UNLOCK(fd, offset, SEEK_SET, len);

    /* mark generilized request as completed */
    MPI_Grequest_complete(*req);

    if (retval != DEEPER_RETVAL_SUCCESS)
        goto fn_exit_error;

    MPI_Status_set_cancelled(status, 0);
    MPI_Status_set_elements(status, datatype, count);
    status->MPI_SOURCE = MPI_UNDEFINED;
    status->MPI_TAG = MPI_UNDEFINED;

    ADIOI_Free(cb);

    return MPI_SUCCESS;

fn_exit_error:
    ADIOI_Free(cb);

    return MPIO_Err_create_code(MPI_SUCCESS,
                                MPIR_ERR_RECOVERABLE,
                                "ADIOI_BEEGFS_Sync_req_poll",
                                __LINE__, MPI_ERR_IO, "**io %s",
                                strerror(errno));
}

/*
 * ADIOI_BEEGFS_Sync_req_wait -
 */
int ADIOI_BEEGFS_Sync_req_wait(int count, void **array_of_states, double
timeout, MPI_Status *status) {
    return ADIOI_BEEGFS_Sync_req_poll(*array_of_states, status);
}

/*
 * ADIOI_BEEGFS_Sync_req_query -
 */
int ADIOI_BEEGFS_Sync_req_query(void *extra_state, MPI_Status *status) {
    return MPI_SUCCESS;
}

/*
 * ADIOI_BEEGFS_Sync_req_free -
 */
int ADIOI_BEEGFS_Sync_req_free(void *extra_state) {
    return MPI_SUCCESS;
}

/*
 * ADIOI_BEEGFS_Sync_req_cancel -
 */
int ADIOI_BEEGFS_Sync_req_cancel(void *extra_state, int complete) {
    return MPI_SUCCESS;
}

I have had a look at the implementation for MPI_File_iwrite() in
ad_iwrite.c but this uses POSIX AIO, thus I am not sure I am doing things
properly here.

Additionally the problem does not show for small files (e.g. 1GB), which
makes it not easy to debug.

BTW, I am using weak scaling thus every process always writes the same
amount of data (64MB), I just change the number of procs in the test.

To test the modification with coll_perf I am using a configuration with 512
procs (64 nodes, 8procs/node) and 16 procs (8 nodes, 2procs/node).

The 16 procs configuration writes 4 files of 1GB (16 x 64MB), the 512 procs
configuration writes 4 files of 32GB (512 x 64MB).

Can someone spot any problem in my code?

Thanks,

-- 
Giuseppe Congiu *·* Research Engineer II
Seagate Technology, LLC
office: +44 (0)23 9249 6082 *·* mobile:
www.seagate.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mpich.org/pipermail/devel/attachments/20170228/c2709d43/attachment.html>


More information about the devel mailing list