From 9b7915facbd2062c9dfbb2a191e77f102c20db88 Mon Sep 17 00:00:00 2001 From: Norman Feske Date: Tue, 5 Nov 2019 18:55:31 +0100 Subject: [PATCH] vfs server: redesigned internal control flow This patch reworks the VFS server to become easier to understand. - The packet-stream handling is hidden from the node types. - Introduced the notion of a "job" as an abstraction from the raw packet stream. - The processing of requests is split into three phases: (1) accepting new jobs, (2) the execution of jobs, and (3) the delivery of acknowledgements. - There is no longer a global fifo of pending nodes. Instead, each session has a fifo of "active" nodes, which are nodes that wait for the completion of jobs. - The Io_node type no longer uses virtual functions as hooks for the derrived classes. Instead, it provides a number of utility functions. - The control flow of the 'Io_progress_handler' is now the same as for the session-local packet-stream handler. - Removed dependency from 'file_system/node.h', which is a relic from the ram_fs. While reworking the code, the following semantic changes were made additionally. - The handling of SEEK_TAIL no longer overwrites the file content at the end of the file. - Removed diagnostic message at session-creation time. - Write loop for continuous files Issue #3547 Issue #2303 --- repos/os/src/server/vfs/main.cc | 541 ++++++++------ repos/os/src/server/vfs/node.h | 1178 ++++++++++++++++++------------- 2 files changed, 1029 insertions(+), 690 deletions(-) diff --git a/repos/os/src/server/vfs/main.cc b/repos/os/src/server/vfs/main.cc index 6b4534921..0f9ee4304 100644 --- a/repos/os/src/server/vfs/main.cc +++ b/repos/os/src/server/vfs/main.cc @@ -2,6 +2,7 @@ * \brief VFS File_system server * \author Emery Hemingway * \author Christian Helmuth + * \author Norman Feske * \date 2015-08-16 */ @@ -25,7 +26,7 @@ #include #include -/* Local includes */ +/* local includes */ #include "assert.h" #include "node.h" @@ -39,7 +40,8 @@ namespace Vfs_server { class Vfs_env; class Root; - typedef Genode::Fifo Session_queue; + typedef Genode::Fifo Session_queue; + typedef Genode::Entrypoint::Io_progress_handler Io_progress_handler; /** * Convenience utities for parsing quotas @@ -82,7 +84,8 @@ class Vfs_server::Session_resources class Vfs_server::Session_component : private Session_resources, public ::File_system::Session_rpc_object, - private Session_queue::Element + private Session_queue::Element, + private Watch_node::Watch_node_response_handler { friend Session_queue; @@ -92,19 +95,21 @@ class Vfs_server::Session_component : private Session_resources, Genode::Entrypoint &_ep; + Io_progress_handler &_io_progress_handler; + Packet_stream &_stream { *tx_sink() }; - /* global queue of nodes to process after an I/O signal */ - Node_queue &_pending_nodes; + /* nodes of this session with active jobs or pending acknowledgements */ + Node_queue _active_nodes { }; - /* global queue of sessions for which packets await progress */ - Session_queue &_pending_sessions; + /* global queue of sessions with active jobs */ + Session_queue &_active_sessions; /* collection of open nodes local to this session */ Node_space _node_space { }; - Genode::Signal_handler _process_packet_handler { - _ep, *this, &Session_component::_process_packets }; + Genode::Signal_handler _packet_stream_handler { + _ep, *this, &Session_component::_handle_packet_stream }; /* * The root node needs be allocated with the session struct @@ -160,104 +165,220 @@ class Vfs_server::Session_component : private Session_resources, ** Packet-stream processing ** ******************************/ - /** - * Attempt to process the head of the packet queue - * - * Return true if the packet can be popped from the - * queue or false if the the packet cannot be processed - * or further queued. - */ - bool _process_packet() + bool _try_import_jobs_from_submit_queue() { - /* leave the packet queued so that it cannot leak */ - Packet_descriptor packet = _stream.peek_packet(); + bool overall_progress = false; - /* assume failure by default */ - packet.succeeded(false); + for (;;) { - if ((packet.length() > packet.size())) { - /* not a valid packet */ - _stream.acknowledge_packet(packet); - return true; + bool progress_in_iteration = false; + + if (!_stream.packet_avail()) + break; + + /* ensure that ack for one malformed packet can be returned */ + if (!_stream.ready_to_ack()) + break; + + Packet_descriptor packet = _stream.peek_packet(); + + auto drop_packet_from_submit_queue = [&] () + { + _stream.get_packet(); + + overall_progress = true; + progress_in_iteration = true; + }; + + auto consume_and_ack_invalid_packet = [&] () + { + drop_packet_from_submit_queue(); + packet.succeeded(false); + _stream.acknowledge_packet(packet); + + overall_progress = true; + progress_in_iteration = true; + }; + + /* test for invalid packet */ + if (packet.length() > packet.size()) { + consume_and_ack_invalid_packet(); + continue; + } + + try { + _apply(packet.handle(), [&] (Io_node &node) { + + if (!node.job_acceptable()) + return; + + Payload_ptr payload_ptr { _stream.packet_content(packet) }; + + switch (node.submit_job(packet, payload_ptr)) { + + case Node::Submit_result::ACCEPTED: + if (!node.enqueued()) + _active_nodes.enqueue(node); + drop_packet_from_submit_queue(); + break; + + case Node::Submit_result::DENIED: + consume_and_ack_invalid_packet(); + break; + + case Node::Submit_result::STALLED: + /* keep request packet in submit queue */ + break; + } + }); + } + catch (File_system::Invalid_handle) { + consume_and_ack_invalid_packet(); } + + if (!progress_in_iteration) + break; } - - bool handle_invalid = true; - bool result = true; - - try { - _apply(packet.handle(), [&] (Io_node &node) { - handle_invalid = false; - result = node.process_packet(packet); - }); - } - catch (File_system::Invalid_handle) { } - - /* send the packet back if the handle is missing */ - if (handle_invalid) - _stream.acknowledge_packet(packet); - - return (handle_invalid || result); + return overall_progress; } - protected: - - friend Vfs_server::Root; - using Session_queue::Element::enqueued; - - /** - * Called by the global Io_progress_handler as - * well as the local signal handler - * - * Return true if the packet queue was emptied - */ - bool process_packets() + void _execute_jobs() { - /** - * Process packets in batches, otherwise a client that - * submits packets as fast as they are processed will - * starve the signal handler. - */ - int quantum = TX_QUEUE_SIZE; + /* nodes with jobs that cannot make progress right now */ + Node_queue requeued_nodes { }; + + _active_nodes.dequeue_all([&] (Node &node) { + + if (node.job_in_progress()) + node.execute_job(); + + requeued_nodes.enqueue(node); + }); + + _active_nodes = requeued_nodes; + } + + bool _try_acknowledge_jobs() + { + bool overall_progress = false; + + for (;;) { + if (!_stream.ready_to_ack()) + break; + + if (_active_nodes.empty()) + break; + + bool progress_in_iteration = false; + + _active_nodes.dequeue([&] (Node &node) { - while (_stream.packet_avail()) { - if (_process_packet()) { /* - * the packet was rejected or associated with - * a handle, pop it from the packet queue + * Deliver only one acknowledgement per iteration to + * re-check the 'ready_to_ack' condition for each + * acknowledgement. */ - _stream.get_packet(); - } else { - /* no progress */ - return false; - } + if (node.acknowledgement_pending()) { + _stream.acknowledge_packet(node.dequeue_acknowledgement()); + progress_in_iteration = true; + } - if (--quantum == 0) { - /* come back to this later */ - Genode::Signal_transmitter(_process_packet_handler).submit(); - return false; - } + /* + * If there is still another acknowledgement pending, keep + * the node enqueud to process it in the next iteration. + * This can happen if there is a READ_READY acknowledgement + * in addition to the acknowledgement of an operation. + */ + if (node.active()) + _active_nodes.enqueue(node); + }); + + overall_progress |= progress_in_iteration; + + if (!progress_in_iteration) + break; } - return true; + return overall_progress; + } + + public: + + enum class Process_packets_result { NONE, PROGRESS, TOO_MUCH_PROGRESS }; + + /** + * Called by the global Io_progress_handler as well as the + * session-local packet-stream handler + * + * \return true if progress was made + */ + Process_packets_result process_packets() + { + bool overall_progress = false; + + /* + * Upper bound for the number of iterations. When reached, + * cancel the handler and trigger the re-execution via a local + * signal. This gives the config handler and the RPC functions + * a chance to run in situations when the submit queue of the + * packet stream is always saturated. + */ + unsigned iterations = 200; + + for (;;) { + + if (--iterations == 0) + return Process_packets_result::TOO_MUCH_PROGRESS; + + /* true if progress can be made in this iteration */ + bool progress_in_iteration = false; + + progress_in_iteration |= _try_import_jobs_from_submit_queue(); + + _execute_jobs(); + + progress_in_iteration |= _try_acknowledge_jobs(); + + if (!progress_in_iteration) + break; + + overall_progress |= progress_in_iteration; + } + return overall_progress ? Process_packets_result::PROGRESS + : Process_packets_result::NONE; + } + + bool no_longer_active() const + { + return Session_queue::Element::enqueued() && _active_nodes.empty(); + } + + bool no_longer_idle() const + { + return !Session_queue::Element::enqueued() && !_active_nodes.empty(); } private: /** - * Called by signal handler + * Signal handler called for session-local packet-stream signals */ - void _process_packets() + void _handle_packet_stream() { - bool done = process_packets(); + Process_packets_result const progress = process_packets(); - if (done && enqueued()) { - /* this session is idle */ - _pending_sessions.remove(*this); - } else - if (!done && !enqueued()) { - /* this session needs unblocking */ - _pending_sessions.enqueue(*this); - } + if (no_longer_idle()) + _active_sessions.enqueue(*this); + + if (progress == Process_packets_result::TOO_MUCH_PROGRESS) + Genode::Signal_transmitter(_packet_stream_handler).submit(); + + /* + * The activity of the session may have an unblocking effect on + * other sessions. So we call the global 'Io_progress_handler' to + * attempt the packet processing of all active sessions. + */ + if (progress == Process_packets_result::PROGRESS) + _io_progress_handler.handle_io_progress(); } /** @@ -294,38 +415,51 @@ class Vfs_server::Session_component : private Session_resources, destroy(_alloc, &node); } + /** + * Watch_node::Watch_node_response_handler interface + */ + void handle_watch_node_response(Watch_node &node) override + { + if (!node.enqueued()) + _active_nodes.enqueue(node); + + /* + * The acknowledgement and dequeuing will be delivered by + * 'Session_component::_try_acknowledge_jobs'. Mark the session as + * active to consider it for the acknowledgement handling. + */ + if (!enqueued()) + _active_sessions.enqueue(*this); + } + public: /** * Constructor */ - Session_component(Genode::Env &env, - char const *label, - Genode::Ram_quota ram_quota, - Genode::Cap_quota cap_quota, - size_t tx_buf_size, - Vfs::File_system &vfs, - Node_queue &pending_nodes, - Session_queue &pending_sessions, - char const *root_path, - bool writable) + Session_component(Genode::Env &env, + char const *label, + Genode::Ram_quota ram_quota, + Genode::Cap_quota cap_quota, + size_t tx_buf_size, + Vfs::File_system &vfs, + Session_queue &active_sessions, + Io_progress_handler &io_progress_handler, + char const *root_path, + bool writable) : Session_resources(env.pd(), env.rm(), ram_quota, cap_quota, tx_buf_size), Session_rpc_object(_packet_ds.cap(), env.rm(), env.ep().rpc_ep()), _vfs(vfs), _ep(env.ep()), - _pending_nodes(pending_nodes), - _pending_sessions(pending_sessions), + _io_progress_handler(io_progress_handler), + _active_sessions(active_sessions), _root_path(root_path), _label(label), _writable(writable) { - /* - * Register an I/O signal handler for - * packet-avail and ready-to-ack signals. - */ - _tx.sigh_packet_avail(_process_packet_handler); - _tx.sigh_ready_to_ack(_process_packet_handler); + _tx.sigh_packet_avail(_packet_stream_handler); + _tx.sigh_ready_to_ack(_packet_stream_handler); } /** @@ -338,7 +472,7 @@ class Vfs_server::Session_component : private Session_resources, _close(node); })) { } if (enqueued()) - _pending_sessions.remove(*this); + _active_sessions.remove(*this); } /** @@ -373,13 +507,13 @@ class Vfs_server::Session_component : private Session_resources, if (!create && !_vfs.directory(path_str)) throw Lookup_failed(); - Directory *dir; - try { dir = new (_alloc) Directory(_node_space, _vfs, _alloc, - _pending_nodes, _stream, - path_str, create); } - catch (Out_of_memory) { throw Out_of_ram(); } + Directory &dir = *new (_alloc) + Directory(_node_space, _vfs, _alloc, path_str, create); - return Dir_handle(dir->id().value); + if (create) + _io_progress_handler.handle_io_progress(); + + return Dir_handle(dir.id().value); } File_handle file(Dir_handle dir_handle, Name const &name, @@ -406,9 +540,9 @@ class Vfs_server::Session_component : private Session_resources, char const *name_str = name.string(); _assert_valid_name(name_str); - return Symlink_handle {dir.symlink( - _node_space, _vfs, _alloc, name_str, - _writable ? READ_WRITE : READ_ONLY, create).value + return Symlink_handle { + dir.symlink(_node_space, _vfs, _alloc, name_str, + _writable ? READ_WRITE : READ_ONLY, create).value }; }); } @@ -420,18 +554,14 @@ class Vfs_server::Session_component : private Session_resources, _assert_valid_path(path_str); /* re-root the path */ - Path sub_path(path_str+1, _root_path.base()); + Path const sub_path(path_str + 1, _root_path.base()); path_str = sub_path.base(); if (sub_path != "/" && !_vfs.leaf_path(path_str)) throw Lookup_failed(); - Node *node; + Node &node = *new (_alloc) Node(_node_space, path_str); - try { node = new (_alloc) Node(_node_space, path_str, - _pending_nodes, _stream); } - catch (Out_of_memory) { throw Out_of_ram(); } - - return Node_handle { node->id().value }; + return Node_handle { node.id().value }; } Watch_handle watch(::File_system::Path const &path) override @@ -441,7 +571,7 @@ class Vfs_server::Session_component : private Session_resources, _assert_valid_path(path_str); /* re-root the path */ - Path sub_path(path_str+1, _root_path.base()); + Path const sub_path(path_str + 1, _root_path.base()); path_str = sub_path.base(); Vfs::Vfs_watch_handle *vfs_handle = nullptr; @@ -458,26 +588,40 @@ class Vfs_server::Session_component : private Session_resources, throw Out_of_caps(); } - Node *node; - try { node = new (_alloc) - Watch_node(_node_space, path_str, *vfs_handle, - _pending_nodes, _stream); } - catch (Out_of_memory) { throw Out_of_ram(); } + Node &node = *new (_alloc) + Watch_node(_node_space, path_str, *vfs_handle, *this); - return Watch_handle { node->id().value }; + return Watch_handle { node.id().value }; } void close(Node_handle handle) override { /* - * churn the packet queue so that any pending - * packets on this handle are processed + * Churn the packet queue so that any pending packets on this + * handle are processed. */ - process_packets(); + _io_progress_handler.handle_io_progress(); - try { _apply_node(handle, [&] (Node &node) { - _close(node); }); } + /* + * Closing a written file or symlink may have triggered a watch handler. + */ + bool node_modified = false; + + try { + _apply_node(handle, [&] (Node &node) { + + if (node.enqueued()) + _active_nodes.remove(node); + + node_modified = node.modified(); + + _close(node); + }); + } catch (::File_system::Invalid_handle) { } + + if (node_modified) + _io_progress_handler.handle_io_progress(); } Status status(Node_handle node_handle) override @@ -547,14 +691,21 @@ class Vfs_server::Session_component : private Session_resources, Path path(name_str, dir.path()); assert_unlink(_vfs.unlink(path.base())); - dir.mark_as_updated(); }); + + /* + * The unlinking may have triggered a directory-watch handler, + * or a watch handler of the deleted file. + */ + _io_progress_handler.handle_io_progress(); } void truncate(File_handle file_handle, file_size_t size) override { _apply(file_handle, [&] (File &file) { file.truncate(size); }); + + _io_progress_handler.handle_io_progress(); } void move(Dir_handle from_dir_handle, Name const &from_name, @@ -575,18 +726,19 @@ class Vfs_server::Session_component : private Session_resources, Path to_path( to_str, to_dir.path()); assert_rename(_vfs.rename(from_path.base(), to_path.base())); - - from_dir.mark_as_updated(); - to_dir.mark_as_updated(); }); }); + + /* the move may have triggered a directory watch handler */ + _io_progress_handler.handle_io_progress(); } void control(Node_handle, Control) override { } }; -class Vfs_server::Root : public Genode::Root_component +class Vfs_server::Root : public Genode::Root_component, + private Genode::Entrypoint::Io_progress_handler { private: @@ -604,6 +756,9 @@ class Vfs_server::Root : public Genode::Root_component } } + Genode::Signal_handler _reactivate_handler { + _env.ep(), *this, &Root::handle_io_progress }; + Genode::Signal_handler _config_handler { _env.ep(), *this, &Root::_config_update }; @@ -620,73 +775,65 @@ class Vfs_server::Root : public Genode::Root_component Genode::Heap _vfs_heap { &_env.ram(), &_env.rm() }; Vfs::Simple_env _vfs_env { _env, _vfs_heap, vfs_config() }; + /* sessions with active jobs */ + Session_queue _active_sessions { }; + /** - * Object for post-I/O-signal processing - * - * This allows packet and VFS backend signals to - * be dispatched quickly followed by a processing - * of sessions that might be unblocked. + * Entrypoint::Io_progress_handler interface */ - struct Io_progress_handler : Genode::Entrypoint::Io_progress_handler + void handle_io_progress() override { - /* All nodes with a packet operation awaiting an I/O signal */ - Node_queue pending_nodes { }; + bool yield = false; - /* All sessions with packet queues that await processing */ - Session_queue pending_sessions { }; + unsigned iterations = 200; - /** - * Post-signal hook invoked by entrypoint - */ - void handle_io_progress() override - { - bool handle_progress = false; + for (;;) { - /* process handles awaiting progress */ - { - /* nodes to process later */ - Node_queue retry { }; - - /* empty the pending nodes and process */ - pending_nodes.dequeue_all([&] (Node &node) { - if (node.process_io()) { - handle_progress = true; - } else { - if (!node.enqueued()) { - retry.enqueue(node); - } - } - }); - - /* requeue the unprocessed nodes in order */ - retry.dequeue_all([&] (Node &node) { - pending_nodes.enqueue(node); }); + /* limit maximum number of iterations */ + if (--iterations == 0) { + yield = true; + break; } - /* - * if any pending handles were processed then - * process session packet queues awaiting progress - */ - if (handle_progress) { - /* sessions to process later */ - Session_queue retry { }; + bool progress = false; - /* empty the pending nodes and process */ - pending_sessions.dequeue_all([&] (Session_component &session) { - if (!session.process_packets()) { - /* requeue the session if there are packets remaining */ - if (!session.enqueued()) { - retry.enqueue(session); - } - } - }); + Session_queue still_active_sessions { }; - /* requeue the unprocessed sessions in order */ - retry.dequeue_all([&] (Session_component &session) { - pending_sessions.enqueue(session); }); - } + _active_sessions.dequeue_all([&] (Session_component &session) { + + typedef Session_component::Process_packets_result Result; + + switch (session.process_packets()) { + + case Result::PROGRESS: + progress = true; + break; + + case Result::TOO_MUCH_PROGRESS: + yield = true; + break; + + case Result::NONE: + break; + } + + if (!session.no_longer_active()) + still_active_sessions.enqueue(session); + }); + + _active_sessions = still_active_sessions; + + if (!progress) + break; } - } _progress_handler { }; + + /* + * Submit a local signal to re-schedule another execution of + * 'handle_io_progress' if the loop was exited via 'yield'. + */ + if (yield) + Genode::Signal_transmitter(_reactivate_handler).submit(); + } protected: @@ -774,8 +921,7 @@ class Vfs_server::Root : public Genode::Root_component Genode::Ram_quota{ram_quota}, Genode::Cap_quota{cap_quota}, tx_buf_size, _vfs_env.root_dir(), - _progress_handler.pending_nodes, - _progress_handler.pending_sessions, + _active_sessions, *this, session_root.base(), writeable); auto ram_used = _env.pd().used_ram().value - initial_ram_usage; @@ -792,7 +938,6 @@ class Vfs_server::Root : public Genode::Root_component ", '", label, "'"); } - Genode::log("session opened for '", label, "' at '", session_root, "'"); return session; } @@ -820,7 +965,7 @@ class Vfs_server::Root : public Genode::Root_component Root_component(&env.ep().rpc_ep(), &md_alloc), _env(env) { - _env.ep().register_io_progress_handler(_progress_handler); + _env.ep().register_io_progress_handler(*this); _config_rom.sigh(_config_handler); env.parent().announce(env.ep().manage(*this)); } diff --git a/repos/os/src/server/vfs/node.h b/repos/os/src/server/vfs/node.h index 2e4eb3c88..ba64b8d92 100644 --- a/repos/os/src/server/vfs/node.h +++ b/repos/os/src/server/vfs/node.h @@ -2,6 +2,7 @@ * \brief Internal nodes of VFS server * \author Emery Hemingway * \author Christian Helmuth + * \author Norman Feske * \date 2016-03-29 */ @@ -16,7 +17,6 @@ #define _VFS__NODE_H_ /* Genode includes */ -#include #include #include #include @@ -52,6 +52,8 @@ namespace Vfs_server { typedef Genode::Allocator::Out_of_memory Out_of_memory; + struct Payload_ptr { char *ptr; }; + /** * Type trait for determining the node type for a given handle type */ @@ -75,16 +77,14 @@ namespace Vfs_server { /* * Note that the file objects are created at the * VFS in the local node constructors, this is to - * ensure that in the case of file creating that the + * ensure that in the case of file creation, the * Out_of_ram exception is thrown before the VFS is * modified. */ } -class Vfs_server::Node : public ::File_system::Node_base, - private Node_space::Element, - private Node_queue::Element +class Vfs_server::Node : Node_space::Element, Node_queue::Element { private: @@ -98,56 +98,128 @@ class Vfs_server::Node : public ::File_system::Node_base, protected: - /* - * Global queue of nodes that await - * some response from the VFS libray + /** + * Packet descriptor to be added to the acknowledgement queue * - * A global collection is perhaps dangerous - * but ensures fairness across sessions + * The '_acked_packet' is reset by 'submit_job' and assigned + * to a valid descriptor by 'try_execute_job'. The validity of the + * packet descriptor is tracked by '_acked_packet_valid'. */ - Node_queue &_response_queue; + Packet_descriptor _acked_packet { }; - /* stream used for reply packets */ - Packet_stream &_stream; + bool _submit_accepted = false; + + bool _acked_packet_valid = false; + + bool _packet_in_progress = false; + + enum class Read_ready_state { DONT_CARE, REQUESTED, READY }; + + Read_ready_state _read_ready_state { Read_ready_state::DONT_CARE }; public: friend Node_queue; using Node_queue::Element::enqueued; - Node(Node_space &space, - char const *node_path, - Node_queue &response_queue, - Packet_stream &stream) - : Node_space::Element(*this, space), - _path(node_path), - _response_queue(response_queue), - _stream(stream) + Node(Node_space &space, char const *node_path) + : + Node_space::Element(*this, space), _path(node_path) { } - virtual ~Node() - { - if (enqueued()) - _response_queue.remove(*this); - } + virtual ~Node() { } using Node_space::Element::id; char const *path() const { return _path.base(); } + enum class Submit_result { DENIED, ACCEPTED, STALLED }; + + bool job_in_progress() const { return _packet_in_progress; } + /** - * Process pending activity, called by post-signal hook + * Return true if node is ready to accept 'submit_job' * - * Default implementation is to return true so that the - * node is removed from the pending handle queue. + * Each node can deal with only one job at a time, except for file + * nodes, which accept a job in addition to an already submitted + * READ_READY request (which leaves '_packet_in_progress' untouched). */ - virtual bool process_io() { return true; } + bool job_acceptable() const { return !job_in_progress(); } + + /** + * Submit job to node + * + * When called, the node is expected to be idle (neither queued in + * the active-nodes queue nor the finished-nodes queue). + */ + virtual Submit_result submit_job(Packet_descriptor, Payload_ptr) + { + return Submit_result::DENIED; + } + + /** + * Execute submitted job + * + * This function must not be called if 'job_in_progress()' is false. + */ + virtual void execute_job() + { + Genode::warning("Node::execute_job unexpectedly called"); + } + + /** + * Return true if the node has at least one acknowledgement ready + */ + bool acknowledgement_pending() const + { + return (_read_ready_state == Read_ready_state::READY) + || _acked_packet_valid; + } + + bool active() const + { + return acknowledgement_pending() + || job_in_progress() + || (_read_ready_state == Read_ready_state::REQUESTED); + } + + /** + * Return and consume one pending acknowledgement + */ + Packet_descriptor dequeue_acknowledgement() + { + if (_read_ready_state == Read_ready_state::READY) { + _read_ready_state = Read_ready_state::DONT_CARE; + + Packet_descriptor ack_read_ready(Packet_descriptor(), + Node_handle { id().value }, + Packet_descriptor::READ_READY, + 0, 0); + ack_read_ready.succeeded(true); + return ack_read_ready; + } + + if (_acked_packet_valid) { + _acked_packet_valid = false; + return _acked_packet; + } + + Genode::warning("dequeue_acknowledgement called with no pending ack"); + return Packet_descriptor(); + } + + /** + * Return true if node was written to + */ + virtual bool modified() const { return false; } /** * Print for debugging */ - void print(Genode::Output &out) const { - out.out_string(_path.base()); } + void print(Genode::Output &out) const + { + Genode::print(out, _path.string(), " (id=", id(), ")"); + } }; @@ -155,11 +227,8 @@ class Vfs_server::Node : public ::File_system::Node_base, * Super-class for nodes that process read/write packets */ class Vfs_server::Io_node : public Vfs_server::Node, - public Vfs::Io_response_handler{ - public: - - enum Op_state { IDLE, READ_QUEUED, SYNC_QUEUED }; - + public Vfs::Io_response_handler +{ private: /* @@ -170,166 +239,231 @@ class Vfs_server::Io_node : public Vfs_server::Node, Mode const _mode; - bool _packet_queued = false; - bool _packet_op_pending = false; - protected: + Payload_ptr _payload_ptr { }; + + bool _modified = false; + Vfs::Vfs_handle &_handle; + void _import_job(Packet_descriptor packet, Payload_ptr payload_ptr) + { + /* + * Accept a READ_READY request without occupying '_packet'. + * This way, another request can follow a READ_READY request + * without blocking on the completion of READ_READY. + */ + if (packet.operation() == Packet_descriptor::READ_READY) { + _read_ready_state = Read_ready_state::REQUESTED; + return; + } + + if (job_in_progress()) + Genode::error("job unexpectedly submitted to busy node"); + + _packet = packet; + _payload_ptr = payload_ptr; + _acked_packet_valid = false; + _acked_packet = Packet_descriptor { }; + } + + void _acknowledge_as_success(size_t count) + { + /* + * Keep '_packet' and '_payload_ptr' intact to allow for the + * conversion of directory entries in 'Directory::execute_job'. + */ + + _packet_in_progress = false; + _acked_packet_valid = true; + _acked_packet = _packet; + + _acked_packet.length(count); + _acked_packet.succeeded(true); + } + + void _acknowledge_as_failure() + { + _packet = Packet_descriptor(); + _payload_ptr = Payload_ptr { nullptr }; + _packet_in_progress = false; + _acked_packet_valid = true; + _acked_packet = _packet; + + _acked_packet.succeeded(false); + } + /** - * Packets that have been removed from the - * packet stream are transfered here + * Current job of this node, assigned by 'submit_job' */ Packet_descriptor _packet { }; - /** - * Abstract read implementation - * - * Returns true if the pending packet - * shall be returned to client - */ - bool _vfs_read(char *dst, file_size count, - file_offset seek_offset, file_size &out_count) + protected: + + Submit_result _submit_read_at(file_offset seek_offset) { - if (!(_mode & READ_ONLY)) return true; + if (!(_mode & READ_ONLY)) + return Submit_result::DENIED; _handle.seek(seek_offset); - if (!_packet_op_pending) { - /* if the read cannot be queued with the VFS then stop here */ - if (!_handle.fs().queue_read(&_handle, count)) { - return false; - } - _packet_op_pending = true; + bool const queuing_succeeded = + _handle.fs().queue_read(&_handle, _packet.length()); + + if (queuing_succeeded) + _packet_in_progress = true; + + return queuing_succeeded ? Submit_result::ACCEPTED + : Submit_result::STALLED; + } + + Submit_result _submit_write_at(file_offset seek_offset) + { + if (!(_mode & WRITE_ONLY)) + return Submit_result::DENIED; + + _handle.seek(seek_offset); + + _packet_in_progress = true; + return Submit_result::ACCEPTED; + } + + Submit_result _submit_sync() + { + bool const queuing_succeeded = _handle.fs().queue_sync(&_handle); + + if (queuing_succeeded) + _packet_in_progress = true; + + return queuing_succeeded ? Submit_result::ACCEPTED + : Submit_result::STALLED; + } + + Submit_result _submit_read_ready() + { + _read_ready_state = Read_ready_state::REQUESTED; + + if (_handle.fs().read_ready(&_handle)) { + /* if the handle is ready, send a packet back immediately */ + read_ready_response(); + } else { + /* register to send READ_READY acknowledgement later */ + _handle.fs().notify_read_ready(&_handle); } + return Submit_result::ACCEPTED; + } - Read_result result = _handle.fs().complete_read( - &_handle, dst, count, out_count); + Submit_result _submit_content_changed() + { + Genode::warning("client unexpectedly submitted CONTENT_CHANGED packet"); + return Submit_result::DENIED; + } - switch (result) { + Submit_result _submit_write_timestamp() + { + if (!(_mode & WRITE_ONLY)) + return Submit_result::DENIED; + + _packet_in_progress = true; + return Submit_result::ACCEPTED; + } + + void _execute_read() + { + file_size out_count = 0; + + switch (_handle.fs().complete_read(&_handle, _payload_ptr.ptr, + _packet.length(), out_count)) { case Read_result::READ_OK: - _packet.succeeded(true); + _acknowledge_as_success(out_count); break; case Read_result::READ_ERR_IO: case Read_result::READ_ERR_INVALID: - _packet.length(out_count); + _acknowledge_as_failure(); break; case Read_result::READ_ERR_WOULD_BLOCK: case Read_result::READ_ERR_AGAIN: case Read_result::READ_ERR_INTERRUPT: case Read_result::READ_QUEUED: - /* packet is still pending */ - return false; + break; } - - /* packet is processed */ - _packet_op_pending = false; - return true; } /** - * Abstract write implementation + * Try to execute write operation at the VFS * - * Returns true if the pending packet - * shall be returned to client + * \return number of consumed bytes */ - bool _vfs_write(char const *src, file_size count, - file_offset seek_offset, file_size &out_count) + file_size _execute_write(char const *src_ptr, size_t length) { - if (!(_mode & WRITE_ONLY)) - return true; - - _handle.seek(seek_offset); - + file_size out_count = 0; try { - Write_result result = _handle.fs().write( - &_handle, src, count, out_count); + switch (_handle.fs().write(&_handle, src_ptr, length, out_count)) { + case Write_result::WRITE_ERR_AGAIN: + case Write_result::WRITE_ERR_WOULD_BLOCK: + break; - if (result == Write_result::WRITE_OK) { - mark_as_updated(); - _packet.succeeded(true); + case Write_result::WRITE_ERR_INVALID: + case Write_result::WRITE_ERR_IO: + case Write_result::WRITE_ERR_INTERRUPT: + _acknowledge_as_failure(); + break; + + case Write_result::WRITE_OK: + break; } } - catch (Vfs::File_io_service::Insufficient_buffer) - { - /* packet is pending */ - return false; - } + catch (Vfs::File_io_service::Insufficient_buffer) { /* re-execute */ } - /* packet is processed */ - return true; + _modified = true; - /* No further error handling! */ + return out_count; } - inline - void _drop_packet() + void _execute_sync() { - _packet = Packet_descriptor(); - _packet_queued = false; - } + switch (_handle.fs().complete_sync(&_handle)) { - inline - void _ack_packet(size_t count) - { - _packet.length(count); - _stream.acknowledge_packet(_packet); - _packet = Packet_descriptor(); - _packet_queued = false; - } - - /** - * Abstract sync implementation - */ - bool _sync() - { - if (!_packet_op_pending) { - /* if the sync cannot be queued with the VFS then stop here */ - if (!_handle.fs().queue_sync(&_handle)) { - return false; - } - _packet_op_pending = true; - } - - Sync_result result = _handle.fs().complete_sync(&_handle); - - switch (result) { case Sync_result::SYNC_OK: - _packet.succeeded(true); + _acknowledge_as_success(0); break; case Sync_result::SYNC_ERR_INVALID: + _acknowledge_as_failure(); break; case Sync_result::SYNC_QUEUED: - /* packet still pending */ - return false; + break; } - - /* packet processed */ - _ack_packet(0); - _packet_op_pending = false; - return true; } - /** - * Virtual methods for specialized node-type I/O - */ - virtual bool _read() = 0; - virtual bool _write() = 0; + void _execute_write_timestamp() + { + try { + _packet.with_timestamp([&] (File_system::Timestamp const time) { + Vfs::Timestamp ts { .value = time.value }; + _handle.fs().update_modification_timestamp(&_handle, ts); + }); + _acknowledge_as_success(0); + } + catch (Vfs::File_io_service::Insufficient_buffer) { } + + _modified = true; + } public: - Io_node(Node_space &space, char const *node_path, Mode node_mode, - Node_queue &response_queue, Packet_stream &stream, + Io_node(Node_space &space, + char const *path, + Mode mode, Vfs_handle &handle) - : Node(space, node_path, response_queue, stream), - _mode(node_mode), _handle(handle) + : + Node(space, path), _mode(mode), _handle(handle) { - _handle.handler(this); + _handle.handler(this); // XXX remove? } virtual ~Io_node() @@ -340,87 +474,18 @@ class Vfs_server::Io_node : public Vfs_server::Node, using Node_space::Element::id; - /** - * Process the packet that is queued at this handle - * - * Return true if the node was processed and is now idle. - */ - bool process_io() override - { - if (!_packet_queued) return true; - if (!_stream.ready_to_ack()) - return false; - - bool result = true; - - switch (_packet.operation()) { - case Packet_descriptor::READ: result = _read(); break; - case Packet_descriptor::WRITE: result = _write(); break; - case Packet_descriptor::SYNC: result = _sync(); break; - - case Packet_descriptor::READ_READY: - /* - * the read-ready pending state is managed - * by the VFS, this packet can be discarded - */ - _drop_packet(); - - if (_handle.fs().read_ready(&_handle)) { - /* if the handle is ready, send a packet back immediately */ - read_ready_response(); - } else { - /* register to send READ_READY later */ - _handle.fs().notify_read_ready(&_handle); - } - - break; - - case Packet_descriptor::CONTENT_CHANGED: - /* discard this packet */ - _drop_packet(); - break; - - case Packet_descriptor::WRITE_TIMESTAMP: - try { - _packet.with_timestamp([&] (File_system::Timestamp const time) { - Vfs::Timestamp ts { .value = time.value }; - _handle.fs().update_modification_timestamp(&_handle, ts); - }); - _packet.succeeded(true); - _ack_packet(0); - } catch (Vfs::File_io_service::Insufficient_buffer) { - /* packet is pending */ - result = false; - } - break; - } - - return result; - } - - /** - * Process a packet by queuing it locally or sending - * an immediate response. Return false if no progress - * can be made. - * - * Called by packet stream signal handler - */ - bool process_packet(Packet_descriptor const &packet) - { - /* attempt to clear any pending packet */ - if (!process_io()) - return false; - - /* otherwise store the packet locally and process */ - _packet = packet; - _packet_queued = true; - process_io(); - return true; - } - Mode mode() const { return _mode; } + /******************************** + ** Vfs_server::Node interface ** + ********************************/ + + void execute_job() override { } + + bool modified() const override { return _modified; } + + /**************************************** ** Vfs::Io_response_handler interface ** ****************************************/ @@ -430,40 +495,27 @@ class Vfs_server::Io_node : public Vfs_server::Node, */ void read_ready_response() override { - if (!_stream.ready_to_ack()) { - /* log a message to catch loops */ - Genode::warning("deferring READ_READY response"); - _handle.fs().notify_read_ready(&_handle); - return; - } - - /* Send packet immediately, though this could be queued */ - Packet_descriptor packet(Packet_descriptor(), - Node_handle { id().value }, - Packet_descriptor::READ_READY, - 0, 0); - packet.succeeded(true); - _stream.acknowledge_packet(packet); + if (_read_ready_state == Read_ready_state::REQUESTED) + _read_ready_state = Read_ready_state::READY; } /** * Called by the VFS plugin of this handle */ - void io_progress_response() override - { - /* - * do not process packet immediately, - * queue to maintain ordering (priorities?) - */ - if (!enqueued()) - _response_queue.enqueue(*this); - } + void io_progress_response() override { } }; class Vfs_server::Watch_node final : public Vfs_server::Node, public Vfs::Watch_response_handler { + public: + + struct Watch_node_response_handler : Genode::Interface + { + virtual void handle_watch_node_response(Watch_node &) = 0; + }; + private: /* @@ -474,20 +526,26 @@ class Vfs_server::Watch_node final : public Vfs_server::Node, Vfs::Vfs_watch_handle &_watch_handle; + Watch_node_response_handler &_watch_node_response_handler; + public: - Watch_node(Node_space &space, char const *path, - Vfs::Vfs_watch_handle &handle, - Node_queue &response_queue, - Packet_stream &stream) - : Node(space, path, response_queue, stream), - _watch_handle(handle) + Watch_node(Node_space &space, + char const *path, + Vfs::Vfs_watch_handle &handle, + Watch_node_response_handler &watch_node_response_handler) + : + Node(space, path), + _watch_handle(handle), + _watch_node_response_handler(watch_node_response_handler) { _watch_handle.handler(this); } - ~Watch_node() { - _watch_handle.close(); } + ~Watch_node() + { + _watch_handle.close(); + } /******************************************* @@ -496,9 +554,13 @@ class Vfs_server::Watch_node final : public Vfs_server::Node, void watch_response() override { - /* send a packet immediately otherwise defer */ - if (!process_io() && !enqueued()) - _response_queue.enqueue(*this); + _acked_packet = Packet_descriptor(Packet_descriptor(), + Node_handle { id().value }, + Packet_descriptor::CONTENT_CHANGED, + 0, 0); + _acked_packet.succeeded(true); + _acked_packet_valid = true; + _watch_node_response_handler.handle_watch_node_response(*this); } @@ -506,92 +568,49 @@ class Vfs_server::Watch_node final : public Vfs_server::Node, ** Vfs_server::Node interface ** ********************************/ - /** - * Called by global I/O progress handler - */ - bool process_io() override + Submit_result submit_job(Packet_descriptor, Payload_ptr) override { - if (!_stream.ready_to_ack()) return false; + /* + * This can only happen if a client misbehaves by submitting + * work to a watch handle. + */ + Genode::warning("job unexpectedly submitted to watch handle"); - Packet_descriptor packet(Packet_descriptor(), - Node_handle { id().value }, - Packet_descriptor::CONTENT_CHANGED, - 0, 0); - packet.succeeded(true); - _stream.acknowledge_packet(packet); - return true; + /* don't reset '_acked_packet' as defined in the constructor */ + + return Submit_result::DENIED; } }; struct Vfs_server::Symlink : Io_node { - protected: + private: - /******************** - ** Node interface ** - ********************/ + typedef Genode::String Write_buffer; - bool _read() override + Write_buffer _write_buffer { }; + + bool _partial_operation() const { + /* partial read or write is not supported */ if (_packet.position() != 0) { - /* partial read is not supported */ - _ack_packet(0); + Genode::warning("attempt for partial operation on a symlink"); return true; } - - file_size out_count = 0; - bool result = _vfs_read(_stream.packet_content(_packet), - _packet.length(), 0, out_count); - if (result) - _ack_packet(out_count); - return result; + return false; } - bool _write() override + bool _max_path_length_exceeded() const { - if (_packet.position() != 0) { - /* partial write is not supported */ - _ack_packet(0); - return true; - } - - file_size count = _packet.length(); - - /* - * if the symlink target is too long return a short result - * because a competent File_system client will error on a - * length mismatch - */ - if (count > MAX_PATH_LEN) { - _ack_packet(1); - return true; - } - - /* ensure symlink gets something null-terminated */ - Genode::String target(Genode::Cstring( - _stream.packet_content(_packet), count)); - size_t const target_len = target.length()-1; - - file_size out_count = 0; - bool result = _vfs_write(target.string(), target_len, 0, out_count); - - if (result) { - _ack_packet(out_count); - if (out_count > 0) { - mark_as_updated(); - notify_listeners(); - } - } - return result; + return _packet.length() >= MAX_PATH_LEN; } - static - Vfs_handle &_open(Vfs::File_system &vfs, Genode::Allocator &alloc, - char const *link_path, bool create) + static Vfs_handle &_open(Vfs::File_system &vfs, Genode::Allocator &alloc, + char const *path, bool create) { Vfs_handle *h = nullptr; - assert_openlink(vfs.openlink(link_path, create, &h, alloc)); + assert_openlink(vfs.openlink(path, create, &h, alloc)); return *h; } @@ -600,14 +619,81 @@ struct Vfs_server::Symlink : Io_node Symlink(Node_space &space, Vfs::File_system &vfs, Genode::Allocator &alloc, - Node_queue &response_queue, - Packet_stream &stream, - char const *link_path, + char const *path, Mode mode, bool create) - : Io_node(space, link_path, mode, response_queue, stream, - _open(vfs, alloc, link_path, create)) + : + Io_node(space, path, mode, _open(vfs, alloc, path, create)) { } + + Submit_result submit_job(Packet_descriptor packet, Payload_ptr payload_ptr) override + { + _import_job(packet, payload_ptr); + + switch (packet.operation()) { + + case Packet_descriptor::READ: + + if (_partial_operation()) + return Submit_result::DENIED; + + return _submit_read_at(0); + + case Packet_descriptor::WRITE: + { + if (_partial_operation() || _max_path_length_exceeded()) + return Submit_result::DENIED; + + /* accessed by 'execute_job' */ + _write_buffer = Write_buffer(Genode::Cstring(_payload_ptr.ptr, + packet.length())); + return _submit_write_at(0); + } + + case Packet_descriptor::SYNC: return _submit_sync(); + case Packet_descriptor::READ_READY: return _submit_read_ready(); + case Packet_descriptor::CONTENT_CHANGED: return _submit_content_changed(); + case Packet_descriptor::WRITE_TIMESTAMP: return _submit_write_timestamp(); + } + + Genode::warning("invalid operation ", (int)_packet.operation(), " " + "requested from symlink node"); + + return Submit_result::DENIED; + } + + void execute_job() override + { + switch (_packet.operation()) { + + /* + * Write symlink content from '_write_buffer' instead of the + * '_payload_ptr'. In contrast to '_payload_ptr', which points + * to shared memory, the null-termination of the content of + * '_write_buffer' does not depend on the goodwill of the client. + */ + case Packet_descriptor::WRITE: + { + size_t const count = _write_buffer.length(); + + if (_execute_write(_write_buffer.string(), count) == count) + _acknowledge_as_success(count); + else + _acknowledge_as_failure(); + break; + } + + /* generic */ + case Packet_descriptor::READ: _execute_read(); break; + case Packet_descriptor::SYNC: _execute_sync(); break; + case Packet_descriptor::WRITE_TIMESTAMP: _execute_write_timestamp(); break; + + /* never executed */ + case Packet_descriptor::READ_READY: + case Packet_descriptor::CONTENT_CHANGED: + break; + } + } }; @@ -621,68 +707,54 @@ class Vfs_server::File : public Io_node File(File const &); File &operator = (File const &); - char const *_leaf_path = nullptr; /* offset pointer to Node::_path */ + char const * const _leaf_path = nullptr; /* offset pointer to Node::_path */ - inline - seek_off_t seek_tail(file_size count) + typedef Directory_service::Stat Stat; + + template + void _with_stat(FN const &fn) { typedef Directory_service::Stat_result Result; - Vfs::Directory_service::Stat st; - /* if stat fails, try and see if the VFS will seek to the end */ - return (_handle.ds().stat(_leaf_path, st) == Result::STAT_OK) - ? ((count < st.size) ? (st.size - count) : 0) - : (seek_off_t)SEEK_TAIL; + Vfs::Directory_service::Stat stat { }; + if (_handle.ds().stat(_leaf_path, stat) == Result::STAT_OK) + fn(stat); } + seek_off_t _seek_pos() + { + seek_off_t seek_pos = _packet.position(); + + if (seek_pos == (seek_off_t)SEEK_TAIL) + _with_stat([&] (Stat const &stat) { + seek_pos = stat.size; }); + + return seek_pos; + } + + enum class Write_type { UNKNOWN, CONTINUOUS, TRANSACTIONAL }; + + Write_type _write_type = Write_type::UNKNOWN; + + /** + * Number of bytes consumed by VFS write + * + * Used for the incremental write to continuous files. + */ + seek_off_t _write_pos = 0; + + bool _watch_read_ready = false; + protected: - bool _read() override - { - file_size out_count = 0; - file_size count = _packet.length(); - seek_off_t seek_offset = _packet.position(); - - if (seek_offset == (seek_off_t)SEEK_TAIL) - seek_offset = seek_tail(count); - - bool result = _vfs_read(_stream.packet_content(_packet), - count, seek_offset, out_count); - if (result) - _ack_packet(out_count); - return result; - } - - bool _write() override - { - file_size out_count = 0; - file_size count = _packet.length(); - seek_off_t seek_offset = _packet.position(); - - if (seek_offset == (seek_off_t)SEEK_TAIL) - seek_offset = seek_tail(count); - - bool result = _vfs_write(_stream.packet_content(_packet), - count, seek_offset, out_count); - if (result) { - _ack_packet(out_count); - if (out_count > 0) { - mark_as_updated(); - notify_listeners(); - } - } - return result; - } - - static - Vfs_handle &_open(Vfs::File_system &vfs, Genode::Allocator &alloc, - char const *file_path, Mode fs_mode, bool create) + static Vfs_handle &_open(Vfs::File_system &vfs, Genode::Allocator &alloc, + char const *path, Mode mode, bool create) { Vfs_handle *h = nullptr; - unsigned vfs_mode = (fs_mode-1) | + unsigned vfs_mode = (mode-1) | (create ? Vfs::Directory_service::OPEN_MODE_CREATE : 0); - assert_open(vfs.open(file_path, vfs_mode, &h, alloc)); + assert_open(vfs.open(path, vfs_mode, &h, alloc)); return *h; } @@ -691,121 +763,195 @@ class Vfs_server::File : public Io_node File(Node_space &space, Vfs::File_system &vfs, Genode::Allocator &alloc, - Node_queue &response_queue, - Packet_stream &stream, - char const *file_path, - Mode fs_mode, + char const *path, + Mode mode, bool create) : - Io_node(space, file_path, fs_mode, response_queue, stream, - _open(vfs, alloc, file_path, fs_mode, create)) - { - _leaf_path = vfs.leaf_path(path()); - } + Io_node(space, path, mode, _open(vfs, alloc, path, mode, create)), + _leaf_path(vfs.leaf_path(Node::path())) + { } void truncate(file_size_t size) { assert_truncate(_handle.fs().ftruncate(&_handle, size)); - mark_as_updated(); + } + + Submit_result submit_job(Packet_descriptor packet, Payload_ptr payload_ptr) override + { + _import_job(packet, payload_ptr); + + _write_type = Write_type::UNKNOWN; + _write_pos = 0; + + switch (packet.operation()) { + + case Packet_descriptor::READ: return _submit_read_at(_seek_pos()); + case Packet_descriptor::WRITE: return _submit_write_at(_seek_pos()); + case Packet_descriptor::SYNC: return _submit_sync(); + case Packet_descriptor::READ_READY: return _submit_read_ready(); + case Packet_descriptor::CONTENT_CHANGED: return _submit_content_changed(); + case Packet_descriptor::WRITE_TIMESTAMP: return _submit_write_timestamp(); + } + + Genode::warning("invalid operation ", (int)_packet.operation(), " " + "requested from file node"); + + return Submit_result::DENIED; + } + + void execute_job() override + { + switch (_packet.operation()) { + + case Packet_descriptor::WRITE: + { + size_t const count = _packet.length() - _write_pos; + char const * const src_ptr = _payload_ptr.ptr + _write_pos; + size_t const consumed = _execute_write(src_ptr, count); + + if (consumed == count) { + _acknowledge_as_success(count); + break; + } + + /* + * The write request was only partially successful. + * Continue writing if the file is continuous. + * Return an error if the file is transactional. + */ + + /* determine write type once via 'stat' */ + if (_write_type == Write_type::UNKNOWN) { + _write_type = Write_type::TRANSACTIONAL; + + _with_stat([&] (Stat const &stat) { + if (stat.type == Vfs::Node_type::CONTINUOUS_FILE) + _write_type = Write_type::CONTINUOUS; }); + } + + if (_write_type == Write_type::TRANSACTIONAL) { + _acknowledge_as_failure(); + break; + } + + /* + * Keep executing the write operation for the remaining bytes. + * The seek offset used for subsequent VFS write operations + * is incremented automatically by the VFS handle. + */ + _write_pos += consumed; + break; + } + + /* generic */ + case Packet_descriptor::READ: _execute_read(); break; + case Packet_descriptor::SYNC: _execute_sync(); break; + case Packet_descriptor::WRITE_TIMESTAMP: _execute_write_timestamp(); break; + + /* never executed */ + case Packet_descriptor::READ_READY: + case Packet_descriptor::CONTENT_CHANGED: + break; + } } }; struct Vfs_server::Directory : Io_node { - protected: + private: - /******************** - ** Node interface ** - ********************/ + typedef Directory_service::Dirent Vfs_dirent; + typedef File_system::Directory_entry Fs_dirent; - bool _read() override + bool _position_and_length_aligned_with_dirent_size() { - if (_packet.length() < sizeof(Directory_entry)) { - _ack_packet(0); - return true; - } + if (_packet.length() < sizeof(Directory_entry)) + return false; - seek_off_t const seek_offset = _packet.position(); + if (_packet.length() % sizeof(::File_system::Directory_entry)) + return false; - size_t const blocksize = sizeof(::File_system::Directory_entry); + if (_packet.position() % sizeof(::File_system::Directory_entry)) + return false; - unsigned const index = (seek_offset / blocksize); - - file_size out_count = 0; - - Directory_service::Dirent vfs_dirent { }; - - bool const result = _vfs_read((char*)&vfs_dirent, - sizeof(vfs_dirent), - index * sizeof(vfs_dirent), - out_count); - vfs_dirent.sanitize(); - - if (result) { - if (out_count != sizeof(vfs_dirent)) { - _ack_packet(0); - return true; - } - - auto fs_dirent_type = [&] (Vfs::Directory_service::Dirent_type type) - { - using From = Vfs::Directory_service::Dirent_type; - using To = ::File_system::Node_type; - - /* - * This should never be taken because 'END' is checked as a - * precondition prior the call to of this function. - */ - To const default_result = To::CONTINUOUS_FILE; - - switch (type) { - case From::END: return default_result; - case From::DIRECTORY: return To::DIRECTORY; - case From::SYMLINK: return To::SYMLINK; - case From::CONTINUOUS_FILE: return To::CONTINUOUS_FILE; - case From::TRANSACTIONAL_FILE: return To::TRANSACTIONAL_FILE; - } - return default_result; - }; - - if (vfs_dirent.type == Vfs::Directory_service::Dirent_type::END) { - _ack_packet(0); - - } else { - - ::File_system::Directory_entry &fs_dirent = - *(Directory_entry *)_stream.packet_content(_packet); - - fs_dirent = { - .inode = vfs_dirent.fileno, - .type = fs_dirent_type(vfs_dirent.type), - .rwx = { - .readable = vfs_dirent.rwx.readable, - .writeable = vfs_dirent.rwx.writeable, - .executable = vfs_dirent.rwx.executable }, - .name = { vfs_dirent.name.buf } - }; - - _ack_packet(sizeof(Directory_entry)); - } - return true; - } - return false; - } - - bool _write() override - { - _ack_packet(0); return true; } - static - Vfs_handle &_open(Vfs::File_system &vfs, Genode::Allocator &alloc, - char const *dir_path, bool create) + static Fs_dirent _convert_dirent(Vfs_dirent from) + { + from.sanitize(); + + auto fs_dirent_type = [&] (Vfs::Directory_service::Dirent_type type) + { + using From = Vfs::Directory_service::Dirent_type; + using To = ::File_system::Node_type; + + /* + * This should never be taken because 'END' is checked as a + * precondition prior the call to of this function. + */ + To const default_result = To::CONTINUOUS_FILE; + + switch (type) { + case From::END: return default_result; + case From::DIRECTORY: return To::DIRECTORY; + case From::SYMLINK: return To::SYMLINK; + case From::CONTINUOUS_FILE: return To::CONTINUOUS_FILE; + case From::TRANSACTIONAL_FILE: return To::TRANSACTIONAL_FILE; + } + return default_result; + }; + + return { + .inode = from.fileno, + .type = fs_dirent_type(from.type), + .rwx = { + .readable = from.rwx.readable, + .writeable = from.rwx.writeable, + .executable = from.rwx.executable }, + .name = { from.name.buf } + }; + } + + /** + * Convert VFS directory entry to FS directory entry in place in the + * payload buffer + * + * \return size of converted data in bytes + */ + file_size _convert_vfs_dirents_to_fs_dirents() + { + static_assert(sizeof(Vfs_dirent) == sizeof(Fs_dirent)); + + file_offset const step = sizeof(Fs_dirent); + file_offset const length = _packet.length(); + + file_size converted_length = 0; + + for (file_offset offset = 0; offset + step <= length; offset += step) { + + char * const ptr = _payload_ptr.ptr + offset; + + Vfs_dirent &vfs_dirent = *(Vfs_dirent *)(ptr); + Fs_dirent &fs_dirent = *(Fs_dirent *)(ptr); + + if (vfs_dirent.type == Vfs::Directory_service::Dirent_type::END) + break; + + fs_dirent = _convert_dirent(vfs_dirent); + + converted_length += step; + } + + return converted_length; + } + + static Vfs_handle &_open(Vfs::File_system &vfs, Genode::Allocator &alloc, + char const *path, bool create) { Vfs_handle *h = nullptr; - assert_opendir(vfs.opendir(dir_path, create, &h, alloc)); + assert_opendir(vfs.opendir(path, create, &h, alloc)); return *h; } @@ -814,12 +960,10 @@ struct Vfs_server::Directory : Io_node Directory(Node_space &space, Vfs::File_system &vfs, Genode::Allocator &alloc, - Node_queue &response_queue, - Packet_stream &stream, - char const *dir_path, + char const *path, bool create) - : Io_node(space, dir_path, READ_ONLY, response_queue, stream, - _open(vfs, alloc, dir_path, create)) + : + Io_node(space, path, READ_ONLY, _open(vfs, alloc, path, create)) { } /** @@ -828,23 +972,15 @@ struct Vfs_server::Directory : Io_node Node_space::Id file(Node_space &space, Vfs::File_system &vfs, Genode::Allocator &alloc, - char const *file_path, + char const *path, Mode mode, bool create) { - Path subpath(file_path, path()); - char const *path_str = subpath.base(); + File &file = *new (alloc) + File(space, vfs, alloc, + Path(path, Node::path()).base(), mode, create); - File *file; - try { - file = new (alloc) File(space, vfs, alloc, - _response_queue, _stream, - path_str, mode, create); - } catch (Out_of_memory) { throw Out_of_ram(); } - - if (create) - mark_as_updated(); - return file->id(); + return file.id(); } /** @@ -853,21 +989,79 @@ struct Vfs_server::Directory : Io_node Node_space::Id symlink(Node_space &space, Vfs::File_system &vfs, Genode::Allocator &alloc, - char const *link_path, + char const *path, Mode mode, bool create) { - Path subpath(link_path, path()); - char const *path_str = subpath.base(); + Symlink &link = *new (alloc) + Symlink(space, vfs, alloc, + Path(path, Node::path()).base(), mode, create); - Symlink *link; - try { link = new (alloc) Symlink(space, vfs, alloc, - _response_queue, _stream, - path_str, mode, create); } - catch (Out_of_memory) { throw Out_of_ram(); } - if (create) - mark_as_updated(); - return link->id(); + return link.id(); + } + + + /******************************** + ** Vfs_server::Node interface ** + ********************************/ + + Submit_result submit_job(Packet_descriptor packet, Payload_ptr payload_ptr) override + { + _import_job(packet, payload_ptr); + + switch (packet.operation()) { + + case Packet_descriptor::READ: + + if (!_position_and_length_aligned_with_dirent_size()) + return Submit_result::DENIED; + + return _submit_read_at(_packet.position()); + + case Packet_descriptor::WRITE: + return Submit_result::DENIED; + + case Packet_descriptor::SYNC: return _submit_sync(); + case Packet_descriptor::READ_READY: return _submit_read_ready(); + case Packet_descriptor::CONTENT_CHANGED: return _submit_content_changed(); + case Packet_descriptor::WRITE_TIMESTAMP: return _submit_write_timestamp(); + } + + Genode::warning("invalid operation ", (int)_packet.operation(), " " + "requested from directory node"); + return Submit_result::DENIED; + } + + void execute_job() override + { + switch (_packet.operation()) { + + case Packet_descriptor::READ: + _execute_read(); + + if (_acked_packet_valid) { + file_size const length = _convert_vfs_dirents_to_fs_dirents(); + + /* + * Overwrite the acknowledgement assigned by + * '_execute_read' with an acknowledgement featuring the + * converted length. This way, the client reads only the + * number of bytes until the end of the directory. + */ + _acknowledge_as_success(length); + } + break; + + /* generic */ + case Packet_descriptor::SYNC: _execute_sync(); break; + case Packet_descriptor::WRITE_TIMESTAMP: _execute_write_timestamp(); break; + + /* never executed */ + case Packet_descriptor::WRITE: + case Packet_descriptor::READ_READY: + case Packet_descriptor::CONTENT_CHANGED: + break; + } } };