diff --git a/hosts/containers/broker/default.nix b/hosts/containers/broker/default.nix index 2feef667..a5ec0c2b 100644 --- a/hosts/containers/broker/default.nix +++ b/hosts/containers/broker/default.nix @@ -8,6 +8,8 @@ let ''; fqdn = "broker.serv.zentralwerk.org"; + + mqttWebsocketPort = 9001; in { c3d2 = { @@ -38,7 +40,10 @@ in default = true; enableACME = true; forceSSL = true; - # TODO: provide websocket + locations."/mqtt" = { + proxyPass = "http://localhost:${toString mqttWebsocketPort}/"; + proxyWebsockets = true; + }; }; }; @@ -66,8 +71,7 @@ in ]; }; }; - in - [ { + in [ { address = "0.0.0.0"; port = 1883; inherit users; @@ -91,6 +95,11 @@ in keyfile = "/run/credentials/mosquitto.service/key.pem"; }; inherit users; + } { + settings.protocol = "websockets"; + address = "::"; + port = mqttWebsocketPort; + inherit users; } ]; }; systemd.services.mosquitto = { diff --git a/overlay/vector-mqtt-sink.patch b/overlay/vector-mqtt-sink.patch new file mode 100644 index 00000000..098ef654 --- /dev/null +++ b/overlay/vector-mqtt-sink.patch @@ -0,0 +1,777 @@ +From 097a53d0db81df3a79245c441eb0665b54d63599 Mon Sep 17 00:00:00 2001 +From: Astro +Date: Sun, 17 Jul 2022 02:54:00 +0200 +Subject: [PATCH] feat(mqtt sink): Implement an MQTT sink + +--- + Cargo.lock | 34 +++ + Cargo.toml | 3 + + src/internal_events/mod.rs | 4 + + src/internal_events/mqtt.rs | 75 ++++++ + src/sinks/mod.rs | 2 + + src/sinks/mqtt/config.rs | 122 ++++++++++ + src/sinks/mqtt/mod.rs | 10 + + src/sinks/mqtt/sink.rs | 222 ++++++++++++++++++ + .../reference/configuration/sinks/mqtt.md | 14 ++ + .../cue/reference/components/sinks/mqtt.cue | 142 +++++++++++ + 10 files changed, 628 insertions(+) + create mode 100644 src/internal_events/mqtt.rs + create mode 100644 src/sinks/mqtt/config.rs + create mode 100644 src/sinks/mqtt/mod.rs + create mode 100644 src/sinks/mqtt/sink.rs + create mode 100644 website/content/en/docs/reference/configuration/sinks/mqtt.md + create mode 100644 website/cue/reference/components/sinks/mqtt.cue + +diff --git a/Cargo.lock b/Cargo.lock +index c0f30014f..b12ec7b3e 100644 +--- a/Cargo.lock ++++ b/Cargo.lock +@@ -4410,6 +4410,15 @@ dependencies = [ + "webpki-roots", + ] + ++[[package]] ++name = "mqttbytes" ++version = "0.5.0" ++source = "registry+https://github.com/rust-lang/crates.io-index" ++checksum = "12793a86f38eb258c5dbddf801dfd521c1d7a9def6e3a3de1ee248441c9dcc28" ++dependencies = [ ++ "bytes 1.1.0", ++] ++ + [[package]] + name = "multer" + version = "2.0.2" +@@ -5360,6 +5369,12 @@ dependencies = [ + "winapi 0.3.9", + ] + ++[[package]] ++name = "pollster" ++version = "0.2.5" ++source = "registry+https://github.com/rust-lang/crates.io-index" ++checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7" ++ + [[package]] + name = "portpicker" + version = "1.0.0" +@@ -6178,6 +6193,24 @@ dependencies = [ + "xmlparser", + ] + ++[[package]] ++name = "rumqttc" ++version = "0.9.0" ++source = "registry+https://github.com/rust-lang/crates.io-index" ++checksum = "613456fdab2f811204edb36f25e397cc62956c85326c274d153bc0b17c44a777" ++dependencies = [ ++ "async-channel", ++ "bytes 1.1.0", ++ "http", ++ "log", ++ "mqttbytes", ++ "pollster", ++ "thiserror", ++ "tokio", ++ "tokio-rustls 0.22.0", ++ "webpki 0.21.4", ++] ++ + [[package]] + name = "rust-argon2" + version = "0.8.3" +@@ -8387,6 +8420,7 @@ dependencies = [ + "rmp-serde", + "rmpv", + "roaring", ++ "rumqttc", + "schannel", + "seahash", + "security-framework", +diff --git a/Cargo.toml b/Cargo.toml +index bfc37c8ac..f3386bb15 100644 +--- a/Cargo.toml ++++ b/Cargo.toml +@@ -280,6 +280,7 @@ rdkafka = { version = "0.27.0", default-features = false, features = ["tokio", " + redis = { version = "0.21.5", default-features = false, features = ["connection-manager", "tokio-comp", "tokio-native-tls-comp"], optional = true } + regex = { version = "1.5.6", default-features = false, features = ["std", "perf"] } + roaring = { version = "0.9.0", default-features = false, optional = true } ++rumqttc = { version = "0.9", default-features = false, optional = true } + seahash = { version = "4.1.0", default-features = false, optional = true } + semver = { version = "1.0.10", default-features = false, features = ["serde", "std"], optional = true } + smallvec = { version = "1", default-features = false, features = ["union"] } +@@ -627,6 +628,7 @@ sinks-logs = [ + "sinks-kafka", + "sinks-logdna", + "sinks-loki", ++ "sinks-mqtt", + "sinks-nats", + "sinks-new_relic_logs", + "sinks-new_relic", +@@ -680,6 +682,7 @@ sinks-influxdb = [] + sinks-kafka = ["dep:rdkafka"] + sinks-logdna = [] + sinks-loki = [] ++sinks-mqtt = ["dep:rumqttc"] + sinks-nats = ["dep:nats", "dep:nkeys"] + sinks-new_relic_logs = ["sinks-http"] + sinks-new_relic = [] +diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs +index 16cdded31..dcf1d1175 100644 +--- a/src/internal_events/mod.rs ++++ b/src/internal_events/mod.rs +@@ -141,6 +141,8 @@ mod unix; + mod vector; + #[cfg(feature = "sinks-websocket")] + mod websocket; ++#[cfg(feature = "sinks-mqtt")] ++mod mqtt; + + #[cfg(any( + feature = "sources-file", +@@ -310,6 +312,8 @@ pub(crate) use self::unix::*; + pub(crate) use self::vector::*; + #[cfg(feature = "sinks-websocket")] + pub(crate) use self::websocket::*; ++#[cfg(feature = "sinks-mqtt")] ++pub(crate) use self::mqtt::*; + #[cfg(windows)] + pub(crate) use self::windows::*; + pub(crate) use self::{ +diff --git a/src/internal_events/mqtt.rs b/src/internal_events/mqtt.rs +new file mode 100644 +index 000000000..6dcedb83a +--- /dev/null ++++ b/src/internal_events/mqtt.rs +@@ -0,0 +1,75 @@ ++use std::fmt::Debug; ++ ++use metrics::counter; ++use rumqttc::{ClientError, ConnectionError}; ++use vector_core::internal_event::InternalEvent; ++ ++use super::prelude::{error_stage, error_type}; ++ ++#[derive(Debug)] ++pub struct MqttConnectionShutdown; ++ ++impl InternalEvent for MqttConnectionShutdown { ++ fn emit(self) { ++ warn!(message = "Closed by the server."); ++ counter!("connection_shutdown_total", 1); ++ } ++ ++ fn name(&self) -> Option<&'static str> { ++ Some("MqttConnectionShutdown") ++ } ++} ++ ++#[derive(Debug)] ++pub struct MqttConnectionError { ++ pub error: ConnectionError, ++} ++ ++impl InternalEvent for MqttConnectionError { ++ fn emit(self) { ++ error!( ++ message = "MQTT connection error.", ++ error = %self.error, ++ // error_code = "ws_connection_error", ++ error_type = error_type::WRITER_FAILED, ++ stage = error_stage::SENDING, ++ ); ++ counter!( ++ "component_errors_total", 1, ++ // "error_code" => "ws_connection_error", ++ "error_type" => error_type::WRITER_FAILED, ++ "stage" => error_stage::SENDING, ++ ); ++ } ++ ++ fn name(&self) -> Option<&'static str> { ++ Some("MqttConnectionError") ++ } ++} ++ ++#[derive(Debug)] ++pub struct MqttClientError { ++ pub error: ClientError, ++} ++ ++impl InternalEvent for MqttClientError { ++ fn emit(self) { ++ error!( ++ message = "MQTT client error.", ++ error = %self.error, ++ // error_code = "ws_client_error", ++ error_type = error_type::WRITER_FAILED, ++ stage = error_stage::SENDING, ++ ); ++ counter!( ++ "component_errors_total", 1, ++ // "error_code" => "ws_client_error", ++ "error_type" => error_type::WRITER_FAILED, ++ "stage" => error_stage::SENDING, ++ ); ++ } ++ ++ fn name(&self) -> Option<&'static str> { ++ Some("MqttClientError") ++ } ++} +diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs +index 313d703fa..df7d5439b 100644 +--- a/src/sinks/mod.rs ++++ b/src/sinks/mod.rs +@@ -58,6 +58,8 @@ pub mod kafka; + pub mod logdna; + #[cfg(feature = "sinks-loki")] + pub mod loki; ++#[cfg(feature = "sinks-mqtt")] ++pub mod mqtt; + #[cfg(feature = "sinks-nats")] + pub mod nats; + #[cfg(feature = "sinks-new_relic")] +diff --git a/src/sinks/mqtt/config.rs b/src/sinks/mqtt/config.rs +new file mode 100644 +index 000000000..17b28b73c +--- /dev/null ++++ b/src/sinks/mqtt/config.rs +@@ -0,0 +1,122 @@ ++use rumqttc::{MqttOptions, Transport}; ++use serde::{Deserialize, Serialize}; ++use snafu::ResultExt; ++ ++use crate::{ ++ config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, ++ sinks::{ ++ util::encoding::{ ++ EncodingConfig, EncodingConfigAdapter, StandardEncodings, StandardEncodingsMigrator, ++ }, ++ mqtt::sink::{TlsSnafu, MqttConnector, MqttError, MqttSink}, ++ Healthcheck, VectorSink, ++ }, ++ tls::{MaybeTlsSettings, TlsEnableableConfig}, ++}; ++ ++#[derive(Clone, Debug, Deserialize, Serialize)] ++pub struct MqttSinkConfig { ++ pub host: String, ++ #[serde(default = "default_port")] ++ pub port: u16, ++ pub user: String, ++ pub password: String, ++ #[serde(default = "default_client_id")] ++ pub client_id: String, ++ #[serde(default = "default_keep_alive")] ++ pub keep_alive: u16, ++ #[serde(default = "default_clean_session")] ++ pub clean_session: bool, ++ pub tls: Option, ++ pub topic: String, ++ pub encoding: ++ EncodingConfigAdapter, StandardEncodingsMigrator>, ++} ++ ++const fn default_port() -> u16 { ++ 1883 ++} ++ ++fn default_client_id() -> String { ++ "vector".into() ++} ++ ++const fn default_keep_alive() -> u16 { ++ 60 ++} ++ ++const fn default_clean_session() -> bool { ++ false ++} ++ ++impl GenerateConfig for MqttSinkConfig { ++ fn generate_config() -> toml::Value { ++ toml::Value::try_from(Self { ++ host: "localhost".into(), ++ port: default_port(), ++ user: "admin".into(), ++ password: "secret".into(), ++ client_id: default_client_id(), ++ keep_alive: default_keep_alive(), ++ clean_session: default_clean_session(), ++ tls: None, ++ topic: "vector".into(), ++ encoding: EncodingConfig::from(StandardEncodings::Json).into(), ++ }) ++ .unwrap() ++ } ++} ++ ++#[async_trait::async_trait] ++#[typetag::serde(name = "mqtt")] ++impl SinkConfig for MqttSinkConfig { ++ async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { ++ let connector = self.build_connector()?; ++ let sink = MqttSink::new(self, connector.clone(), cx.acker())?; ++ ++ Ok(( ++ VectorSink::from_event_streamsink(sink), ++ Box::pin(async move { connector.healthcheck().await }), ++ )) ++ } ++ ++ fn input(&self) -> Input { ++ Input::log() ++ } ++ ++ fn sink_type(&self) -> &'static str { ++ "mqtt" ++ } ++ ++ fn acknowledgements(&self) -> Option<&AcknowledgementsConfig> { ++ None ++ } ++} ++ ++impl MqttSinkConfig { ++ fn build_connector(&self) -> Result { ++ let tls = MaybeTlsSettings::from_config(&self.tls, false) ++ .context(TlsSnafu)?; ++ let mut options = MqttOptions::new(&self.client_id, &self.host, self.port); ++ options.set_keep_alive(self.keep_alive); ++ options.set_clean_session(self.clean_session); ++ options.set_credentials(&self.user, &self.password); ++ if let Some(tls) = tls.tls() { ++ let ca = tls.authorities_pem().flatten().collect(); ++ let client_auth = None; ++ let alpn = Some(vec!["mqtt".into()]); ++ options.set_transport(Transport::tls(ca, client_auth, alpn)); ++ } ++ MqttConnector::new(options, self.topic.clone()) ++ } ++} ++ ++#[cfg(test)] ++mod test { ++ use super::*; ++ ++ #[test] ++ fn generate_config() { ++ crate::test_util::test_generate_config::(); ++ } ++} +diff --git a/src/sinks/mqtt/mod.rs b/src/sinks/mqtt/mod.rs +new file mode 100644 +index 000000000..78858ee96 +--- /dev/null ++++ b/src/sinks/mqtt/mod.rs +@@ -0,0 +1,10 @@ ++mod config; ++mod sink; ++ ++pub use config::MqttSinkConfig; ++ ++use crate::config::SinkDescription; ++ ++inventory::submit! { ++ SinkDescription::new::("websocket") ++} +diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs +new file mode 100644 +index 000000000..3f06dbe6a +--- /dev/null ++++ b/src/sinks/mqtt/sink.rs +@@ -0,0 +1,222 @@ ++use std::{ ++ fmt::Debug, ++}; ++ ++use async_trait::async_trait; ++use bytes::BytesMut; ++use futures::{ ++ pin_mut, ++ stream::BoxStream, ++ Stream, StreamExt, ++}; ++use rumqttc::{ ++ AsyncClient, ClientError, ConnectionError, ++ Event as MqttEvent, EventLoop, MqttOptions, ++ Packet, QoS, ++}; ++use snafu::{ResultExt, Snafu}; ++use tokio_util::codec::Encoder as _; ++use vector_core::{ ++ buffers::Acker, ++ internal_event::{BytesSent, EventsSent}, ++ ByteSizeOf, ++}; ++ ++use crate::{ ++ codecs::Encoder, ++ emit, ++ event::{Event, EventStatus, Finalizable}, ++ internal_events::{ ++ ConnectionOpen, OpenGauge, MqttClientError, MqttConnectionError, ++ }, ++ internal_events::TemplateRenderingError, ++ sinks::util::StreamSink, ++ sinks::{util::encoding::Transformer, mqtt::config::MqttSinkConfig}, ++ template::{Template, TemplateParseError}, ++ tls::TlsError, ++}; ++ ++#[derive(Debug, Snafu)] ++#[snafu(visibility(pub))] ++pub enum MqttError { ++ #[snafu(display("invalid topic template: {}", source))] ++ TopicTemplate { source: TemplateParseError }, ++ #[snafu(display("MQTT connection: {}", source))] ++ Connection { source: ConnectionError }, ++ #[snafu(display("TLS error: {}", source))] ++ Tls { source: TlsError }, ++ #[snafu(display("MQTT client: {}", source))] ++ Client { source: ClientError }, ++} ++ ++#[derive(Clone)] ++pub struct MqttConnector { ++ options: MqttOptions, ++ topic: Template, ++} ++ ++impl MqttConnector { ++ pub fn new(options: MqttOptions, topic: String) -> Result { ++ let topic = Template::try_from(topic).context(TopicTemplateSnafu)?; ++ Ok(Self { ++ options, ++ topic, ++ }) ++ } ++ ++ fn connect(&self) -> (AsyncClient, EventLoop) { ++ AsyncClient::new(self.options.clone(), 1024) ++ } ++ ++ pub async fn healthcheck(&self) -> crate::Result<()> { ++ let (client, connection) = self.connect(); ++ drop(client); ++ drop(connection); ++ Ok(()) ++ } ++} ++ ++pub struct MqttSink { ++ transformer: Transformer, ++ encoder: Encoder<()>, ++ connector: MqttConnector, ++ acker: Acker, ++} ++ ++impl MqttSink { ++ pub fn new( ++ config: &MqttSinkConfig, ++ connector: MqttConnector, ++ acker: Acker, ++ ) -> crate::Result { ++ let transformer = config.encoding.transformer(); ++ let serializer = config.encoding.encoding()?; ++ let encoder = Encoder::<()>::new(serializer); ++ ++ Ok(Self { ++ transformer, ++ encoder, ++ connector, ++ acker, ++ }) ++ } ++ ++ async fn handle_events( ++ &mut self, ++ input: &mut I, ++ client: &mut AsyncClient, ++ connection: &mut EventLoop, ++ ) -> Result<(), ()> ++ where ++ I: Stream + Unpin, ++ { ++ loop { ++ tokio::select! { ++ msg = connection.poll() => { ++ match msg { ++ Ok(msg) => ++ if let MqttEvent::Incoming(Packet::PubComp(_)) = msg { ++ self.acker.ack(1); ++ }, ++ Err(error) => { ++ emit!(MqttConnectionError { error }); ++ return Err(()); ++ } ++ } ++ }, ++ ++ event = input.next() => { ++ let mut event = if let Some(event) = event { ++ event ++ } else { ++ break; ++ }; ++ ++ let finalizers = event.take_finalizers(); ++ ++ let topic = match self.connector.topic.render_string(&event) { ++ Ok(topic) => topic, ++ Err(error) => { ++ emit!(TemplateRenderingError { ++ error, ++ field: Some("topic"), ++ drop_event: true, ++ }); ++ finalizers.update_status(EventStatus::Errored); ++ self.acker.ack(1); ++ continue; ++ } ++ }; ++ ++ self.transformer.transform(&mut event); ++ ++ let event_byte_size = event.size_of(); ++ ++ let mut bytes = BytesMut::new(); ++ let res = match self.encoder.encode(event, &mut bytes) { ++ Ok(()) => { ++ finalizers.update_status(EventStatus::Delivered); ++ ++ let message = bytes.to_vec(); ++ let message_len = message.len(); ++ ++ let qos = QoS::ExactlyOnce; ++ let retain = false; ++ client.publish(&topic, qos, retain, message).await.map(|_| { ++ emit!(EventsSent { ++ count: 1, ++ byte_size: event_byte_size, ++ output: None ++ }); ++ emit!(BytesSent { ++ byte_size: message_len, ++ protocol: "mqtt" ++ }); ++ }) ++ }, ++ Err(_) => { ++ // Error is handled by `Encoder`. ++ finalizers.update_status(EventStatus::Errored); ++ Ok(()) ++ } ++ }; ++ ++ if let Err(error) = res { ++ emit!(MqttClientError { error }); ++ return Err(()); ++ } ++ }, ++ ++ else => break, ++ } ++ } ++ ++ Ok(()) ++ } ++} ++ ++#[async_trait] ++impl StreamSink for MqttSink { ++ async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { ++ let input = input.fuse().peekable(); ++ pin_mut!(input); ++ ++ while input.as_mut().peek().await.is_some() { ++ let (client, connection) = self.connector.connect(); ++ pin_mut!(client); ++ pin_mut!(connection); ++ ++ let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count })); ++ ++ if self ++ .handle_events(&mut input, &mut client, &mut connection) ++ .await ++ .is_ok() ++ { ++ let _ = client.disconnect().await; ++ } ++ } ++ ++ Ok(()) ++ } ++} +diff --git a/website/content/en/docs/reference/configuration/sinks/mqtt.md b/website/content/en/docs/reference/configuration/sinks/mqtt.md +new file mode 100644 +index 000000000..294597fe6 +--- /dev/null ++++ b/website/content/en/docs/reference/configuration/sinks/mqtt.md +@@ -0,0 +1,14 @@ ++--- ++title: MQTT ++description: Deliver observability event data to an MQTT broker ++kind: sink ++layout: component ++tags: ["mqtt", "component", "sink"] ++--- ++ ++{{/* ++This doc is generated using: ++ ++1. The template in layouts/docs/component.html ++2. The relevant CUE data in cue/reference/components/... ++*/}} +diff --git a/website/cue/reference/components/sinks/mqtt.cue b/website/cue/reference/components/sinks/mqtt.cue +new file mode 100644 +index 000000000..cc2c01781 +--- /dev/null ++++ b/website/cue/reference/components/sinks/mqtt.cue +@@ -0,0 +1,142 @@ ++package metadata ++ ++components: sinks: mqtt: { ++ title: "MQTT" ++ ++ classes: { ++ commonly_used: false ++ delivery: "best_effort" ++ development: "beta" ++ egress_method: "stream" ++ service_providers: [] ++ stateful: false ++ } ++ ++ features: { ++ acknowledgements: true ++ healthcheck: enabled: false ++ send: { ++ compression: enabled: false ++ encoding: { ++ enabled: true ++ codec: { ++ enabled: true ++ enum: ["json", "text"] ++ } ++ } ++ request: enabled: false ++ tls: { ++ enabled: true ++ can_verify_certificate: true ++ can_verify_hostname: true ++ enabled_default: false ++ } ++ to: { ++ service: services.mqtt ++ interface: { ++ socket: { ++ direction: "outgoing" ++ protocols: ["tcp"] ++ ssl: "optional" ++ } ++ } ++ } ++ } ++ } ++ ++ support: { ++ targets: { ++ "aarch64-unknown-linux-gnu": true ++ "aarch64-unknown-linux-musl": true ++ "armv7-unknown-linux-gnueabihf": true ++ "armv7-unknown-linux-musleabihf": true ++ "x86_64-apple-darwin": true ++ "x86_64-pc-windows-msv": true ++ "x86_64-unknown-linux-gnu": true ++ "x86_64-unknown-linux-musl": true ++ } ++ requirements: [] ++ warnings: [] ++ notices: [] ++ } ++ ++ configuration: { ++ host: { ++ description: """ ++ The MQTT broker to connect to. ++ """ ++ required: true ++ warnings: [] ++ type: string: { ++ examples: ["mqtt.example.com"] ++ syntax: "literal" ++ } ++ } ++ port: { ++ description: """ ++ MQTT service port to connect to. ++ """ ++ required: false ++ type: uint: { ++ default: 1883 ++ } ++ } ++ user: { ++ description: """ ++ MQTT username ++ """ ++ required: true ++ type: str: {} ++ } ++ password: { ++ description: """ ++ MQTT password ++ """ ++ required: true ++ type: str: {} ++ } ++ client_id: { ++ description: """ ++ MQTT client Id ++ """ ++ required: false ++ type: str: {} ++ } ++ keep_alive: { ++ description: """ ++ MQTT keep-alive ++ """ ++ required: false ++ type: uint: { ++ default: 60 ++ } ++ } ++ clean_session: { ++ description: """ ++ Removes all the state from queues & instructs the broker to clean all the client state after disconnect. ++ """ ++ required: false ++ type: bool: { ++ default: false ++ } ++ } ++ } ++ ++ input: { ++ logs: true ++ metrics: null ++ traces: false ++ } ++ ++ telemetry: metrics: { ++ open_connections: components.sources.internal_metrics.output.metrics.open_connections ++ connection_shutdown_total: components.sources.internal_metrics.output.metrics.connection_shutdown_total ++ connection_errors_total: components.sources.internal_metrics.output.metrics.connection_errors_total ++ events_in_total: components.sources.internal_metrics.output.metrics.events_in_total ++ events_out_total: components.sources.internal_metrics.output.metrics.events_out_total ++ component_sent_bytes_total: components.sources.internal_metrics.output.metrics.component_sent_bytes_total ++ component_sent_events_total: components.sources.internal_metrics.output.metrics.component_sent_events_total ++ events_out_total: components.sources.internal_metrics.output.metrics.events_out_total ++ component_sent_event_bytes_total: components.sources.internal_metrics.output.metrics.component_sent_event_bytes_total ++ } ++} +-- +2.36.1 +