From c79205c26a77df9086bd6294ae6285a7346e6656 Mon Sep 17 00:00:00 2001 From: Nicolas Werner Date: Sun, 5 Jul 2020 05:29:07 +0200 Subject: [PATCH] Use new timeline cache structure --- src/Cache.cpp | 246 +++++++++++++++++++++++++++++--------------------- src/Cache_p.h | 24 +++-- 2 files changed, 154 insertions(+), 116 deletions(-) diff --git a/src/Cache.cpp b/src/Cache.cpp index 2824960b..26291cfd 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -37,7 +37,7 @@ //! Should be changed when a breaking change occurs in the cache format. //! This will reset client's data. -static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.05.01"); +static const std::string CURRENT_CACHE_FORMAT_VERSION("2020.07.05"); static const std::string SECRET("secret"); static lmdb::val NEXT_BATCH_KEY("next_batch"); @@ -46,8 +46,9 @@ static lmdb::val CACHE_FORMAT_VERSION_KEY("cache_format_version"); constexpr size_t MAX_RESTORED_MESSAGES = 30'000; -constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB -constexpr auto MAX_DBS = 8092UL; +constexpr auto DB_SIZE = 32ULL * 1024ULL * 1024ULL * 1024ULL; // 32 GB +constexpr auto MAX_DBS = 8092UL; +constexpr auto BATCH_SIZE = 100; //! Cache databases and their format. //! @@ -63,7 +64,6 @@ constexpr auto SYNC_STATE_DB("sync_state"); //! Read receipts per room/event. constexpr auto READ_RECEIPTS_DB("read_receipts"); constexpr auto NOTIFICATIONS_DB("sent_notifications"); -//! TODO: delete pending_receipts database on old cache versions //! Encryption related databases. @@ -93,20 +93,6 @@ namespace { std::unique_ptr instance_ = nullptr; } -int -numeric_key_comparison(const MDB_val *a, const MDB_val *b) -{ - auto lhs = std::stoull(std::string((char *)a->mv_data, a->mv_size)); - auto rhs = std::stoull(std::string((char *)b->mv_data, b->mv_size)); - - if (lhs < rhs) - return 1; - else if (lhs == rhs) - return 0; - - return -1; -} - Cache::Cache(const QString &userId, QObject *parent) : QObject{parent} , env_{nullptr} @@ -697,6 +683,27 @@ Cache::runMigrations() return false; } + nhlog::db()->info("Successfully deleted pending receipts database."); + return true; + }}, + {"2020.07.05", + [this]() { + try { + auto txn = lmdb::txn::begin(env_, nullptr); + auto room_ids = getRoomIds(txn); + + for (const auto &room_id : room_ids) { + auto messagesDb = lmdb::dbi::open( + txn, std::string(room_id + "/messages").c_str(), MDB_CREATE); + lmdb::dbi_drop(txn, messagesDb, true); + } + txn.commit(); + } catch (const lmdb::error &) { + nhlog::db()->critical( + "Failed to delete messages database in migration!"); + return false; + } + nhlog::db()->info("Successfully deleted pending receipts database."); return true; }}, @@ -1232,38 +1239,64 @@ Cache::getTimelineMentions() return notifs; } -mtx::responses::Timeline -Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id) +Cache::Messages +Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, int64_t index, bool forward) { // TODO(nico): Limit the messages returned by this maybe? - auto db = getMessagesDb(txn, room_id); + auto orderDb = getEventOrderDb(txn, room_id); + auto eventsDb = getEventsDb(txn, room_id); - mtx::responses::Timeline timeline; - std::string timestamp, msg; + Messages messages{}; - auto cursor = lmdb::cursor::open(txn, db); + lmdb::val indexVal, val; + + auto cursor = lmdb::cursor::open(txn, orderDb); + if (index == std::numeric_limits::max()) { + if (cursor.get(indexVal, val, forward ? MDB_FIRST : MDB_LAST)) { + index = *indexVal.data(); + } else { + messages.end_of_cache = true; + return messages; + } + } else { + if (cursor.get(indexVal, val, MDB_SET)) { + index = *indexVal.data(); + } else { + messages.end_of_cache = true; + return messages; + } + } - size_t index = 0; + int counter = 0; - while (cursor.get(timestamp, msg, MDB_NEXT) && index < MAX_RESTORED_MESSAGES) { - auto obj = json::parse(msg); + bool ret; + while ((ret = cursor.get(indexVal, val, forward ? MDB_NEXT : MDB_LAST)) && + counter++ < BATCH_SIZE) { + auto obj = json::parse(std::string(val.data(), val.size())); - if (obj.count("event") == 0 || obj.count("token") == 0) - continue; + if (obj.count("event_id") == 0) + break; - mtx::events::collections::TimelineEvent event; - mtx::events::collections::from_json(obj.at("event"), event); + lmdb::val event; + bool success = lmdb::dbi_get( + txn, eventsDb, lmdb::val(obj["event_id"].get()), event); + if (!success) + continue; - index += 1; + mtx::events::collections::TimelineEvent te; + mtx::events::collections::from_json( + json::parse(std::string(event.data(), event.size())), te); - timeline.events.push_back(event.data); - timeline.prev_batch = obj.at("token").get(); + messages.timeline.events.push_back(std::move(te.data)); + // timeline.prev_batch = obj.at("token").get(); } cursor.close(); - std::reverse(timeline.events.begin(), timeline.events.end()); + // std::reverse(timeline.events.begin(), timeline.events.end()); + messages.next_index = *indexVal.data(); + messages.end_of_cache = !ret; - return timeline; + return messages; } QMap @@ -1306,55 +1339,59 @@ Cache::roomInfo(bool withInvites) std::string Cache::getLastEventId(lmdb::txn &txn, const std::string &room_id) { - auto db = getMessagesDb(txn, room_id); - - if (db.size(txn) == 0) - return {}; - - std::string timestamp, msg; - - auto cursor = lmdb::cursor::open(txn, db); - while (cursor.get(timestamp, msg, MDB_NEXT)) { - auto obj = json::parse(msg); + auto orderDb = getEventOrderDb(txn, room_id); - if (obj.count("event") == 0) - continue; + lmdb::val indexVal, val; - cursor.close(); - return obj["event"]["event_id"]; + auto cursor = lmdb::cursor::open(txn, orderDb); + if (!cursor.get(indexVal, val, MDB_LAST)) { + return {}; } - cursor.close(); - return {}; + auto obj = json::parse(std::string(val.data(), val.size())); + + if (obj.count("event_id") == 0) + return {}; + else + return obj["event_id"]; } DescInfo Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id) { - auto db = getMessagesDb(txn, room_id); - - if (db.size(txn) == 0) + auto orderDb = getEventOrderDb(txn, room_id); + auto eventsDb = getEventsDb(txn, room_id); + if (orderDb.size(txn) == 0) return DescInfo{}; - std::string timestamp, msg; - const auto local_user = utils::localUser(); DescInfo fallbackDesc{}; - auto cursor = lmdb::cursor::open(txn, db); - while (cursor.get(timestamp, msg, MDB_NEXT)) { - auto obj = json::parse(msg); + lmdb::val indexVal, val; - if (obj.count("event") == 0) + auto cursor = lmdb::cursor::open(txn, orderDb); + cursor.get(indexVal, val, MDB_LAST); + while (cursor.get(indexVal, val, MDB_PREV)) { + auto temp = json::parse(std::string(val.data(), val.size())); + + if (temp.count("event_id") == 0) + break; + + lmdb::val event; + bool success = lmdb::dbi_get( + txn, eventsDb, lmdb::val(temp["event_id"].get()), event); + if (!success) continue; - if (fallbackDesc.event_id.isEmpty() && obj["event"]["type"] == "m.room.member" && - obj["event"]["state_key"] == local_user.toStdString() && - obj["event"]["content"]["membership"] == "join") { - uint64_t ts = obj["event"]["origin_server_ts"]; + auto obj = json::parse(std::string(event.data(), event.size())); + + if (fallbackDesc.event_id.isEmpty() && obj["type"] == "m.room.member" && + obj["state_key"] == local_user.toStdString() && + obj["content"]["membership"] == "join") { + uint64_t ts = obj["origin_server_ts"]; auto time = QDateTime::fromMSecsSinceEpoch(ts); - fallbackDesc = DescInfo{QString::fromStdString(obj["event"]["event_id"]), + fallbackDesc = DescInfo{QString::fromStdString(obj["event_id"]), local_user, tr("You joined this room."), utils::descriptiveTime(time), @@ -1362,17 +1399,16 @@ Cache::getLastMessageInfo(lmdb::txn &txn, const std::string &room_id) time}; } - if (!(obj["event"]["type"] == "m.room.message" || - obj["event"]["type"] == "m.sticker" || - obj["event"]["type"] == "m.room.encrypted")) + if (!(obj["type"] == "m.room.message" || obj["type"] == "m.sticker" || + obj["type"] == "m.room.encrypted")) continue; - mtx::events::collections::TimelineEvent event; - mtx::events::collections::from_json(obj.at("event"), event); + mtx::events::collections::TimelineEvent te; + mtx::events::collections::from_json(obj, te); cursor.close(); return utils::getMessageDescription( - event.data, local_user, QString::fromStdString(room_id)); + te.data, local_user, QString::fromStdString(room_id)); } cursor.close(); @@ -1954,7 +1990,6 @@ Cache::saveTimelineMessages(lmdb::txn &txn, const std::string &room_id, const mtx::responses::Timeline &res) { - auto db = getMessagesDb(txn, room_id); auto eventsDb = getEventsDb(txn, room_id); auto orderDb = getEventOrderDb(txn, room_id); @@ -1966,7 +2001,7 @@ Cache::saveTimelineMessages(lmdb::txn &txn, lmdb::val indexVal, val; int64_t index = 0; - auto cursor = lmdb::cursor::open(txn, orderDb); + auto cursor = lmdb::cursor::open(txn, orderDb); if (cursor.get(indexVal, val, MDB_LAST)) { index = *indexVal.data(); } @@ -1979,17 +2014,6 @@ Cache::saveTimelineMessages(lmdb::txn &txn, lmdb::dbi_put( txn, eventsDb, lmdb::val(redaction->redacts), lmdb::val(event.dump())); } else { - json obj = json::object(); - - obj["event"] = event; - obj["token"] = res.prev_batch; - - lmdb::dbi_put( - txn, - db, - lmdb::val(std::to_string(event["origin_server_ts"].get())), - lmdb::val(obj.dump())); - lmdb::dbi_put(txn, eventsDb, lmdb::val(event["event_id"].get()), @@ -1997,9 +2021,16 @@ Cache::saveTimelineMessages(lmdb::txn &txn, ++index; + json orderEntry = json::object(); + orderEntry["event_id"] = event["event_id"]; + if (first) + orderEntry["prev_batch"] = res.prev_batch; + + nhlog::db()->debug("saving '{}'", orderEntry.dump()); + lmdb::cursor_put(cursor.handle(), lmdb::val(&index, sizeof(index)), - lmdb::val(first ? res.prev_batch : ""), + lmdb::val(orderEntry.dump()), MDB_APPEND); first = false; } @@ -2138,34 +2169,43 @@ Cache::getRoomIds(lmdb::txn &txn) void Cache::deleteOldMessages() { + lmdb::val indexVal, val; + auto txn = lmdb::txn::begin(env_); auto room_ids = getRoomIds(txn); - for (const auto &id : room_ids) { - auto msg_db = getMessagesDb(txn, id); - - std::string ts, event; - uint64_t idx = 0; + for (const auto &room_id : room_ids) { + auto orderDb = getEventOrderDb(txn, room_id); + auto eventsDb = getEventsDb(txn, room_id); + auto cursor = lmdb::cursor::open(txn, orderDb); - const auto db_size = msg_db.size(txn); - if (db_size <= 3 * MAX_RESTORED_MESSAGES) + int64_t first, last; + if (cursor.get(indexVal, val, MDB_LAST)) { + last = *indexVal.data(); + } else { continue; + } + if (cursor.get(indexVal, val, MDB_FIRST)) { + first = *indexVal.data(); + } else { + continue; + } - nhlog::db()->info("[{}] message count: {}", id, db_size); + size_t message_count = static_cast(last - first); + if (message_count < MAX_RESTORED_MESSAGES) + continue; - auto cursor = lmdb::cursor::open(txn, msg_db); - while (cursor.get(ts, event, MDB_NEXT)) { - idx += 1; + while (cursor.get(indexVal, val, MDB_NEXT) && + message_count-- < MAX_RESTORED_MESSAGES) { + auto obj = json::parse(std::string(val.data(), val.size())); - if (idx > MAX_RESTORED_MESSAGES) - lmdb::cursor_del(cursor); + if (obj.count("event_id") != 0) + lmdb::dbi_del( + txn, eventsDb, lmdb::val(obj["event_id"].get())); + lmdb::cursor_del(cursor); } - cursor.close(); - - nhlog::db()->info("[{}] updated message count: {}", id, msg_db.size(txn)); } - txn.commit(); } diff --git a/src/Cache_p.h b/src/Cache_p.h index 5f01f736..37486ca0 100644 --- a/src/Cache_p.h +++ b/src/Cache_p.h @@ -18,6 +18,7 @@ #pragma once +#include #include #include @@ -38,9 +39,6 @@ #include "CacheCryptoStructs.h" #include "CacheStructs.h" -int -numeric_key_comparison(const MDB_val *a, const MDB_val *b); - class Cache : public QObject { Q_OBJECT @@ -250,7 +248,16 @@ private: const std::string &room_id, const mtx::responses::Timeline &res); - mtx::responses::Timeline getTimelineMessages(lmdb::txn &txn, const std::string &room_id); + struct Messages + { + mtx::responses::Timeline timeline; + uint64_t next_index; + bool end_of_cache = false; + }; + Messages getTimelineMessages(lmdb::txn &txn, + const std::string &room_id, + int64_t index = std::numeric_limits::max(), + bool forward = false); //! Remove a room from the cache. // void removeLeftRoom(lmdb::txn &txn, const std::string &room_id); @@ -402,15 +409,6 @@ private: return lmdb::dbi::open(txn, "pending_receipts", MDB_CREATE); } - lmdb::dbi getMessagesDb(lmdb::txn &txn, const std::string &room_id) - { - auto db = - lmdb::dbi::open(txn, std::string(room_id + "/messages").c_str(), MDB_CREATE); - lmdb::dbi_set_compare(txn, db, numeric_key_comparison); - - return db; - } - lmdb::dbi getEventsDb(lmdb::txn &txn, const std::string &room_id) { return lmdb::dbi::open(txn, std::string(room_id + "/events").c_str(), MDB_CREATE);