<div dir="ltr">Hello Rob,<div> <br></div><div class="gmail_extra"><div class="gmail_quote"><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
I'm excited to see someone using the MPIX_Grequest interface.  We used<br>
the MPIX_Grequest interface to implement non-blocking collective I/O,<br>
and had some bad interactions between libc's aio and the grequest<br>
callbacks.  I don't know if you are running into something similar.<br></blockquote><div><br></div><div>Maybe. Do you have a description of the problem somewhere?</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
Do you have any desire or plans to submit these changes into upstream<br>
ROMIO?  </blockquote><div><br></div><div>The idea would be to push these changes to upstream ROMIO if this is relevant for the community. </div><div>  </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
> The BeeGFS ROMIO driver uses the cache API internally provided by<br>
> BeeGFS to move data around, while the custom ROMIO implementation<br>
> provides support for any other file system (in the "common" layer).<br>
><br>
> Description of the problem:<br>
> I am currently having problems making the BeeGFS driver work<br>
> properly. More specifically I am noticing that when the size of the<br>
> shared file is large (32GB in my configuration) and I am writing<br>
> multiple files one after the other (using a custom coll_perf<br>
> benchmark) the i-th file (for i > 0 with 0 <= i < 4) gets stuck.<br>
><br>
> BeeGFS provides a deeper_cache_flush_range() function to flush data<br>
> in the cache to the global file. This is non blocking and will submit<br>
> a range (offset, length) for a certain filename to a cache deamon<br>
> that transfers the data. Multiple ranges can be submitted one after<br>
> the other.?<br>
><br>
> Completion can be checked using deeper_cache_flush_wait() using the<br>
> filename as input. Since the MPI_Grequest model requires the external<br>
> thread to make progress by invoking MPI_Grequest_complete() I have<br>
> opted for the MPIX_Grequest interface which allows me to make<br>
> progress (MPI_Grequest_complete) inside MPI while invoking the<br>
> MPI_Wait() function.<br>
<br>
The original intent of the MPIX_Grequest extensions is that something<br>
else (in this case the AIO library) was making progress in the<br>
background and all we needed was a way to check if things were done or<br>
not.  I suspect the callbacks could do some work, but blocking<br>
callbacks haven't seen a lot of testing (in 10+ years the extensions<br>
themselves haven't seen a lot of activity, either)<br></blockquote><div><br></div><div>In principle here I have the same intent. The difference is that I cannot check on progress since</div><div>BeeGFS does not provide a way for checking the status of a single request. Instead it only</div><div>offers a blocking wait interface for all the requests submitted for a certain file (identified</div><div>by the filename). Thus I need to invoke deeper_cache_flush_wait() from inside one of the</div><div>callbacks.</div><div><br></div><div>If this is a problem I can revert to MPI_Grequest but I need to spawn a thread that launches </div><div>deeper_cache_flush_range() for every new request and then waits in </div><div>deeper_cache_flush_wait() for all submitted requests to complete before calling </div><div>MPI_Grequest_complete() this should work but is adding an unnecessary thread.</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
><br>
> deeper_cache_flush_range() does not require the application to keep<br>
> any handler for the submitted request. The only thing the application<br>
> needs is the name of the file to pass to deeper_cache_flush_wait().<br>
><br>
> Follows a code snippet for the corresponding non blocking<br>
> implementation in ROMIO:<br>
><br>
> /* arguments for callback function */<br>
> struct callback {<br>
> ? ? ADIO_File fd_;<br>
> ? ? ADIOI_Sync_req_t req_;<br>
> };<br>
><br>
> /*<br>
> ?* ADIOI_BEEGFS_Sync_thread_start - start synchronisation of req<br>
> ?*/<br>
> int ADIOI_BEEGFS_Sync_thread_start<wbr>(ADIOI_Sync_thread_t t) {<br>
> ? ? ADIOI_Atomic_queue_t q = t->sub_;<br>
> ? ? ADIOI_Sync_req_t r;<br>
> ? ? int retval, count, fflags, error_code, i;<br>
> ? ? ADIO_Offset offset, len;<br>
> ? ? MPI_Count datatype_size;<br>
> ? ? MPI_Datatype datatype;<br>
> ? ? ADIO_Request *req;<br>
> ? ? char myname[] = "ADIOI_BEEGFS_SYNC_THREAD_STAR<wbr>T";<br>
><br>
> ? ? r = ADIOI_Atomic_queue_front(q);<br>
> ? ? ADIOI_Atomic_queue_pop(q);<br>
><br>
> ? ? ADIOI_Sync_req_get_key(r, ADIOI_SYNC_ALL, &offset,<br>
>        ? ?&datatype, &count, &req, &error_code, &fflags);<br>
><br>
> ? ? MPI_Type_size_x(datatype, &datatype_size);<br>
> ? ? len = (ADIO_Offset)datatype_size * (ADIO_Offset)count;<br>
><br>
> ? ? retval = deeper_cache_flush_range(t->fd<wbr>_->filename,<br>
> (off_t)offset, (size_t)len, fflags);<br>
><br>
> ? ? if (retval == DEEPER_RETVAL_SUCCESS && ADIOI_BEEGFS_greq_class ==<br>
> 1) {<br>
>       MPIX_Grequest_class_create(AD<wbr>IOI_BEEGFS_Sync_req_query,<br>
>               ?? ? ? ? ? ? ? ? ? ADIOI_BEEGFS_Sync_req_free,<br>
>               ?? ? ? ? ? ? ? ? ? MPIU_Greq_cancel_fn,<br>
>                       ?? ? ? ? ? ADIOI_BEEGFS_Sync_req_poll,<br>
>                       ?? ? ? ? ? ADIOI_BEEGFS_Sync_req_wait,<br>
>                               ?? &ADIOI_BEEGFS_greq_class);<br>
> ? ? } else {<br>
>       /* --BEGIN ERROR HANDLING-- */<br>
>       return MPIO_Err_create_code(MPI_SUCCE<wbr>SS,<br>
>               ?? ? ? ? ? ? ? ? ? ?MPIR_ERR_RECOVERABLE,<br>
>               ?? ? ? ? ? ? ? ? ? ?"ADIOI_BEEGFS_Cache_sync_req"<wbr>,<br>
>                       ?? ? ? ? ? ?__LINE__, MPI_ERR_IO, "**io %s",<br>
>                       ?? ? ? ? ? ?strerror(errno));<br>
> ? ? ? ? /* --END ERROR HANDLING-- */<br>
> ? ? }<br>
><br>
> ? ? /* init args for the callback functions */<br>
> ? ? struct callback *args = (struct callback<br>
> *)ADIOI_Malloc(sizeof(struct callback));<br>
> ? ? args->fd_ = t->fd_;<br>
> ? ? args->req_ = r;<br>
><br>
> ? ? MPIX_Grequest_class_allocate(A<wbr>DIOI_BEEGFS_greq_class, args, req);<br>
><br>
> ? ? return MPI_SUCCESS;<br>
> }<br>
><br>
> /*<br>
> ?* ADIOI_BEEGFS_Sync_req_poll -<br>
> ?*/<br>
> int ADIOI_BEEGFS_Sync_req_poll(voi<wbr>d *extra_state, MPI_Status *status)<br>
> {<br>
> ? ? struct callback *cb = (struct callback *)extra_state;<br>
> ? ? ADIOI_Sync_req_t r = (ADIOI_Sync_req_t)cb->req_;<br>
> ? ? ADIO_File fd = (ADIO_File)cb->fd_;<br>
> ? ? char *filename = fd->filename;<br>
> ? ? int count, cache_flush_flags, error_code;<br>
> ? ? MPI_Datatype datatype;<br>
> ? ? ADIO_Offset offset;<br>
> ? ? MPI_Aint lb, extent;<br>
> ? ? ADIO_Offset len;<br>
> ? ? ADIO_Request *req;<br>
><br>
> ? ? ADIOI_Sync_req_get_key(r, ADIOI_SYNC_ALL, &offset,<br>
>        ? ?&datatype, &count, &req, &error_code, &cache_flush_flags);<br>
><br>
> ? ? int retval = deeper_cache_flush_wait(filena<wbr>me,<br>
> cache_flush_flags);<br>
><br>
> ? ? MPI_Type_get_extent(datatype, &lb, &extent);<br>
> ? ? len = (ADIO_Offset)extent * (ADIO_Offset)count;<br>
><br>
> ? ? if (fd->hints->e10_cache_coherent == ADIOI_HINT_ENABLE)<br>
>       ADIOI_UNLOCK(fd, offset, SEEK_SET, len);<br>
><br>
> ? ? /* mark generilized request as completed */<br>
> ? ? MPI_Grequest_complete(*req);<br>
><br>
> ? ? if (retval != DEEPER_RETVAL_SUCCESS)<br>
> ? ? ? ? goto fn_exit_error;<br>
><br>
> ? ? MPI_Status_set_cancelled(statu<wbr>s, 0);<br>
> ? ? MPI_Status_set_elements(status<wbr>, datatype, count);<br>
> ? ? status->MPI_SOURCE = MPI_UNDEFINED;<br>
> ? ? status->MPI_TAG = MPI_UNDEFINED;<br>
><br>
> ? ? ADIOI_Free(cb);<br>
><br>
> ? ? return MPI_SUCCESS;<br>
><br>
> fn_exit_error:<br>
> ? ? ADIOI_Free(cb);<br>
><br>
> ? ? return MPIO_Err_create_code(MPI_SUCCE<wbr>SS,<br>
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? MPIR_ERR_RECOVERABLE,<br>
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "ADIOI_BEEGFS_Sync_req_poll",<br>
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? __LINE__, MPI_ERR_IO, "**io %s",<br>
> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? strerror(errno));<br>
> }<br>
><br>
> /*<br>
> ?* ADIOI_BEEGFS_Sync_req_wait -<br>
> ?*/<br>
> int ADIOI_BEEGFS_Sync_req_wait(int count, void **array_of_states,<br>
> double timeout, MPI_Status *status) {<br>
> ? ? return ADIOI_BEEGFS_Sync_req_poll(*ar<wbr>ray_of_states, status);<br>
> }<br>
><br>
> /*<br>
> ?* ADIOI_BEEGFS_Sync_req_query -<br>
> ?*/<br>
> int ADIOI_BEEGFS_Sync_req_query(vo<wbr>id *extra_state, MPI_Status<br>
> *status) {<br>
> ? ? return MPI_SUCCESS;<br>
> }<br>
><br>
> /*<br>
> ?* ADIOI_BEEGFS_Sync_req_free -<br>
> ?*/<br>
> int ADIOI_BEEGFS_Sync_req_free(voi<wbr>d *extra_state) {<br>
> ? ? return MPI_SUCCESS;<br>
> }<br>
><br>
> /*<br>
> ?* ADIOI_BEEGFS_Sync_req_cancel -<br>
> ?*/<br>
> int ADIOI_BEEGFS_Sync_req_cancel(v<wbr>oid *extra_state, int complete) {<br>
> ? ? return MPI_SUCCESS;<br>
> } ?<br>
><br>
> I have had a look at the implementation for MPI_File_iwrite() in<br>
> ad_iwrite.c but this uses POSIX AIO, thus I am not sure I am doing<br>
> things properly here.<br>
<br>
the poll_fn() should be non-blocking.  think MPI_Test more than unix<br>
poll/select/epoll.  it's going to get called in a tight loop.<br></blockquote><div><br></div><div>So calling a blocking function inside the poll callback is not advised. I am currently </div><div>referring to "Extending the MPI-2 Generalised Request Interface" paper and some</div><div>iwrite example in ROMIO for my implementation. Is there a more detailed document</div><div>for MPIX_Grequest?</div></div><br clear="all"><div>Thank you,</div><div><br></div>-- <br><div class="m_4455921819236832215m_-4784891663962875185gmail_signature" data-smartmail="gmail_signature"><div dir="ltr">Giuseppe Congiu <strong>·</strong> Research Engineer II<br>
Seagate Technology, LLC<br>
office: <a href="tel:+44%2023%209249%206082" value="+442392496082" target="_blank">+44 (0)23 9249 6082</a> <strong>·</strong> mobile: <br>
<a href="http://www.seagate.com" target="_blank">www.seagate.com</a><br></div></div>
</div></div>