feat: zap (#107)
This commit is contained in:
parent
407a6fb802
commit
249593d547
72 changed files with 2582 additions and 818 deletions
|
|
@ -17,6 +17,7 @@ import {
|
|||
SimplePool,
|
||||
VerifiedEvent
|
||||
} from 'nostr-tools'
|
||||
import { SubscribeManyParams } from 'nostr-tools/abstract-pool'
|
||||
import { AbstractRelay } from 'nostr-tools/abstract-relay'
|
||||
import indexedDb from './indexed-db.service'
|
||||
|
||||
|
|
@ -44,28 +45,23 @@ class ClientService extends EventTarget {
|
|||
{ cacheMap: this.eventCache }
|
||||
)
|
||||
private fetchEventFromBigRelaysDataloader = new DataLoader<string, NEvent | undefined>(
|
||||
this.eventBatchLoadFn.bind(this),
|
||||
{ cache: false }
|
||||
)
|
||||
private profileEventDataloader = new DataLoader<string, NEvent | undefined>(
|
||||
(ids) => Promise.all(ids.map((id) => this._fetchProfileEvent(id))),
|
||||
{
|
||||
cache: false,
|
||||
maxBatchSize: 50
|
||||
}
|
||||
this.fetchEventsFromBigRelays.bind(this),
|
||||
{ cache: false, batchScheduleFn: (callback) => setTimeout(callback, 200) }
|
||||
)
|
||||
private fetchProfileEventFromBigRelaysDataloader = new DataLoader<string, NEvent | undefined>(
|
||||
this.profileEventBatchLoadFn.bind(this),
|
||||
{
|
||||
batchScheduleFn: (callback) => setTimeout(callback, 200),
|
||||
cacheMap: new LRUCache<string, Promise<NEvent | undefined>>({ max: 1000 }),
|
||||
maxBatchSize: 50
|
||||
maxBatchSize: 20
|
||||
}
|
||||
)
|
||||
private relayListEventDataLoader = new DataLoader<string, NEvent | undefined>(
|
||||
this.relayListEventBatchLoadFn.bind(this),
|
||||
{
|
||||
batchScheduleFn: (callback) => setTimeout(callback, 200),
|
||||
cacheMap: new LRUCache<string, Promise<NEvent | undefined>>({ max: 1000 }),
|
||||
maxBatchSize: 50
|
||||
maxBatchSize: 20
|
||||
}
|
||||
)
|
||||
private followListCache = new LRUCache<string, Promise<NEvent | undefined>>({
|
||||
|
|
@ -166,7 +162,8 @@ class ClientService extends EventTarget {
|
|||
needSort?: boolean
|
||||
} = {}
|
||||
) {
|
||||
const key = this.generateTimelineKey(urls, filter)
|
||||
const relays = Array.from(new Set(urls))
|
||||
const key = this.generateTimelineKey(relays, filter)
|
||||
const timeline = this.timelines[key]
|
||||
let cachedEvents: NEvent[] = []
|
||||
let since: number | undefined
|
||||
|
|
@ -183,7 +180,7 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
|
||||
if (!timeline && needSort) {
|
||||
this.timelines[key] = { refs: [], filter, urls }
|
||||
this.timelines[key] = { refs: [], filter, urls: relays }
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
|
|
@ -193,7 +190,7 @@ class ClientService extends EventTarget {
|
|||
let startedCount = 0
|
||||
let eosedCount = 0
|
||||
let eosed = false
|
||||
const subPromises = urls.map(async (url) => {
|
||||
const subPromises = relays.map(async (url) => {
|
||||
const relay = await this.pool.ensureRelay(url)
|
||||
let hasAuthed = false
|
||||
|
||||
|
|
@ -345,11 +342,19 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
}
|
||||
|
||||
async query(urls: string[], filter: Filter) {
|
||||
subscribe(urls: string[], filter: Filter | Filter[], params: SubscribeManyParams) {
|
||||
const relays = Array.from(new Set(urls))
|
||||
const filters = Array.isArray(filter) ? filter : [filter]
|
||||
return this.pool.subscribeMany(relays, filters, params)
|
||||
}
|
||||
|
||||
private async query(urls: string[], filter: Filter | Filter[], onevent?: (evt: NEvent) => void) {
|
||||
const relays = Array.from(new Set(urls))
|
||||
const filters = Array.isArray(filter) ? filter : [filter]
|
||||
const _knownIds = new Set<string>()
|
||||
const events: NEvent[] = []
|
||||
await Promise.allSettled(
|
||||
urls.map(async (url) => {
|
||||
relays.map(async (url) => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
const relay = await this.pool.ensureRelay(url)
|
||||
|
|
@ -357,7 +362,7 @@ class ClientService extends EventTarget {
|
|||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const startQuery = () => {
|
||||
relay.subscribe([filter], {
|
||||
relay.subscribe(filters, {
|
||||
receivedEvent(relay, id) {
|
||||
that.trackEventSeenOn(id, relay)
|
||||
},
|
||||
|
|
@ -384,6 +389,7 @@ class ClientService extends EventTarget {
|
|||
if (_knownIds.has(evt.id)) return
|
||||
_knownIds.add(evt.id)
|
||||
events.push(evt)
|
||||
onevent?.(evt)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -421,10 +427,22 @@ class ClientService extends EventTarget {
|
|||
return events
|
||||
}
|
||||
|
||||
async fetchEvents(relayUrls: string[], filter: Filter, cache = false) {
|
||||
async fetchEvents(
|
||||
urls: string[],
|
||||
filter: Filter | Filter[],
|
||||
{
|
||||
onevent,
|
||||
cache = false
|
||||
}: {
|
||||
onevent?: (evt: NEvent) => void
|
||||
cache?: boolean
|
||||
} = {}
|
||||
) {
|
||||
const relays = Array.from(new Set(urls))
|
||||
const events = await this.query(
|
||||
relayUrls.length > 0 ? relayUrls : this.currentRelayUrls.concat(BIG_RELAY_URLS),
|
||||
filter
|
||||
relays.length > 0 ? relays : this.currentRelayUrls.concat(BIG_RELAY_URLS),
|
||||
filter,
|
||||
onevent
|
||||
)
|
||||
if (cache) {
|
||||
events.forEach((evt) => {
|
||||
|
|
@ -460,12 +478,70 @@ class ClientService extends EventTarget {
|
|||
this.eventDataLoader.prime(event.id, Promise.resolve(event))
|
||||
}
|
||||
|
||||
async fetchProfileEvent(id: string): Promise<NEvent | undefined> {
|
||||
return await this.profileEventDataloader.load(id)
|
||||
async fetchProfileEvent(id: string, skipCache: boolean = false): Promise<NEvent | undefined> {
|
||||
let pubkey: string | undefined
|
||||
let relays: string[] = []
|
||||
if (/^[0-9a-f]{64}$/.test(id)) {
|
||||
pubkey = id
|
||||
} else {
|
||||
const { data, type } = nip19.decode(id)
|
||||
switch (type) {
|
||||
case 'npub':
|
||||
pubkey = data
|
||||
break
|
||||
case 'nprofile':
|
||||
pubkey = data.pubkey
|
||||
if (data.relays) relays = data.relays
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (!pubkey) {
|
||||
throw new Error('Invalid id')
|
||||
}
|
||||
if (!skipCache) {
|
||||
const localProfile = await indexedDb.getReplaceableEvent(pubkey, kinds.Metadata)
|
||||
if (localProfile) {
|
||||
this.addUsernameToIndex(localProfile)
|
||||
return localProfile
|
||||
}
|
||||
}
|
||||
const profileFromBigRelays = await this.fetchProfileEventFromBigRelaysDataloader.load(pubkey)
|
||||
if (profileFromBigRelays) {
|
||||
this.addUsernameToIndex(profileFromBigRelays)
|
||||
await indexedDb.putReplaceableEvent(profileFromBigRelays)
|
||||
return profileFromBigRelays
|
||||
}
|
||||
|
||||
if (!relays.length) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
const profileEvent = await this.tryHarderToFetchEvent(
|
||||
relays,
|
||||
{
|
||||
authors: [pubkey],
|
||||
kinds: [kinds.Metadata],
|
||||
limit: 1
|
||||
},
|
||||
true
|
||||
)
|
||||
|
||||
if (profileEvent) {
|
||||
this.addUsernameToIndex(profileEvent)
|
||||
indexedDb.putReplaceableEvent(profileEvent)
|
||||
}
|
||||
|
||||
return profileEvent
|
||||
}
|
||||
|
||||
async fetchProfile(id: string): Promise<TProfile | undefined> {
|
||||
const profileEvent = await this.fetchProfileEvent(id)
|
||||
async fetchProfile(id: string, skipCache: boolean = false): Promise<TProfile | undefined> {
|
||||
let profileEvent: NEvent | undefined
|
||||
if (skipCache) {
|
||||
profileEvent = await this.fetchProfileEvent(id, skipCache)
|
||||
} else {
|
||||
profileEvent = await this.fetchProfileEvent(id)
|
||||
}
|
||||
if (profileEvent) {
|
||||
return getProfileFromProfileEvent(profileEvent)
|
||||
}
|
||||
|
|
@ -478,11 +554,6 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
}
|
||||
|
||||
updateProfileCache(event: NEvent) {
|
||||
this.profileEventDataloader.clear(event.pubkey)
|
||||
this.profileEventDataloader.prime(event.pubkey, Promise.resolve(event))
|
||||
}
|
||||
|
||||
async fetchProfiles(relayUrls: string[], filter: Filter): Promise<TProfile[]> {
|
||||
const events = await this.query(relayUrls, {
|
||||
...filter,
|
||||
|
|
@ -490,7 +561,6 @@ class ClientService extends EventTarget {
|
|||
})
|
||||
|
||||
const profileEvents = events.sort((a, b) => b.created_at - a.created_at)
|
||||
profileEvents.forEach((profile) => this.profileEventDataloader.prime(profile.pubkey, profile))
|
||||
await Promise.all(profileEvents.map((profile) => this.addUsernameToIndex(profile)))
|
||||
return profileEvents.map((profileEvent) => getProfileFromProfileEvent(profileEvent))
|
||||
}
|
||||
|
|
@ -519,17 +589,22 @@ class ClientService extends EventTarget {
|
|||
return event
|
||||
}
|
||||
|
||||
async fetchFollowings(pubkey: string) {
|
||||
const followListEvent = await this.fetchFollowListEvent(pubkey)
|
||||
async fetchFollowings(pubkey: string, storeToIndexedDb = false) {
|
||||
const followListEvent = await this.fetchFollowListEvent(pubkey, storeToIndexedDb)
|
||||
return followListEvent ? extractPubkeysFromEventTags(followListEvent.tags) : []
|
||||
}
|
||||
|
||||
updateFollowListCache(pubkey: string, event: NEvent) {
|
||||
this.followListCache.set(pubkey, Promise.resolve(event))
|
||||
updateFollowListCache(event: NEvent) {
|
||||
this.followListCache.set(event.pubkey, Promise.resolve(event))
|
||||
}
|
||||
|
||||
updateRelayListCache(event: NEvent) {
|
||||
this.relayListEventDataLoader.clear(event.pubkey)
|
||||
this.relayListEventDataLoader.prime(event.pubkey, Promise.resolve(event))
|
||||
}
|
||||
|
||||
async calculateOptimalReadRelays(pubkey: string) {
|
||||
const followings = await this.fetchFollowings(pubkey)
|
||||
const followings = await this.fetchFollowings(pubkey, true)
|
||||
const [selfRelayListEvent, ...relayListEvents] = await this.relayListEventDataLoader.loadMany([
|
||||
pubkey,
|
||||
...followings
|
||||
|
|
@ -544,7 +619,6 @@ class ClientService extends EventTarget {
|
|||
pubkeyRelayListMap.set(evt.pubkey, getRelayListFromRelayListEvent(evt).write)
|
||||
}
|
||||
})
|
||||
|
||||
let uncoveredPubkeys = [...followings]
|
||||
const readRelays: { url: string; pubkeys: string[] }[] = []
|
||||
while (uncoveredPubkeys.length) {
|
||||
|
|
@ -571,7 +645,6 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
}
|
||||
if (!maxCoveredRelay) break
|
||||
|
||||
readRelays.push(maxCoveredRelay)
|
||||
uncoveredPubkeys = uncoveredPubkeys.filter(
|
||||
(pubkey) => !maxCoveredRelay!.pubkeys.includes(pubkey)
|
||||
|
|
@ -588,12 +661,13 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
|
||||
async initUserIndexFromFollowings(pubkey: string, signal: AbortSignal) {
|
||||
const followings = await this.fetchFollowings(pubkey)
|
||||
for (let i = 0; i * 50 < followings.length; i++) {
|
||||
const followings = await this.fetchFollowings(pubkey, true)
|
||||
for (let i = 0; i * 20 < followings.length; i++) {
|
||||
if (signal.aborted) return
|
||||
|
||||
await this.profileEventDataloader.loadMany(followings.slice(i * 50, (i + 1) * 50))
|
||||
await new Promise((resolve) => setTimeout(resolve, 30000))
|
||||
await Promise.all(
|
||||
followings.slice(i * 20, (i + 1) * 20).map((pubkey) => this.fetchProfileEvent(pubkey))
|
||||
)
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -665,9 +739,7 @@ class ClientService extends EventTarget {
|
|||
let event: NEvent | undefined
|
||||
if (filter.ids) {
|
||||
event = await this.fetchEventById(relays, filter.ids[0])
|
||||
}
|
||||
|
||||
if (!event) {
|
||||
} else {
|
||||
event = await this.tryHarderToFetchEvent(relays, filter)
|
||||
}
|
||||
|
||||
|
|
@ -678,62 +750,6 @@ class ClientService extends EventTarget {
|
|||
return event
|
||||
}
|
||||
|
||||
private async _fetchProfileEvent(id: string): Promise<NEvent | undefined> {
|
||||
let pubkey: string | undefined
|
||||
let relays: string[] = []
|
||||
if (/^[0-9a-f]{64}$/.test(id)) {
|
||||
pubkey = id
|
||||
} else {
|
||||
const { data, type } = nip19.decode(id)
|
||||
switch (type) {
|
||||
case 'npub':
|
||||
pubkey = data
|
||||
break
|
||||
case 'nprofile':
|
||||
pubkey = data.pubkey
|
||||
if (data.relays) relays = data.relays
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (!pubkey) {
|
||||
throw new Error('Invalid id')
|
||||
}
|
||||
const localProfile = await indexedDb.getReplaceableEvent(pubkey, kinds.Metadata)
|
||||
if (localProfile) {
|
||||
this.addUsernameToIndex(localProfile)
|
||||
return localProfile
|
||||
}
|
||||
const profileFromBigRelays = await this.fetchProfileEventFromBigRelaysDataloader.load(pubkey)
|
||||
if (profileFromBigRelays) {
|
||||
this.addUsernameToIndex(profileFromBigRelays)
|
||||
await indexedDb.putReplaceableEvent(profileFromBigRelays)
|
||||
return profileFromBigRelays
|
||||
}
|
||||
|
||||
const profileEvent = await this.tryHarderToFetchEvent(
|
||||
relays,
|
||||
{
|
||||
authors: [pubkey],
|
||||
kinds: [kinds.Metadata],
|
||||
limit: 1
|
||||
},
|
||||
true
|
||||
)
|
||||
if (pubkey !== id) {
|
||||
this.profileEventDataloader.prime(pubkey, Promise.resolve(profileEvent))
|
||||
}
|
||||
|
||||
if (profileEvent) {
|
||||
await Promise.allSettled([
|
||||
this.addUsernameToIndex(profileEvent),
|
||||
indexedDb.putReplaceableEvent(profileEvent)
|
||||
])
|
||||
}
|
||||
|
||||
return profileEvent
|
||||
}
|
||||
|
||||
private async addUsernameToIndex(profileEvent: NEvent) {
|
||||
try {
|
||||
const profileObj = JSON.parse(profileEvent.content)
|
||||
|
|
@ -772,7 +788,7 @@ class ClientService extends EventTarget {
|
|||
return events.sort((a, b) => b.created_at - a.created_at)[0]
|
||||
}
|
||||
|
||||
private async eventBatchLoadFn(ids: readonly string[]) {
|
||||
private async fetchEventsFromBigRelays(ids: readonly string[]) {
|
||||
const events = await this.query(BIG_RELAY_URLS, {
|
||||
ids: Array.from(new Set(ids)),
|
||||
limit: ids.length
|
||||
|
|
@ -803,10 +819,8 @@ class ClientService extends EventTarget {
|
|||
return eventsMap.get(pubkey)
|
||||
})
|
||||
|
||||
await Promise.allSettled(
|
||||
profileEvents.map(
|
||||
(profileEvent) => profileEvent && indexedDb.putReplaceableEvent(profileEvent)
|
||||
)
|
||||
profileEvents.forEach(
|
||||
(profileEvent) => profileEvent && indexedDb.putReplaceableEvent(profileEvent)
|
||||
)
|
||||
return profileEvents
|
||||
}
|
||||
|
|
@ -830,9 +844,7 @@ class ClientService extends EventTarget {
|
|||
eventsMap.set(pubkey, event)
|
||||
}
|
||||
}
|
||||
await Promise.allSettled(
|
||||
Array.from(eventsMap.values()).map((evt) => indexedDb.putReplaceableEvent(evt))
|
||||
)
|
||||
Array.from(eventsMap.values()).forEach((evt) => indexedDb.putReplaceableEvent(evt))
|
||||
nonExistingPubkeys.forEach((pubkey) => {
|
||||
const event = eventsMap.get(pubkey)
|
||||
if (event) {
|
||||
|
|
@ -846,6 +858,11 @@ class ClientService extends EventTarget {
|
|||
}
|
||||
|
||||
private async _fetchFollowListEvent(pubkey: string) {
|
||||
const storedFollowListEvent = await indexedDb.getReplaceableEvent(pubkey, kinds.Contacts)
|
||||
if (storedFollowListEvent) {
|
||||
return storedFollowListEvent
|
||||
}
|
||||
|
||||
const relayList = await this.fetchRelayList(pubkey)
|
||||
const followListEvents = await this.query(relayList.write.concat(BIG_RELAY_URLS), {
|
||||
authors: [pubkey],
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue