feat: adapt for algo relay

This commit is contained in:
codytseng 2024-11-17 23:08:01 +08:00
parent feb04cce87
commit d1150b947a
4 changed files with 163 additions and 131 deletions

View file

@ -1,13 +1,20 @@
import { TRelayGroup } from '@common/types'
import { TDraftEvent, TRelayGroup } from '@common/types'
import { formatPubkey } from '@renderer/lib/pubkey'
import { tagNameEquals } from '@renderer/lib/tag'
import { isWebsocketUrl, normalizeUrl } from '@renderer/lib/url'
import { TProfile, TRelayList } from '@renderer/types'
import DataLoader from 'dataloader'
import { LRUCache } from 'lru-cache'
import { Filter, kinds, Event as NEvent, SimplePool } from 'nostr-tools'
import {
EventTemplate,
Filter,
kinds,
Event as NEvent,
SimplePool,
VerifiedEvent
} from 'nostr-tools'
import { EVENT_TYPES, eventBus } from './event-bus.service'
import storage from './storage.service'
import { isWebsocketUrl, normalizeUrl } from '@renderer/lib/url'
const BIG_RELAY_URLS = [
'wss://relay.damus.io/',
@ -26,10 +33,7 @@ class ClientService {
private eventByFilterCache = new LRUCache<string, Promise<NEvent | undefined>>({
max: 10000,
fetchMethod: async (filterStr) => {
const events = await this.fetchEvents(
BIG_RELAY_URLS.concat(this.relayUrls),
JSON.parse(filterStr)
)
const events = await this.fetchEvents(BIG_RELAY_URLS, JSON.parse(filterStr))
events.forEach((event) => this.addEventToCache(event))
return events.sort((a, b) => b.created_at - a.created_at)[0]
}
@ -85,44 +89,86 @@ class ClientService {
return await Promise.any(this.pool.publish(this.relayUrls.concat(relayUrls), event))
}
subscribeEvents(
subscribeEventsWithAuth(
urls: string[],
filter: Filter,
opts: {
{
onEose,
onNew
}: {
onEose: (events: NEvent[]) => void
onNew: (evt: NEvent) => void
}
},
signer?: (evt: TDraftEvent) => Promise<NEvent>
) {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this
const _knownIds = new Set<string>()
const events: NEvent[] = []
let eose = false
return this.pool.subscribeMany(
urls.length > 0 ? urls : this.relayUrls.concat(BIG_RELAY_URLS),
[filter],
{
onevent: (evt) => {
if (eose) {
opts.onNew(evt)
} else {
events.push(evt)
let started = 0
let eosed = 0
const subPromises = urls.map(async (url) => {
const relay = await this.pool.ensureRelay(url)
let hasAuthed = false
return startSub()
function startSub() {
started++
return relay.subscribe([filter], {
alreadyHaveEvent: (id: string) => {
const have = _knownIds.has(id)
_knownIds.add(id)
return have
},
onevent(evt: NEvent) {
if (eosed === started) {
onNew(evt)
} else {
events.push(evt)
}
that.eventByIdCache.set(evt.id, Promise.resolve(evt))
},
onclose(reason: string) {
if (reason.startsWith('auth-required:')) {
if (!hasAuthed && signer) {
relay
.auth((authEvt: EventTemplate) => {
return signer(authEvt) as Promise<VerifiedEvent>
})
.then(() => {
hasAuthed = true
startSub()
})
}
}
},
oneose() {
eosed++
if (eosed === started) {
events.sort((a, b) => b.created_at - a.created_at)
onEose(events)
}
}
},
oneose: () => {
eose = true
opts.onEose(events.sort((a, b) => b.created_at - a.created_at))
},
onclose: () => {
if (!eose) {
opts.onEose(events.sort((a, b) => b.created_at - a.created_at))
}
}
})
}
)
})
return () => {
onEose = () => {}
onNew = () => {}
subPromises.forEach((subPromise) => {
subPromise.then((sub) => {
sub.close()
})
})
}
}
async fetchEvents(relayUrls: string[], filter: Filter) {
await this.initPromise
// If relayUrls is empty, use this.relayUrls
return await this.pool.querySync(relayUrls.length > 0 ? relayUrls : this.relayUrls, filter)
return await this.pool.querySync(relayUrls.length > 0 ? relayUrls : BIG_RELAY_URLS, filter)
}
async fetchEventByFilter(filter: Filter) {
@ -154,7 +200,7 @@ class ClientService {
}
private async eventBatchLoadFn(ids: readonly string[]) {
const events = await this.fetchEvents(this.relayUrls, {
const events = await this.fetchEvents(BIG_RELAY_URLS, {
ids: ids as string[],
limit: ids.length
})
@ -163,25 +209,11 @@ class ClientService {
eventsMap.set(event.id, event)
}
const missingIds = ids.filter((id) => !eventsMap.has(id))
if (missingIds.length > 0) {
const missingEvents = await this.fetchEvents(
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url)),
{
ids: missingIds,
limit: missingIds.length
}
)
for (const event of missingEvents) {
eventsMap.set(event.id, event)
}
}
return ids.map((id) => eventsMap.get(id))
}
private async profileBatchLoadFn(pubkeys: readonly string[]) {
const events = await this.fetchEvents(this.relayUrls, {
const events = await this.fetchEvents(BIG_RELAY_URLS, {
authors: pubkeys as string[],
kinds: [kinds.Metadata],
limit: pubkeys.length
@ -195,25 +227,6 @@ class ClientService {
}
}
const missingPubkeys = pubkeys.filter((pubkey) => !eventsMap.has(pubkey))
if (missingPubkeys.length > 0) {
const missingEvents = await this.fetchEvents(
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url)),
{
authors: missingPubkeys,
kinds: [kinds.Metadata],
limit: missingPubkeys.length
}
)
for (const event of missingEvents) {
const pubkey = event.pubkey
const existing = eventsMap.get(pubkey)
if (!existing || existing.created_at < event.created_at) {
eventsMap.set(pubkey, event)
}
}
}
return pubkeys.map((pubkey) => {
const event = eventsMap.get(pubkey)
return event ? this.parseProfileFromEvent(event) : undefined
@ -221,7 +234,7 @@ class ClientService {
}
private async relayListBatchLoadFn(pubkeys: readonly string[]) {
const events = await this.fetchEvents(BIG_RELAY_URLS.concat(this.relayUrls), {
const events = await this.fetchEvents(BIG_RELAY_URLS, {
authors: pubkeys as string[],
kinds: [kinds.RelayList],
limit: pubkeys.length