New VFS plugin for emulating POSIX pipes

Add a new plugin for creating pipes between pairs of VFS handles. It is
intended to replace the libc_pipe plugin, one of the last remaining libc
plugins.

In contrast to the libc_pipe plugin, this plugin defers cross-handle
notification until I/O signal handling rather than block and unblock
readers using a semaphore. This is a performance regression in the case
of multiple threads blocking on a pipe, but shall be an intermediate
mechanism pending renovations within the libc VFS and threading layers.
As a side effect, threads blocked on a pipe might not be resumed until
the main thread suspends and dispatches I/O signals.

The "test-libc_pipe" test has been adjusted to use the VFS pipe plugin
and tests both local pipes and pipes hosted remotely in the VFS server.

Fix #2303
This commit is contained in:
Ehmry - 2019-07-09 14:16:46 +02:00
parent cff389758e
commit 8509e35e62
14 changed files with 728 additions and 20 deletions

View File

@ -0,0 +1,5 @@
SRC_CC = plugin.cc
vpath %.cc $(REP_DIR)/src/lib/vfs/pipe
SHARED_LIB = yes

View File

@ -0,0 +1,9 @@
MIRROR_FROM_REP_DIR := lib/mk/vfs_pipe.mk src/lib/vfs/pipe
content: $(MIRROR_FROM_REP_DIR) LICENSE
$(MIRROR_FROM_REP_DIR):
$(mirror_from_rep_dir)
LICENSE:
cp $(GENODE_DIR)/LICENSE $@

View File

@ -0,0 +1 @@
2019-06-04 ecacc703584c04e70085ad12628ee40885e9e50c

View File

@ -0,0 +1,4 @@
base
os
so
vfs

View File

@ -0,0 +1,12 @@
The VFS pipe plugin exposes a control file for creating pipes and a set of pipe
directories. Opening and reading the "/new" returns a relative path to a
directory. That directory represents a pipe and contains an "in" and "out" file
for writing and reading respectively to the pipe.
Reads and writes are non-blocking and will complete short operations without
error, with the exception of reads on an empty pipe, which return READ_QUEUED.
The read and write capacity of a pipe may be queried by stat'ing the size of
"out" and "in" files.
When all "in" and "out" handles on a pipe as well as the initial handle on "new"
are closed, the pipe is destroyed.

View File

@ -0,0 +1,587 @@
/*
* \brief VFS pipe plugin
* \author Emery Hemingway
* \date 2019-05-29
*/
/*
* Copyright (C) 2019 Genode Labs GmbH
*
* This file is part of the Genode OS framework, which is distributed
* under the terms of the GNU Affero General Public License version 3.
*/
#include <vfs/file_system_factory.h>
#include <os/path.h>
#include <os/ring_buffer.h>
#include <base/registry.h>
namespace Vfs_pipe {
using namespace Vfs;
typedef Vfs::Directory_service::Open_result Open_result;
typedef Vfs::File_io_service::Write_result Write_result;
typedef Vfs::File_io_service::Read_result Read_result;
typedef Genode::Path<32> Path;
enum { PIPE_BUF_SIZE = 8192 };
typedef Genode::Ring_buffer<unsigned char, PIPE_BUF_SIZE+1> Pipe_buffer;
struct Pipe_handle;
typedef Genode::Fifo_element<Pipe_handle> Handle_element;
typedef Genode::Fifo<Handle_element> Handle_fifo;
typedef Genode::Registry<Pipe_handle>::Element Pipe_handle_registry_element;
typedef Genode::Registry<Pipe_handle> Pipe_handle_registry;
class Pipe;
typedef Genode::Id_space<Pipe> Pipe_space;
struct New_pipe_handle;
class File_system;
}
struct Vfs_pipe::Pipe_handle
: Vfs::Vfs_handle, private Pipe_handle_registry_element
{
Pipe &pipe;
Handle_element io_progress_elem { *this };
Handle_element read_ready_elem { *this };
Pipe_handle(Vfs::File_system &fs,
Genode::Allocator &alloc,
unsigned flags,
Pipe_handle_registry &registry,
Pipe &p)
: Vfs::Vfs_handle(fs, fs, alloc, flags),
Pipe_handle_registry_element(registry, *this),
pipe(p)
{ }
virtual ~Pipe_handle();
Write_result write(const char *buf,
file_size count,
file_size &out_count);
Read_result read(char *buf,
file_size count,
file_size &out_count);
bool read_ready();
bool notify_read_ready();
};
struct Vfs_pipe::Pipe
{
Genode::Allocator &alloc;
Pipe_space::Element space_elem;
Pipe_buffer buffer { };
Pipe_handle_registry registry { };
Handle_fifo io_progress_waiters { };
Handle_fifo read_ready_waiters { };
Genode::Signal_context_capability &notify_sigh;
bool new_handle_active { true };
Pipe(Genode::Allocator &alloc, Pipe_space &space,
Genode::Signal_context_capability &notify_sigh)
: alloc(alloc), space_elem(*this, space), notify_sigh(notify_sigh) { }
typedef Genode::String<8> Name;
Name name() const
{
return Name(space_elem.id().value);
}
/**
* Check if pipe is referenced, if not, destroy
*/
void cleanup()
{
bool alive = new_handle_active;
if (!alive)
registry.for_each([&alive] (Pipe_handle&) {
alive = true; });
if (!alive)
destroy(alloc, this);
}
/**
* Remove "/new" handle reference
*/
void remove_new_handle() {
new_handle_active = false; }
/**
* Detach a handle
*/
void remove(Pipe_handle &handle)
{
if (handle.io_progress_elem.enqueued())
io_progress_waiters.remove(handle.io_progress_elem);
if (handle.read_ready_elem.enqueued())
read_ready_waiters.remove(handle.read_ready_elem);
}
/**
* Open a write or read handle
*/
Open_result open(Vfs::File_system &fs,
Path const &filename,
Vfs::Vfs_handle **handle,
Genode::Allocator &alloc)
{
if (filename == "/in") {
*handle = new (alloc)
Pipe_handle(fs, alloc, Directory_service::OPEN_MODE_WRONLY, registry, *this);
return Open_result::OPEN_OK;
}
if (filename == "/out") {
*handle = new (alloc)
Pipe_handle(fs, alloc, Directory_service::OPEN_MODE_RDONLY, registry, *this);
return Open_result::OPEN_OK;
}
return Open_result::OPEN_ERR_UNACCESSIBLE;
}
/**
* Use a signal as a hack to defer notifications
* until the "io_progress_handler".
*/
void submit_signal() {
Genode::Signal_transmitter(notify_sigh).submit(); }
/**
* Notify handles waiting for activity
*/
void notify()
{
io_progress_waiters.dequeue_all([] (Handle_element &elem) {
elem.object().io_progress_response(); });
read_ready_waiters.dequeue_all([] (Handle_element &elem) {
elem.object().read_ready_response(); });
}
Write_result write(Pipe_handle &handle,
const char *buf, file_size count,
file_size &out_count)
{
file_size out = 0;
bool notify = buffer.empty();
while (out < count && 0 < buffer.avail_capacity()) {
buffer.add(*(buf++));
++out;
}
out_count = out;
if (out < count)
io_progress_waiters.enqueue(handle.io_progress_elem);
if (notify)
submit_signal();
return Write_result::WRITE_OK;
}
Read_result read(Pipe_handle &handle,
char *buf, file_size count,
file_size &out_count)
{
bool notify = buffer.avail_capacity() == 0;
file_size out = 0;
while (out < count && !buffer.empty()) {
*(buf++) = buffer.get();
++out;
}
out_count = out;
if (!out) {
io_progress_waiters.enqueue(handle.io_progress_elem);
return Read_result::READ_QUEUED;
}
if (notify)
submit_signal();
return Read_result::READ_OK;
}
};
Vfs_pipe::Pipe_handle::~Pipe_handle() {
pipe.remove(*this); }
Vfs_pipe::Write_result
Vfs_pipe::Pipe_handle::write(const char *buf,
file_size count,
file_size &out_count) {
return Pipe_handle::pipe.write(*this, buf, count, out_count); }
Vfs_pipe::Read_result
Vfs_pipe::Pipe_handle::read(char *buf,
file_size count,
file_size &out_count) {
return Pipe_handle::pipe.read(*this, buf, count, out_count); }
bool
Vfs_pipe::Pipe_handle::read_ready() {
return !pipe.buffer.empty(); }
bool
Vfs_pipe::Pipe_handle::notify_read_ready()
{
if (!read_ready_elem.enqueued())
pipe.read_ready_waiters.enqueue(read_ready_elem);
return true;
}
struct Vfs_pipe::New_pipe_handle : Vfs::Vfs_handle
{
Pipe &pipe;
New_pipe_handle(Vfs::File_system &fs,
Genode::Allocator &alloc,
unsigned flags,
Pipe_space &pipe_space,
Genode::Signal_context_capability &notify_sigh)
: Vfs::Vfs_handle(fs, fs, alloc, flags),
pipe(*(new (alloc) Pipe(alloc, pipe_space, notify_sigh)))
{ }
~New_pipe_handle()
{
pipe.remove_new_handle();
}
Read_result read(char *buf,
file_size count,
file_size &out_count)
{
auto name = pipe.name();
if (name.length() < count) {
memcpy(buf, name.string(), name.length());
out_count = name.length();
return Read_result::READ_OK;
}
return Read_result::READ_ERR_INVALID;
}
};
class Vfs_pipe::File_system : public Vfs::File_system
{
private:
Pipe_space _pipe_space { };
/*
* XXX: a hack to defer cross-thread notifications at
* the libc until the io_progress handler
*/
Genode::Io_signal_handler<File_system> _notify_handler;
Genode::Signal_context_capability _notify_cap { _notify_handler };
void _notify_any()
{
_pipe_space.for_each<Pipe&>([] (Pipe &pipe) {
pipe.notify(); });
}
public:
File_system(Vfs::Env &env)
: _notify_handler(env.env().ep(), *this, &File_system::_notify_any) { }
const char* type() override { return "pipe"; }
/***********************
** Directory service **
***********************/
Genode::Dataspace_capability dataspace(char const*) override {
return Genode::Dataspace_capability(); }
void release(char const*, Dataspace_capability) override { }
Open_result open(const char *cpath,
unsigned mode,
Vfs::Vfs_handle **handle,
Genode::Allocator &alloc) override
{
Path path(cpath);
if (path == "/new") {
if ((Directory_service::OPEN_MODE_ACCMODE & mode) == Directory_service::OPEN_MODE_WRONLY)
return Open_result::OPEN_ERR_NO_PERM;
*handle = new (alloc)
New_pipe_handle(*this, alloc, mode, _pipe_space, _notify_cap);
return Open_result::OPEN_OK;
}
path.strip_last_element();
if (!path.has_single_element())
return Open_result::OPEN_ERR_UNACCESSIBLE;
Pipe_space::Id id { ~0UL };
if (!ascii_to(path.last_element(), id.value))
return Open_result::OPEN_ERR_UNACCESSIBLE;
Open_result result = Open_result::OPEN_ERR_UNACCESSIBLE;
try {
_pipe_space.apply<Pipe&>(id, [&] (Pipe &pipe) {
Path filename(cpath);
filename.keep_only_last_element();
result = pipe.open(*this, filename, handle, alloc);
});
}
catch (Pipe_space::Unknown_id) { }
return result;
}
Opendir_result opendir(char const *cpath, bool create,
Vfs_handle **handle,
Allocator &alloc) override
{
/* open dummy handles on directories */
if (create) return OPENDIR_ERR_PERMISSION_DENIED;
Path path(cpath);
if (path == "/") {
*handle = new (alloc)
Vfs_handle(*this, *this, alloc, 0);
return OPENDIR_OK;
}
Opendir_result result { OPENDIR_ERR_LOOKUP_FAILED };
if (path.has_single_element()) {
Pipe_space::Id id { ~0UL };
if (ascii_to(path.last_element(), id.value)) try {
_pipe_space.apply<Pipe&>(id, [&] (Pipe&) {
*handle = new (alloc)
Vfs_handle(*this, *this, alloc, 0);
result = OPENDIR_OK;
});
}
catch (Pipe_space::Unknown_id) { }
}
return result;
}
void close(Vfs::Vfs_handle *vfs_handle) override
{
Pipe *pipe = nullptr;
if (Pipe_handle *handle = dynamic_cast<Pipe_handle*>(vfs_handle)) {
pipe = &handle->pipe;
} else
if (New_pipe_handle *handle = dynamic_cast<New_pipe_handle*>(vfs_handle)) {
pipe = &handle->pipe;
}
destroy(vfs_handle->alloc(), vfs_handle);
if (pipe)
pipe->cleanup();
}
Stat_result stat(const char *cpath, Vfs::Directory_service::Stat &buf) override
{
Stat_result result { STAT_ERR_NO_ENTRY };
Path path(cpath);
if (path == "/new") {
buf.size = 1;
buf.mode = STAT_MODE_FILE;
buf.inode = (Genode::addr_t)this;
buf.device = (Genode::addr_t)this;
return STAT_OK;
}
if (path.has_single_element()) {
Pipe_space::Id id { ~0UL };
if (ascii_to(path.last_element(), id.value)) try {
_pipe_space.apply<Pipe&>(id, [&] (Pipe &pipe) {
buf.size = 2;
buf.mode = STAT_MODE_DIRECTORY;
buf.inode = (Genode::addr_t)&pipe;;
buf.device = (Genode::addr_t)this;
result = STAT_OK;
});
}
catch (Pipe_space::Unknown_id) { }
} else {
/* maybe this is /N/in or /N/out */
path.strip_last_element();
if (!path.has_single_element())
/* too many directory levels */
return result;
Pipe_space::Id id { ~0UL };
if (ascii_to(path.last_element(), id.value)) try {
_pipe_space.apply<Pipe&>(id, [&] (Pipe &pipe) {
Path filename(cpath);
filename.keep_only_last_element();
if (filename == "/in") {
buf.size = pipe.buffer.avail_capacity();
buf.mode = STAT_MODE_FILE;
buf.inode = Genode::addr_t(&pipe)+1;
buf.device = Genode::addr_t(this);
result = STAT_OK;
} else
if (filename == "/out") {
buf.size = pipe.buffer.size()
- pipe.buffer.avail_capacity();
buf.mode = STAT_MODE_FILE;
buf.inode = Genode::addr_t(&pipe)+2;
buf.device = Genode::addr_t(this);
result = STAT_OK;
}
});
}
catch (Pipe_space::Unknown_id) { }
}
return result;
}
Unlink_result unlink(const char*) override {
return UNLINK_ERR_NO_ENTRY; }
Rename_result rename(const char*, const char*) override {
return RENAME_ERR_NO_ENTRY; }
file_size num_dirent(char const *) override {
return ~0UL; }
bool directory(char const *cpath) override
{
Path path(cpath);
if (path == "/") return true;
if (!path.has_single_element())
return Open_result::OPEN_ERR_UNACCESSIBLE;
Pipe_space::Id id { ~0UL };
if (!ascii_to(path.last_element(), id.value))
return false;
bool result = false;
try {
_pipe_space.apply<Pipe&>(id, [&] (Pipe &) {
result = true; });
}
catch (Pipe_space::Unknown_id) { }
return result;
}
const char* leaf_path(const char *cpath) override
{
Path path(cpath);
if (path == "/") return cpath;
if (path == "/new") return cpath;
char const *result = nullptr;
if (!path.has_single_element()) {
/* maybe this is /N/in or /N/out */
path.strip_last_element();
if (!path.has_single_element())
/* too many directory levels */
return nullptr;
Path filename(cpath);
filename.keep_only_last_element();
if (filename != "/in" && filename != "/out")
/* not a pipe file */
return nullptr;
}
Pipe_space::Id id { ~0UL };
if (ascii_to(path.last_element(), id.value)) try {
/* check if the pipe directory exists */
_pipe_space.apply<Pipe&>(id, [&] (Pipe &) {
result = cpath; });
}
catch (Pipe_space::Unknown_id) { }
return result;
}
/**********************
** File I/O service **
**********************/
Write_result write(Vfs_handle *vfs_handle,
const char *src, file_size count,
file_size &out_count) override
{
if (Pipe_handle *handle = dynamic_cast<Pipe_handle*>(vfs_handle))
return handle->write(src, count, out_count);
return WRITE_ERR_INVALID;
}
Read_result complete_read(Vfs_handle *vfs_handle,
char *dst, file_size count,
file_size &out_count) override
{
if (Pipe_handle *handle = dynamic_cast<Pipe_handle*>(vfs_handle))
return handle->read(dst, count, out_count);
if (New_pipe_handle *handle = dynamic_cast<New_pipe_handle*>(vfs_handle))
return handle->read(dst, count, out_count);
return READ_ERR_INVALID;
}
bool read_ready(Vfs_handle *vfs_handle) override
{
if (Pipe_handle *handle = dynamic_cast<Pipe_handle*>(vfs_handle))
return handle->read_ready();
return true;
}
bool notify_read_ready(Vfs_handle *vfs_handle) override
{
if (Pipe_handle *handle = dynamic_cast<Pipe_handle*>(vfs_handle))
return handle->notify_read_ready();
return false;
}
Ftruncate_result ftruncate(Vfs_handle*, file_size) override {
return FTRUNCATE_ERR_NO_PERM; }
Sync_result complete_sync(Vfs_handle*) override {
return SYNC_OK; }
};
extern "C" Vfs::File_system_factory *vfs_file_system_factory(void)
{
struct Factory : Vfs::File_system_factory
{
Vfs::File_system *create(Vfs::Env &env, Genode::Xml_node) override
{
return new (env.alloc())
Vfs_pipe::File_system(env);
}
};
static Factory f;
return &f;
}

View File

@ -0,0 +1,2 @@
TARGET = dummy-vfs_pipe
LIBS = vfs_pipe

View File

@ -2,4 +2,6 @@ _/src/init
_/src/test-libc_pipe
_/src/libc
_/src/vfs
_/src/vfs_pipe
_/src/posix
_/src/sequence

View File

@ -2,40 +2,71 @@
<events>
<timeout meaning="failed" sec="30" />
<log meaning="succeeded">child "test-libc_pipe" exited with exit value 0</log>
<log meaning="succeeded">child "sequence" exited with exit value 0</log>
<log meaning="failed">Error: </log>
</events>
<content>
<rom label="ld.lib.so"/>
<rom label="libc.lib.so"/>
<rom label="vfs.lib.so"/>
<rom label="libm.lib.so"/>
<rom label="libc_pipe.lib.so"/>
<rom label="posix.lib.so"/>
<rom label="sequence"/>
<rom label="test-libc_pipe"/>
<rom label="vfs"/>
<rom label="vfs.lib.so"/>
<rom label="vfs_pipe.lib.so"/>
</content>
<config>
<parent-provides>
<service name="ROM"/>
<service name="IRQ"/>
<service name="IO_MEM"/>
<service name="IO_PORT"/>
<service name="PD"/>
<service name="RM"/>
<service name="CPU"/>
<service name="LOG"/>
<service name="Timer"/>
</parent-provides>
<default-route>
<any-service> <parent/> <any-child/> </any-service>
</default-route>
<default caps="100"/>
<start name="test-libc_pipe">
<default caps="256"/>
<start name="pipes_fs">
<binary name="vfs"/>
<provides> <service name="File_system"/> </provides>
<resource name="RAM" quantum="4M"/>
<config>
<vfs> <dir name="dev"> <log/> </dir> </vfs>
<libc stdout="/dev/log" stderr="/dev/log"/>
<vfs> <pipe/> </vfs>
<default-policy root="/" writeable="yes"/>
</config>
</start>
<start name="sequence">
<resource name="RAM" quantum="4M"/>
<config>
<start name="libc_pipe_local">
<binary name="test-libc_pipe"/>
<config>
<vfs>
<dir name="dev">
<dir name="pipe"> <pipe/> </dir>
<log/>
</dir>
</vfs>
<libc stdout="/dev/log" stderr="/dev/log" pipe="/dev/pipe"/>
</config>
</start>
<start name="libc_pipe_remote">
<binary name="test-libc_pipe"/>
<config>
<vfs>
<dir name="dev">
<dir name="pipe"> <fs/> </dir>
<log/>
</dir>
</vfs>
<libc stdout="/dev/log" stderr="/dev/log" pipe="/dev/pipe"/>
</config>
</start>
</config>
</start>
</config>

View File

@ -1,11 +1,2 @@
SRC_DIR = src/test/libc_pipe
include $(GENODE_DIR)/repos/base/recipes/src/content.inc
MIRROR_FROM_REP_DIR := include/libc-plugin \
lib/mk/libc_pipe.mk \
src/lib/libc_pipe
content: $(MIRROR_FROM_REP_DIR)
$(MIRROR_FROM_REP_DIR):
$(mirror_from_rep_dir)

View File

@ -124,6 +124,13 @@ namespace Libc {
char const *string() const { return _value.string(); }
};
char const *config_pipe() __attribute__((weak));
char const *config_pipe()
{
static Config_attr attr("pipe", "");
return attr.string();
}
char const *config_rtc() __attribute__((weak));
char const *config_rtc()
{
@ -1313,6 +1320,59 @@ int Libc::Vfs_plugin::munmap(void *addr, ::size_t)
}
int Libc::Vfs_plugin::pipe(Libc::File_descriptor *pipefdo[2])
{
Absolute_path base_path(Libc::config_pipe());
if (base_path == "") {
Genode::error(__func__, ": pipe fs not mounted");
return Errno(EACCES);
}
Libc::File_descriptor *meta_fd { nullptr };
{
Absolute_path new_path = base_path;
new_path.append("/new");
meta_fd = open(new_path.base(), O_RDONLY, Libc::ANY_FD);
if (!meta_fd) {
Genode::error("failed to create pipe at ", new_path);
return Errno(EACCES);
}
char buf[32];
int const n = read(meta_fd, buf, sizeof(buf)-1);
if (n < 1) {
Genode::error("failed to read pipe at ", new_path);
close(meta_fd);
return Errno(EACCES);
}
buf[n] = '\0';
base_path.append("/");
base_path.append(buf);
} {
Absolute_path out_path = base_path;
out_path.append("/out");
pipefdo[0] = open(out_path.base(), O_RDONLY, Libc::ANY_FD);
if (!pipefdo[0])
Genode::error("failed to open pipe end at ", out_path);
} {
Absolute_path in_path = base_path;
in_path.append("/in");
pipefdo[1] = open(in_path.base(), O_WRONLY, Libc::ANY_FD);
if (!pipefdo[1])
Genode::error("failed to open pipe end at ", in_path);
}
close(meta_fd);
if (!pipefdo[0] || !pipefdo[1])
return Errno(EACCES);
return 0;
}
bool Libc::Vfs_plugin::poll(File_descriptor &fd, struct pollfd &pfd)
{
if (fd.plugin != this) return false;

View File

@ -115,6 +115,7 @@ class Libc::Vfs_plugin : public Libc::Plugin
bool supports_access(const char *, int) override { return true; }
bool supports_mkdir(const char *, mode_t) override { return true; }
bool supports_open(const char *, int) override { return true; }
bool supports_pipe() override { return true; }
bool supports_poll() override { return true; }
bool supports_readlink(const char *, char *, ::size_t) override { return true; }
bool supports_rename(const char *, const char *) override { return true; }
@ -147,6 +148,7 @@ class Libc::Vfs_plugin : public Libc::Plugin
int ioctl(Libc::File_descriptor *, int , char *) override;
::off_t lseek(Libc::File_descriptor *fd, ::off_t offset, int whence) override;
int mkdir(const char *, mode_t) override;
int pipe(Libc::File_descriptor *pipefdo[2]) override;
bool poll(File_descriptor &fdo, struct pollfd &pfd) override;
ssize_t read(Libc::File_descriptor *, void *, ::size_t) override;
ssize_t readlink(const char *, char *, ::size_t) override;

View File

@ -1,5 +1,5 @@
TARGET = test-libc_pipe
LIBS = base posix libc_pipe
LIBS = base posix
SRC_CC = main.cc
CC_CXX_WARN_STRICT =

View File

@ -84,6 +84,8 @@ class Genode::Ring_buffer
public:
constexpr static size_t size() { return QUEUE_SIZE; }
class Overflow : public Exception { };
/**