feat: outbox model (#4)
This commit is contained in:
parent
bd3078bcd0
commit
ead1710392
21 changed files with 171 additions and 59 deletions
|
|
@ -1,11 +1,13 @@
|
|||
import { TRelayGroup } from '@common/types'
|
||||
import { formatPubkey } from '@renderer/lib/pubkey'
|
||||
import { TProfile } from '@renderer/types'
|
||||
import { tagNameEquals } from '@renderer/lib/tag'
|
||||
import { TProfile, TRelayList } from '@renderer/types'
|
||||
import DataLoader from 'dataloader'
|
||||
import { LRUCache } from 'lru-cache'
|
||||
import { Filter, kinds, Event as NEvent, SimplePool } from 'nostr-tools'
|
||||
import { EVENT_TYPES, eventBus } from './event-bus.service'
|
||||
import storage from './storage.service'
|
||||
import { isWebsocketUrl, normalizeUrl } from '@renderer/lib/url'
|
||||
|
||||
const BIG_RELAY_URLS = [
|
||||
'wss://relay.damus.io/',
|
||||
|
|
@ -24,24 +26,31 @@ class ClientService {
|
|||
private eventCache = new LRUCache<string, Promise<NEvent | undefined>>({
|
||||
max: 10000,
|
||||
fetchMethod: async (filterStr) => {
|
||||
const [event] = await this.fetchEvents(JSON.parse(filterStr))
|
||||
const [event] = await this.fetchEvents(
|
||||
BIG_RELAY_URLS.concat(this.relayUrls),
|
||||
JSON.parse(filterStr)
|
||||
)
|
||||
return event
|
||||
}
|
||||
})
|
||||
|
||||
private eventDataloader = new DataLoader<string, NEvent | undefined>(
|
||||
this.eventBatchLoadFn.bind(this),
|
||||
{
|
||||
cacheMap: new LRUCache<string, Promise<NEvent | undefined>>({ max: 10000 })
|
||||
}
|
||||
)
|
||||
|
||||
private profileDataloader = new DataLoader<string, TProfile | undefined>(
|
||||
this.profileBatchLoadFn.bind(this),
|
||||
{
|
||||
cacheMap: new LRUCache<string, Promise<TProfile | undefined>>({ max: 10000 })
|
||||
}
|
||||
)
|
||||
private relayListDataLoader = new DataLoader<string, TRelayList>(
|
||||
this.relayListBatchLoadFn.bind(this),
|
||||
{
|
||||
cacheMap: new LRUCache<string, Promise<TRelayList>>({ max: 10000 })
|
||||
}
|
||||
)
|
||||
|
||||
constructor() {
|
||||
if (!ClientService.instance) {
|
||||
|
|
@ -68,9 +77,8 @@ class ClientService {
|
|||
return this.pool.listConnectionStatus()
|
||||
}
|
||||
|
||||
async publishEvent(event: NEvent) {
|
||||
// TODO: outbox
|
||||
return await Promise.any(this.pool.publish(this.relayUrls, event))
|
||||
async publishEvent(relayUrls: string[], event: NEvent) {
|
||||
return await Promise.any(this.pool.publish(this.relayUrls.concat(relayUrls), event))
|
||||
}
|
||||
|
||||
subscribeEvents(
|
||||
|
|
@ -103,9 +111,10 @@ class ClientService {
|
|||
})
|
||||
}
|
||||
|
||||
async fetchEvents(filter: Filter, relayUrls: string[] = this.relayUrls) {
|
||||
async fetchEvents(relayUrls: string[], filter: Filter) {
|
||||
await this.initPromise
|
||||
return await this.pool.querySync(relayUrls, filter)
|
||||
// If relayUrls is empty, use this.relayUrls
|
||||
return await this.pool.querySync(relayUrls.length > 0 ? relayUrls : this.relayUrls, filter)
|
||||
}
|
||||
|
||||
async fetchEventByFilter(filter: Filter) {
|
||||
|
|
@ -120,8 +129,12 @@ class ClientService {
|
|||
return this.profileDataloader.load(pubkey)
|
||||
}
|
||||
|
||||
async fetchRelayList(pubkey: string): Promise<TRelayList> {
|
||||
return this.relayListDataLoader.load(pubkey)
|
||||
}
|
||||
|
||||
private async eventBatchLoadFn(ids: readonly string[]) {
|
||||
const events = await this.fetchEvents({
|
||||
const events = await this.fetchEvents(this.relayUrls, {
|
||||
ids: ids as string[],
|
||||
limit: ids.length
|
||||
})
|
||||
|
|
@ -133,11 +146,11 @@ class ClientService {
|
|||
const missingIds = ids.filter((id) => !eventsMap.has(id))
|
||||
if (missingIds.length > 0) {
|
||||
const missingEvents = await this.fetchEvents(
|
||||
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url)),
|
||||
{
|
||||
ids: missingIds,
|
||||
limit: missingIds.length
|
||||
},
|
||||
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url))
|
||||
}
|
||||
)
|
||||
for (const event of missingEvents) {
|
||||
eventsMap.set(event.id, event)
|
||||
|
|
@ -148,7 +161,7 @@ class ClientService {
|
|||
}
|
||||
|
||||
private async profileBatchLoadFn(pubkeys: readonly string[]) {
|
||||
const events = await this.fetchEvents({
|
||||
const events = await this.fetchEvents(this.relayUrls, {
|
||||
authors: pubkeys as string[],
|
||||
kinds: [kinds.Metadata],
|
||||
limit: pubkeys.length
|
||||
|
|
@ -165,12 +178,12 @@ class ClientService {
|
|||
const missingPubkeys = pubkeys.filter((pubkey) => !eventsMap.has(pubkey))
|
||||
if (missingPubkeys.length > 0) {
|
||||
const missingEvents = await this.fetchEvents(
|
||||
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url)),
|
||||
{
|
||||
authors: missingPubkeys,
|
||||
kinds: [kinds.Metadata],
|
||||
limit: missingPubkeys.length
|
||||
},
|
||||
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url))
|
||||
}
|
||||
)
|
||||
for (const event of missingEvents) {
|
||||
const pubkey = event.pubkey
|
||||
|
|
@ -187,6 +200,49 @@ class ClientService {
|
|||
})
|
||||
}
|
||||
|
||||
private async relayListBatchLoadFn(pubkeys: readonly string[]) {
|
||||
const events = await this.fetchEvents(BIG_RELAY_URLS.concat(this.relayUrls), {
|
||||
authors: pubkeys as string[],
|
||||
kinds: [kinds.RelayList],
|
||||
limit: pubkeys.length
|
||||
})
|
||||
const eventsMap = new Map<string, NEvent>()
|
||||
for (const event of events) {
|
||||
const pubkey = event.pubkey
|
||||
const existing = eventsMap.get(pubkey)
|
||||
if (!existing || existing.created_at < event.created_at) {
|
||||
eventsMap.set(pubkey, event)
|
||||
}
|
||||
}
|
||||
|
||||
return pubkeys.map((pubkey) => {
|
||||
const event = eventsMap.get(pubkey)
|
||||
const relayList = { write: [], read: [] } as TRelayList
|
||||
if (!event) return relayList
|
||||
|
||||
event.tags.filter(tagNameEquals('r')).forEach(([, url, type]) => {
|
||||
if (!url || !isWebsocketUrl(url)) return
|
||||
|
||||
const normalizedUrl = normalizeUrl(url)
|
||||
switch (type) {
|
||||
case 'w':
|
||||
relayList.write.push(normalizedUrl)
|
||||
break
|
||||
case 'r':
|
||||
relayList.read.push(normalizedUrl)
|
||||
break
|
||||
default:
|
||||
relayList.write.push(normalizedUrl)
|
||||
relayList.read.push(normalizedUrl)
|
||||
}
|
||||
})
|
||||
return {
|
||||
write: relayList.write.slice(0, 3),
|
||||
read: relayList.read.slice(0, 3)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private parseProfileFromEvent(event: NEvent): TProfile {
|
||||
try {
|
||||
const profileObj = JSON.parse(event.content)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue