feat: cache replies
This commit is contained in:
parent
e1d3d97f9a
commit
6d4bb00f8b
7 changed files with 200 additions and 120 deletions
|
|
@ -1,4 +1,5 @@
|
|||
import { TDraftEvent, TRelayGroup } from '@common/types'
|
||||
import { TDraftEvent } from '@common/types'
|
||||
import { isReplyNoteEvent } from '@renderer/lib/event'
|
||||
import { formatPubkey } from '@renderer/lib/pubkey'
|
||||
import { tagNameEquals } from '@renderer/lib/tag'
|
||||
import { isWebsocketUrl, normalizeUrl } from '@renderer/lib/url'
|
||||
|
|
@ -14,8 +15,6 @@ import {
|
|||
SimplePool,
|
||||
VerifiedEvent
|
||||
} from 'nostr-tools'
|
||||
import { EVENT_TYPES, eventBus } from './event-bus.service'
|
||||
import storage from './storage.service'
|
||||
|
||||
const BIG_RELAY_URLS = [
|
||||
'wss://relay.damus.io/',
|
||||
|
|
@ -28,8 +27,6 @@ class ClientService {
|
|||
static instance: ClientService
|
||||
|
||||
private pool = new SimplePool()
|
||||
private relayUrls: string[] = BIG_RELAY_URLS
|
||||
private initPromise!: Promise<void>
|
||||
|
||||
private eventCache = new LRUCache<string, Promise<NEvent | undefined>>({ max: 10000 })
|
||||
private eventDataLoader = new DataLoader<string, NEvent | undefined>(
|
||||
|
|
@ -40,6 +37,9 @@ class ClientService {
|
|||
this.eventBatchLoadFn.bind(this),
|
||||
{ cache: false }
|
||||
)
|
||||
private repliesCache = new LRUCache<string, { refs: [string, number][]; until?: number }>({
|
||||
max: 1000
|
||||
})
|
||||
private profileCache = new LRUCache<string, Promise<TProfile>>({ max: 10000 })
|
||||
private profileDataloader = new DataLoader<string, TProfile>(
|
||||
(ids) => Promise.all(ids.map((id) => this._fetchProfileByBench32Id(id))),
|
||||
|
|
@ -62,31 +62,17 @@ class ClientService {
|
|||
|
||||
constructor() {
|
||||
if (!ClientService.instance) {
|
||||
this.initPromise = this.init()
|
||||
ClientService.instance = this
|
||||
}
|
||||
return ClientService.instance
|
||||
}
|
||||
|
||||
async init() {
|
||||
const relayGroups = await storage.getRelayGroups()
|
||||
this.relayUrls = relayGroups.find((group) => group.isActive)?.relayUrls ?? []
|
||||
eventBus.on(EVENT_TYPES.RELAY_GROUPS_CHANGED, (event) => {
|
||||
this.onRelayGroupsChange(event.detail)
|
||||
})
|
||||
}
|
||||
|
||||
onRelayGroupsChange(relayGroups: TRelayGroup[]) {
|
||||
const newRelayUrls = relayGroups.find((group) => group.isActive)?.relayUrls ?? []
|
||||
this.relayUrls = newRelayUrls
|
||||
}
|
||||
|
||||
listConnectionStatus() {
|
||||
return this.pool.listConnectionStatus()
|
||||
}
|
||||
|
||||
async publishEvent(relayUrls: string[], event: NEvent) {
|
||||
return await Promise.any(this.pool.publish(this.relayUrls.concat(relayUrls), event))
|
||||
return await Promise.any(this.pool.publish(relayUrls, event))
|
||||
}
|
||||
|
||||
subscribeEventsWithAuth(
|
||||
|
|
@ -165,10 +151,121 @@ class ClientService {
|
|||
}
|
||||
}
|
||||
|
||||
async fetchEvents(relayUrls: string[], filter: Filter) {
|
||||
await this.initPromise
|
||||
// If relayUrls is empty, use this.relayUrls
|
||||
return await this.pool.querySync(relayUrls.length > 0 ? relayUrls : BIG_RELAY_URLS, filter)
|
||||
async subscribeReplies(
|
||||
relayUrls: string[],
|
||||
parentEventId: string,
|
||||
limit: number,
|
||||
{
|
||||
onReplies,
|
||||
onNew
|
||||
}: {
|
||||
onReplies: (events: NEvent[], until?: number) => void
|
||||
onNew: (evt: NEvent) => void
|
||||
}
|
||||
) {
|
||||
let cache = this.repliesCache.get(parentEventId)
|
||||
const refs = cache?.refs ?? []
|
||||
let replies: NEvent[] = []
|
||||
if (cache) {
|
||||
replies = (await Promise.all(cache.refs.map(([id]) => this.eventCache.get(id)))).filter(
|
||||
Boolean
|
||||
) as NEvent[]
|
||||
onReplies(replies, cache.until)
|
||||
} else {
|
||||
cache = { refs }
|
||||
this.repliesCache.set(parentEventId, cache)
|
||||
}
|
||||
const since = replies.length ? replies[replies.length - 1].created_at + 1 : undefined
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
const events: NEvent[] = []
|
||||
let hasEosed = false
|
||||
const closer = this.pool.subscribeMany(
|
||||
relayUrls.length > 0 ? relayUrls : BIG_RELAY_URLS,
|
||||
[
|
||||
{
|
||||
'#e': [parentEventId],
|
||||
kinds: [kinds.ShortTextNote],
|
||||
limit,
|
||||
since
|
||||
}
|
||||
],
|
||||
{
|
||||
onevent(evt: NEvent) {
|
||||
if (!isReplyNoteEvent(evt)) return
|
||||
|
||||
if (hasEosed) {
|
||||
onNew(evt)
|
||||
} else {
|
||||
events.push(evt)
|
||||
}
|
||||
that.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
},
|
||||
oneose() {
|
||||
hasEosed = true
|
||||
const newReplies = events.sort((a, b) => a.created_at - b.created_at)
|
||||
replies = replies.concat(newReplies)
|
||||
refs.push(...newReplies.map((evt) => [evt.id, evt.created_at] as [string, number]))
|
||||
// first fetch
|
||||
if (!since) {
|
||||
cache.until = events.length >= limit ? events[0].created_at - 1 : undefined
|
||||
}
|
||||
onReplies(replies, cache.until)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return () => {
|
||||
onReplies = () => {}
|
||||
onNew = () => {}
|
||||
closer.close()
|
||||
}
|
||||
}
|
||||
|
||||
async fetchMoreReplies(relayUrls: string[], parentEventId: string, until: number, limit: number) {
|
||||
const events = await this.pool.querySync(relayUrls, {
|
||||
'#e': [parentEventId],
|
||||
kinds: [kinds.ShortTextNote],
|
||||
limit,
|
||||
until
|
||||
})
|
||||
events.forEach((evt) => {
|
||||
this.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
})
|
||||
events.sort((a, b) => a.created_at - b.created_at)
|
||||
const replies = events.filter((evt) => isReplyNoteEvent(evt))
|
||||
let cache = this.repliesCache.get(parentEventId)
|
||||
if (!cache) {
|
||||
cache = { refs: [] }
|
||||
this.repliesCache.set(parentEventId, cache)
|
||||
}
|
||||
const refs = cache.refs
|
||||
const firstRefCreatedAt = refs.length ? refs[0][1] : undefined
|
||||
const newRefs = firstRefCreatedAt
|
||||
? replies
|
||||
.filter((evt) => evt.created_at < firstRefCreatedAt)
|
||||
.map((evt) => [evt.id, evt.created_at] as [string, number])
|
||||
: replies.map((evt) => [evt.id, evt.created_at] as [string, number])
|
||||
|
||||
if (newRefs.length) {
|
||||
refs.unshift(...newRefs)
|
||||
}
|
||||
cache.until = events.length >= limit ? events[0].created_at - 1 : undefined
|
||||
return { replies, until: cache.until }
|
||||
}
|
||||
|
||||
async fetchEvents(relayUrls: string[], filter: Filter, cache = false) {
|
||||
const events = await this.pool.querySync(
|
||||
relayUrls.length > 0 ? relayUrls : BIG_RELAY_URLS,
|
||||
filter
|
||||
)
|
||||
if (cache) {
|
||||
events.forEach((evt) => {
|
||||
this.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
})
|
||||
}
|
||||
return events
|
||||
}
|
||||
|
||||
async fetchEventByBench32Id(id: string): Promise<NEvent | undefined> {
|
||||
|
|
@ -351,12 +448,12 @@ class ClientService {
|
|||
}
|
||||
if (!relayUrls.length) return
|
||||
|
||||
const events = await this.fetchEvents(relayUrls, filter)
|
||||
const events = await this.pool.querySync(relayUrls, filter)
|
||||
return events.sort((a, b) => b.created_at - a.created_at)[0]
|
||||
}
|
||||
|
||||
private async eventBatchLoadFn(ids: readonly string[]) {
|
||||
const events = await this.fetchEvents(BIG_RELAY_URLS, {
|
||||
const events = await this.pool.querySync(BIG_RELAY_URLS, {
|
||||
ids: Array.from(new Set(ids)),
|
||||
limit: ids.length
|
||||
})
|
||||
|
|
@ -369,7 +466,7 @@ class ClientService {
|
|||
}
|
||||
|
||||
private async profileBatchLoadFn(pubkeys: readonly string[]) {
|
||||
const events = await this.fetchEvents(BIG_RELAY_URLS, {
|
||||
const events = await this.pool.querySync(BIG_RELAY_URLS, {
|
||||
authors: Array.from(new Set(pubkeys)),
|
||||
kinds: [kinds.Metadata],
|
||||
limit: pubkeys.length
|
||||
|
|
@ -390,7 +487,7 @@ class ClientService {
|
|||
}
|
||||
|
||||
private async relayListBatchLoadFn(pubkeys: readonly string[]) {
|
||||
const events = await this.fetchEvents(BIG_RELAY_URLS, {
|
||||
const events = await this.pool.querySync(BIG_RELAY_URLS, {
|
||||
authors: pubkeys as string[],
|
||||
kinds: [kinds.RelayList],
|
||||
limit: pubkeys.length
|
||||
|
|
@ -434,7 +531,7 @@ class ClientService {
|
|||
|
||||
private async _fetchFollowListEvent(pubkey: string) {
|
||||
const relayList = await this.fetchRelayList(pubkey)
|
||||
const followListEvents = await this.fetchEvents(relayList.write.concat(BIG_RELAY_URLS), {
|
||||
const followListEvents = await this.pool.querySync(relayList.write.concat(BIG_RELAY_URLS), {
|
||||
authors: [pubkey],
|
||||
kinds: [kinds.Contacts]
|
||||
})
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue