Add write completion loop to VFS server and libc
Loop at the VFS server and libc VFS plugin on partial writes. This implies that the VFS server will not process successive packets for an open node until a write packet has been processed completely. The libc will now supspend and reissue write requests to the VFS until the write has been submitted completely. Ref #2303 Fix #2971
This commit is contained in:
parent
d33bef2e49
commit
cff389758e
|
@ -479,9 +479,9 @@ class Vfs::Lxip_data_file final : public Vfs::Lxip_file
|
|||
return (_sock.ops->poll(&f, &_sock, nullptr) & (POLLIN_SET));
|
||||
}
|
||||
|
||||
Lxip::ssize_t write(Lxip_vfs_file_handle &,
|
||||
Lxip::ssize_t write(Lxip_vfs_file_handle &handle,
|
||||
char const *src, Genode::size_t len,
|
||||
file_size /* ignored */) override
|
||||
file_size out_len) override
|
||||
{
|
||||
using namespace Linux;
|
||||
|
||||
|
@ -494,7 +494,13 @@ class Vfs::Lxip_data_file final : public Vfs::Lxip_file
|
|||
|
||||
Lxip::ssize_t res = _sock.ops->sendmsg(&_sock, &msg, len);
|
||||
|
||||
if (res < 0) _write_err = res;
|
||||
if (res < 0) {
|
||||
_write_err = res;
|
||||
} else {
|
||||
out_len = res;
|
||||
if (size_t(res) < len)
|
||||
handle.io_enqueue(_io_progress_waiters);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -1888,7 +1894,7 @@ class Vfs::Lxip_file_system : public Vfs::File_system,
|
|||
static_cast<Vfs::Lxip_vfs_handle*>(vfs_handle);
|
||||
|
||||
try { return handle->write(src, count, out_count); }
|
||||
catch (File::Would_block) { return WRITE_ERR_WOULD_BLOCK; }
|
||||
catch (File::Would_block) { return WRITE_OK; }
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -513,7 +513,7 @@ ssize_t Libc::Vfs_plugin::write(Libc::File_descriptor *fd, const void *buf,
|
|||
|
||||
Vfs::Vfs_handle *handle;
|
||||
void const *buf;
|
||||
::size_t count;
|
||||
::size_t const count;
|
||||
Vfs::file_size &out_count;
|
||||
Result &out_result;
|
||||
|
||||
|
@ -526,14 +526,16 @@ ssize_t Libc::Vfs_plugin::write(Libc::File_descriptor *fd, const void *buf,
|
|||
|
||||
bool suspend() override
|
||||
{
|
||||
Vfs::file_size out = 0;
|
||||
try {
|
||||
out_result = VFS_THREAD_SAFE(handle->fs().write(handle, (char const *)buf,
|
||||
count, out_count));
|
||||
retry = false;
|
||||
out_result = VFS_THREAD_SAFE(handle->fs().write(handle, (char const *)buf+out_count,
|
||||
count - out_count, out));
|
||||
} catch (Vfs::File_io_service::Insufficient_buffer) {
|
||||
retry = true;
|
||||
out_result = Result::WRITE_OK;
|
||||
}
|
||||
|
||||
out_count += out;
|
||||
retry = (out_result == Result::WRITE_OK) && (out_count < count);
|
||||
return retry;
|
||||
}
|
||||
} check(handle, buf, count, out_count, out_result);
|
||||
|
|
|
@ -105,7 +105,7 @@ extern "C" {
|
|||
struct pbuf *p, err_t err);
|
||||
static err_t tcp_delayed_recv_callback(void *arg, struct tcp_pcb *tpcb,
|
||||
struct pbuf *p, err_t err);
|
||||
/* static err_t tcp_sent_callback(void *arg, struct tcp_pcb *tpcb, u16_t len); */
|
||||
static err_t tcp_sent_callback(void *arg, struct tcp_pcb *tpcb, u16_t len);
|
||||
static void tcp_err_callback(void *arg, err_t err);
|
||||
}
|
||||
|
||||
|
@ -423,9 +423,15 @@ Lwip::Read_result Lwip::Lwip_file_handle::read(char *dst, file_size count,
|
|||
Lwip::Write_result Lwip::Lwip_file_handle::write(char const *src, file_size count,
|
||||
file_size &out_count)
|
||||
{
|
||||
return (socket)
|
||||
? socket->write(*this, src, count, out_count)
|
||||
: Write_result::WRITE_ERR_INVALID;
|
||||
if (!socket) return Write_result::WRITE_ERR_INVALID;
|
||||
|
||||
Lwip::Write_result res = socket->write(*this, src, count, out_count);
|
||||
|
||||
/* if the write is partial, notify the handle when the write is ACKed */
|
||||
if (res == Write_result::WRITE_OK && out_count < count)
|
||||
socket->io_progress_queue.enqueue(_io_progress_waiter);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
bool Lwip::Lwip_file_handle::notify_read_ready()
|
||||
|
@ -833,6 +839,7 @@ class Lwip::Udp_socket_dir final :
|
|||
}
|
||||
result = Read_result::READ_OK;
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
case Lwip_file_handle::PEEK:
|
||||
|
@ -1059,10 +1066,7 @@ class Lwip::Tcp_socket_dir final :
|
|||
tcp_arg(_pcb, this);
|
||||
|
||||
tcp_recv(_pcb, tcp_recv_callback);
|
||||
|
||||
/* Disabled, do not track acknowledgements */
|
||||
/* tcp_sent(_pcb, tcp_sent_callback); */
|
||||
|
||||
tcp_sent(_pcb, tcp_sent_callback);
|
||||
tcp_err(_pcb, tcp_err_callback);
|
||||
}
|
||||
|
||||
|
@ -1361,7 +1365,7 @@ class Lwip::Tcp_socket_dir final :
|
|||
switch(handle.kind) {
|
||||
case Lwip_file_handle::DATA:
|
||||
if (state == READY) {
|
||||
Write_result res = Write_result::WRITE_ERR_WOULD_BLOCK;
|
||||
Write_result res = Write_result::WRITE_OK;
|
||||
file_size out = 0;
|
||||
/*
|
||||
* write in a loop to account for LwIP chunking
|
||||
|
@ -1381,8 +1385,6 @@ class Lwip::Tcp_socket_dir final :
|
|||
count -= n;
|
||||
src += n;
|
||||
out += n;
|
||||
/* pending_ack += n; */
|
||||
res = Write_result::WRITE_OK;
|
||||
}
|
||||
|
||||
/* send queued data */
|
||||
|
@ -1555,12 +1557,8 @@ err_t tcp_delayed_recv_callback(void *arg, struct tcp_pcb *pcb, struct pbuf *buf
|
|||
};
|
||||
|
||||
|
||||
/**
|
||||
* This would be the ACK callback, we could defer sync completion
|
||||
* until then, but performance is expected to be unacceptable.
|
||||
*
|
||||
static
|
||||
err_t tcp_sent_callback(void *arg, struct tcp_pcb*, u16_t len)
|
||||
err_t tcp_sent_callback(void *arg, struct tcp_pcb *pcb, u16_t)
|
||||
{
|
||||
if (!arg) {
|
||||
tcp_abort(pcb);
|
||||
|
@ -1568,12 +1566,11 @@ err_t tcp_sent_callback(void *arg, struct tcp_pcb*, u16_t len)
|
|||
}
|
||||
|
||||
Lwip::Tcp_socket_dir *socket_dir = static_cast<Lwip::Tcp_socket_dir *>(arg);
|
||||
socket_dir->pending_ack -= len;
|
||||
|
||||
/* unblock partial writers */
|
||||
socket_dir->process_io();
|
||||
socket_dir->process_write_ready();
|
||||
return ERR_OK;
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
static
|
||||
|
@ -1838,25 +1835,14 @@ class Lwip::File_system final : public Vfs::File_system, public Lwip::Directory
|
|||
char const *src, file_size count,
|
||||
file_size &out_count) override
|
||||
{
|
||||
Write_result res = Write_result::WRITE_ERR_INVALID;
|
||||
out_count = 0;
|
||||
|
||||
if ((vfs_handle->status_flags() & OPEN_MODE_ACCMODE) == OPEN_MODE_RDONLY)
|
||||
return Write_result::WRITE_ERR_INVALID;
|
||||
if (Lwip_handle *handle = dynamic_cast<Lwip_handle*>(vfs_handle)) {
|
||||
while (true) {
|
||||
res = handle->write(src, count, out_count);
|
||||
if (res != WRITE_ERR_WOULD_BLOCK || out_count) break;
|
||||
if (Lwip_handle *handle = dynamic_cast<Lwip_handle*>(vfs_handle))
|
||||
return handle->write(src, count, out_count);
|
||||
|
||||
/*
|
||||
* XXX: block for signals until the write completes
|
||||
* or fails, this is not how it should be done, but
|
||||
* it's how lxip does it
|
||||
*/
|
||||
_ep.wait_and_dispatch_one_io_signal();
|
||||
}
|
||||
}
|
||||
return res;
|
||||
return Write_result::WRITE_ERR_INVALID;
|
||||
}
|
||||
|
||||
Read_result complete_read(Vfs_handle *vfs_handle,
|
||||
|
|
|
@ -251,6 +251,8 @@ class Vfs_server::Io_node : public Vfs_server::Node,
|
|||
if (result == Write_result::WRITE_OK) {
|
||||
mark_as_updated();
|
||||
_packet.succeeded(true);
|
||||
} else {
|
||||
_packet.succeeded(false);
|
||||
}
|
||||
}
|
||||
catch (Vfs::File_io_service::Insufficient_buffer)
|
||||
|
@ -609,6 +611,8 @@ class Vfs_server::File : public Io_node
|
|||
|
||||
char const *_leaf_path = nullptr; /* offset pointer to Node::_path */
|
||||
|
||||
file_size _write_offset { 0 };
|
||||
|
||||
inline
|
||||
seek_off_t seek_tail(file_size count)
|
||||
{
|
||||
|
@ -648,9 +652,15 @@ class Vfs_server::File : public Io_node
|
|||
if (seek_offset == (seek_off_t)SEEK_TAIL)
|
||||
seek_offset = seek_tail(count);
|
||||
|
||||
bool result = _vfs_write(_stream.packet_content(_packet),
|
||||
count, seek_offset, out_count);
|
||||
bool result = _vfs_write(_stream.packet_content(_packet)+_write_offset,
|
||||
count-_write_offset, seek_offset, out_count);
|
||||
if (result) {
|
||||
/* loop until the write completes or produces an error */
|
||||
if (_packet.succeeded() && ((out_count+_write_offset) < count)) {
|
||||
_write_offset += out_count;
|
||||
return false;
|
||||
}
|
||||
_write_offset = 0;
|
||||
_ack_packet(out_count);
|
||||
if (out_count > 0) {
|
||||
mark_as_updated();
|
||||
|
|
Loading…
Reference in New Issue