Commit 107b2322 authored by jan.koester's avatar jan.koester
Browse files

test

parent 8733fb5b
Loading
Loading
Loading
Loading
Loading
+22 −74
Original line number Diff line number Diff line
@@ -81,13 +81,6 @@ namespace authdb {

        uint64_t dgid = domainGroupId();

        // Fetch from local block store (file-backed) — always fast
        auto &store = g_Cluster->getStore();
        std::vector<uint8_t> local_data;
        try {
            store.fetch(dgid, 0, local_data);
        } catch (...) {}

        // Only fetch from peers every 5 seconds
        std::vector<uint8_t> peer_data;
        auto now = std::chrono::steady_clock::now();
@@ -100,87 +93,51 @@ namespace authdb {
                    auto &cmtx = g_Cluster->getClientMutex();
                    std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::seconds(3));
                    if (lock.owns_lock()) {
                        client->fetch_from_peers(dgid, 0, peer_data);
                        peer_data = client->retrieve(dgid);
                        _LastPeerFetch = now;
                    }
                } catch (const std::exception &e) {
                    std::cerr << "ClusterBackend::fetchFromCluster: fetch_from_peers failed: "
                    std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed: "
                              << e.what() << std::endl;
                } catch (...) {
                    std::cerr << "ClusterBackend::fetchFromCluster: fetch_from_peers failed (unknown)" << std::endl;
                    std::cerr << "ClusterBackend::fetchFromCluster: retrieve failed (unknown)" << std::endl;
                }
            }
        }

        // Reject corrupt buffers before considering them
        if (!bufferValid(local_data))
            local_data.clear();
        if (!bufferValid(peer_data))
            peer_data.clear();

        // Merge into _Buffer — caller (lock()) holds _BufLock after this returns.
        // Instead of picking a single "best revision" buffer, merge records from
        // ALL sources (in-memory, local store, peers) so that records created on
        // remote nodes become visible locally.  vacuum() deduplicates by
        // (ruid, fieldname, type), keeping the last occurrence.
        size_t local_rev = bufferRevision(local_data);
        size_t peer_rev = bufferRevision(peer_data);
        size_t cur_rev  = bufferRevision(_Buffer);

        bool local_has_records = (!local_data.empty() && local_data.size() > sizeof(AuthHeader));
        bool peer_has_records = (!peer_data.empty() && peer_data.size() > sizeof(AuthHeader));

        // Quick check: skip merge when local store matches in-memory buffer
        // and no peer data is available
        bool local_same = (!_Buffer.empty() &&
                           local_data.size() == _Buffer.size() &&
                           (local_data.empty() ||
                            std::memcmp(local_data.data(), _Buffer.data(), _Buffer.size()) == 0));

        if (local_same && !peer_has_records)
        if (!peer_has_records)
            return;

        // Ensure _Buffer has at least a valid header as merge base
        if (_Buffer.size() < sizeof(AuthHeader)) {
            if (local_data.size() >= sizeof(AuthHeader))
                _Buffer.assign(local_data.begin(),
                               local_data.begin() + sizeof(AuthHeader));
            else if (peer_data.size() >= sizeof(AuthHeader))
            if (peer_data.size() >= sizeof(AuthHeader))
                _Buffer.assign(peer_data.begin(),
                               peer_data.begin() + sizeof(AuthHeader));
            else
                return; // no usable data anywhere
                return;
        }

        // Append records from local store and peer store into _Buffer.
        // Append records from peer data into _Buffer.
        // vacuum() will deduplicate afterwards.
        bool merged = false;

        if (local_has_records) {
            _Buffer.insert(_Buffer.end(),
                           local_data.begin() + sizeof(AuthHeader),
                           local_data.end());
            merged = true;
        }

        if (peer_has_records) {
        _Buffer.insert(_Buffer.end(),
                       peer_data.begin() + sizeof(AuthHeader),
                       peer_data.end());
            merged = true;
        }

        if (merged) {
            // Use the highest revision from any source
            size_t max_rev = std::max({cur_rev, local_rev, peer_rev});
        size_t max_rev = std::max(cur_rev, peer_rev);
        AuthHeader head;
        std::memcpy(&head, _Buffer.data(), sizeof(AuthHeader));
        head.Revesion = max_rev;
        std::memcpy(_Buffer.data(), &head, sizeof(AuthHeader));

        vacuum();
            store.store(dgid, 0, _Buffer.data(), _Buffer.size());
        }
    }

    void ClusterBackend::pushToCluster() {
@@ -269,11 +226,6 @@ namespace authdb {
        if (!g_Cluster || !g_Cluster->isRunning())
            return;

        auto &store = g_Cluster->getStore();
        if (!store.store(dgid, 0, buf.data(), buf.size())) {
            std::cerr << "ClusterBackend::pushToCluster: store.store() failed" << std::endl;
        }

        auto &client = g_Cluster->getClient();
        if (client) {
            try {
@@ -282,7 +234,7 @@ namespace authdb {
                for (int attempt = 0; attempt < 3; ++attempt) {
                    std::unique_lock<std::timed_mutex> lock(cmtx, std::chrono::seconds(3));
                    if (lock.owns_lock()) {
                        client->replicate_to_peers(dgid, 0, buf.data(), buf.size());
                        client->store(dgid, buf.data(), buf.size());
                        break;
                    }
                    if (attempt == 2) {
@@ -334,10 +286,6 @@ namespace authdb {

        uint64_t dgid = domainGroupId();

        // Remove from local block store
        auto &store = g_Cluster->getStore();
        store.remove_group(dgid);

        // Remove from peer nodes
        auto &client = g_Cluster->getClient();
        if (client) {
+77 −126
Original line number Diff line number Diff line
@@ -241,21 +241,7 @@ namespace authdb {

    bool session_intercepting_store::store(uint64_t group_id, uint32_t block_index,
                                            const uint8_t* data, size_t len) {
        bool ok = inner_->store(group_id, block_index, data, len);

        // Try to detect if this is a session block and mirror it
        if (ok && len >= 56) {
            std::vector<uint8_t> buf(data, data + len);
            uuid::uuid sid, uid, did;
            std::vector<uuid::uuid> members;
            std::string username;
            std::vector<std::pair<uuid::uuid, bool>> gpo;
            if (SessionBlock::deserialize(buf, sid, uid, did, members, username, gpo) &&
                !sid.empty() && !uid.empty()) {
                session_store_->store(group_id, block_index, data, len);
            }
        }
        return ok;
        return inner_->store(group_id, block_index, data, len);
    }

    bool session_intercepting_store::fetch(uint64_t group_id, uint32_t block_index,
@@ -264,7 +250,6 @@ namespace authdb {
    }

    bool session_intercepting_store::remove_group(uint64_t group_id) {
        session_store_->remove_group(group_id);
        return inner_->remove_group(group_id);
    }

@@ -435,17 +420,13 @@ namespace authdb {
        uint64_t sgid = sessionGroupId(uid, did);
        uint64_t sid_gid = sidGroupId(sid);

        // Store in dedicated session store (memory-only, never mixed with data)
        session_store_->store(sgid, 0, data.data(), data.size());
        session_store_->store(sid_gid, 0, data.data(), data.size());

        if (!pclient_) return;

        // Replicate raw data to all peer nodes under both keys
        // Distribute erasure-coded shards across peer nodes
        try {
            std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
            pclient_->replicate_to_peers(sgid, 0, data.data(), data.size());
            pclient_->replicate_to_peers(sid_gid, 0, data.data(), data.size());
            pclient_->store(sgid, data.data(), data.size());
            pclient_->store(sid_gid, data.data(), data.size());
        } catch (const netplus::NetException &e) {
            std::cerr << "Cluster push session failed: " << e.what() << std::endl;
        } catch (const std::exception &e) {
@@ -459,24 +440,15 @@ namespace authdb {
                                std::vector<std::pair<uuid::uuid, bool>> &gpo_results) {
        uint64_t sgid = sessionGroupId(uid, did);

        // Check local session store first
        // Retrieve from peer nodes via erasure coding
        std::vector<uint8_t> data;
        if (session_store_->fetch(sgid, 0, data)) {
            uuid::uuid d_uid, d_did;
            if (SessionBlock::deserialize(data, sid, d_uid, d_did, members, username, gpo_results)) {
                return true;
            }
        }

        // Try to fetch from peer nodes
        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                if (pclient_->fetch_from_peers(sgid, 0, data) && !data.empty()) {
                data = pclient_->retrieve(sgid);
                if (!data.empty()) {
                    uuid::uuid d_uid, d_did;
                    if (SessionBlock::deserialize(data, sid, d_uid, d_did, members, username, gpo_results)) {
                        // Cache in session store
                        session_store_->store(sgid, 0, data.data(), data.size());
                        return true;
                    }
                }
@@ -491,31 +463,21 @@ namespace authdb {
    void Cluster::removeSession(const uuid::uuid &uid, const uuid::uuid &did) {
        uint64_t sgid = sessionGroupId(uid, did);

        // Also remove the SID-keyed entry: fetch data first to get the SID
        std::vector<uint8_t> data;
        if (session_store_->fetch(sgid, 0, data)) {
        // Fetch session to get the SID for removing the SID-keyed entry
        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                std::vector<uint8_t> data = pclient_->retrieve(sgid);
                if (!data.empty()) {
                    uuid::uuid sid, d_uid, d_did;
                    std::vector<uuid::uuid> members;
                    std::string username;
                    std::vector<std::pair<uuid::uuid, bool>> gpo_results;
                    if (SessionBlock::deserialize(data, sid, d_uid, d_did, members, username, gpo_results)) {
                        uint64_t sid_gid = sidGroupId(sid);
                session_store_->remove_group(sid_gid);
                if (pclient_) {
                    try {
                        std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                        pclient_->remove(sid_gid);
                    } catch (...) {}
                    }
                }
        }

        // Remove uid+did keyed entry
        session_store_->remove_group(sgid);

        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                pclient_->remove(sgid);
            } catch (...) {}
        }
@@ -524,30 +486,21 @@ namespace authdb {
    void Cluster::removeSessionBySid(const uuid::uuid &session_id) {
        uint64_t sid_gid = sidGroupId(session_id);

        // Fetch session data to get uid+did key as well
        std::vector<uint8_t> data;
        if (session_store_->fetch(sid_gid, 0, data)) {
        // Fetch session data to get uid+did key
        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                std::vector<uint8_t> data = pclient_->retrieve(sid_gid);
                if (!data.empty()) {
                    uuid::uuid sid, uid, did;
                    std::vector<uuid::uuid> members;
                    std::string username;
                    std::vector<std::pair<uuid::uuid, bool>> gpo_results;
                    if (SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results)) {
                        uint64_t sgid = sessionGroupId(uid, did);
                session_store_->remove_group(sgid);
                if (pclient_) {
                    try {
                        std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                        pclient_->remove(sgid);
                    } catch (...) {}
                    }
                }
        }

        session_store_->remove_group(sid_gid);

        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                pclient_->remove(sid_gid);
            } catch (...) {}
        }
@@ -556,16 +509,11 @@ namespace authdb {
    bool Cluster::hasSession(const uuid::uuid &uid, const uuid::uuid &did) {
        uint64_t sgid = sessionGroupId(uid, did);

        // Check local session store
        std::vector<uint8_t> data;
        if (session_store_->fetch(sgid, 0, data)) return true;

        // Check peer nodes
        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                if (pclient_->fetch_from_peers(sgid, 0, data) && !data.empty())
                    return true;
                auto data = pclient_->retrieve(sgid);
                if (!data.empty()) return true;
            } catch (...) {}
        }

@@ -577,27 +525,28 @@ namespace authdb {
                                          std::vector<uuid::uuid> &members,
                                          std::string &username,
                                          std::vector<std::pair<uuid::uuid, bool>> &gpo_results) {
        // No local store — retrieve via erasure coding
        uint64_t sid_gid = sidGroupId(session_id);
        std::vector<uint8_t> data;
        if (session_store_->fetch(sid_gid, 0, data)) {
        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                auto data = pclient_->retrieve(sid_gid);
                if (!data.empty()) {
                    return SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results);
                }
            } catch (...) {}
        }
        return false;
    }

    bool Cluster::hasSessionBySid(const uuid::uuid &session_id) {
        uint64_t sid_gid = sidGroupId(session_id);

        // Check local session store
        std::vector<uint8_t> data;
        if (session_store_->fetch(sid_gid, 0, data)) return true;

        // Check peer nodes
        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                if (pclient_->fetch_from_peers(sid_gid, 0, data) && !data.empty())
                    return true;
                auto data = pclient_->retrieve(sid_gid);
                if (!data.empty()) return true;
            } catch (...) {}
        }

@@ -611,22 +560,12 @@ namespace authdb {
                                     std::vector<std::pair<uuid::uuid, bool>> &gpo_results) {
        uint64_t sid_gid = sidGroupId(session_id);

        // Check local session store first
        std::vector<uint8_t> data;
        if (session_store_->fetch(sid_gid, 0, data)) {
            if (SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results)) {
                return true;
            }
        }

        // Try to fetch from peer nodes
        if (pclient_) {
            try {
                std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
                if (pclient_->fetch_from_peers(sid_gid, 0, data) && !data.empty()) {
                auto data = pclient_->retrieve(sid_gid);
                if (!data.empty()) {
                    if (SessionBlock::deserialize(data, sid, uid, did, members, username, gpo_results)) {
                        // Cache in session store
                        session_store_->store(sid_gid, 0, data.data(), data.size());
                        return true;
                    }
                }
@@ -640,14 +579,27 @@ namespace authdb {

    void Cluster::listAllSessions(std::vector<SessionInfo> &out) {
        out.clear();
        if (!pclient_) return;
        std::set<std::string> seen_sids;

        // Enumerate dedicated session store only (memory-only, contains only sessions).
        // Peer sessions are already imported here via pushSession replication.
        auto groups = session_store_->list_groups();
        for (uint64_t gid : groups) {
            std::vector<uint8_t> data;
            if (session_store_->fetch(gid, 0, data)) {
        // Collect all group IDs across all nodes
        std::set<uint64_t> all_groups;
        try {
            std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5));
            if (!lock.owns_lock()) return;
            size_t n = pclient_->get_cluster_status().nodes_total;
            for (size_t i = 0; i < n; i++) {
                std::vector<uint64_t> node_groups;
                if (pclient_->list_groups_on_node(i, node_groups)) {
                    all_groups.insert(node_groups.begin(), node_groups.end());
                }
            }

            // Try to retrieve and deserialize each group as a session
            for (uint64_t gid : all_groups) {
                try {
                    auto data = pclient_->retrieve(gid);
                    if (data.empty()) continue;
                    SessionInfo info;
                    if (SessionBlock::deserialize(data, info.sid, info.uid, info.did,
                                                  info.members, info.username, info.gpo_results)) {
@@ -657,8 +609,9 @@ namespace authdb {
                            out.push_back(std::move(info));
                        }
                    }
                } catch (...) {}
            }
        }
        } catch (...) {}
    }

    // --- DataOpBlock serialization ---
@@ -785,16 +738,14 @@ namespace authdb {

        auto data = DataOpBlock::serialize(op, domain, uid, dataType, records);

        // Store locally for peers to fetch
        auto hash = netplus::sha256_hash(data);
        uint64_t localId = 0;
        for (int i = 0; i < 8; i++) localId = (localId << 8) | hash[i];
        store_->store(localId, 0, data.data(), data.size());

        // Replicate to all peer nodes
        // Distribute erasure-coded shards to peer nodes
        try {
            std::unique_lock<std::timed_mutex> lock(client_mutex_, std::chrono::seconds(5)); if (!lock.owns_lock()) throw std::runtime_error("cluster client lock timeout");
            pclient_->replicate_to_peers(localId, 0, data.data(), data.size());
            pclient_->store(localId, data.data(), data.size());
        } catch (const netplus::NetException &e) {
            std::cerr << "Cluster replicate failed: " << e.what() << std::endl;
        } catch (const std::exception &e) {