1mod config;
58mod device;
59
60pub use config::ManagerConfig;
61pub use device::Device;
62
63use std::sync::Arc;
64
65use anyhow::{Context, Result};
66
67use std::time::Duration;
68
69use tokio::sync::mpsc::UnboundedReceiver;
70
71use crate::{certmanager, controller, discover, fabric::Fabric, mdns2, onboarding, transport};
72
73pub struct DeviceManager {
74 base_path: String,
75 config: ManagerConfig,
76 transport: Arc<transport::Transport>,
77 controller: Arc<controller::Controller>,
78 certmanager: Arc<dyn certmanager::CertManager>,
79 registry: std::sync::Mutex<device::DeviceRegistry>,
80 mdns: Arc<mdns2::MdnsService>,
81 mdns_receiver: tokio::sync::Mutex<UnboundedReceiver<mdns2::MdnsEvent>>,
82}
83
84impl DeviceManager {
85 pub async fn create(base_path: &str, config: ManagerConfig) -> Result<Self> {
88 std::fs::create_dir_all(base_path)
89 .context(format!("creating base directory {}", base_path))?;
90 config::save_config(base_path, &config)?;
91
92 let pem = config::pem_path(base_path);
93 let cm = certmanager::FileCertManager::new(config.fabric_id, &pem);
94 cm.bootstrap()?;
95 cm.create_user(config.controller_id)?;
96
97 let cm: Arc<dyn certmanager::CertManager> = certmanager::FileCertManager::load(&pem)?;
98 let transport = transport::Transport::new(&config.local_address).await?;
99 let controller = controller::Controller::new(&cm, &transport, config.fabric_id)?;
100 let registry = device::DeviceRegistry::load(&config::devices_path(base_path))?;
101 let (mdns, mdns_receiver) = mdns2::MdnsService::new().await?;
102
103 Ok(Self {
104 base_path: base_path.to_owned(),
105 config,
106 transport,
107 controller,
108 certmanager: cm,
109 registry: std::sync::Mutex::new(registry),
110 mdns,
111 mdns_receiver: tokio::sync::Mutex::new(mdns_receiver),
112 })
113 }
114
115 pub async fn load(base_path: &str) -> Result<Self> {
117 let config = config::load_config(base_path)?;
118 let pem = config::pem_path(base_path);
119 let cm: Arc<dyn certmanager::CertManager> = certmanager::FileCertManager::load(&pem)?;
120 let transport = transport::Transport::new(&config.local_address).await?;
121 let controller = controller::Controller::new(&cm, &transport, config.fabric_id)?;
122 let registry = device::DeviceRegistry::load(&config::devices_path(base_path))?;
123 let (mdns, mdns_receiver) = mdns2::MdnsService::new().await?;
124
125 Ok(Self {
126 base_path: base_path.to_owned(),
127 config,
128 transport,
129 controller,
130 certmanager: cm,
131 registry: std::sync::Mutex::new(registry),
132 mdns,
133 mdns_receiver: tokio::sync::Mutex::new(mdns_receiver),
134 })
135 }
136
137 pub async fn commission(
140 &self,
141 address: &str,
142 pin: u32,
143 node_id: u64,
144 name: &str,
145 ) -> Result<controller::Connection> {
146 let conn = self.transport.create_connection(address).await;
147 let connection = self
148 .controller
149 .commission(&conn, pin, node_id, self.config.controller_id)
150 .await?;
151
152 let device = Device {
153 node_id,
154 address: address.to_owned(),
155 name: name.to_owned(),
156 };
157 self.registry
158 .lock()
159 .map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
160 .add(device)?;
161
162 Ok(connection)
163 }
164
165 pub async fn connect(&self, node_id: u64) -> Result<controller::Connection> {
168 let address = {
169 let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
170 reg.get(node_id)
171 .context(format!("device {} not found in registry", node_id))?
172 .address
173 .clone()
174 };
175 self.connect_with_rediscovery(node_id, &address).await
176 }
177
178 pub async fn connect_by_name(&self, name: &str) -> Result<controller::Connection> {
181 let (node_id, address) = {
182 let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
183 let dev = reg
184 .get_by_name(name)
185 .context(format!("device '{}' not found in registry", name))?;
186 (dev.node_id, dev.address.clone())
187 };
188 self.connect_with_rediscovery(node_id, &address).await
189 }
190
191 async fn connect_with_rediscovery(&self, node_id: u64, address: &str) -> Result<controller::Connection> {
193 let conn = self.transport.create_connection(address).await;
194 match self.controller.auth_sigma(&conn, node_id, self.config.controller_id).await {
195 Ok(c) => Ok(c),
196 Err(e) => {
197 log::info!("Connection to {} failed ({}), attempting operational rediscovery...", address, e);
198 let new_address = self.discover_device(node_id, Duration::from_secs(10)).await
199 .context(format!("rediscovery for node {} after connect failure", node_id))?;
200 let conn = self.transport.create_connection(&new_address).await;
201 self.controller
202 .auth_sigma(&conn, node_id, self.config.controller_id)
203 .await
204 .context(format!("connection still failed after rediscovery at {}", new_address))
205 }
206 }
207 }
208
209 pub async fn commission_with_code(
213 &self,
214 pairing_code: &str,
215 node_id: u64,
216 name: &str,
217 ) -> Result<controller::Connection> {
218 let info = onboarding::decode_manual_pairing_code(pairing_code)
219 .context("decoding manual pairing code")?;
220 let discriminator = info.discriminator;
221 let passcode = info.passcode;
222
223 log::info!("Discovering device with discriminator {}...", discriminator);
224
225 self.mdns.add_query("_matterc._udp.local", 0xff, Duration::from_secs(5)).await;
226 let mut receiver = self.mdns_receiver.lock().await;
227
228 let (ips, port) = loop {
229 match receiver.recv().await {
230 Some(mdns2::MdnsEvent::ServiceDiscovered { name: svc_name, records: _, target }) => {
231 if svc_name != "_matterc._udp.local." {
232 continue;
233 }
234 let matter_info = match discover::extract_matter_info(&target, &self.mdns).await {
235 Ok(i) => i,
236 Err(e) => {
237 log::debug!("Failed to extract Matter info from {}: {}", target, e);
238 continue;
239 }
240 };
241 if let Some(ref d) = matter_info.discriminator {
242 if *d == discriminator.to_string() {
243 self.mdns.remove_query("_matterc._udp.local").await;
244 break (matter_info.ips, matter_info.port.unwrap_or(5540));
245 }
246 }
247 }
248 None => {
249 anyhow::bail!("no commissionable device found with discriminator {}", discriminator);
250 }
251 _ => {}
252 }
253 };
254 drop(receiver);
255
256 if ips.is_empty() {
257 anyhow::bail!("discovered device with discriminator {} but no IPs returned", discriminator);
258 }
259
260 let mut last_err = anyhow::anyhow!("no IPs to try");
261 for ip in &ips {
262 let address = if ip.is_ipv6() {
263 format!("[{}]:{}", ip, port)
264 } else {
265 format!("{}:{}", ip, port)
266 };
267 match self.commission(&address, passcode, node_id, name).await {
268 Ok(conn) => return Ok(conn),
269 Err(e) => {
270 log::debug!("Commission attempt at {} failed: {}", address, e);
271 last_err = e;
272 }
273 }
274 }
275 Err(last_err).context(format!("commissioning failed on all IPs for discriminator {}", discriminator))
276 }
277
278 #[cfg(feature = "ble")]
287 pub async fn commission_ble_with_code(
288 &self,
289 pairing_code: &str,
290 node_id: u64,
291 name: &str,
292 network_creds: crate::commission::NetworkCreds,
293 ) -> Result<controller::Connection> {
294 let info = if pairing_code.starts_with("MT:") || pairing_code.starts_with("mt:") {
295 crate::onboarding::decode_qr_payload(pairing_code)
296 } else {
297 crate::onboarding::decode_manual_pairing_code(pairing_code)
298 }
299 .context("decoding pairing code")?;
300
301 let connection = self
302 .controller
303 .commission_ble(
304 info.discriminator,
305 info.is_short_discriminator,
306 info.passcode,
307 node_id,
308 self.config.controller_id,
309 network_creds,
310 &self.mdns,
311 &self.mdns_receiver,
312 )
313 .await?;
314
315 let device = crate::devman::Device {
316 node_id,
317 address: String::new(), name: name.to_owned(),
319 };
320 self.registry
321 .lock()
322 .map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
323 .add(device)?;
324
325 Ok(connection)
326 }
327
328 pub async fn discover_device(&self, node_id: u64, timeout: Duration) -> Result<String> {
332 let ca_public_key = self.certmanager.get_ca_public_key()?;
333 let fabric = Fabric::new(self.config.fabric_id, 0, &ca_public_key);
334 let compressed = fabric.compressed().context("computing compressed fabric ID")?;
335 let instance_name = format!("{}-{:016X}", hex::encode_upper(&compressed), node_id);
336 let expected_target = format!("{}._matter._tcp.local.", instance_name);
337
338 log::info!("Operational discovery for instance {}...", instance_name);
339
340 self.mdns.add_query("_matter._tcp.local", 0xff, Duration::from_secs(10)).await;
341 let mut receiver = self.mdns_receiver.lock().await;
342
343 let (ips, port) = loop {
344 let event = tokio::time::timeout(timeout, receiver.recv())
345 .await
346 .map_err(|_| anyhow::anyhow!("operational discovery timeout for node {}", node_id))?;
347 match event {
348 Some(mdns2::MdnsEvent::ServiceDiscovered { name: svc_name, records: _, target }) => {
349 if svc_name != "_matter._tcp.local." {
350 continue;
351 }
352 if target != expected_target {
353 continue;
354 }
355 let matter_info = match discover::extract_matter_info(&target, &self.mdns).await {
356 Ok(i) => i,
357 Err(e) => {
358 log::debug!("Failed to extract Matter info from {}: {}", target, e);
359 continue;
360 }
361 };
362 self.mdns.remove_query("_matter._tcp.local").await;
363 break (matter_info.ips, matter_info.port.unwrap_or(5540));
364 }
365 None => {
366 anyhow::bail!("no operational mDNS result for instance {}", instance_name);
367 }
368 _ => {}
369 }
370 };
371 drop(receiver);
372
373 let ip = ips.first()
374 .context(format!("discovered {} but no IPs in response", instance_name))?;
375 let address = if ip.is_ipv6() {
376 format!("[{}]:{}", ip, port)
377 } else {
378 format!("{}:{}", ip, port)
379 };
380
381 self.update_device_address(node_id, &address)?;
382 Ok(address)
383 }
384
385 pub fn list_devices(&self) -> Result<Vec<Device>> {
387 let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
388 Ok(reg.list().to_vec())
389 }
390
391 pub fn get_device(&self, node_id: u64) -> Result<Option<Device>> {
393 let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
394 Ok(reg.get(node_id).cloned())
395 }
396
397 pub fn get_device_by_name(&self, name: &str) -> Result<Option<Device>> {
399 let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
400 Ok(reg.get_by_name(name).cloned())
401 }
402
403 pub fn remove_device(&self, node_id: u64) -> Result<()> {
405 self.registry
406 .lock()
407 .map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
408 .remove(node_id)
409 }
410
411 pub fn rename_device(&self, node_id: u64, name: &str) -> Result<()> {
413 self.registry
414 .lock()
415 .map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
416 .rename(node_id, name)
417 }
418
419 pub fn update_device_address(&self, node_id: u64, address: &str) -> Result<()> {
421 self.registry
422 .lock()
423 .map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
424 .update_address(node_id, address)
425 }
426
427 pub fn mdns(&self) -> &Arc<mdns2::MdnsService> {
429 &self.mdns
430 }
431
432 pub fn controller(&self) -> &Arc<controller::Controller> {
434 &self.controller
435 }
436
437 pub fn transport(&self) -> &Arc<transport::Transport> {
439 &self.transport
440 }
441
442 pub fn certmanager(&self) -> &Arc<dyn certmanager::CertManager> {
444 &self.certmanager
445 }
446
447 pub fn config(&self) -> &ManagerConfig {
449 &self.config
450 }
451
452 pub fn base_path(&self) -> &str {
454 &self.base_path
455 }
456}