refactor: 🏗️
This commit is contained in:
parent
7b662c4dcc
commit
984d18683e
6 changed files with 286 additions and 351 deletions
|
|
@ -1,5 +1,4 @@
|
|||
import { TDraftEvent } from '@common/types'
|
||||
import { isReplyNoteEvent } from '@renderer/lib/event'
|
||||
import { formatPubkey } from '@renderer/lib/pubkey'
|
||||
import { tagNameEquals } from '@renderer/lib/tag'
|
||||
import { isWebsocketUrl, normalizeUrl } from '@renderer/lib/url'
|
||||
|
|
@ -23,12 +22,23 @@ const BIG_RELAY_URLS = [
|
|||
'wss://relay.noswhere.com/'
|
||||
]
|
||||
|
||||
type TTimelineRef = [string, number]
|
||||
|
||||
class ClientService {
|
||||
static instance: ClientService
|
||||
|
||||
private defaultRelayUrls: string[] = BIG_RELAY_URLS
|
||||
private pool = new SimplePool()
|
||||
|
||||
private timelines: Record<
|
||||
string,
|
||||
| {
|
||||
refs: TTimelineRef[]
|
||||
filter: Omit<Filter, 'since' | 'until'> & { limit: number }
|
||||
urls: string[]
|
||||
}
|
||||
| undefined
|
||||
> = {}
|
||||
private eventCache = new LRUCache<string, Promise<NEvent | undefined>>({ max: 10000 })
|
||||
private eventDataLoader = new DataLoader<string, NEvent | undefined>(
|
||||
(ids) => Promise.all(ids.map((id) => this._fetchEvent(id))),
|
||||
|
|
@ -38,10 +48,6 @@ class ClientService {
|
|||
this.eventBatchLoadFn.bind(this),
|
||||
{ cache: false }
|
||||
)
|
||||
private repliesCache = new LRUCache<string, { refs: [string, number][]; until?: number }>({
|
||||
max: 1000
|
||||
})
|
||||
private notificationsCache: [string, number][] = []
|
||||
private profileCache = new LRUCache<string, Promise<TProfile>>({ max: 10000 })
|
||||
private profileDataloader = new DataLoader<string, TProfile>(
|
||||
(ids) => Promise.all(ids.map((id) => this._fetchProfile(id))),
|
||||
|
|
@ -91,26 +97,65 @@ class ClientService {
|
|||
this.defaultRelayUrls = Array.from(new Set(urls.concat(BIG_RELAY_URLS)))
|
||||
}
|
||||
|
||||
getDefaultRelayUrls() {
|
||||
return this.defaultRelayUrls
|
||||
}
|
||||
|
||||
async publishEvent(relayUrls: string[], event: NEvent) {
|
||||
return await Promise.any(this.pool.publish(relayUrls, event))
|
||||
}
|
||||
|
||||
subscribeEventsWithAuth(
|
||||
private async generateTimelineKey(urls: string[], filter: Filter): Promise<string> {
|
||||
const paramsStr = JSON.stringify({ urls: urls.sort(), filter })
|
||||
const encoder = new TextEncoder()
|
||||
const data = encoder.encode(paramsStr)
|
||||
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
|
||||
const hashArray = Array.from(new Uint8Array(hashBuffer))
|
||||
return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('')
|
||||
}
|
||||
|
||||
async subscribeTimeline(
|
||||
urls: string[],
|
||||
filter: Filter,
|
||||
filter: Omit<Filter, 'since' | 'until'> & { limit: number }, // filter with limit,
|
||||
{
|
||||
onEose,
|
||||
onEvents,
|
||||
onNew
|
||||
}: {
|
||||
onEose: (events: NEvent[]) => void
|
||||
onEvents: (events: NEvent[], eosed: boolean) => void
|
||||
onNew: (evt: NEvent) => void
|
||||
},
|
||||
signer?: (evt: TDraftEvent) => Promise<NEvent>
|
||||
{
|
||||
signer,
|
||||
needSort = true
|
||||
}: {
|
||||
signer?: (evt: TDraftEvent) => Promise<NEvent>
|
||||
needSort?: boolean
|
||||
} = {}
|
||||
) {
|
||||
const key = await this.generateTimelineKey(urls, filter)
|
||||
const timeline = this.timelines[key]
|
||||
let cachedEvents: NEvent[] = []
|
||||
let since: number | undefined
|
||||
if (timeline && timeline.refs.length) {
|
||||
cachedEvents = (
|
||||
await Promise.all(
|
||||
timeline.refs.slice(0, filter.limit).map(([id]) => this.eventCache.get(id))
|
||||
)
|
||||
).filter(Boolean) as NEvent[]
|
||||
if (cachedEvents.length) {
|
||||
onEvents(cachedEvents, false)
|
||||
since = cachedEvents[0].created_at + 1
|
||||
}
|
||||
}
|
||||
|
||||
if (!timeline && needSort) {
|
||||
this.timelines[key] = { refs: [], filter, urls }
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
const _knownIds = new Set<string>()
|
||||
const events: NEvent[] = []
|
||||
let events: NEvent[] = []
|
||||
let started = 0
|
||||
let eosed = 0
|
||||
const subPromises = urls.map(async (url) => {
|
||||
|
|
@ -121,19 +166,52 @@ class ClientService {
|
|||
|
||||
function startSub() {
|
||||
started++
|
||||
return relay.subscribe([filter], {
|
||||
return relay.subscribe([since ? { ...filter, since } : filter], {
|
||||
alreadyHaveEvent: (id: string) => {
|
||||
const have = _knownIds.has(id)
|
||||
if (have) {
|
||||
return true
|
||||
}
|
||||
_knownIds.add(id)
|
||||
return have
|
||||
return false
|
||||
},
|
||||
onevent(evt: NEvent) {
|
||||
if (eosed === started) {
|
||||
onNew(evt)
|
||||
} else {
|
||||
events.push(evt)
|
||||
}
|
||||
that.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
// not eosed yet, push to events
|
||||
if (eosed < started) {
|
||||
return events.push(evt)
|
||||
}
|
||||
// eosed, (algo relay feeds) no need to sort and cache
|
||||
if (!needSort) {
|
||||
return onNew(evt)
|
||||
}
|
||||
|
||||
const timeline = that.timelines[key]
|
||||
if (!timeline || !timeline.refs.length) {
|
||||
return onNew(evt)
|
||||
}
|
||||
// the event is newer than the first ref, insert it to the front
|
||||
if (evt.created_at > timeline.refs[0][1]) {
|
||||
onNew(evt)
|
||||
return timeline.refs.unshift([evt.id, evt.created_at])
|
||||
}
|
||||
|
||||
let idx = 0
|
||||
for (const ref of timeline.refs) {
|
||||
if (evt.created_at > ref[1] || (evt.created_at === ref[1] && evt.id < ref[0])) {
|
||||
break
|
||||
}
|
||||
// the event is already in the cache
|
||||
if (evt.created_at === ref[1] && evt.id === ref[0]) {
|
||||
return
|
||||
}
|
||||
idx++
|
||||
}
|
||||
// the event is too old, ignore it
|
||||
if (idx >= timeline.refs.length) return
|
||||
|
||||
// insert the event to the right position
|
||||
timeline.refs.splice(idx, 0, [evt.id, evt.created_at])
|
||||
},
|
||||
onclose(reason: string) {
|
||||
if (reason.startsWith('auth-required:')) {
|
||||
|
|
@ -151,255 +229,91 @@ class ClientService {
|
|||
},
|
||||
oneose() {
|
||||
eosed++
|
||||
if (eosed === started) {
|
||||
onEose(events)
|
||||
if (eosed < started) return
|
||||
|
||||
// (algo feeds) no need to sort and cache
|
||||
if (!needSort) {
|
||||
return onEvents(events, true)
|
||||
}
|
||||
events = events.sort((a, b) => b.created_at - a.created_at).slice(0, filter.limit)
|
||||
|
||||
const timeline = that.timelines[key]
|
||||
// no cache yet
|
||||
if (!timeline || !timeline.refs.length) {
|
||||
that.timelines[key] = {
|
||||
refs: events.map((evt) => [evt.id, evt.created_at]),
|
||||
filter,
|
||||
urls
|
||||
}
|
||||
return onEvents(events, true)
|
||||
}
|
||||
|
||||
const newEvents = events.filter((evt) => {
|
||||
const firstRef = timeline.refs[0]
|
||||
return (
|
||||
evt.created_at > firstRef[1] ||
|
||||
(evt.created_at === firstRef[1] && evt.id < firstRef[0])
|
||||
)
|
||||
})
|
||||
const newRefs = newEvents.map((evt) => [evt.id, evt.created_at] as TTimelineRef)
|
||||
|
||||
if (newRefs.length >= filter.limit) {
|
||||
// if new refs are more than limit, means old refs are too old, replace them
|
||||
timeline.refs = newRefs
|
||||
onEvents(newEvents, true)
|
||||
} else {
|
||||
// merge new refs with old refs
|
||||
timeline.refs = newRefs.concat(timeline.refs)
|
||||
onEvents(newEvents.concat(cachedEvents), true)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
return () => {
|
||||
onEose = () => {}
|
||||
onNew = () => {}
|
||||
subPromises.forEach((subPromise) => {
|
||||
subPromise.then((sub) => {
|
||||
sub.close()
|
||||
return {
|
||||
timelineKey: key,
|
||||
closer: () => {
|
||||
onEvents = () => {}
|
||||
onNew = () => {}
|
||||
subPromises.forEach((subPromise) => {
|
||||
subPromise.then((sub) => {
|
||||
sub.close()
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async subscribeReplies(
|
||||
relayUrls: string[],
|
||||
parentEventId: string,
|
||||
limit: number,
|
||||
{
|
||||
onReplies,
|
||||
onNew
|
||||
}: {
|
||||
onReplies: (events: NEvent[], isCache: boolean, until?: number) => void
|
||||
onNew: (evt: NEvent) => void
|
||||
}
|
||||
) {
|
||||
let cache = this.repliesCache.get(parentEventId)
|
||||
const refs = cache?.refs ?? []
|
||||
let replies: NEvent[] = []
|
||||
if (cache) {
|
||||
replies = (await Promise.all(cache.refs.map(([id]) => this.eventCache.get(id)))).filter(
|
||||
Boolean
|
||||
) as NEvent[]
|
||||
onReplies(replies, true, cache.until)
|
||||
} else {
|
||||
cache = { refs }
|
||||
this.repliesCache.set(parentEventId, cache)
|
||||
}
|
||||
const since = replies.length ? replies[replies.length - 1].created_at + 1 : undefined
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
const events: NEvent[] = []
|
||||
let hasEosed = false
|
||||
const closer = this.pool.subscribeMany(
|
||||
relayUrls.length > 0 ? relayUrls : this.defaultRelayUrls,
|
||||
[
|
||||
{
|
||||
'#e': [parentEventId],
|
||||
kinds: [kinds.ShortTextNote],
|
||||
limit,
|
||||
since
|
||||
}
|
||||
],
|
||||
{
|
||||
onevent(evt: NEvent) {
|
||||
if (hasEosed) {
|
||||
if (!isReplyNoteEvent(evt)) return
|
||||
onNew(evt)
|
||||
} else {
|
||||
events.push(evt)
|
||||
}
|
||||
that.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
},
|
||||
oneose() {
|
||||
hasEosed = true
|
||||
const newReplies = events
|
||||
.sort((a, b) => a.created_at - b.created_at)
|
||||
.slice(0, limit)
|
||||
.filter(isReplyNoteEvent)
|
||||
replies = replies.concat(newReplies)
|
||||
// first fetch
|
||||
if (!since) {
|
||||
cache.until = events.length >= limit ? events[0].created_at - 1 : undefined
|
||||
}
|
||||
onReplies(replies, false, cache.until)
|
||||
const lastRefCreatedAt = refs.length ? refs[refs.length - 1][1] : undefined
|
||||
if (lastRefCreatedAt) {
|
||||
refs.push(
|
||||
...newReplies
|
||||
.filter((reply) => reply.created_at > lastRefCreatedAt)
|
||||
.map((evt) => [evt.id, evt.created_at] as [string, number])
|
||||
)
|
||||
} else {
|
||||
refs.push(...newReplies.map((evt) => [evt.id, evt.created_at] as [string, number]))
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return () => {
|
||||
onReplies = () => {}
|
||||
onNew = () => {}
|
||||
closer.close()
|
||||
}
|
||||
}
|
||||
|
||||
async subscribeNotifications(
|
||||
pubkey: string,
|
||||
limit: number,
|
||||
{
|
||||
onNotifications,
|
||||
onNew
|
||||
}: {
|
||||
onNotifications: (events: NEvent[], isCache: boolean) => void
|
||||
onNew: (evt: NEvent) => void
|
||||
}
|
||||
) {
|
||||
let cachedNotifications: NEvent[] = []
|
||||
if (this.notificationsCache.length) {
|
||||
cachedNotifications = (
|
||||
await Promise.all(this.notificationsCache.map(([id]) => this.eventCache.get(id)))
|
||||
).filter(Boolean) as NEvent[]
|
||||
onNotifications(cachedNotifications, true)
|
||||
}
|
||||
const since = this.notificationsCache.length ? this.notificationsCache[0][1] + 1 : undefined
|
||||
async loadMoreTimeline(key: string, until: number, limit: number) {
|
||||
const timeline = this.timelines[key]
|
||||
if (!timeline) return []
|
||||
|
||||
const relayList = await this.fetchRelayList(pubkey)
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
const events: NEvent[] = []
|
||||
let hasEosed = false
|
||||
let count = 0
|
||||
const closer = this.pool.subscribeMany(
|
||||
relayList.read.length >= 4
|
||||
? relayList.read
|
||||
: relayList.read.concat(this.defaultRelayUrls).slice(0, 4),
|
||||
[
|
||||
{
|
||||
kinds: [kinds.ShortTextNote, kinds.Repost, kinds.Reaction],
|
||||
'#p': [pubkey],
|
||||
limit,
|
||||
since
|
||||
}
|
||||
],
|
||||
{
|
||||
onevent(evt: NEvent) {
|
||||
count++
|
||||
if (hasEosed) {
|
||||
if (evt.pubkey === pubkey) return
|
||||
onNew(evt)
|
||||
} else {
|
||||
events.push(evt)
|
||||
}
|
||||
that.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
},
|
||||
oneose() {
|
||||
hasEosed = true
|
||||
const newNotifications = events
|
||||
.sort((a, b) => b.created_at - a.created_at)
|
||||
.slice(0, limit)
|
||||
.filter((evt) => evt.pubkey !== pubkey)
|
||||
if (count >= limit) {
|
||||
that.notificationsCache = newNotifications.map(
|
||||
(evt) => [evt.id, evt.created_at] as [string, number]
|
||||
const { filter, urls, refs } = timeline
|
||||
const startIdx = refs.findIndex(([, createdAt]) => createdAt < until)
|
||||
const cachedEvents =
|
||||
startIdx >= 0
|
||||
? ((
|
||||
await Promise.all(
|
||||
refs.slice(startIdx, startIdx + limit).map(([id]) => this.eventCache.get(id))
|
||||
)
|
||||
onNotifications(newNotifications, false)
|
||||
} else {
|
||||
that.notificationsCache = [
|
||||
...newNotifications.map((evt) => [evt.id, evt.created_at] as [string, number]),
|
||||
...that.notificationsCache
|
||||
]
|
||||
onNotifications(newNotifications.concat(cachedNotifications), false)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return () => {
|
||||
onNotifications = () => {}
|
||||
onNew = () => {}
|
||||
closer.close()
|
||||
).filter(Boolean) as NEvent[])
|
||||
: []
|
||||
if (cachedEvents.length >= limit) {
|
||||
return cachedEvents
|
||||
}
|
||||
}
|
||||
const restLimit = limit - cachedEvents.length
|
||||
const restUntil = cachedEvents.length
|
||||
? cachedEvents[cachedEvents.length - 1].created_at - 1
|
||||
: until
|
||||
|
||||
async fetchMoreReplies(relayUrls: string[], parentEventId: string, until: number, limit: number) {
|
||||
let events = await this.pool.querySync(relayUrls, {
|
||||
'#e': [parentEventId],
|
||||
kinds: [kinds.ShortTextNote],
|
||||
limit,
|
||||
until
|
||||
})
|
||||
let events = await this.pool.querySync(urls, { ...filter, until: restUntil, limit: restLimit })
|
||||
events.forEach((evt) => {
|
||||
this.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
})
|
||||
events = events.sort((a, b) => a.created_at - b.created_at).slice(0, limit)
|
||||
const replies = events.filter((evt) => isReplyNoteEvent(evt))
|
||||
let cache = this.repliesCache.get(parentEventId)
|
||||
if (!cache) {
|
||||
cache = { refs: [] }
|
||||
this.repliesCache.set(parentEventId, cache)
|
||||
}
|
||||
const refs = cache.refs
|
||||
const firstRefCreatedAt = refs.length ? refs[0][1] : undefined
|
||||
const newRefs = firstRefCreatedAt
|
||||
? replies
|
||||
.filter((evt) => evt.created_at < firstRefCreatedAt)
|
||||
.map((evt) => [evt.id, evt.created_at] as [string, number])
|
||||
: replies.map((evt) => [evt.id, evt.created_at] as [string, number])
|
||||
|
||||
if (newRefs.length) {
|
||||
refs.unshift(...newRefs)
|
||||
}
|
||||
cache.until = events.length >= limit ? events[0].created_at - 1 : undefined
|
||||
return { replies, until: cache.until }
|
||||
}
|
||||
|
||||
async fetchMoreNotifications(pubkey: string, until: number, limit: number) {
|
||||
const relayList = await this.fetchRelayList(pubkey)
|
||||
const events = await this.pool.querySync(
|
||||
relayList.read.length >= 4
|
||||
? relayList.read
|
||||
: relayList.read.concat(this.defaultRelayUrls).slice(0, 4),
|
||||
{
|
||||
kinds: [kinds.ShortTextNote, kinds.Repost, kinds.Reaction],
|
||||
'#p': [pubkey],
|
||||
limit,
|
||||
until
|
||||
}
|
||||
)
|
||||
events.forEach((evt) => {
|
||||
this.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
})
|
||||
const notifications = events
|
||||
.sort((a, b) => b.created_at - a.created_at)
|
||||
.slice(0, limit)
|
||||
.filter((evt) => evt.pubkey !== pubkey)
|
||||
|
||||
const cacheLastCreatedAt = this.notificationsCache.length
|
||||
? this.notificationsCache[this.notificationsCache.length - 1][1]
|
||||
: undefined
|
||||
this.notificationsCache = this.notificationsCache.concat(
|
||||
(cacheLastCreatedAt
|
||||
? notifications.filter((evt) => evt.created_at < cacheLastCreatedAt)
|
||||
: notifications
|
||||
).map((evt) => [evt.id, evt.created_at] as [string, number])
|
||||
)
|
||||
|
||||
return notifications
|
||||
}
|
||||
|
||||
clearNotificationsCache() {
|
||||
this.notificationsCache = []
|
||||
events = events.sort((a, b) => b.created_at - a.created_at).slice(0, restLimit)
|
||||
timeline.refs.push(...events.map((evt) => [evt.id, evt.created_at] as TTimelineRef))
|
||||
return cachedEvents.concat(events)
|
||||
}
|
||||
|
||||
async fetchEvents(relayUrls: string[], filter: Filter, cache = false) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue