refactor
This commit is contained in:
parent
9beaffb272
commit
bfc07545b3
28 changed files with 665 additions and 608 deletions
|
|
@ -1,55 +1,53 @@
|
|||
import { TRelayGroup } from '@common/types'
|
||||
import { TEventStats } from '@renderer/types'
|
||||
import { formatPubkey } from '@renderer/lib/pubkey'
|
||||
import { TEventStats, TProfile } from '@renderer/types'
|
||||
import DataLoader from 'dataloader'
|
||||
import { LRUCache } from 'lru-cache'
|
||||
import { Filter, kinds, Event as NEvent, SimplePool } from 'nostr-tools'
|
||||
import { EVENT_TYPES, eventBus } from './event-bus.service'
|
||||
import storage from './storage.service'
|
||||
|
||||
const BIG_RELAY_URLS = [
|
||||
'wss://relay.damus.io/',
|
||||
'wss://nos.lol/',
|
||||
'wss://relay.nostr.band/',
|
||||
'wss://relay.noswhere.com/'
|
||||
]
|
||||
|
||||
class ClientService {
|
||||
static instance: ClientService
|
||||
|
||||
private pool = new SimplePool()
|
||||
private relayUrls: string[] = BIG_RELAY_URLS
|
||||
private initPromise!: Promise<void>
|
||||
private relayUrls: string[] = []
|
||||
private cache = new LRUCache<string, NEvent>({
|
||||
max: 10000,
|
||||
fetchMethod: async (filter) => this.fetchEvent(JSON.parse(filter))
|
||||
})
|
||||
|
||||
// Event cache
|
||||
private eventsCache = new LRUCache<string, Promise<NEvent | undefined>>({
|
||||
max: 10000,
|
||||
ttl: 1000 * 60 * 10 // 10 minutes
|
||||
})
|
||||
private fetchEventQueue = new Map<
|
||||
string,
|
||||
{
|
||||
resolve: (value: NEvent | undefined) => void
|
||||
reject: (reason: any) => void
|
||||
}
|
||||
>()
|
||||
private fetchEventTimer: NodeJS.Timeout | null = null
|
||||
|
||||
// Event stats cache
|
||||
private eventStatsCache = new LRUCache<string, Promise<TEventStats>>({
|
||||
max: 10000,
|
||||
ttl: 1000 * 60 * 10, // 10 minutes
|
||||
fetchMethod: async (id) => this._fetchEventStatsById(id)
|
||||
})
|
||||
|
||||
// Profile cache
|
||||
private profilesCache = new LRUCache<string, Promise<NEvent | undefined>>({
|
||||
private eventCache = new LRUCache<string, Promise<NEvent | undefined>>({
|
||||
max: 10000,
|
||||
ttl: 1000 * 60 * 10 // 10 minutes
|
||||
})
|
||||
private fetchProfileQueue = new Map<
|
||||
string,
|
||||
{
|
||||
resolve: (value: NEvent | undefined) => void
|
||||
reject: (reason: any) => void
|
||||
fetchMethod: async (filterStr) => {
|
||||
const [event] = await this.fetchEvents(JSON.parse(filterStr))
|
||||
return event
|
||||
}
|
||||
>()
|
||||
private fetchProfileTimer: NodeJS.Timeout | null = null
|
||||
})
|
||||
|
||||
private eventDataloader = new DataLoader<string, NEvent | undefined>(
|
||||
this.eventBatchLoadFn.bind(this),
|
||||
{
|
||||
cacheMap: new LRUCache<string, Promise<NEvent | undefined>>({ max: 10000 })
|
||||
}
|
||||
)
|
||||
|
||||
private profileDataloader = new DataLoader<string, TProfile | undefined>(
|
||||
this.profileBatchLoadFn.bind(this),
|
||||
{
|
||||
cacheMap: new LRUCache<string, Promise<TProfile | undefined>>({ max: 10000 })
|
||||
}
|
||||
)
|
||||
|
||||
constructor() {
|
||||
if (!ClientService.instance) {
|
||||
|
|
@ -69,12 +67,6 @@ class ClientService {
|
|||
|
||||
onRelayGroupsChange(relayGroups: TRelayGroup[]) {
|
||||
const newRelayUrls = relayGroups.find((group) => group.isActive)?.relayUrls ?? []
|
||||
if (
|
||||
newRelayUrls.length === this.relayUrls.length &&
|
||||
newRelayUrls.every((url) => this.relayUrls.includes(url))
|
||||
) {
|
||||
return
|
||||
}
|
||||
this.relayUrls = newRelayUrls
|
||||
}
|
||||
|
||||
|
|
@ -82,70 +74,40 @@ class ClientService {
|
|||
return this.pool.listConnectionStatus()
|
||||
}
|
||||
|
||||
async fetchEvents(filters: Filter[]) {
|
||||
await this.initPromise
|
||||
return new Promise<NEvent[]>((resolve) => {
|
||||
const events: NEvent[] = []
|
||||
this.pool.subscribeManyEose(this.relayUrls, filters, {
|
||||
onevent(event) {
|
||||
events.push(event)
|
||||
},
|
||||
onclose() {
|
||||
resolve(events)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fetchEventWithCache(filter: Filter) {
|
||||
return this.cache.fetch(JSON.stringify(filter))
|
||||
}
|
||||
|
||||
async fetchEvent(filter: Filter) {
|
||||
const events = await this.fetchEvents([{ ...filter, limit: 1 }])
|
||||
return events.length ? events[0] : undefined
|
||||
}
|
||||
|
||||
async fetchEventById(id: string): Promise<NEvent | undefined> {
|
||||
const cache = this.eventsCache.get(id)
|
||||
if (cache) {
|
||||
return cache
|
||||
subscribeEvents(
|
||||
urls: string[],
|
||||
filter: Filter,
|
||||
opts: {
|
||||
onEose: (events: NEvent[]) => void
|
||||
onNew: (evt: NEvent) => void
|
||||
}
|
||||
|
||||
const promise = new Promise<NEvent | undefined>((resolve, reject) => {
|
||||
this.fetchEventQueue.set(id, { resolve, reject })
|
||||
if (this.fetchEventTimer) {
|
||||
return
|
||||
}
|
||||
|
||||
this.fetchEventTimer = setTimeout(async () => {
|
||||
this.fetchEventTimer = null
|
||||
const queue = new Map(this.fetchEventQueue)
|
||||
this.fetchEventQueue.clear()
|
||||
|
||||
try {
|
||||
const ids = Array.from(queue.keys())
|
||||
const events = await this.fetchEvents([{ ids, limit: ids.length }])
|
||||
for (const event of events) {
|
||||
queue.get(event.id)?.resolve(event)
|
||||
queue.delete(event.id)
|
||||
}
|
||||
|
||||
for (const [, job] of queue) {
|
||||
job.resolve(undefined)
|
||||
}
|
||||
queue.clear()
|
||||
} catch (err) {
|
||||
for (const [id, job] of queue) {
|
||||
this.eventsCache.delete(id)
|
||||
job.reject(err)
|
||||
}
|
||||
) {
|
||||
console.log('subscribeEvents', urls, filter)
|
||||
const events: NEvent[] = []
|
||||
let eose = false
|
||||
return this.pool.subscribeMany(urls, [filter], {
|
||||
onevent: (evt) => {
|
||||
if (eose) {
|
||||
opts.onNew(evt)
|
||||
} else {
|
||||
events.push(evt)
|
||||
}
|
||||
}, 20)
|
||||
},
|
||||
oneose: () => {
|
||||
eose = true
|
||||
opts.onEose(events.sort((a, b) => b.created_at - a.created_at))
|
||||
},
|
||||
onclose: () => {
|
||||
if (!eose) {
|
||||
opts.onEose(events.sort((a, b) => b.created_at - a.created_at))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
this.eventsCache.set(id, promise)
|
||||
return promise
|
||||
async fetchEvents(filter: Filter, relayUrls: string[] = this.relayUrls) {
|
||||
await this.initPromise
|
||||
return await this.pool.querySync(relayUrls, filter)
|
||||
}
|
||||
|
||||
async fetchEventStatsById(id: string): Promise<TEventStats> {
|
||||
|
|
@ -153,70 +115,116 @@ class ClientService {
|
|||
return stats ?? { reactionCount: 0, repostCount: 0 }
|
||||
}
|
||||
|
||||
async fetchEventByFilter(filter: Filter) {
|
||||
return this.eventCache.fetch(JSON.stringify({ ...filter, limit: 1 }))
|
||||
}
|
||||
|
||||
async fetchEventById(id: string): Promise<NEvent | undefined> {
|
||||
return this.eventDataloader.load(id)
|
||||
}
|
||||
|
||||
async fetchProfile(pubkey: string): Promise<TProfile | undefined> {
|
||||
return this.profileDataloader.load(pubkey)
|
||||
}
|
||||
|
||||
private async _fetchEventStatsById(id: string) {
|
||||
const [reactionEvents, repostEvents] = await Promise.all([
|
||||
this.fetchEvents([{ '#e': [id], kinds: [kinds.Reaction] }]),
|
||||
this.fetchEvents([{ '#e': [id], kinds: [kinds.Repost] }])
|
||||
this.fetchEvents({ '#e': [id], kinds: [kinds.Reaction] }),
|
||||
this.fetchEvents({ '#e': [id], kinds: [kinds.Repost] })
|
||||
])
|
||||
|
||||
return { reactionCount: reactionEvents.length, repostCount: repostEvents.length }
|
||||
}
|
||||
|
||||
async fetchProfile(pubkey: string): Promise<NEvent | undefined> {
|
||||
const cache = this.profilesCache.get(pubkey)
|
||||
if (cache) {
|
||||
return cache
|
||||
private async eventBatchLoadFn(ids: readonly string[]) {
|
||||
const events = await this.fetchEvents({
|
||||
ids: ids as string[],
|
||||
limit: ids.length
|
||||
})
|
||||
const eventsMap = new Map<string, NEvent>()
|
||||
for (const event of events) {
|
||||
eventsMap.set(event.id, event)
|
||||
}
|
||||
|
||||
const promise = new Promise<NEvent | undefined>((resolve, reject) => {
|
||||
this.fetchProfileQueue.set(pubkey, { resolve, reject })
|
||||
if (this.fetchProfileTimer) {
|
||||
return
|
||||
const missingIds = ids.filter((id) => !eventsMap.has(id))
|
||||
if (missingIds.length > 0) {
|
||||
const missingEvents = await this.fetchEvents(
|
||||
{
|
||||
ids: missingIds,
|
||||
limit: missingIds.length
|
||||
},
|
||||
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url))
|
||||
)
|
||||
for (const event of missingEvents) {
|
||||
eventsMap.set(event.id, event)
|
||||
}
|
||||
}
|
||||
|
||||
this.fetchProfileTimer = setTimeout(async () => {
|
||||
this.fetchProfileTimer = null
|
||||
const queue = new Map(this.fetchProfileQueue)
|
||||
this.fetchProfileQueue.clear()
|
||||
return ids.map((id) => eventsMap.get(id))
|
||||
}
|
||||
|
||||
try {
|
||||
const pubkeys = Array.from(queue.keys())
|
||||
const events = await this.fetchEvents([
|
||||
{
|
||||
authors: pubkeys,
|
||||
kinds: [0],
|
||||
limit: pubkeys.length
|
||||
}
|
||||
])
|
||||
const eventsMap = new Map<string, NEvent>()
|
||||
for (const event of events) {
|
||||
const pubkey = event.pubkey
|
||||
const existing = eventsMap.get(pubkey)
|
||||
if (!existing || existing.created_at < event.created_at) {
|
||||
eventsMap.set(pubkey, event)
|
||||
}
|
||||
}
|
||||
|
||||
for (const [pubkey, job] of queue) {
|
||||
const event = eventsMap.get(pubkey)
|
||||
if (event) {
|
||||
job.resolve(event)
|
||||
} else {
|
||||
job.resolve(undefined)
|
||||
}
|
||||
queue.delete(pubkey)
|
||||
}
|
||||
} catch (err) {
|
||||
for (const [pubkey, job] of queue) {
|
||||
this.profilesCache.delete(pubkey)
|
||||
job.reject(err)
|
||||
}
|
||||
}
|
||||
}, 20)
|
||||
private async profileBatchLoadFn(pubkeys: readonly string[]) {
|
||||
const events = await this.fetchEvents({
|
||||
authors: pubkeys as string[],
|
||||
kinds: [kinds.Metadata],
|
||||
limit: pubkeys.length
|
||||
})
|
||||
const eventsMap = new Map<string, NEvent>()
|
||||
for (const event of events) {
|
||||
const pubkey = event.pubkey
|
||||
const existing = eventsMap.get(pubkey)
|
||||
if (!existing || existing.created_at < event.created_at) {
|
||||
eventsMap.set(pubkey, event)
|
||||
}
|
||||
}
|
||||
|
||||
this.profilesCache.set(pubkey, promise)
|
||||
return promise
|
||||
const missingPubkeys = pubkeys.filter((pubkey) => !eventsMap.has(pubkey))
|
||||
if (missingPubkeys.length > 0) {
|
||||
const missingEvents = await this.fetchEvents(
|
||||
{
|
||||
authors: missingPubkeys,
|
||||
kinds: [kinds.Metadata],
|
||||
limit: missingPubkeys.length
|
||||
},
|
||||
BIG_RELAY_URLS.filter((url) => !this.relayUrls.includes(url))
|
||||
)
|
||||
for (const event of missingEvents) {
|
||||
const pubkey = event.pubkey
|
||||
const existing = eventsMap.get(pubkey)
|
||||
if (!existing || existing.created_at < event.created_at) {
|
||||
eventsMap.set(pubkey, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pubkeys.map((pubkey) => {
|
||||
const event = eventsMap.get(pubkey)
|
||||
return event ? this.parseProfileFromEvent(event) : undefined
|
||||
})
|
||||
}
|
||||
|
||||
private parseProfileFromEvent(event: NEvent): TProfile {
|
||||
try {
|
||||
const profileObj = JSON.parse(event.content)
|
||||
return {
|
||||
pubkey: event.pubkey,
|
||||
banner: profileObj.banner,
|
||||
avatar: profileObj.picture,
|
||||
username:
|
||||
profileObj.display_name?.trim() ||
|
||||
profileObj.name?.trim() ||
|
||||
profileObj.nip05?.split('@')[0]?.trim() ||
|
||||
formatPubkey(event.pubkey),
|
||||
nip05: profileObj.nip05,
|
||||
about: profileObj.about
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
return {
|
||||
pubkey: event.pubkey,
|
||||
username: formatPubkey(event.pubkey)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue