src/storage/SqlStore.js
import { Q } from '@nozbe/watermelondb'
import {
MatrixEvent, MemoryStore, SyncAccumulator, User,
} from 'matrix-js-sdk'
import { Adapter } from '../util/reactAdapter'
import { STORE_PREFIX } from './SqlStoreSchema'
const debug = require('debug')('ditto:services:matrix:SqlStore')
// If this value is too small we'll be writing very often which will cause
// noticable stop-the-world pauses. If this value is too big we'll be writing
// so infrequently that the /sync size gets bigger on reload. Writing more
// often does not affect the length of the pause since the entire /sync
// response is persisted each time.
const WRITE_DELAY_MS = 1000 * 60 // once every minute
export default class SqlStore extends MemoryStore {
constructor() {
super()
this._syncAccumulator = new SyncAccumulator()
this.startedUp = false
this._isNewlyCreated = false
this._isSaving = false
this._syncTs = 0
// Records the last-modified-time of each user at the last point we saved
// to storage, such that we can derive the set if users that have been
// modified since we last saved.
this._userModifiedMap = {
// user_id : timestamp
}
}
async startup() {
debug('Startup')
if (this.startedUp) {
debug('Startup: already started')
return
}
try {
await Adapter.getInteractionManager().runAfterInteractions({
name: `SqlStore.load.${STORE_PREFIX}data`,
gen: async () => {
const jsonAccountData = await Adapter.getStorage().getItem(`${STORE_PREFIX}account_data`)
const accountData = jsonAccountData ? JSON.parse(jsonAccountData) : null
const jsonSyncData = await Adapter.getStorage().getItem(`${STORE_PREFIX}sync`)
const syncData = jsonSyncData ? JSON.parse(jsonSyncData) : {
nextBatch: null,
groupsData: null,
}
if (!jsonSyncData) this._isNewlyCreated = true
const roomsData = { join: {}, invite: {}, leave: {} }
const dbRooms = await Adapter.getStorage().getCollection(`${STORE_PREFIX}rooms`).query().fetch()
for (const dbRoom of dbRooms) {
roomsData[dbRoom.membership][dbRoom.id] = dbRoom.data
}
this._syncAccumulator.accumulate({
next_batch: syncData.nextBatch,
rooms: roomsData,
groups: syncData.groupsData,
account_data: {
events: accountData,
},
})
const dbUsers = await Adapter.getStorage().getCollection(`${STORE_PREFIX}users`).query().fetch()
for (const user of dbUsers) {
const u = new User(user.userId)
if (user.event) {
u.setPresenceEvent(new MatrixEvent(user.event))
}
this._userModifiedMap[u.userId] = u.getLastModifiedTime()
this.storeUser(u)
}
},
})
} catch (e) {
}
}
async getSavedSync() {
const data = this._syncAccumulator.getJSON()
if (!data.nextBatch) return null
return JSON.parse(JSON.stringify(data))
}
isNewlyCreated() {
return this._isNewlyCreated
}
async getSavedSyncToken() {
return this._syncAccumulator.getNextBatchToken()
}
async deleteAllData() {
return Adapter.getStorage().reset
}
wantsSave() {
const now = Date.now()
return now - this._syncTs > WRITE_DELAY_MS
}
save(force) {
return new Promise((resolve) => {
if (!this._isSaving && (force || this.wantsSave())) {
this._isSaving = true
Adapter.getInteractionManager().runAfterInteractions(async () => {
this._syncTs = Date.now()
await this._syncToStorage()
this._isSaving = false
resolve()
})
} else {
resolve()
}
})
}
async _syncToStorage() {
try {
const syncData = this._syncAccumulator.getJSON()
const updatedUserPresences = []
for (const u of this.getUsers()) {
if (this._userModifiedMap[u.userId] === u.getLastModifiedTime()) continue
if (!u.events.presence) continue
updatedUserPresences.push({
userId: u.userId,
event: u.events.presence.event,
})
this._userModifiedMap[u.userId] = u.getLastModifiedTime()
}
const usersSync = []
await Adapter.getInteractionManager().runAfterInteractions({
name: `SqlStore.build.${STORE_PREFIX}users`,
gen: async () => {
if (updatedUserPresences.length > 0) {
const dbUsers = await Adapter.getStorage().getCollection(`${STORE_PREFIX}users`).query().fetch()
for (const userPresence of updatedUserPresences) {
const dbUser = dbUsers.find((user) => user.id === userPresence.userId)
if (dbUser) {
const updatedUser = dbUser.prepareUpdate((user) => {
user.value = userPresence
})
usersSync.push(updatedUser)
} else {
const newUser = Adapter.getStorage()
.getCollection(`${STORE_PREFIX}users`)
.prepareCreate((user) => {
user._raw.id = userPresence.userId
user.value = userPresence
})
usersSync.push(newUser)
}
}
}
},
})
const roomsSync = []
await Adapter.getInteractionManager().runAfterInteractions({
name: `SqlStore.build.${STORE_PREFIX}rooms`,
gen: async () => {
const dbRooms = await Adapter
.getStorage()
.getCollection(`${STORE_PREFIX}rooms`)
.query()
.fetch()
for (const [membership, syncRooms] of Object.entries(syncData.roomsData)) {
for (const [roomId, roomData] of Object.entries(syncRooms)) {
const roomIndex = dbRooms.findIndex((room) => room.id === roomId)
if (roomIndex === -1) {
const newRoom = Adapter
.getStorage()
.getCollection(`${STORE_PREFIX}rooms`)
.prepareCreate((room) => {
room._raw.id = roomId
room.membership = membership
room.data = roomData
})
roomsSync.push(newRoom)
} else {
const updatedRoom = dbRooms[roomIndex].prepareUpdate((room) => {
room.membership = membership
room.data = roomData
})
roomsSync.push(updatedRoom)
dbRooms.splice(roomIndex, 1)
}
}
}
if (dbRooms.length > 0) {
for (const dbRoom of dbRooms) {
roomsSync.push(dbRoom.prepareDestroyPermanently())
}
}
},
})
if (usersSync.length > 0 || roomsSync.length > 0) {
await Adapter.getInteractionManager().runAfterInteractions({
name: `SqlStore.store.${STORE_PREFIX}rooms_users`,
gen: async () => {
await Adapter.getStorage().batch([...usersSync, ...roomsSync])
},
})
}
await Adapter.getInteractionManager().runAfterInteractions({
name: `SqlStore.store.${STORE_PREFIX}account_data`,
gen: async () => {
await Adapter.getStorage().setItem(
`${STORE_PREFIX}account_data`,
JSON.stringify(syncData.accountData),
)
},
})
await Adapter.getInteractionManager().runAfterInteractions({
name: `SqlStore.store.${STORE_PREFIX}sync`,
gen: async () => {
await Adapter.getStorage().setItem(
`${STORE_PREFIX}sync`,
JSON.stringify({
nextBatch: syncData.nextBatch,
groupsData: syncData.groupsData,
}),
)
},
})
} catch (e) {
debug('Error syncing to storage')
}
}
async setSyncData(syncData) {
return this._syncAccumulator.accumulate(syncData)
}
async getOutOfBandMembers(roomId) {
const memberships = []
let oobWritten = false
const dbMemberships = await Adapter.getStorage().getCollection(`${STORE_PREFIX}memberships`)
.query(Q.where('room_id', roomId))
.fetch()
for (const membership of dbMemberships) {
if (membership.oob_written) {
oobWritten = true
} else {
memberships.push(membership)
}
}
if (memberships.length > 0 || oobWritten) {
debug('Found %s membership events for room %s', memberships.length, roomId, memberships)
return memberships
}
}
async setOutOfBandMembers(roomId, membershipEvents) {
MemoryStore.prototype.setOutOfBandMembers.call(this, roomId, membershipEvents)
const dbMemberships = await Adapter.getStorage().getCollection(`${STORE_PREFIX}memberships`)
.query(Q.where('room_id', roomId))
.fetch()
const membershipsSync = []
for (const membershipEvent of membershipEvents) {
const dbMembership = dbMemberships
.find((membership) => membership.id === `${roomId}:${membershipEvent.state_key}`)
if (dbMembership) {
const updatedMembership = dbMembership
.prepareUpdate((membership) => {
membership.value = membershipEvent
})
membershipsSync.push(updatedMembership)
} else {
const newMembership = Adapter
.getStorage()
.getCollection(`${STORE_PREFIX}memberships`)
.prepareCreate((membership) => {
membership._raw.id = `${roomId}:${membershipEvent.state_key}`
membership.roomId = roomId
membership.value = membershipEvent
})
membershipsSync.push(newMembership)
}
}
// aside from all the events, we also write a marker object to the store
// to mark the fact that OOB members have been written for this room.
// It's possible that 0 members need to be written as all where previously know
// but we still need to know whether to return null or [] from getOutOfBandMembers
// where null means out of band members haven't been stored yet for this room
const dbMarker = dbMemberships.find((membership) => membership.id === roomId)
if (!dbMarker) {
const newMembership = Adapter
.getStorage()
.getCollection(`${STORE_PREFIX}memberships`)
.prepareCreate((membership) => {
membership._raw.id = roomId
membership.roomId = roomId
membership.value = { oob_written: true }
})
membershipsSync.push(newMembership)
}
await Adapter.getStorage().batch(membershipsSync)
}
async clearOutOfBandMembers(roomId) {
MemoryStore.prototype.clearOutOfBandMembers.call(this)
return Adapter.getStorage().action(async () => {
await Adapter.getStorage().getCollection(`${STORE_PREFIX}memberships`)
.query(Q.where('room_id', roomId))
.destroyAllPermanently()
})
}
async getClientOptions() {
const jsonClientOptions = await Adapter.getStorage().getItem(`${STORE_PREFIX}client_options`)
return JSON.parse(jsonClientOptions)
}
async storeClientOptions(options) {
MemoryStore.prototype.storeClientOptions.call(this, options)
return Adapter.getStorage().setItem(`${STORE_PREFIX}client_options`, JSON.stringify(options))
}
}