[mpich-devel] devel Digest, Vol 44, Issue 1

Giuseppe Congiu giuseppe.congiu at seagate.com
Wed Mar 8 03:25:45 CST 2017


Hello Rob,


> I'm excited to see someone using the MPIX_Grequest interface.  We used
> the MPIX_Grequest interface to implement non-blocking collective I/O,
> and had some bad interactions between libc's aio and the grequest
> callbacks.  I don't know if you are running into something similar.
>

Maybe. Do you have a description of the problem somewhere?


> Do you have any desire or plans to submit these changes into upstream
> ROMIO?


The idea would be to push these changes to upstream ROMIO if this is
relevant for the community.


> > The BeeGFS ROMIO driver uses the cache API internally provided by
> > BeeGFS to move data around, while the custom ROMIO implementation
> > provides support for any other file system (in the "common" layer).
> >
> > Description of the problem:
> > I am currently having problems making the BeeGFS driver work
> > properly. More specifically I am noticing that when the size of the
> > shared file is large (32GB in my configuration) and I am writing
> > multiple files one after the other (using a custom coll_perf
> > benchmark) the i-th file (for i > 0 with 0 <= i < 4) gets stuck.
> >
> > BeeGFS provides a deeper_cache_flush_range() function to flush data
> > in the cache to the global file. This is non blocking and will submit
> > a range (offset, length) for a certain filename to a cache deamon
> > that transfers the data. Multiple ranges can be submitted one after
> > the other.?
> >
> > Completion can be checked using deeper_cache_flush_wait() using the
> > filename as input. Since the MPI_Grequest model requires the external
> > thread to make progress by invoking MPI_Grequest_complete() I have
> > opted for the MPIX_Grequest interface which allows me to make
> > progress (MPI_Grequest_complete) inside MPI while invoking the
> > MPI_Wait() function.
>
> The original intent of the MPIX_Grequest extensions is that something
> else (in this case the AIO library) was making progress in the
> background and all we needed was a way to check if things were done or
> not.  I suspect the callbacks could do some work, but blocking
> callbacks haven't seen a lot of testing (in 10+ years the extensions
> themselves haven't seen a lot of activity, either)
>

In principle here I have the same intent. The difference is that I cannot
check on progress since
BeeGFS does not provide a way for checking the status of a single request.
Instead it only
offers a blocking wait interface for all the requests submitted for a
certain file (identified
by the filename). Thus I need to invoke deeper_cache_flush_wait() from
inside one of the
callbacks.

If this is a problem I can revert to MPI_Grequest but I need to spawn a
thread that launches
deeper_cache_flush_range() for every new request and then waits in
deeper_cache_flush_wait() for all submitted requests to complete before
calling
MPI_Grequest_complete() this should work but is adding an unnecessary
thread.


> >
> > deeper_cache_flush_range() does not require the application to keep
> > any handler for the submitted request. The only thing the application
> > needs is the name of the file to pass to deeper_cache_flush_wait().
> >
> > Follows a code snippet for the corresponding non blocking
> > implementation in ROMIO:
> >
> > /* arguments for callback function */
> > struct callback {
> > ? ? ADIO_File fd_;
> > ? ? ADIOI_Sync_req_t req_;
> > };
> >
> > /*
> > ?* ADIOI_BEEGFS_Sync_thread_start - start synchronisation of req
> > ?*/
> > int ADIOI_BEEGFS_Sync_thread_start(ADIOI_Sync_thread_t t) {
> > ? ? ADIOI_Atomic_queue_t q = t->sub_;
> > ? ? ADIOI_Sync_req_t r;
> > ? ? int retval, count, fflags, error_code, i;
> > ? ? ADIO_Offset offset, len;
> > ? ? MPI_Count datatype_size;
> > ? ? MPI_Datatype datatype;
> > ? ? ADIO_Request *req;
> > ? ? char myname[] = "ADIOI_BEEGFS_SYNC_THREAD_START";
> >
> > ? ? r = ADIOI_Atomic_queue_front(q);
> > ? ? ADIOI_Atomic_queue_pop(q);
> >
> > ? ? ADIOI_Sync_req_get_key(r, ADIOI_SYNC_ALL, &offset,
> >        ? ?&datatype, &count, &req, &error_code, &fflags);
> >
> > ? ? MPI_Type_size_x(datatype, &datatype_size);
> > ? ? len = (ADIO_Offset)datatype_size * (ADIO_Offset)count;
> >
> > ? ? retval = deeper_cache_flush_range(t->fd_->filename,
> > (off_t)offset, (size_t)len, fflags);
> >
> > ? ? if (retval == DEEPER_RETVAL_SUCCESS && ADIOI_BEEGFS_greq_class ==
> > 1) {
> >       MPIX_Grequest_class_create(ADIOI_BEEGFS_Sync_req_query,
> >               ?? ? ? ? ? ? ? ? ? ADIOI_BEEGFS_Sync_req_free,
> >               ?? ? ? ? ? ? ? ? ? MPIU_Greq_cancel_fn,
> >                       ?? ? ? ? ? ADIOI_BEEGFS_Sync_req_poll,
> >                       ?? ? ? ? ? ADIOI_BEEGFS_Sync_req_wait,
> >                               ?? &ADIOI_BEEGFS_greq_class);
> > ? ? } else {
> >       /* --BEGIN ERROR HANDLING-- */
> >       return MPIO_Err_create_code(MPI_SUCCESS,
> >               ?? ? ? ? ? ? ? ? ? ?MPIR_ERR_RECOVERABLE,
> >               ?? ? ? ? ? ? ? ? ? ?"ADIOI_BEEGFS_Cache_sync_req",
> >                       ?? ? ? ? ? ?__LINE__, MPI_ERR_IO, "**io %s",
> >                       ?? ? ? ? ? ?strerror(errno));
> > ? ? ? ? /* --END ERROR HANDLING-- */
> > ? ? }
> >
> > ? ? /* init args for the callback functions */
> > ? ? struct callback *args = (struct callback
> > *)ADIOI_Malloc(sizeof(struct callback));
> > ? ? args->fd_ = t->fd_;
> > ? ? args->req_ = r;
> >
> > ? ? MPIX_Grequest_class_allocate(ADIOI_BEEGFS_greq_class, args, req);
> >
> > ? ? return MPI_SUCCESS;
> > }
> >
> > /*
> > ?* ADIOI_BEEGFS_Sync_req_poll -
> > ?*/
> > int ADIOI_BEEGFS_Sync_req_poll(void *extra_state, MPI_Status *status)
> > {
> > ? ? struct callback *cb = (struct callback *)extra_state;
> > ? ? ADIOI_Sync_req_t r = (ADIOI_Sync_req_t)cb->req_;
> > ? ? ADIO_File fd = (ADIO_File)cb->fd_;
> > ? ? char *filename = fd->filename;
> > ? ? int count, cache_flush_flags, error_code;
> > ? ? MPI_Datatype datatype;
> > ? ? ADIO_Offset offset;
> > ? ? MPI_Aint lb, extent;
> > ? ? ADIO_Offset len;
> > ? ? ADIO_Request *req;
> >
> > ? ? ADIOI_Sync_req_get_key(r, ADIOI_SYNC_ALL, &offset,
> >        ? ?&datatype, &count, &req, &error_code, &cache_flush_flags);
> >
> > ? ? int retval = deeper_cache_flush_wait(filename,
> > cache_flush_flags);
> >
> > ? ? MPI_Type_get_extent(datatype, &lb, &extent);
> > ? ? len = (ADIO_Offset)extent * (ADIO_Offset)count;
> >
> > ? ? if (fd->hints->e10_cache_coherent == ADIOI_HINT_ENABLE)
> >       ADIOI_UNLOCK(fd, offset, SEEK_SET, len);
> >
> > ? ? /* mark generilized request as completed */
> > ? ? MPI_Grequest_complete(*req);
> >
> > ? ? if (retval != DEEPER_RETVAL_SUCCESS)
> > ? ? ? ? goto fn_exit_error;
> >
> > ? ? MPI_Status_set_cancelled(status, 0);
> > ? ? MPI_Status_set_elements(status, datatype, count);
> > ? ? status->MPI_SOURCE = MPI_UNDEFINED;
> > ? ? status->MPI_TAG = MPI_UNDEFINED;
> >
> > ? ? ADIOI_Free(cb);
> >
> > ? ? return MPI_SUCCESS;
> >
> > fn_exit_error:
> > ? ? ADIOI_Free(cb);
> >
> > ? ? return MPIO_Err_create_code(MPI_SUCCESS,
> > ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? MPIR_ERR_RECOVERABLE,
> > ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "ADIOI_BEEGFS_Sync_req_poll",
> > ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? __LINE__, MPI_ERR_IO, "**io %s",
> > ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? strerror(errno));
> > }
> >
> > /*
> > ?* ADIOI_BEEGFS_Sync_req_wait -
> > ?*/
> > int ADIOI_BEEGFS_Sync_req_wait(int count, void **array_of_states,
> > double timeout, MPI_Status *status) {
> > ? ? return ADIOI_BEEGFS_Sync_req_poll(*array_of_states, status);
> > }
> >
> > /*
> > ?* ADIOI_BEEGFS_Sync_req_query -
> > ?*/
> > int ADIOI_BEEGFS_Sync_req_query(void *extra_state, MPI_Status
> > *status) {
> > ? ? return MPI_SUCCESS;
> > }
> >
> > /*
> > ?* ADIOI_BEEGFS_Sync_req_free -
> > ?*/
> > int ADIOI_BEEGFS_Sync_req_free(void *extra_state) {
> > ? ? return MPI_SUCCESS;
> > }
> >
> > /*
> > ?* ADIOI_BEEGFS_Sync_req_cancel -
> > ?*/
> > int ADIOI_BEEGFS_Sync_req_cancel(void *extra_state, int complete) {
> > ? ? return MPI_SUCCESS;
> > } ?
> >
> > I have had a look at the implementation for MPI_File_iwrite() in
> > ad_iwrite.c but this uses POSIX AIO, thus I am not sure I am doing
> > things properly here.
>
> the poll_fn() should be non-blocking.  think MPI_Test more than unix
> poll/select/epoll.  it's going to get called in a tight loop.
>

So calling a blocking function inside the poll callback is not advised. I
am currently
referring to "Extending the MPI-2 Generalised Request Interface" paper and
some
iwrite example in ROMIO for my implementation. Is there a more detailed
document
for MPIX_Grequest?

Thank you,

-- 
Giuseppe Congiu *·* Research Engineer II
Seagate Technology, LLC
office: +44 (0)23 9249 6082 <+44%2023%209249%206082> *·* mobile:
www.seagate.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mpich.org/pipermail/devel/attachments/20170308/791625d6/attachment.html>


More information about the devel mailing list