From 097a53d0db81df3a79245c441eb0665b54d63599 Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 17 Jul 2022 02:54:00 +0200 Subject: [PATCH] feat(mqtt sink): Implement an MQTT sink --- Cargo.lock | 34 +++ Cargo.toml | 3 + src/internal_events/mod.rs | 4 + src/internal_events/mqtt.rs | 75 ++++++ src/sinks/mod.rs | 2 + src/sinks/mqtt/config.rs | 122 ++++++++++ src/sinks/mqtt/mod.rs | 10 + src/sinks/mqtt/sink.rs | 222 ++++++++++++++++++ .../reference/configuration/sinks/mqtt.md | 14 ++ .../cue/reference/components/sinks/mqtt.cue | 142 +++++++++++ 10 files changed, 628 insertions(+) create mode 100644 src/internal_events/mqtt.rs create mode 100644 src/sinks/mqtt/config.rs create mode 100644 src/sinks/mqtt/mod.rs create mode 100644 src/sinks/mqtt/sink.rs create mode 100644 website/content/en/docs/reference/configuration/sinks/mqtt.md create mode 100644 website/cue/reference/components/sinks/mqtt.cue diff --git a/Cargo.lock b/Cargo.lock index c0f30014f..b12ec7b3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4410,6 +4410,15 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "mqttbytes" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12793a86f38eb258c5dbddf801dfd521c1d7a9def6e3a3de1ee248441c9dcc28" +dependencies = [ + "bytes 1.1.0", +] + [[package]] name = "multer" version = "2.0.2" @@ -5360,6 +5369,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "pollster" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7" + [[package]] name = "portpicker" version = "1.0.0" @@ -6178,6 +6193,24 @@ dependencies = [ "xmlparser", ] +[[package]] +name = "rumqttc" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613456fdab2f811204edb36f25e397cc62956c85326c274d153bc0b17c44a777" +dependencies = [ + "async-channel", + "bytes 1.1.0", + "http", + "log", + "mqttbytes", + "pollster", + "thiserror", + "tokio", + "tokio-rustls 0.22.0", + "webpki 0.21.4", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -8387,6 +8420,7 @@ dependencies = [ "rmp-serde", "rmpv", "roaring", + "rumqttc", "schannel", "seahash", "security-framework", diff --git a/Cargo.toml b/Cargo.toml index bfc37c8ac..f3386bb15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -280,6 +280,7 @@ rdkafka = { version = "0.27.0", default-features = false, features = ["tokio", " redis = { version = "0.21.5", default-features = false, features = ["connection-manager", "tokio-comp", "tokio-native-tls-comp"], optional = true } regex = { version = "1.5.6", default-features = false, features = ["std", "perf"] } roaring = { version = "0.9.0", default-features = false, optional = true } +rumqttc = { version = "0.9", default-features = false, optional = true } seahash = { version = "4.1.0", default-features = false, optional = true } semver = { version = "1.0.10", default-features = false, features = ["serde", "std"], optional = true } smallvec = { version = "1", default-features = false, features = ["union"] } @@ -627,6 +628,7 @@ sinks-logs = [ "sinks-kafka", "sinks-logdna", "sinks-loki", + "sinks-mqtt", "sinks-nats", "sinks-new_relic_logs", "sinks-new_relic", @@ -680,6 +682,7 @@ sinks-influxdb = [] sinks-kafka = ["dep:rdkafka"] sinks-logdna = [] sinks-loki = [] +sinks-mqtt = ["dep:rumqttc"] sinks-nats = ["dep:nats", "dep:nkeys"] sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 16cdded31..dcf1d1175 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -141,6 +141,8 @@ mod unix; mod vector; #[cfg(feature = "sinks-websocket")] mod websocket; +#[cfg(feature = "sinks-mqtt")] +mod mqtt; #[cfg(any( feature = "sources-file", @@ -310,6 +312,8 @@ pub(crate) use self::unix::*; pub(crate) use self::vector::*; #[cfg(feature = "sinks-websocket")] pub(crate) use self::websocket::*; +#[cfg(feature = "sinks-mqtt")] +pub(crate) use self::mqtt::*; #[cfg(windows)] pub(crate) use self::windows::*; pub(crate) use self::{ diff --git a/src/internal_events/mqtt.rs b/src/internal_events/mqtt.rs new file mode 100644 index 000000000..6dcedb83a --- /dev/null +++ b/src/internal_events/mqtt.rs @@ -0,0 +1,75 @@ +use std::fmt::Debug; + +use metrics::counter; +use rumqttc::{ClientError, ConnectionError}; +use vector_core::internal_event::InternalEvent; + +use super::prelude::{error_stage, error_type}; + +#[derive(Debug)] +pub struct MqttConnectionShutdown; + +impl InternalEvent for MqttConnectionShutdown { + fn emit(self) { + warn!(message = "Closed by the server."); + counter!("connection_shutdown_total", 1); + } + + fn name(&self) -> Option<&'static str> { + Some("MqttConnectionShutdown") + } +} + +#[derive(Debug)] +pub struct MqttConnectionError { + pub error: ConnectionError, +} + +impl InternalEvent for MqttConnectionError { + fn emit(self) { + error!( + message = "MQTT connection error.", + error = %self.error, + // error_code = "ws_connection_error", + error_type = error_type::WRITER_FAILED, + stage = error_stage::SENDING, + ); + counter!( + "component_errors_total", 1, + // "error_code" => "ws_connection_error", + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::SENDING, + ); + } + + fn name(&self) -> Option<&'static str> { + Some("MqttConnectionError") + } +} + +#[derive(Debug)] +pub struct MqttClientError { + pub error: ClientError, +} + +impl InternalEvent for MqttClientError { + fn emit(self) { + error!( + message = "MQTT client error.", + error = %self.error, + // error_code = "ws_client_error", + error_type = error_type::WRITER_FAILED, + stage = error_stage::SENDING, + ); + counter!( + "component_errors_total", 1, + // "error_code" => "ws_client_error", + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::SENDING, + ); + } + + fn name(&self) -> Option<&'static str> { + Some("MqttClientError") + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 313d703fa..df7d5439b 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -58,6 +58,8 @@ pub mod kafka; pub mod logdna; #[cfg(feature = "sinks-loki")] pub mod loki; +#[cfg(feature = "sinks-mqtt")] +pub mod mqtt; #[cfg(feature = "sinks-nats")] pub mod nats; #[cfg(feature = "sinks-new_relic")] diff --git a/src/sinks/mqtt/config.rs b/src/sinks/mqtt/config.rs new file mode 100644 index 000000000..17b28b73c --- /dev/null +++ b/src/sinks/mqtt/config.rs @@ -0,0 +1,122 @@ +use rumqttc::{MqttOptions, Transport}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::{ + config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, + sinks::{ + util::encoding::{ + EncodingConfig, EncodingConfigAdapter, StandardEncodings, StandardEncodingsMigrator, + }, + mqtt::sink::{TlsSnafu, MqttConnector, MqttError, MqttSink}, + Healthcheck, VectorSink, + }, + tls::{MaybeTlsSettings, TlsEnableableConfig}, +}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct MqttSinkConfig { + pub host: String, + #[serde(default = "default_port")] + pub port: u16, + pub user: String, + pub password: String, + #[serde(default = "default_client_id")] + pub client_id: String, + #[serde(default = "default_keep_alive")] + pub keep_alive: u16, + #[serde(default = "default_clean_session")] + pub clean_session: bool, + pub tls: Option, + pub topic: String, + pub encoding: + EncodingConfigAdapter, StandardEncodingsMigrator>, +} + +const fn default_port() -> u16 { + 1883 +} + +fn default_client_id() -> String { + "vector".into() +} + +const fn default_keep_alive() -> u16 { + 60 +} + +const fn default_clean_session() -> bool { + false +} + +impl GenerateConfig for MqttSinkConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + host: "localhost".into(), + port: default_port(), + user: "admin".into(), + password: "secret".into(), + client_id: default_client_id(), + keep_alive: default_keep_alive(), + clean_session: default_clean_session(), + tls: None, + topic: "vector".into(), + encoding: EncodingConfig::from(StandardEncodings::Json).into(), + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "mqtt")] +impl SinkConfig for MqttSinkConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let connector = self.build_connector()?; + let sink = MqttSink::new(self, connector.clone(), cx.acker())?; + + Ok(( + VectorSink::from_event_streamsink(sink), + Box::pin(async move { connector.healthcheck().await }), + )) + } + + fn input(&self) -> Input { + Input::log() + } + + fn sink_type(&self) -> &'static str { + "mqtt" + } + + fn acknowledgements(&self) -> Option<&AcknowledgementsConfig> { + None + } +} + +impl MqttSinkConfig { + fn build_connector(&self) -> Result { + let tls = MaybeTlsSettings::from_config(&self.tls, false) + .context(TlsSnafu)?; + let mut options = MqttOptions::new(&self.client_id, &self.host, self.port); + options.set_keep_alive(self.keep_alive); + options.set_clean_session(self.clean_session); + options.set_credentials(&self.user, &self.password); + if let Some(tls) = tls.tls() { + let ca = tls.authorities_pem().flatten().collect(); + let client_auth = None; + let alpn = Some(vec!["mqtt".into()]); + options.set_transport(Transport::tls(ca, client_auth, alpn)); + } + MqttConnector::new(options, self.topic.clone()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/sinks/mqtt/mod.rs b/src/sinks/mqtt/mod.rs new file mode 100644 index 000000000..78858ee96 --- /dev/null +++ b/src/sinks/mqtt/mod.rs @@ -0,0 +1,10 @@ +mod config; +mod sink; + +pub use config::MqttSinkConfig; + +use crate::config::SinkDescription; + +inventory::submit! { + SinkDescription::new::("websocket") +} diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs new file mode 100644 index 000000000..3f06dbe6a --- /dev/null +++ b/src/sinks/mqtt/sink.rs @@ -0,0 +1,222 @@ +use std::{ + fmt::Debug, +}; + +use async_trait::async_trait; +use bytes::BytesMut; +use futures::{ + pin_mut, + stream::BoxStream, + Stream, StreamExt, +}; +use rumqttc::{ + AsyncClient, ClientError, ConnectionError, + Event as MqttEvent, EventLoop, MqttOptions, + Packet, QoS, +}; +use snafu::{ResultExt, Snafu}; +use tokio_util::codec::Encoder as _; +use vector_core::{ + buffers::Acker, + internal_event::{BytesSent, EventsSent}, + ByteSizeOf, +}; + +use crate::{ + codecs::Encoder, + emit, + event::{Event, EventStatus, Finalizable}, + internal_events::{ + ConnectionOpen, OpenGauge, MqttClientError, MqttConnectionError, + }, + internal_events::TemplateRenderingError, + sinks::util::StreamSink, + sinks::{util::encoding::Transformer, mqtt::config::MqttSinkConfig}, + template::{Template, TemplateParseError}, + tls::TlsError, +}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum MqttError { + #[snafu(display("invalid topic template: {}", source))] + TopicTemplate { source: TemplateParseError }, + #[snafu(display("MQTT connection: {}", source))] + Connection { source: ConnectionError }, + #[snafu(display("TLS error: {}", source))] + Tls { source: TlsError }, + #[snafu(display("MQTT client: {}", source))] + Client { source: ClientError }, +} + +#[derive(Clone)] +pub struct MqttConnector { + options: MqttOptions, + topic: Template, +} + +impl MqttConnector { + pub fn new(options: MqttOptions, topic: String) -> Result { + let topic = Template::try_from(topic).context(TopicTemplateSnafu)?; + Ok(Self { + options, + topic, + }) + } + + fn connect(&self) -> (AsyncClient, EventLoop) { + AsyncClient::new(self.options.clone(), 1024) + } + + pub async fn healthcheck(&self) -> crate::Result<()> { + let (client, connection) = self.connect(); + drop(client); + drop(connection); + Ok(()) + } +} + +pub struct MqttSink { + transformer: Transformer, + encoder: Encoder<()>, + connector: MqttConnector, + acker: Acker, +} + +impl MqttSink { + pub fn new( + config: &MqttSinkConfig, + connector: MqttConnector, + acker: Acker, + ) -> crate::Result { + let transformer = config.encoding.transformer(); + let serializer = config.encoding.encoding()?; + let encoder = Encoder::<()>::new(serializer); + + Ok(Self { + transformer, + encoder, + connector, + acker, + }) + } + + async fn handle_events( + &mut self, + input: &mut I, + client: &mut AsyncClient, + connection: &mut EventLoop, + ) -> Result<(), ()> + where + I: Stream + Unpin, + { + loop { + tokio::select! { + msg = connection.poll() => { + match msg { + Ok(msg) => + if let MqttEvent::Incoming(Packet::PubComp(_)) = msg { + self.acker.ack(1); + }, + Err(error) => { + emit!(MqttConnectionError { error }); + return Err(()); + } + } + }, + + event = input.next() => { + let mut event = if let Some(event) = event { + event + } else { + break; + }; + + let finalizers = event.take_finalizers(); + + let topic = match self.connector.topic.render_string(&event) { + Ok(topic) => topic, + Err(error) => { + emit!(TemplateRenderingError { + error, + field: Some("topic"), + drop_event: true, + }); + finalizers.update_status(EventStatus::Errored); + self.acker.ack(1); + continue; + } + }; + + self.transformer.transform(&mut event); + + let event_byte_size = event.size_of(); + + let mut bytes = BytesMut::new(); + let res = match self.encoder.encode(event, &mut bytes) { + Ok(()) => { + finalizers.update_status(EventStatus::Delivered); + + let message = bytes.to_vec(); + let message_len = message.len(); + + let qos = QoS::ExactlyOnce; + let retain = false; + client.publish(&topic, qos, retain, message).await.map(|_| { + emit!(EventsSent { + count: 1, + byte_size: event_byte_size, + output: None + }); + emit!(BytesSent { + byte_size: message_len, + protocol: "mqtt" + }); + }) + }, + Err(_) => { + // Error is handled by `Encoder`. + finalizers.update_status(EventStatus::Errored); + Ok(()) + } + }; + + if let Err(error) = res { + emit!(MqttClientError { error }); + return Err(()); + } + }, + + else => break, + } + } + + Ok(()) + } +} + +#[async_trait] +impl StreamSink for MqttSink { + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let input = input.fuse().peekable(); + pin_mut!(input); + + while input.as_mut().peek().await.is_some() { + let (client, connection) = self.connector.connect(); + pin_mut!(client); + pin_mut!(connection); + + let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count })); + + if self + .handle_events(&mut input, &mut client, &mut connection) + .await + .is_ok() + { + let _ = client.disconnect().await; + } + } + + Ok(()) + } +} diff --git a/website/content/en/docs/reference/configuration/sinks/mqtt.md b/website/content/en/docs/reference/configuration/sinks/mqtt.md new file mode 100644 index 000000000..294597fe6 --- /dev/null +++ b/website/content/en/docs/reference/configuration/sinks/mqtt.md @@ -0,0 +1,14 @@ +--- +title: MQTT +description: Deliver observability event data to an MQTT broker +kind: sink +layout: component +tags: ["mqtt", "component", "sink"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components/sinks/mqtt.cue b/website/cue/reference/components/sinks/mqtt.cue new file mode 100644 index 000000000..cc2c01781 --- /dev/null +++ b/website/cue/reference/components/sinks/mqtt.cue @@ -0,0 +1,142 @@ +package metadata + +components: sinks: mqtt: { + title: "MQTT" + + classes: { + commonly_used: false + delivery: "best_effort" + development: "beta" + egress_method: "stream" + service_providers: [] + stateful: false + } + + features: { + acknowledgements: true + healthcheck: enabled: false + send: { + compression: enabled: false + encoding: { + enabled: true + codec: { + enabled: true + enum: ["json", "text"] + } + } + request: enabled: false + tls: { + enabled: true + can_verify_certificate: true + can_verify_hostname: true + enabled_default: false + } + to: { + service: services.mqtt + interface: { + socket: { + direction: "outgoing" + protocols: ["tcp"] + ssl: "optional" + } + } + } + } + } + + support: { + targets: { + "aarch64-unknown-linux-gnu": true + "aarch64-unknown-linux-musl": true + "armv7-unknown-linux-gnueabihf": true + "armv7-unknown-linux-musleabihf": true + "x86_64-apple-darwin": true + "x86_64-pc-windows-msv": true + "x86_64-unknown-linux-gnu": true + "x86_64-unknown-linux-musl": true + } + requirements: [] + warnings: [] + notices: [] + } + + configuration: { + host: { + description: """ + The MQTT broker to connect to. + """ + required: true + warnings: [] + type: string: { + examples: ["mqtt.example.com"] + syntax: "literal" + } + } + port: { + description: """ + MQTT service port to connect to. + """ + required: false + type: uint: { + default: 1883 + } + } + user: { + description: """ + MQTT username + """ + required: true + type: str: {} + } + password: { + description: """ + MQTT password + """ + required: true + type: str: {} + } + client_id: { + description: """ + MQTT client Id + """ + required: false + type: str: {} + } + keep_alive: { + description: """ + MQTT keep-alive + """ + required: false + type: uint: { + default: 60 + } + } + clean_session: { + description: """ + Removes all the state from queues & instructs the broker to clean all the client state after disconnect. + """ + required: false + type: bool: { + default: false + } + } + } + + input: { + logs: true + metrics: null + traces: false + } + + telemetry: metrics: { + open_connections: components.sources.internal_metrics.output.metrics.open_connections + connection_shutdown_total: components.sources.internal_metrics.output.metrics.connection_shutdown_total + connection_errors_total: components.sources.internal_metrics.output.metrics.connection_errors_total + events_in_total: components.sources.internal_metrics.output.metrics.events_in_total + events_out_total: components.sources.internal_metrics.output.metrics.events_out_total + component_sent_bytes_total: components.sources.internal_metrics.output.metrics.component_sent_bytes_total + component_sent_events_total: components.sources.internal_metrics.output.metrics.component_sent_events_total + events_out_total: components.sources.internal_metrics.output.metrics.events_out_total + component_sent_event_bytes_total: components.sources.internal_metrics.output.metrics.component_sent_event_bytes_total + } +} -- 2.36.1