Loading src/cluster.cpp +35 −17 Original line number Diff line number Diff line Loading @@ -310,28 +310,24 @@ namespace authdb { } std::vector<uint32_t> session_intercepting_store::list_blocks(uint64_t group_id) { // Merge blocks from both stores auto inner_blocks = inner_->list_blocks(group_id); auto sess_blocks = session_store_->list_blocks(group_id); if (sess_blocks.empty()) return inner_blocks; if (inner_blocks.empty()) return sess_blocks; std::set<uint32_t> merged(inner_blocks.begin(), inner_blocks.end()); merged.insert(sess_blocks.begin(), sess_blocks.end()); return std::vector<uint32_t>(merged.begin(), merged.end()); if (is_session_group(group_id)) return session_store_->list_blocks(group_id); return inner_->list_blocks(group_id); } uint32_t session_intercepting_store::total_blocks() { return inner_->total_blocks() + session_store_->total_blocks(); return inner_->total_blocks(); } uint32_t session_intercepting_store::total_groups() { return inner_->total_groups() + session_store_->total_groups(); return inner_->total_groups(); } std::vector<uint64_t> session_intercepting_store::list_groups() { auto ig = inner_->list_groups(); auto sg = session_store_->list_groups(); std::set<uint64_t> merged(ig.begin(), ig.end()); merged.insert(sg.begin(), sg.end()); return std::vector<uint64_t>(merged.begin(), merged.end()); // Only return domain groups — session groups are managed by the // dedicated session_client_ and must not appear in scrub/rebalance // operations performed by pclient_. Merging them caused pclient_ // to re-store session blocks into the domain file store, creating // ghost groups with uneven distribution across nodes. return inner_->list_groups(); } // --- Cluster --- Loading Loading @@ -1063,18 +1059,40 @@ namespace authdb { size_t n = cfg_.data_blocks + cfg_.parity_blocks; // Collect known session group IDs so we can skip them. // Session groups are replicated via the dedicated session_client_ and // must not be touched by the domain-data scrub/rebalance path — mixing // them causes blocks to be written to the wrong store on the local node // (file store instead of session memory store) and creates ghost groups // that inflate block counts unevenly across nodes. std::set<uint64_t> session_gids; if (interceptor_) { // Merge session groups from all nodes for a complete picture. // Remote nodes expose session groups via their interceptor's // list_groups() through the server, so we need to identify and // exclude them here. if (session_store_) { auto sg = session_store_->list_groups(); session_gids.insert(sg.begin(), sg.end()); } } // 1. Collect all group IDs from every node std::map<uint64_t, std::set<size_t>> group_nodes; // group_id -> set of node indices that have it for (size_t ni = 0; ni < n; ++ni) { std::vector<uint64_t> node_groups; if (pclient_->list_groups_on_node(ni, node_groups)) { for (uint64_t gid : node_groups) for (uint64_t gid : node_groups) { if (session_gids.count(gid)) continue; // skip session groups group_nodes[gid].insert(ni); } } } std::cerr << "[SCRUB] found " << group_nodes.size() << " groups across " << n << " nodes" << std::endl; << " data groups across " << n << " nodes" << " (skipped " << session_gids.size() << " session groups)" << std::endl; // 2. For each group, check block distribution and repair for (auto &[gid, present_nodes] : group_nodes) { Loading Loading
src/cluster.cpp +35 −17 Original line number Diff line number Diff line Loading @@ -310,28 +310,24 @@ namespace authdb { } std::vector<uint32_t> session_intercepting_store::list_blocks(uint64_t group_id) { // Merge blocks from both stores auto inner_blocks = inner_->list_blocks(group_id); auto sess_blocks = session_store_->list_blocks(group_id); if (sess_blocks.empty()) return inner_blocks; if (inner_blocks.empty()) return sess_blocks; std::set<uint32_t> merged(inner_blocks.begin(), inner_blocks.end()); merged.insert(sess_blocks.begin(), sess_blocks.end()); return std::vector<uint32_t>(merged.begin(), merged.end()); if (is_session_group(group_id)) return session_store_->list_blocks(group_id); return inner_->list_blocks(group_id); } uint32_t session_intercepting_store::total_blocks() { return inner_->total_blocks() + session_store_->total_blocks(); return inner_->total_blocks(); } uint32_t session_intercepting_store::total_groups() { return inner_->total_groups() + session_store_->total_groups(); return inner_->total_groups(); } std::vector<uint64_t> session_intercepting_store::list_groups() { auto ig = inner_->list_groups(); auto sg = session_store_->list_groups(); std::set<uint64_t> merged(ig.begin(), ig.end()); merged.insert(sg.begin(), sg.end()); return std::vector<uint64_t>(merged.begin(), merged.end()); // Only return domain groups — session groups are managed by the // dedicated session_client_ and must not appear in scrub/rebalance // operations performed by pclient_. Merging them caused pclient_ // to re-store session blocks into the domain file store, creating // ghost groups with uneven distribution across nodes. return inner_->list_groups(); } // --- Cluster --- Loading Loading @@ -1063,18 +1059,40 @@ namespace authdb { size_t n = cfg_.data_blocks + cfg_.parity_blocks; // Collect known session group IDs so we can skip them. // Session groups are replicated via the dedicated session_client_ and // must not be touched by the domain-data scrub/rebalance path — mixing // them causes blocks to be written to the wrong store on the local node // (file store instead of session memory store) and creates ghost groups // that inflate block counts unevenly across nodes. std::set<uint64_t> session_gids; if (interceptor_) { // Merge session groups from all nodes for a complete picture. // Remote nodes expose session groups via their interceptor's // list_groups() through the server, so we need to identify and // exclude them here. if (session_store_) { auto sg = session_store_->list_groups(); session_gids.insert(sg.begin(), sg.end()); } } // 1. Collect all group IDs from every node std::map<uint64_t, std::set<size_t>> group_nodes; // group_id -> set of node indices that have it for (size_t ni = 0; ni < n; ++ni) { std::vector<uint64_t> node_groups; if (pclient_->list_groups_on_node(ni, node_groups)) { for (uint64_t gid : node_groups) for (uint64_t gid : node_groups) { if (session_gids.count(gid)) continue; // skip session groups group_nodes[gid].insert(ni); } } } std::cerr << "[SCRUB] found " << group_nodes.size() << " groups across " << n << " nodes" << std::endl; << " data groups across " << n << " nodes" << " (skipped " << session_gids.size() << " session groups)" << std::endl; // 2. For each group, check block distribution and repair for (auto &[gid, present_nodes] : group_nodes) { Loading