libc_fs: Make plugin thread safe

Issue #1090
This commit is contained in:
Sebastian Sumpf 2014-03-10 12:06:10 +01:00 committed by Norman Feske
parent e7f3e99aab
commit 45b4d6b187
1 changed files with 67 additions and 117 deletions

View File

@ -86,19 +86,18 @@ class Plugin_context : public Libc::Plugin_context,
public:
bool in_flight;
Plugin_context(File_system::File_handle handle)
: _type(TYPE_FILE), _node_handle(handle), _fd_flags(0),
_status_flags(0), _seek_offset(~0), in_flight(false) { }
_status_flags(0), _seek_offset(~0) { }
Plugin_context(File_system::Dir_handle handle)
: _type(TYPE_DIR), _node_handle(handle), _fd_flags(0),
_status_flags(0), _seek_offset(0), in_flight(false) { }
_status_flags(0), _seek_offset(0){ }
Plugin_context(File_system::Symlink_handle handle)
: _type(TYPE_SYMLINK), _node_handle(handle), _fd_flags(0),
_status_flags(0), _seek_offset(~0), in_flight(false) { }
_status_flags(0), _seek_offset(~0) { }
File_system::Node_handle node_handle() const { return _node_handle; }
@ -154,31 +153,12 @@ static inline Plugin_context *context(Libc::File_descriptor *fd)
return fd->context ? static_cast<Plugin_context *>(fd->context) : 0;
}
static void wait_for_acknowledgement(File_system::Session::Tx::Source &source)
static size_t const session_max_packet_size(File_system::Session::Tx::Source &source)
{
::File_system::Packet_descriptor packet = source.get_acked_packet();
if (verbose)
PDBG("got acknowledgement for packet of size %zd", packet.size());
using Policy = File_system::Session::Tx_policy;
static_cast<Plugin_context *>(packet.ref())->in_flight = false;
source.release_packet(packet);
}
/**
* Collect pending packet acknowledgements, freeing the space occupied
* by the packet in the bulk buffer
*
* This function should be called prior enqueing new packets into the
* packet stream to free up space in the bulk buffer.
*/
static void collect_acknowledgements(File_system::Session::Tx::Source &source)
{
while (source.ack_avail())
wait_for_acknowledgement(source);
return source.bulk_buffer_size() - (sizeof(Policy::Ack_queue)
+ sizeof(Policy::Submit_queue));
}
@ -217,6 +197,8 @@ class Plugin : public Libc::Plugin
{
private:
Genode::Lock _rw_lock;
::off_t _file_size(Libc::File_descriptor *fd)
{
struct stat stat_buf;
@ -287,13 +269,6 @@ class Plugin : public Libc::Plugin
int close(Libc::File_descriptor *fd)
{
/* wait for the completion of all operations of the context */
while (context(fd)->in_flight) {
if (verbose)
PDBG("wait_for_acknowledgement");
wait_for_acknowledgement(*file_system()->tx());
}
file_system()->close(context(fd)->node_handle());
Genode::destroy(Genode::env()->heap(), context(fd));
@ -563,78 +538,58 @@ class Plugin : public Libc::Plugin
ssize_t read(Libc::File_descriptor *fd, void *buf, ::size_t count)
{
try {
File_system::Session::Tx::Source &source = *file_system()->tx();
Genode::Lock::Guard guard(_rw_lock);
size_t const max_packet_size = source.bulk_buffer_size() / 2;
File_system::Session::Tx::Source &source = *file_system()->tx();
size_t remaining_count = count;
size_t const max_packet_size = session_max_packet_size(source);
if (context(fd)->seek_offset() == ~0)
context(fd)->seek_offset(0);
size_t remaining_count = count;
while (remaining_count) {
if (context(fd)->seek_offset() == ~0)
context(fd)->seek_offset(0);
collect_acknowledgements(source);
while (remaining_count) {
size_t curr_packet_size = Genode::min(remaining_count, max_packet_size);
size_t curr_packet_size = Genode::min(remaining_count, max_packet_size);
File_system::Packet_descriptor packet(
source.alloc_packet(curr_packet_size),
static_cast<File_system::Packet_ref *>(context(fd)),
context(fd)->node_handle(),
File_system::Packet_descriptor::READ,
curr_packet_size,
context(fd)->seek_offset());
off_t const seek_offset = context(fd)->seek_offset();
File_system::Packet_descriptor
packet(source.alloc_packet(curr_packet_size),
static_cast<File_system::Packet_ref *>(context(fd)),
context(fd)->node_handle(),
File_system::Packet_descriptor::READ,
curr_packet_size,
context(fd)->seek_offset());
/* pass packet to server side */
source.submit_packet(packet);
packet = source.get_acked_packet();
/* mark context as having an operation in flight */
context(fd)->in_flight = true;
/*
* XXX check if acked packet belongs to request,
* needed for thread safety
*/
/* pass packet to server side */
source.submit_packet(packet);
size_t read_num_bytes = Genode::min(packet.length(), curr_packet_size);
do {
packet = source.get_acked_packet();
static_cast<Plugin_context *>(packet.ref())->in_flight = false;
/* copy-out payload into destination buffer */
memcpy(buf, source.packet_content(packet), read_num_bytes);
if (packet.operation() == File_system::Packet_descriptor::WRITE)
source.release_packet(packet);
source.release_packet(packet);
} while (context(fd)->in_flight);
/* prepare next iteration */
context(fd)->advance_seek_offset(read_num_bytes);
buf = (void *)((Genode::addr_t)buf + read_num_bytes);
remaining_count -= read_num_bytes;
/*
* XXX check if acked packet belongs to request,
* needed for thread safety
*/
size_t read_num_bytes = Genode::min(packet.length(), curr_packet_size);
/* copy-out payload into destination buffer */
memcpy(buf, source.packet_content(packet), read_num_bytes);
source.release_packet(packet);
/* prepare next iteration */
context(fd)->advance_seek_offset(read_num_bytes);
buf = (void *)((Genode::addr_t)buf + read_num_bytes);
remaining_count -= read_num_bytes;
/*
* If we received less bytes than requested, we reached the end
* of the file.
*/
if (read_num_bytes < curr_packet_size)
break;
}
return count - remaining_count;
} catch (File_system::Session::Tx::Source::Packet_alloc_failed) {
PERR("Packet_alloc_failed during read (count=%zd)", count);
return -1;
/*
* If we received less bytes than requested, we reached the end
* of the file.
*/
if (read_num_bytes < curr_packet_size)
break;
}
return count - remaining_count;
}
ssize_t readlink(const char *path, char *buf, size_t bufsiz)
@ -777,47 +732,42 @@ class Plugin : public Libc::Plugin
ssize_t write(Libc::File_descriptor *fd, const void *buf, ::size_t count)
{
Genode::Lock::Guard guard(_rw_lock);
File_system::Session::Tx::Source &source = *file_system()->tx();
size_t const max_packet_size = source.bulk_buffer_size() / 2;
size_t const max_packet_size = session_max_packet_size(source);
size_t remaining_count = count;
while (remaining_count) {
collect_acknowledgements(source);
size_t curr_packet_size = Genode::min(remaining_count, max_packet_size);
try {
File_system::Packet_descriptor
packet(source.alloc_packet(curr_packet_size),
static_cast<File_system::Packet_ref *>(context(fd)),
context(fd)->node_handle(),
File_system::Packet_descriptor::WRITE,
curr_packet_size,
context(fd)->seek_offset());
File_system::Packet_descriptor
packet(source.alloc_packet(curr_packet_size),
static_cast<File_system::Packet_ref *>(context(fd)),
context(fd)->node_handle(),
File_system::Packet_descriptor::WRITE,
curr_packet_size,
context(fd)->seek_offset());
/* mark context as having an operation in flight */
context(fd)->in_flight = true;
/* copy-in payload into packet */
memcpy(source.packet_content(packet), buf, curr_packet_size);
/* copy-in payload into packet */
memcpy(source.packet_content(packet), buf, curr_packet_size);
/* pass packet to server side */
source.submit_packet(packet);
packet = source.get_acked_packet();
/* pass packet to server side */
source.submit_packet(packet);
/* prepare next iteration */
context(fd)->advance_seek_offset(curr_packet_size);
buf = (void *)((Genode::addr_t)buf + curr_packet_size);
remaining_count -= curr_packet_size;
source.release_packet(packet);
/* prepare next iteration */
context(fd)->advance_seek_offset(curr_packet_size);
buf = (void *)((Genode::addr_t)buf + curr_packet_size);
remaining_count -= curr_packet_size;
} catch (File_system::Session::Tx::Source::Packet_alloc_failed) {
do {
wait_for_acknowledgement(source);
} while (context(fd)->in_flight);
}
}
if (verbose)
PDBG("write returns %zd", count);
return count;