refactor: subscribe
This commit is contained in:
parent
2a4968568a
commit
74d7f9be29
5 changed files with 157 additions and 188 deletions
|
|
@ -40,10 +40,8 @@ class ClientService extends EventTarget {
|
|||
| string[]
|
||||
| undefined
|
||||
> = {}
|
||||
private eventCache = new LRUCache<string, Promise<NEvent | undefined>>({ max: 10000 })
|
||||
private eventDataLoader = new DataLoader<string, NEvent | undefined>(
|
||||
(ids) => Promise.all(ids.map((id) => this._fetchEvent(id))),
|
||||
{ cacheMap: this.eventCache }
|
||||
private eventDataLoader = new DataLoader<string, NEvent | undefined>((ids) =>
|
||||
Promise.all(ids.map((id) => this._fetchEvent(id)))
|
||||
)
|
||||
private fetchEventFromBigRelaysDataloader = new DataLoader<string, NEvent | undefined>(
|
||||
this.fetchEventsFromBigRelays.bind(this),
|
||||
|
|
@ -343,9 +341,10 @@ class ClientService extends EventTarget {
|
|||
|
||||
async function startSub() {
|
||||
startedCount++
|
||||
const relay = await that.pool.ensureRelay(url, { connectionTimeout: 2000 }).catch(() => {
|
||||
const relay = await that.pool.ensureRelay(url, { connectionTimeout: 5000 }).catch(() => {
|
||||
return undefined
|
||||
})
|
||||
// cannot connect to relay
|
||||
if (!relay) {
|
||||
if (!eosed) {
|
||||
eosedCount++
|
||||
|
|
@ -356,6 +355,7 @@ class ClientService extends EventTarget {
|
|||
close: () => {}
|
||||
}
|
||||
}
|
||||
|
||||
return relay.subscribe(filters, {
|
||||
receivedEvent: (relay, id) => {
|
||||
that.trackEventSeenOn(id, relay)
|
||||
|
|
@ -372,44 +372,52 @@ class ClientService extends EventTarget {
|
|||
onevent?.(evt)
|
||||
},
|
||||
oneose: () => {
|
||||
// make sure eosed is not called multiple times
|
||||
if (eosed) return
|
||||
|
||||
eosedCount++
|
||||
eosed = eosedCount >= startedCount
|
||||
|
||||
oneose?.(eosed)
|
||||
},
|
||||
onclose: (reason: string) => {
|
||||
if (!reason.startsWith('auth-required')) {
|
||||
closedCount++
|
||||
closeReasons.push(reason)
|
||||
if (closedCount >= startedCount) {
|
||||
onclose?.(closeReasons)
|
||||
// auth-required
|
||||
if (reason.startsWith('auth-required') && !hasAuthed) {
|
||||
// already logged in
|
||||
if (that.signer) {
|
||||
relay
|
||||
.auth(async (authEvt: EventTemplate) => {
|
||||
const evt = await that.signer!.signEvent(authEvt)
|
||||
if (!evt) {
|
||||
throw new Error('sign event failed')
|
||||
}
|
||||
return evt as VerifiedEvent
|
||||
})
|
||||
.then(() => {
|
||||
hasAuthed = true
|
||||
if (!eosed) {
|
||||
subPromises.push(startSub())
|
||||
}
|
||||
})
|
||||
.catch(() => {
|
||||
// ignore
|
||||
})
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
if (hasAuthed) return
|
||||
|
||||
if (that.signer) {
|
||||
relay
|
||||
.auth(async (authEvt: EventTemplate) => {
|
||||
const evt = await that.signer!.signEvent(authEvt)
|
||||
if (!evt) {
|
||||
throw new Error('sign event failed')
|
||||
}
|
||||
return evt as VerifiedEvent
|
||||
})
|
||||
.then(() => {
|
||||
hasAuthed = true
|
||||
if (!eosed) {
|
||||
subPromises.push(startSub())
|
||||
}
|
||||
})
|
||||
.catch(() => {
|
||||
// ignore
|
||||
})
|
||||
} else if (startLogin) {
|
||||
startLogin()
|
||||
// open login dialog
|
||||
if (startLogin) {
|
||||
startLogin()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// close the subscription
|
||||
closedCount++
|
||||
closeReasons.push(reason)
|
||||
if (closedCount >= startedCount) {
|
||||
onclose?.(closeReasons)
|
||||
}
|
||||
return
|
||||
},
|
||||
eoseTimeout: 10_000 // 10s
|
||||
})
|
||||
|
|
@ -432,56 +440,24 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
|
||||
private async query(urls: string[], filter: Filter | Filter[], onevent?: (evt: NEvent) => void) {
|
||||
const relays = Array.from(new Set(urls))
|
||||
const filters = Array.isArray(filter) ? filter : [filter]
|
||||
const _knownIds = new Set<string>()
|
||||
const events: NEvent[] = []
|
||||
await Promise.allSettled(
|
||||
relays.map(async (url) => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
const relay = await this.pool.ensureRelay(url)
|
||||
let hasAuthed = false
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const startQuery = () => {
|
||||
const sub = relay.subscribe(filters, {
|
||||
receivedEvent(relay, id) {
|
||||
that.trackEventSeenOn(id, relay)
|
||||
},
|
||||
onclose(reason) {
|
||||
if (!reason.startsWith('auth-required') || hasAuthed) {
|
||||
resolve()
|
||||
return
|
||||
}
|
||||
|
||||
if (that.signer) {
|
||||
relay
|
||||
.auth((authEvt: EventTemplate) => that.signer!.signEvent(authEvt))
|
||||
.then(() => {
|
||||
hasAuthed = true
|
||||
startQuery()
|
||||
})
|
||||
.catch(reject)
|
||||
}
|
||||
},
|
||||
oneose() {
|
||||
sub.close()
|
||||
resolve()
|
||||
},
|
||||
onevent(evt) {
|
||||
if (_knownIds.has(evt.id)) return
|
||||
_knownIds.add(evt.id)
|
||||
events.push(evt)
|
||||
onevent?.(evt)
|
||||
}
|
||||
})
|
||||
return await new Promise<NEvent[]>((resolve) => {
|
||||
const events: NEvent[] = []
|
||||
const sub = this.subscribe(urls, filter, {
|
||||
onevent(evt) {
|
||||
onevent?.(evt)
|
||||
events.push(evt)
|
||||
},
|
||||
oneose: (eosed) => {
|
||||
if (eosed) {
|
||||
sub.close()
|
||||
resolve(events)
|
||||
}
|
||||
startQuery()
|
||||
})
|
||||
},
|
||||
onclose: () => {
|
||||
resolve(events)
|
||||
}
|
||||
})
|
||||
)
|
||||
return events
|
||||
})
|
||||
}
|
||||
|
||||
private async _subscribeTimeline(
|
||||
|
|
@ -509,20 +485,14 @@ class ClientService extends EventTarget {
|
|||
let since: number | undefined
|
||||
if (timeline && !Array.isArray(timeline) && timeline.refs.length && needSort) {
|
||||
cachedEvents = (
|
||||
await Promise.all(
|
||||
timeline.refs.slice(0, filter.limit).map(([id]) => this.eventCache.get(id))
|
||||
)
|
||||
).filter(Boolean) as NEvent[]
|
||||
await this.eventDataLoader.loadMany(timeline.refs.slice(0, filter.limit).map(([id]) => id))
|
||||
).filter((evt) => !!evt && !(evt instanceof Error)) as NEvent[]
|
||||
if (cachedEvents.length) {
|
||||
onEvents([...cachedEvents], false)
|
||||
since = cachedEvents[0].created_at + 1
|
||||
}
|
||||
}
|
||||
|
||||
if (!timeline && needSort) {
|
||||
this.timelines[key] = { refs: [], filter, urls: relays }
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
let events: NEvent[] = []
|
||||
|
|
@ -544,12 +514,8 @@ class ClientService extends EventTarget {
|
|||
if (!timeline || Array.isArray(timeline) || !timeline.refs.length) {
|
||||
return onNew(evt)
|
||||
}
|
||||
// the event is newer than the first ref, insert it to the front
|
||||
if (evt.created_at > timeline.refs[0][1]) {
|
||||
onNew(evt)
|
||||
return timeline.refs.unshift([evt.id, evt.created_at])
|
||||
}
|
||||
|
||||
// find the right position to insert
|
||||
let idx = 0
|
||||
for (const ref of timeline.refs) {
|
||||
if (evt.created_at > ref[1] || (evt.created_at === ref[1] && evt.id < ref[0])) {
|
||||
|
|
@ -564,6 +530,11 @@ class ClientService extends EventTarget {
|
|||
// the event is too old, ignore it
|
||||
if (idx >= timeline.refs.length) return
|
||||
|
||||
// new event
|
||||
if (idx === 0) {
|
||||
onNew(evt)
|
||||
}
|
||||
|
||||
// insert the event to the right position
|
||||
timeline.refs.splice(idx, 0, [evt.id, evt.created_at])
|
||||
},
|
||||
|
|
@ -575,7 +546,7 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
if (!eosed) {
|
||||
events = events.sort((a, b) => b.created_at - a.created_at).slice(0, filter.limit)
|
||||
return onEvents([...events.concat(cachedEvents)], false)
|
||||
return onEvents([...events.concat(cachedEvents).slice(0, filter.limit)], false)
|
||||
}
|
||||
|
||||
events = events.sort((a, b) => b.created_at - a.created_at).slice(0, filter.limit)
|
||||
|
|
@ -590,22 +561,16 @@ class ClientService extends EventTarget {
|
|||
return onEvents([...events], true)
|
||||
}
|
||||
|
||||
const newEvents = events.filter((evt) => {
|
||||
const firstRef = timeline.refs[0]
|
||||
return (
|
||||
evt.created_at > firstRef[1] || (evt.created_at === firstRef[1] && evt.id < firstRef[0])
|
||||
)
|
||||
})
|
||||
const newRefs = newEvents.map((evt) => [evt.id, evt.created_at] as TTimelineRef)
|
||||
const newRefs = events.map((evt) => [evt.id, evt.created_at] as TTimelineRef)
|
||||
|
||||
if (newRefs.length >= filter.limit) {
|
||||
if (events.length >= filter.limit) {
|
||||
// if new refs are more than limit, means old refs are too old, replace them
|
||||
timeline.refs = newRefs
|
||||
onEvents([...newEvents], true)
|
||||
onEvents([...events], true)
|
||||
} else {
|
||||
// merge new refs with old refs
|
||||
timeline.refs = newRefs.concat(timeline.refs)
|
||||
onEvents([...newEvents.concat(cachedEvents)], true)
|
||||
onEvents([...events.concat(cachedEvents).slice(0, filter.limit)], true)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
@ -625,14 +590,14 @@ class ClientService extends EventTarget {
|
|||
if (!timeline || Array.isArray(timeline)) return []
|
||||
|
||||
const { filter, urls, refs } = timeline
|
||||
const startIdx = refs.findIndex(([, createdAt]) => createdAt < until)
|
||||
const startIdx = refs.findIndex(([, createdAt]) => createdAt <= until)
|
||||
const cachedEvents =
|
||||
startIdx >= 0
|
||||
? ((
|
||||
await Promise.all(
|
||||
refs.slice(startIdx, startIdx + limit).map(([id]) => this.eventCache.get(id))
|
||||
await this.eventDataLoader.loadMany(
|
||||
refs.slice(startIdx, startIdx + limit).map(([id]) => id)
|
||||
)
|
||||
).filter(Boolean) as NEvent[])
|
||||
).filter((evt) => !!evt && !(evt instanceof Error)) as NEvent[])
|
||||
: []
|
||||
if (cachedEvents.length >= limit) {
|
||||
return cachedEvents
|
||||
|
|
@ -640,7 +605,7 @@ class ClientService extends EventTarget {
|
|||
|
||||
until = cachedEvents.length ? cachedEvents[cachedEvents.length - 1].created_at - 1 : until
|
||||
limit = limit - cachedEvents.length
|
||||
let events = await this.query(urls, { ...filter, until: until, limit: limit })
|
||||
let events = await this.query(urls, { ...filter, until, limit })
|
||||
events.forEach((evt) => {
|
||||
this.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
})
|
||||
|
|
@ -687,7 +652,7 @@ class ClientService extends EventTarget {
|
|||
break
|
||||
}
|
||||
if (eventId) {
|
||||
const cache = await this.eventCache.get(eventId)
|
||||
const cache = await this.eventDataLoader.load(eventId)
|
||||
if (cache) {
|
||||
return cache
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue