Loading src/cluster.cpp +45 −0 Original line number Diff line number Diff line Loading @@ -429,6 +429,7 @@ namespace authdb { if (running_) return; running_ = true; server_thread_ = std::thread(&Cluster::server_loop, this); health_monitor_thread_ = std::thread(&Cluster::health_monitor_loop, this); } size_t Cluster::waitForPeers(int timeout_s) { Loading Loading @@ -471,7 +472,11 @@ namespace authdb { void Cluster::stop() { if (!running_) return; running_ = false; monitor_cv_.notify_all(); if (server_) server_->stop(); if (health_monitor_thread_.joinable()) { health_monitor_thread_.join(); } if (server_thread_.joinable()) { server_thread_.join(); } Loading @@ -488,6 +493,46 @@ namespace authdb { running_ = false; } void Cluster::health_monitor_loop() { size_t k = cfg_.data_blocks; size_t n = cfg_.data_blocks + cfg_.parity_blocks; while (running_) { { std::unique_lock<std::mutex> lk(monitor_mutex_); monitor_cv_.wait_for(lk, std::chrono::seconds(10), [this]{ return !running_.load(); }); } if (!running_ || !pclient_) continue; try { std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) continue; auto health = pclient_->get_cluster_status(); if (health.nodes_online >= n) { if (degraded_) { std::cerr << "Cluster: recovered — all " << health.nodes_online << "/" << n << " nodes online" << std::endl; degraded_ = false; } } else if (health.nodes_online >= k) { if (!degraded_) { std::cerr << "Cluster: DEGRADED — " << health.nodes_online << "/" << n << " nodes online" << std::endl; } degraded_ = true; } else { std::cerr << "Cluster: CRITICAL — only " << health.nodes_online << "/" << n << " nodes online (need " << k << ")" << std::endl; degraded_ = true; } } catch (const std::exception &e) { std::cerr << "Cluster: health probe failed: " << e.what() << std::endl; degraded_ = true; } } } void Cluster::pushSession(const SessionData &sess) { if (!pclient_) return; Loading src/cluster.h +8 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ #include <vector> #include <memory> #include <mutex> #include <condition_variable> #include <thread> #include <atomic> #include <utility> Loading Loading @@ -203,10 +204,16 @@ namespace authdb { }; void listAllSessions(std::vector<SessionInfo> &out); bool isDegraded() const { return degraded_; } private: ClusterConfig cfg_; std::atomic<bool> running_{false}; std::atomic<bool> degraded_{false}; std::thread server_thread_; std::thread health_monitor_thread_; std::mutex monitor_mutex_; std::condition_variable monitor_cv_; std::shared_ptr<paritypp::block_store> store_; std::shared_ptr<paritypp::memory_block_store> session_store_; // sessions only Loading @@ -217,6 +224,7 @@ namespace authdb { std::timed_mutex client_mutex_; void server_loop(); void health_monitor_loop(); }; // Global cluster instance Loading Loading
src/cluster.cpp +45 −0 Original line number Diff line number Diff line Loading @@ -429,6 +429,7 @@ namespace authdb { if (running_) return; running_ = true; server_thread_ = std::thread(&Cluster::server_loop, this); health_monitor_thread_ = std::thread(&Cluster::health_monitor_loop, this); } size_t Cluster::waitForPeers(int timeout_s) { Loading Loading @@ -471,7 +472,11 @@ namespace authdb { void Cluster::stop() { if (!running_) return; running_ = false; monitor_cv_.notify_all(); if (server_) server_->stop(); if (health_monitor_thread_.joinable()) { health_monitor_thread_.join(); } if (server_thread_.joinable()) { server_thread_.join(); } Loading @@ -488,6 +493,46 @@ namespace authdb { running_ = false; } void Cluster::health_monitor_loop() { size_t k = cfg_.data_blocks; size_t n = cfg_.data_blocks + cfg_.parity_blocks; while (running_) { { std::unique_lock<std::mutex> lk(monitor_mutex_); monitor_cv_.wait_for(lk, std::chrono::seconds(10), [this]{ return !running_.load(); }); } if (!running_ || !pclient_) continue; try { std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(2)); if (!lock.owns_lock()) continue; auto health = pclient_->get_cluster_status(); if (health.nodes_online >= n) { if (degraded_) { std::cerr << "Cluster: recovered — all " << health.nodes_online << "/" << n << " nodes online" << std::endl; degraded_ = false; } } else if (health.nodes_online >= k) { if (!degraded_) { std::cerr << "Cluster: DEGRADED — " << health.nodes_online << "/" << n << " nodes online" << std::endl; } degraded_ = true; } else { std::cerr << "Cluster: CRITICAL — only " << health.nodes_online << "/" << n << " nodes online (need " << k << ")" << std::endl; degraded_ = true; } } catch (const std::exception &e) { std::cerr << "Cluster: health probe failed: " << e.what() << std::endl; degraded_ = true; } } } void Cluster::pushSession(const SessionData &sess) { if (!pclient_) return; Loading
src/cluster.h +8 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ #include <vector> #include <memory> #include <mutex> #include <condition_variable> #include <thread> #include <atomic> #include <utility> Loading Loading @@ -203,10 +204,16 @@ namespace authdb { }; void listAllSessions(std::vector<SessionInfo> &out); bool isDegraded() const { return degraded_; } private: ClusterConfig cfg_; std::atomic<bool> running_{false}; std::atomic<bool> degraded_{false}; std::thread server_thread_; std::thread health_monitor_thread_; std::mutex monitor_mutex_; std::condition_variable monitor_cv_; std::shared_ptr<paritypp::block_store> store_; std::shared_ptr<paritypp::memory_block_store> session_store_; // sessions only Loading @@ -217,6 +224,7 @@ namespace authdb { std::timed_mutex client_mutex_; void server_loop(); void health_monitor_loop(); }; // Global cluster instance Loading