Loading src/cluster.cpp +42 −10 Original line number Diff line number Diff line Loading @@ -534,16 +534,35 @@ void Cluster::health_loop() { return {online, total}; }); // Verify local store is writable // Verify local store is writable without polluting blocks.bin. // Previous approach wrote a sentinel record every health cycle // which accumulated thousands of dead records in the data file. bool store_ok = true; if (store_) { if (!cfg_.store_path.empty()) { try { const uint64_t sentinel_gid = 0; const uint32_t sentinel_idx = UINT32_MAX; uint8_t probe = 0x01; store_ok = store_->store(sentinel_gid, sentinel_idx, &probe, 1); if (store_ok) store_->remove_group(sentinel_gid); std::string probe_path = cfg_.store_path + "/.health_probe"; #ifdef _WIN32 int probe_fd = ::_open(probe_path.c_str(), _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE); if (probe_fd >= 0) { store_ok = (::_write(probe_fd, "OK", 2) == 2); ::_close(probe_fd); ::_unlink(probe_path.c_str()); } else { store_ok = false; } #else int probe_fd = ::open(probe_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); if (probe_fd >= 0) { store_ok = (::write(probe_fd, "OK", 2) == 2); ::close(probe_fd); ::unlink(probe_path.c_str()); } else { store_ok = false; } #endif } catch (...) { store_ok = false; } Loading Loading @@ -716,9 +735,22 @@ Cluster::ScrubResult Cluster::scrub() { continue; } // remove + store ensures correct block placement on all nodes // SAFETY: store FIRST so correct blocks exist on correct nodes. // If this throws, original data remains on old nodes. scrub_client_->store(gid, data.data(), data.size()); // Data is now safely placed. Remove all copies and re-store // for clean distribution without leftover misplaced blocks. try { scrub_client_->remove(gid); scrub_client_->store(gid, data.data(), data.size()); } catch (const std::exception& ce) { // Cleanup pass failed — data is still safe from the // first store(). Misplaced blocks may remain. std::cerr << "[SCRUB] group " << gid << " — cleanup pass failed: " << ce.what() << " (data is safe)\n"; } result.groups_repaired++; std::cerr << "[SCRUB] group " << gid << " — repaired\n"; } catch (const std::exception& e) { Loading src/cluster.h +1 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ #ifndef _WIN32 #include <unistd.h> #include <fcntl.h> #include <ifaddrs.h> #include <netinet/in.h> #include <arpa/inet.h> Loading Loading
src/cluster.cpp +42 −10 Original line number Diff line number Diff line Loading @@ -534,16 +534,35 @@ void Cluster::health_loop() { return {online, total}; }); // Verify local store is writable // Verify local store is writable without polluting blocks.bin. // Previous approach wrote a sentinel record every health cycle // which accumulated thousands of dead records in the data file. bool store_ok = true; if (store_) { if (!cfg_.store_path.empty()) { try { const uint64_t sentinel_gid = 0; const uint32_t sentinel_idx = UINT32_MAX; uint8_t probe = 0x01; store_ok = store_->store(sentinel_gid, sentinel_idx, &probe, 1); if (store_ok) store_->remove_group(sentinel_gid); std::string probe_path = cfg_.store_path + "/.health_probe"; #ifdef _WIN32 int probe_fd = ::_open(probe_path.c_str(), _O_WRONLY | _O_CREAT | _O_TRUNC, _S_IREAD | _S_IWRITE); if (probe_fd >= 0) { store_ok = (::_write(probe_fd, "OK", 2) == 2); ::_close(probe_fd); ::_unlink(probe_path.c_str()); } else { store_ok = false; } #else int probe_fd = ::open(probe_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); if (probe_fd >= 0) { store_ok = (::write(probe_fd, "OK", 2) == 2); ::close(probe_fd); ::unlink(probe_path.c_str()); } else { store_ok = false; } #endif } catch (...) { store_ok = false; } Loading Loading @@ -716,9 +735,22 @@ Cluster::ScrubResult Cluster::scrub() { continue; } // remove + store ensures correct block placement on all nodes // SAFETY: store FIRST so correct blocks exist on correct nodes. // If this throws, original data remains on old nodes. scrub_client_->store(gid, data.data(), data.size()); // Data is now safely placed. Remove all copies and re-store // for clean distribution without leftover misplaced blocks. try { scrub_client_->remove(gid); scrub_client_->store(gid, data.data(), data.size()); } catch (const std::exception& ce) { // Cleanup pass failed — data is still safe from the // first store(). Misplaced blocks may remain. std::cerr << "[SCRUB] group " << gid << " — cleanup pass failed: " << ce.what() << " (data is safe)\n"; } result.groups_repaired++; std::cerr << "[SCRUB] group " << gid << " — repaired\n"; } catch (const std::exception& e) { Loading
src/cluster.h +1 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ #ifndef _WIN32 #include <unistd.h> #include <fcntl.h> #include <ifaddrs.h> #include <netinet/in.h> #include <arpa/inet.h> Loading