feat: notifications
This commit is contained in:
parent
57aa3be645
commit
94b9272042
17 changed files with 447 additions and 30 deletions
|
|
@ -41,6 +41,7 @@ class ClientService {
|
|||
private repliesCache = new LRUCache<string, { refs: [string, number][]; until?: number }>({
|
||||
max: 1000
|
||||
})
|
||||
private notificationsCache: [string, number][] = []
|
||||
private profileCache = new LRUCache<string, Promise<TProfile>>({ max: 10000 })
|
||||
private profileDataloader = new DataLoader<string, TProfile>(
|
||||
(ids) => Promise.all(ids.map((id) => this._fetchProfile(id))),
|
||||
|
|
@ -211,9 +212,8 @@ class ClientService {
|
|||
],
|
||||
{
|
||||
onevent(evt: NEvent) {
|
||||
if (!isReplyNoteEvent(evt)) return
|
||||
|
||||
if (hasEosed) {
|
||||
if (!isReplyNoteEvent(evt)) return
|
||||
onNew(evt)
|
||||
} else {
|
||||
events.push(evt)
|
||||
|
|
@ -222,7 +222,10 @@ class ClientService {
|
|||
},
|
||||
oneose() {
|
||||
hasEosed = true
|
||||
const newReplies = events.sort((a, b) => a.created_at - b.created_at)
|
||||
const newReplies = events
|
||||
.sort((a, b) => a.created_at - b.created_at)
|
||||
.slice(0, limit)
|
||||
.filter(isReplyNoteEvent)
|
||||
replies = replies.concat(newReplies)
|
||||
// first fetch
|
||||
if (!since) {
|
||||
|
|
@ -250,8 +253,87 @@ class ClientService {
|
|||
}
|
||||
}
|
||||
|
||||
async subscribeNotifications(
|
||||
pubkey: string,
|
||||
limit: number,
|
||||
{
|
||||
onNotifications,
|
||||
onNew
|
||||
}: {
|
||||
onNotifications: (events: NEvent[], isCache: boolean) => void
|
||||
onNew: (evt: NEvent) => void
|
||||
}
|
||||
) {
|
||||
let cachedNotifications: NEvent[] = []
|
||||
if (this.notificationsCache.length) {
|
||||
cachedNotifications = (
|
||||
await Promise.all(this.notificationsCache.map(([id]) => this.eventCache.get(id)))
|
||||
).filter(Boolean) as NEvent[]
|
||||
onNotifications(cachedNotifications, true)
|
||||
}
|
||||
const since = this.notificationsCache.length ? this.notificationsCache[0][1] + 1 : undefined
|
||||
|
||||
const relayList = await this.fetchRelayList(pubkey)
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const that = this
|
||||
const events: NEvent[] = []
|
||||
let hasEosed = false
|
||||
let count = 0
|
||||
const closer = this.pool.subscribeMany(
|
||||
relayList.read.length >= 4
|
||||
? relayList.read
|
||||
: relayList.read.concat(this.defaultRelayUrls).slice(0, 4),
|
||||
[
|
||||
{
|
||||
kinds: [kinds.ShortTextNote, kinds.Repost, kinds.Reaction],
|
||||
'#p': [pubkey],
|
||||
limit,
|
||||
since
|
||||
}
|
||||
],
|
||||
{
|
||||
onevent(evt: NEvent) {
|
||||
count++
|
||||
if (hasEosed) {
|
||||
if (evt.pubkey === pubkey) return
|
||||
onNew(evt)
|
||||
} else {
|
||||
events.push(evt)
|
||||
}
|
||||
that.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
},
|
||||
oneose() {
|
||||
hasEosed = true
|
||||
const newNotifications = events
|
||||
.sort((a, b) => b.created_at - a.created_at)
|
||||
.slice(0, limit)
|
||||
.filter((evt) => evt.pubkey !== pubkey)
|
||||
if (count >= limit) {
|
||||
that.notificationsCache = newNotifications.map(
|
||||
(evt) => [evt.id, evt.created_at] as [string, number]
|
||||
)
|
||||
onNotifications(newNotifications, false)
|
||||
} else {
|
||||
that.notificationsCache = [
|
||||
...newNotifications.map((evt) => [evt.id, evt.created_at] as [string, number]),
|
||||
...that.notificationsCache
|
||||
]
|
||||
onNotifications(newNotifications.concat(cachedNotifications), false)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
return () => {
|
||||
onNotifications = () => {}
|
||||
onNew = () => {}
|
||||
closer.close()
|
||||
}
|
||||
}
|
||||
|
||||
async fetchMoreReplies(relayUrls: string[], parentEventId: string, until: number, limit: number) {
|
||||
const events = await this.pool.querySync(relayUrls, {
|
||||
let events = await this.pool.querySync(relayUrls, {
|
||||
'#e': [parentEventId],
|
||||
kinds: [kinds.ShortTextNote],
|
||||
limit,
|
||||
|
|
@ -260,7 +342,7 @@ class ClientService {
|
|||
events.forEach((evt) => {
|
||||
this.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
})
|
||||
events.sort((a, b) => a.created_at - b.created_at)
|
||||
events = events.sort((a, b) => a.created_at - b.created_at).slice(0, limit)
|
||||
const replies = events.filter((evt) => isReplyNoteEvent(evt))
|
||||
let cache = this.repliesCache.get(parentEventId)
|
||||
if (!cache) {
|
||||
|
|
@ -282,6 +364,44 @@ class ClientService {
|
|||
return { replies, until: cache.until }
|
||||
}
|
||||
|
||||
async fetchMoreNotifications(pubkey: string, until: number, limit: number) {
|
||||
const relayList = await this.fetchRelayList(pubkey)
|
||||
const events = await this.pool.querySync(
|
||||
relayList.read.length >= 4
|
||||
? relayList.read
|
||||
: relayList.read.concat(this.defaultRelayUrls).slice(0, 4),
|
||||
{
|
||||
kinds: [kinds.ShortTextNote, kinds.Repost, kinds.Reaction],
|
||||
'#p': [pubkey],
|
||||
limit,
|
||||
until
|
||||
}
|
||||
)
|
||||
events.forEach((evt) => {
|
||||
this.eventDataLoader.prime(evt.id, Promise.resolve(evt))
|
||||
})
|
||||
const notifications = events
|
||||
.sort((a, b) => b.created_at - a.created_at)
|
||||
.slice(0, limit)
|
||||
.filter((evt) => evt.pubkey !== pubkey)
|
||||
|
||||
const cacheLastCreatedAt = this.notificationsCache.length
|
||||
? this.notificationsCache[this.notificationsCache.length - 1][1]
|
||||
: undefined
|
||||
this.notificationsCache = this.notificationsCache.concat(
|
||||
(cacheLastCreatedAt
|
||||
? notifications.filter((evt) => evt.created_at < cacheLastCreatedAt)
|
||||
: notifications
|
||||
).map((evt) => [evt.id, evt.created_at] as [string, number])
|
||||
)
|
||||
|
||||
return notifications
|
||||
}
|
||||
|
||||
clearNotificationsCache() {
|
||||
this.notificationsCache = []
|
||||
}
|
||||
|
||||
async fetchEvents(relayUrls: string[], filter: Filter, cache = false) {
|
||||
const events = await this.pool.querySync(
|
||||
relayUrls.length > 0 ? relayUrls : this.defaultRelayUrls,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue