vfs server: redesigned internal control flow

This patch reworks the VFS server to become easier to understand.

- The packet-stream handling is hidden from the node types.

- Introduced the notion of a "job" as an abstraction from the raw packet
  stream.

- The processing of requests is split into three phases:
  (1) accepting new jobs,
  (2) the execution of jobs, and
  (3) the delivery of acknowledgements.

- There is no longer a global fifo of pending nodes. Instead, each
  session has a fifo of "active" nodes, which are nodes that wait
  for the completion of jobs.

- The Io_node type no longer uses virtual functions as hooks for
  the derrived classes. Instead, it provides a number of utility
  functions.

- The control flow of the 'Io_progress_handler' is now the same as
  for the session-local packet-stream handler.

- Removed dependency from 'file_system/node.h', which is a relic
  from the ram_fs.

While reworking the code, the following semantic changes were made
additionally.

- The handling of SEEK_TAIL no longer overwrites the file content at the
  end of the file.

- Removed diagnostic message at session-creation time.

- Write loop for continuous files

Issue #3547
Issue #2303
This commit is contained in:
Norman Feske 2019-11-05 18:55:31 +01:00 committed by Christian Helmuth
parent f0de187bbb
commit 9b7915facb
2 changed files with 1029 additions and 690 deletions

View File

@ -2,6 +2,7 @@
* \brief VFS File_system server * \brief VFS File_system server
* \author Emery Hemingway * \author Emery Hemingway
* \author Christian Helmuth * \author Christian Helmuth
* \author Norman Feske
* \date 2015-08-16 * \date 2015-08-16
*/ */
@ -25,7 +26,7 @@
#include <util/fifo.h> #include <util/fifo.h>
#include <vfs/simple_env.h> #include <vfs/simple_env.h>
/* Local includes */ /* local includes */
#include "assert.h" #include "assert.h"
#include "node.h" #include "node.h"
@ -40,6 +41,7 @@ namespace Vfs_server {
class Root; class Root;
typedef Genode::Fifo<Session_component> Session_queue; typedef Genode::Fifo<Session_component> Session_queue;
typedef Genode::Entrypoint::Io_progress_handler Io_progress_handler;
/** /**
* Convenience utities for parsing quotas * Convenience utities for parsing quotas
@ -82,7 +84,8 @@ class Vfs_server::Session_resources
class Vfs_server::Session_component : private Session_resources, class Vfs_server::Session_component : private Session_resources,
public ::File_system::Session_rpc_object, public ::File_system::Session_rpc_object,
private Session_queue::Element private Session_queue::Element,
private Watch_node::Watch_node_response_handler
{ {
friend Session_queue; friend Session_queue;
@ -92,19 +95,21 @@ class Vfs_server::Session_component : private Session_resources,
Genode::Entrypoint &_ep; Genode::Entrypoint &_ep;
Io_progress_handler &_io_progress_handler;
Packet_stream &_stream { *tx_sink() }; Packet_stream &_stream { *tx_sink() };
/* global queue of nodes to process after an I/O signal */ /* nodes of this session with active jobs or pending acknowledgements */
Node_queue &_pending_nodes; Node_queue _active_nodes { };
/* global queue of sessions for which packets await progress */ /* global queue of sessions with active jobs */
Session_queue &_pending_sessions; Session_queue &_active_sessions;
/* collection of open nodes local to this session */ /* collection of open nodes local to this session */
Node_space _node_space { }; Node_space _node_space { };
Genode::Signal_handler<Session_component> _process_packet_handler { Genode::Signal_handler<Session_component> _packet_stream_handler {
_ep, *this, &Session_component::_process_packets }; _ep, *this, &Session_component::_handle_packet_stream };
/* /*
* The root node needs be allocated with the session struct * The root node needs be allocated with the session struct
@ -160,104 +165,220 @@ class Vfs_server::Session_component : private Session_resources,
** Packet-stream processing ** ** Packet-stream processing **
******************************/ ******************************/
/** bool _try_import_jobs_from_submit_queue()
* Attempt to process the head of the packet queue
*
* Return true if the packet can be popped from the
* queue or false if the the packet cannot be processed
* or further queued.
*/
bool _process_packet()
{ {
/* leave the packet queued so that it cannot leak */ bool overall_progress = false;
for (;;) {
bool progress_in_iteration = false;
if (!_stream.packet_avail())
break;
/* ensure that ack for one malformed packet can be returned */
if (!_stream.ready_to_ack())
break;
Packet_descriptor packet = _stream.peek_packet(); Packet_descriptor packet = _stream.peek_packet();
/* assume failure by default */ auto drop_packet_from_submit_queue = [&] ()
{
_stream.get_packet();
overall_progress = true;
progress_in_iteration = true;
};
auto consume_and_ack_invalid_packet = [&] ()
{
drop_packet_from_submit_queue();
packet.succeeded(false); packet.succeeded(false);
if ((packet.length() > packet.size())) {
/* not a valid packet */
_stream.acknowledge_packet(packet); _stream.acknowledge_packet(packet);
return true;
}
bool handle_invalid = true; overall_progress = true;
bool result = true; progress_in_iteration = true;
};
/* test for invalid packet */
if (packet.length() > packet.size()) {
consume_and_ack_invalid_packet();
continue;
}
try { try {
_apply(packet.handle(), [&] (Io_node &node) { _apply(packet.handle(), [&] (Io_node &node) {
handle_invalid = false;
result = node.process_packet(packet); if (!node.job_acceptable())
return;
Payload_ptr payload_ptr { _stream.packet_content(packet) };
switch (node.submit_job(packet, payload_ptr)) {
case Node::Submit_result::ACCEPTED:
if (!node.enqueued())
_active_nodes.enqueue(node);
drop_packet_from_submit_queue();
break;
case Node::Submit_result::DENIED:
consume_and_ack_invalid_packet();
break;
case Node::Submit_result::STALLED:
/* keep request packet in submit queue */
break;
}
}); });
} }
catch (File_system::Invalid_handle) { } catch (File_system::Invalid_handle) {
consume_and_ack_invalid_packet(); }
/* send the packet back if the handle is missing */ if (!progress_in_iteration)
if (handle_invalid) break;
_stream.acknowledge_packet(packet); }
return overall_progress;
return (handle_invalid || result);
} }
protected: void _execute_jobs()
friend Vfs_server::Root;
using Session_queue::Element::enqueued;
/**
* Called by the global Io_progress_handler as
* well as the local signal handler
*
* Return true if the packet queue was emptied
*/
bool process_packets()
{ {
/** /* nodes with jobs that cannot make progress right now */
* Process packets in batches, otherwise a client that Node_queue requeued_nodes { };
* submits packets as fast as they are processed will
* starve the signal handler. _active_nodes.dequeue_all([&] (Node &node) {
*/
int quantum = TX_QUEUE_SIZE; if (node.job_in_progress())
node.execute_job();
requeued_nodes.enqueue(node);
});
_active_nodes = requeued_nodes;
}
bool _try_acknowledge_jobs()
{
bool overall_progress = false;
for (;;) {
if (!_stream.ready_to_ack())
break;
if (_active_nodes.empty())
break;
bool progress_in_iteration = false;
_active_nodes.dequeue([&] (Node &node) {
while (_stream.packet_avail()) {
if (_process_packet()) {
/* /*
* the packet was rejected or associated with * Deliver only one acknowledgement per iteration to
* a handle, pop it from the packet queue * re-check the 'ready_to_ack' condition for each
* acknowledgement.
*/ */
_stream.get_packet(); if (node.acknowledgement_pending()) {
} else { _stream.acknowledge_packet(node.dequeue_acknowledgement());
/* no progress */ progress_in_iteration = true;
return false;
} }
if (--quantum == 0) { /*
/* come back to this later */ * If there is still another acknowledgement pending, keep
Genode::Signal_transmitter(_process_packet_handler).submit(); * the node enqueud to process it in the next iteration.
return false; * This can happen if there is a READ_READY acknowledgement
} * in addition to the acknowledgement of an operation.
*/
if (node.active())
_active_nodes.enqueue(node);
});
overall_progress |= progress_in_iteration;
if (!progress_in_iteration)
break;
} }
return true; return overall_progress;
}
public:
enum class Process_packets_result { NONE, PROGRESS, TOO_MUCH_PROGRESS };
/**
* Called by the global Io_progress_handler as well as the
* session-local packet-stream handler
*
* \return true if progress was made
*/
Process_packets_result process_packets()
{
bool overall_progress = false;
/*
* Upper bound for the number of iterations. When reached,
* cancel the handler and trigger the re-execution via a local
* signal. This gives the config handler and the RPC functions
* a chance to run in situations when the submit queue of the
* packet stream is always saturated.
*/
unsigned iterations = 200;
for (;;) {
if (--iterations == 0)
return Process_packets_result::TOO_MUCH_PROGRESS;
/* true if progress can be made in this iteration */
bool progress_in_iteration = false;
progress_in_iteration |= _try_import_jobs_from_submit_queue();
_execute_jobs();
progress_in_iteration |= _try_acknowledge_jobs();
if (!progress_in_iteration)
break;
overall_progress |= progress_in_iteration;
}
return overall_progress ? Process_packets_result::PROGRESS
: Process_packets_result::NONE;
}
bool no_longer_active() const
{
return Session_queue::Element::enqueued() && _active_nodes.empty();
}
bool no_longer_idle() const
{
return !Session_queue::Element::enqueued() && !_active_nodes.empty();
} }
private: private:
/** /**
* Called by signal handler * Signal handler called for session-local packet-stream signals
*/ */
void _process_packets() void _handle_packet_stream()
{ {
bool done = process_packets(); Process_packets_result const progress = process_packets();
if (done && enqueued()) { if (no_longer_idle())
/* this session is idle */ _active_sessions.enqueue(*this);
_pending_sessions.remove(*this);
} else if (progress == Process_packets_result::TOO_MUCH_PROGRESS)
if (!done && !enqueued()) { Genode::Signal_transmitter(_packet_stream_handler).submit();
/* this session needs unblocking */
_pending_sessions.enqueue(*this); /*
} * The activity of the session may have an unblocking effect on
* other sessions. So we call the global 'Io_progress_handler' to
* attempt the packet processing of all active sessions.
*/
if (progress == Process_packets_result::PROGRESS)
_io_progress_handler.handle_io_progress();
} }
/** /**
@ -294,6 +415,23 @@ class Vfs_server::Session_component : private Session_resources,
destroy(_alloc, &node); destroy(_alloc, &node);
} }
/**
* Watch_node::Watch_node_response_handler interface
*/
void handle_watch_node_response(Watch_node &node) override
{
if (!node.enqueued())
_active_nodes.enqueue(node);
/*
* The acknowledgement and dequeuing will be delivered by
* 'Session_component::_try_acknowledge_jobs'. Mark the session as
* active to consider it for the acknowledgement handling.
*/
if (!enqueued())
_active_sessions.enqueue(*this);
}
public: public:
/** /**
@ -305,8 +443,8 @@ class Vfs_server::Session_component : private Session_resources,
Genode::Cap_quota cap_quota, Genode::Cap_quota cap_quota,
size_t tx_buf_size, size_t tx_buf_size,
Vfs::File_system &vfs, Vfs::File_system &vfs,
Node_queue &pending_nodes, Session_queue &active_sessions,
Session_queue &pending_sessions, Io_progress_handler &io_progress_handler,
char const *root_path, char const *root_path,
bool writable) bool writable)
: :
@ -314,18 +452,14 @@ class Vfs_server::Session_component : private Session_resources,
Session_rpc_object(_packet_ds.cap(), env.rm(), env.ep().rpc_ep()), Session_rpc_object(_packet_ds.cap(), env.rm(), env.ep().rpc_ep()),
_vfs(vfs), _vfs(vfs),
_ep(env.ep()), _ep(env.ep()),
_pending_nodes(pending_nodes), _io_progress_handler(io_progress_handler),
_pending_sessions(pending_sessions), _active_sessions(active_sessions),
_root_path(root_path), _root_path(root_path),
_label(label), _label(label),
_writable(writable) _writable(writable)
{ {
/* _tx.sigh_packet_avail(_packet_stream_handler);
* Register an I/O signal handler for _tx.sigh_ready_to_ack(_packet_stream_handler);
* packet-avail and ready-to-ack signals.
*/
_tx.sigh_packet_avail(_process_packet_handler);
_tx.sigh_ready_to_ack(_process_packet_handler);
} }
/** /**
@ -338,7 +472,7 @@ class Vfs_server::Session_component : private Session_resources,
_close(node); })) { } _close(node); })) { }
if (enqueued()) if (enqueued())
_pending_sessions.remove(*this); _active_sessions.remove(*this);
} }
/** /**
@ -373,13 +507,13 @@ class Vfs_server::Session_component : private Session_resources,
if (!create && !_vfs.directory(path_str)) if (!create && !_vfs.directory(path_str))
throw Lookup_failed(); throw Lookup_failed();
Directory *dir; Directory &dir = *new (_alloc)
try { dir = new (_alloc) Directory(_node_space, _vfs, _alloc, Directory(_node_space, _vfs, _alloc, path_str, create);
_pending_nodes, _stream,
path_str, create); }
catch (Out_of_memory) { throw Out_of_ram(); }
return Dir_handle(dir->id().value); if (create)
_io_progress_handler.handle_io_progress();
return Dir_handle(dir.id().value);
} }
File_handle file(Dir_handle dir_handle, Name const &name, File_handle file(Dir_handle dir_handle, Name const &name,
@ -406,8 +540,8 @@ class Vfs_server::Session_component : private Session_resources,
char const *name_str = name.string(); char const *name_str = name.string();
_assert_valid_name(name_str); _assert_valid_name(name_str);
return Symlink_handle {dir.symlink( return Symlink_handle {
_node_space, _vfs, _alloc, name_str, dir.symlink(_node_space, _vfs, _alloc, name_str,
_writable ? READ_WRITE : READ_ONLY, create).value _writable ? READ_WRITE : READ_ONLY, create).value
}; };
}); });
@ -420,18 +554,14 @@ class Vfs_server::Session_component : private Session_resources,
_assert_valid_path(path_str); _assert_valid_path(path_str);
/* re-root the path */ /* re-root the path */
Path sub_path(path_str+1, _root_path.base()); Path const sub_path(path_str + 1, _root_path.base());
path_str = sub_path.base(); path_str = sub_path.base();
if (sub_path != "/" && !_vfs.leaf_path(path_str)) if (sub_path != "/" && !_vfs.leaf_path(path_str))
throw Lookup_failed(); throw Lookup_failed();
Node *node; Node &node = *new (_alloc) Node(_node_space, path_str);
try { node = new (_alloc) Node(_node_space, path_str, return Node_handle { node.id().value };
_pending_nodes, _stream); }
catch (Out_of_memory) { throw Out_of_ram(); }
return Node_handle { node->id().value };
} }
Watch_handle watch(::File_system::Path const &path) override Watch_handle watch(::File_system::Path const &path) override
@ -441,7 +571,7 @@ class Vfs_server::Session_component : private Session_resources,
_assert_valid_path(path_str); _assert_valid_path(path_str);
/* re-root the path */ /* re-root the path */
Path sub_path(path_str+1, _root_path.base()); Path const sub_path(path_str + 1, _root_path.base());
path_str = sub_path.base(); path_str = sub_path.base();
Vfs::Vfs_watch_handle *vfs_handle = nullptr; Vfs::Vfs_watch_handle *vfs_handle = nullptr;
@ -458,26 +588,40 @@ class Vfs_server::Session_component : private Session_resources,
throw Out_of_caps(); throw Out_of_caps();
} }
Node *node; Node &node = *new (_alloc)
try { node = new (_alloc) Watch_node(_node_space, path_str, *vfs_handle, *this);
Watch_node(_node_space, path_str, *vfs_handle,
_pending_nodes, _stream); }
catch (Out_of_memory) { throw Out_of_ram(); }
return Watch_handle { node->id().value }; return Watch_handle { node.id().value };
} }
void close(Node_handle handle) override void close(Node_handle handle) override
{ {
/* /*
* churn the packet queue so that any pending * Churn the packet queue so that any pending packets on this
* packets on this handle are processed * handle are processed.
*/ */
process_packets(); _io_progress_handler.handle_io_progress();
try { _apply_node(handle, [&] (Node &node) { /*
_close(node); }); } * Closing a written file or symlink may have triggered a watch handler.
*/
bool node_modified = false;
try {
_apply_node(handle, [&] (Node &node) {
if (node.enqueued())
_active_nodes.remove(node);
node_modified = node.modified();
_close(node);
});
}
catch (::File_system::Invalid_handle) { } catch (::File_system::Invalid_handle) { }
if (node_modified)
_io_progress_handler.handle_io_progress();
} }
Status status(Node_handle node_handle) override Status status(Node_handle node_handle) override
@ -547,14 +691,21 @@ class Vfs_server::Session_component : private Session_resources,
Path path(name_str, dir.path()); Path path(name_str, dir.path());
assert_unlink(_vfs.unlink(path.base())); assert_unlink(_vfs.unlink(path.base()));
dir.mark_as_updated();
}); });
/*
* The unlinking may have triggered a directory-watch handler,
* or a watch handler of the deleted file.
*/
_io_progress_handler.handle_io_progress();
} }
void truncate(File_handle file_handle, file_size_t size) override void truncate(File_handle file_handle, file_size_t size) override
{ {
_apply(file_handle, [&] (File &file) { _apply(file_handle, [&] (File &file) {
file.truncate(size); }); file.truncate(size); });
_io_progress_handler.handle_io_progress();
} }
void move(Dir_handle from_dir_handle, Name const &from_name, void move(Dir_handle from_dir_handle, Name const &from_name,
@ -575,18 +726,19 @@ class Vfs_server::Session_component : private Session_resources,
Path to_path( to_str, to_dir.path()); Path to_path( to_str, to_dir.path());
assert_rename(_vfs.rename(from_path.base(), to_path.base())); assert_rename(_vfs.rename(from_path.base(), to_path.base()));
});
});
from_dir.mark_as_updated(); /* the move may have triggered a directory watch handler */
to_dir.mark_as_updated(); _io_progress_handler.handle_io_progress();
});
});
} }
void control(Node_handle, Control) override { } void control(Node_handle, Control) override { }
}; };
class Vfs_server::Root : public Genode::Root_component<Session_component> class Vfs_server::Root : public Genode::Root_component<Session_component>,
private Genode::Entrypoint::Io_progress_handler
{ {
private: private:
@ -604,6 +756,9 @@ class Vfs_server::Root : public Genode::Root_component<Session_component>
} }
} }
Genode::Signal_handler<Root> _reactivate_handler {
_env.ep(), *this, &Root::handle_io_progress };
Genode::Signal_handler<Root> _config_handler { Genode::Signal_handler<Root> _config_handler {
_env.ep(), *this, &Root::_config_update }; _env.ep(), *this, &Root::_config_update };
@ -620,73 +775,65 @@ class Vfs_server::Root : public Genode::Root_component<Session_component>
Genode::Heap _vfs_heap { &_env.ram(), &_env.rm() }; Genode::Heap _vfs_heap { &_env.ram(), &_env.rm() };
Vfs::Simple_env _vfs_env { _env, _vfs_heap, vfs_config() }; Vfs::Simple_env _vfs_env { _env, _vfs_heap, vfs_config() };
/** /* sessions with active jobs */
* Object for post-I/O-signal processing Session_queue _active_sessions { };
*
* This allows packet and VFS backend signals to
* be dispatched quickly followed by a processing
* of sessions that might be unblocked.
*/
struct Io_progress_handler : Genode::Entrypoint::Io_progress_handler
{
/* All nodes with a packet operation awaiting an I/O signal */
Node_queue pending_nodes { };
/* All sessions with packet queues that await processing */
Session_queue pending_sessions { };
/** /**
* Post-signal hook invoked by entrypoint * Entrypoint::Io_progress_handler interface
*/ */
void handle_io_progress() override void handle_io_progress() override
{ {
bool handle_progress = false; bool yield = false;
/* process handles awaiting progress */ unsigned iterations = 200;
{
/* nodes to process later */
Node_queue retry { };
/* empty the pending nodes and process */ for (;;) {
pending_nodes.dequeue_all([&] (Node &node) {
if (node.process_io()) { /* limit maximum number of iterations */
handle_progress = true; if (--iterations == 0) {
} else { yield = true;
if (!node.enqueued()) { break;
retry.enqueue(node);
} }
bool progress = false;
Session_queue still_active_sessions { };
_active_sessions.dequeue_all([&] (Session_component &session) {
typedef Session_component::Process_packets_result Result;
switch (session.process_packets()) {
case Result::PROGRESS:
progress = true;
break;
case Result::TOO_MUCH_PROGRESS:
yield = true;
break;
case Result::NONE:
break;
} }
if (!session.no_longer_active())
still_active_sessions.enqueue(session);
}); });
/* requeue the unprocessed nodes in order */ _active_sessions = still_active_sessions;
retry.dequeue_all([&] (Node &node) {
pending_nodes.enqueue(node); }); if (!progress)
break;
} }
/* /*
* if any pending handles were processed then * Submit a local signal to re-schedule another execution of
* process session packet queues awaiting progress * 'handle_io_progress' if the loop was exited via 'yield'.
*/ */
if (handle_progress) { if (yield)
/* sessions to process later */ Genode::Signal_transmitter(_reactivate_handler).submit();
Session_queue retry { };
/* empty the pending nodes and process */
pending_sessions.dequeue_all([&] (Session_component &session) {
if (!session.process_packets()) {
/* requeue the session if there are packets remaining */
if (!session.enqueued()) {
retry.enqueue(session);
} }
}
});
/* requeue the unprocessed sessions in order */
retry.dequeue_all([&] (Session_component &session) {
pending_sessions.enqueue(session); });
}
}
} _progress_handler { };
protected: protected:
@ -774,8 +921,7 @@ class Vfs_server::Root : public Genode::Root_component<Session_component>
Genode::Ram_quota{ram_quota}, Genode::Ram_quota{ram_quota},
Genode::Cap_quota{cap_quota}, Genode::Cap_quota{cap_quota},
tx_buf_size, _vfs_env.root_dir(), tx_buf_size, _vfs_env.root_dir(),
_progress_handler.pending_nodes, _active_sessions, *this,
_progress_handler.pending_sessions,
session_root.base(), writeable); session_root.base(), writeable);
auto ram_used = _env.pd().used_ram().value - initial_ram_usage; auto ram_used = _env.pd().used_ram().value - initial_ram_usage;
@ -792,7 +938,6 @@ class Vfs_server::Root : public Genode::Root_component<Session_component>
", '", label, "'"); ", '", label, "'");
} }
Genode::log("session opened for '", label, "' at '", session_root, "'");
return session; return session;
} }
@ -820,7 +965,7 @@ class Vfs_server::Root : public Genode::Root_component<Session_component>
Root_component<Session_component>(&env.ep().rpc_ep(), &md_alloc), Root_component<Session_component>(&env.ep().rpc_ep(), &md_alloc),
_env(env) _env(env)
{ {
_env.ep().register_io_progress_handler(_progress_handler); _env.ep().register_io_progress_handler(*this);
_config_rom.sigh(_config_handler); _config_rom.sigh(_config_handler);
env.parent().announce(env.ep().manage(*this)); env.parent().announce(env.ep().manage(*this));
} }

File diff suppressed because it is too large Load Diff