Skip to content

Commit

Permalink
feat: mos gateway statuses
Browse files Browse the repository at this point in the history
  • Loading branch information
Julusian committed Dec 17, 2024
1 parent 10240cb commit f29a11b
Show file tree
Hide file tree
Showing 35 changed files with 1,503 additions and 721 deletions.
3 changes: 2 additions & 1 deletion meteor/server/collections/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ export interface AsyncOnlyReadOnlyMongoCollection<DBInterface extends { _id: Pro
observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle>

/**
Expand Down
7 changes: 4 additions & 3 deletions meteor/server/collections/implementations/asyncCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
async observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle> {
const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`)
if (span) {
Expand All @@ -152,8 +153,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
}
try {
const res = await this._collection
.find((selector ?? {}) as any, options as any)
.observeChangesAsync(callbacks)
.find((selector ?? {}) as any, findOptions as any)
.observeChangesAsync(callbacks, callbackOptions)
if (span) span.end()
return res
} catch (e) {
Expand Down
1 change: 1 addition & 0 deletions meteor/server/publications/_publications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import './lib/lib'

import './buckets'
import './blueprintUpgradeStatus/publication'
import './ingestStatus/publication'
import './packageManager/expectedPackages/publication'
import './packageManager/packageContainers'
import './packageManager/playoutContext'
Expand Down
132 changes: 132 additions & 0 deletions meteor/server/publications/ingestStatus/createIngestRundownStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import type { RundownId, PartId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
import type { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import {
IngestRundownStatus,
IngestPartPlaybackStatus,
IngestRundownActiveStatus,
IngestPartStatus,
} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
import type { ReadonlyDeep } from 'type-fest'
import _ from 'underscore'
import type { ContentCache, PartFields, PartInstanceFields, PlaylistFields } from './reactiveContentCache'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection'
import { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances'
import { IngestPart } from '@sofie-automation/blueprints-integration'
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'

export function createIngestRundownStatus(
cache: ReadonlyDeep<ContentCache>,
rundownId: RundownId
): IngestRundownStatus | null {
const rundown = cache.Rundowns.findOne(rundownId)
if (!rundown) return null

const newDoc: IngestRundownStatus = {
_id: rundownId,
externalId: rundown.externalId,

active: IngestRundownActiveStatus.INACTIVE,

segments: [],
}

const playlist = cache.Playlists.findOne({
_id: rundown.playlistId,
activationId: { $exists: true },
})

if (playlist) {
newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE
}

// Find the most important part instance for each part
const partInstanceMap = findPartInstanceForEachPart(playlist, rundownId, cache.PartInstances)

const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch()
for (const nrcsSegment of nrcsSegments) {
const nrcsParts = cache.NrcsIngestData.find({
rundownId,
segmentId: nrcsSegment.segmentId,
type: NrcsIngestCacheType.PART,
}).fetch()

newDoc.segments.push({
externalId: nrcsSegment.data.externalId,
parts: _.compact(
nrcsParts.map((nrcsPart) => {
if (!nrcsPart.partId || !nrcsPart.segmentId) return null

const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId })
const partInstance = partInstanceMap.get(nrcsPart.partId)

return createIngestPartStatus(playlist, partInstance, part, nrcsPart.data as IngestPart)
})
),
})
}

return newDoc
}

function findPartInstanceForEachPart(
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
rundownId: RundownId,
partInstancesCache: ReadonlyDeep<ReactiveCacheCollection<Pick<PartInstance, PartInstanceFields>>>
) {
const partInstanceMap = new Map<PartId, Pick<DBPartInstance, PartInstanceFields>>()
if (!playlist) return partInstanceMap

for (const partInstance of partInstancesCache.find({}).fetch()) {
if (partInstance.rundownId !== rundownId) continue
// Ignore the next partinstance
if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue

// The current part instance is the most important
if (partInstance._id === playlist.currentPartInfo?.partInstanceId) {
partInstanceMap.set(partInstance.part._id, partInstance)
continue
}

// Take the part with the highest takeCount
const existingEntry = partInstanceMap.get(partInstance.part._id)
if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) {
partInstanceMap.set(partInstance.part._id, partInstance)
}
}

return partInstanceMap
}

function createIngestPartStatus(
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
partInstance: Pick<PartInstance, PartInstanceFields> | undefined,
part: Pick<DBPart, PartFields> | undefined,
ingestPart: IngestPart
): IngestPartStatus {
// Determine the playback status from the PartInstance
let playbackStatus = IngestPartPlaybackStatus.UNKNOWN
if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) {
const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id

if (isCurrentPartInstance) {
// If the current, it is playing
playbackStatus = IngestPartPlaybackStatus.PLAY
} else {
// If not the current, but has been played, it is stopped
playbackStatus = IngestPartPlaybackStatus.STOP
}
}

// Determine the ready status from the PartInstance or Part
const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady

return {
externalId: ingestPart.externalId,

isReady: isReady ?? null,

playbackStatus,
}
}
195 changes: 195 additions & 0 deletions meteor/server/publications/ingestStatus/publication.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { ReadonlyDeep } from 'type-fest'
import {
CustomPublishCollection,
meteorCustomPublish,
setUpCollectionOptimizedObserver,
SetupObserversResult,
TriggerUpdate,
} from '../../lib/customPublication'
import { logger } from '../../logging'
import { ContentCache, createReactiveContentCache } from './reactiveContentCache'
import { RundownsObserver } from '../lib/rundownsObserver'
import { RundownContentObserver } from './rundownContentObserver'
import {
PeripheralDevicePubSub,
PeripheralDevicePubSubCollectionsNames,
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
import { checkAccessAndGetPeripheralDevice } from '../../security/check'
import { check } from '../../lib/check'
import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
import { createIngestRundownStatus } from './createIngestRundownStatus'

interface IngestRundownStatusArgs {
readonly deviceId: PeripheralDeviceId
}

export interface IngestRundownStatusState {
contentCache: ReadonlyDeep<ContentCache>
}

interface IngestRundownStatusUpdateProps {
newCache: ContentCache

invalidateRundownIds: RundownId[]
invalidatePlaylistIds: RundownPlaylistId[]
}

async function setupIngestRundownStatusPublicationObservers(
args: ReadonlyDeep<IngestRundownStatusArgs>,
triggerUpdate: TriggerUpdate<IngestRundownStatusUpdateProps>
): Promise<SetupObserversResult> {
const rundownsObserver = await RundownsObserver.createForPeripheralDevice(args.deviceId, async (rundownIds) => {
logger.silly(`Creating new RundownContentObserver`, rundownIds)

// TODO - can this be done cheaper?
const cache = createReactiveContentCache(rundownIds)

// Push update
triggerUpdate({ newCache: cache })

const contentObserver = await RundownContentObserver.create(rundownIds, cache)

const innerQueries = [
cache.Playlists.find({}).observeChanges(
{
added: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
changed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
removed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
},
{ nonMutatingCallbacks: true }
),
cache.Rundowns.find({}).observeChanges(
{
added: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
changed: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
removed: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
},
{ nonMutatingCallbacks: true }
),
cache.Parts.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
cache.PartInstances.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
cache.NrcsIngestData.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
]

return () => {
contentObserver.dispose()

for (const query of innerQueries) {
query.stop()
}
}
})

// Set up observers:
return [rundownsObserver]
}

async function manipulateIngestRundownStatusPublicationData(
_args: IngestRundownStatusArgs,
state: Partial<IngestRundownStatusState>,
collection: CustomPublishCollection<IngestRundownStatus>,
updateProps: Partial<ReadonlyDeep<IngestRundownStatusUpdateProps>> | undefined
): Promise<void> {
// Prepare data for publication:

if (updateProps?.newCache !== undefined) {
state.contentCache = updateProps.newCache ?? undefined
}

if (!state.contentCache) {
// Remove all the notes
collection.remove(null)

return
}

const updateAll = !updateProps || !!updateProps?.newCache
if (updateAll) {
// Remove all the notes
collection.remove(null)

const knownRundownIds = new Set(state.contentCache.RundownIds)

for (const rundownId of knownRundownIds) {
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
if (newDoc) collection.replace(newDoc)
}
} else {
const regenerateForRundownIds = new Set(updateProps.invalidateRundownIds)

// Include anything where the playlist has changed
if (updateProps.invalidatePlaylistIds && updateProps.invalidatePlaylistIds.length > 0) {
const rundownsToUpdate = state.contentCache.Rundowns.find(
{
playlistId: { $in: updateProps.invalidatePlaylistIds },
},
{
projection: {
_id: 1,
},
}
).fetch() as Pick<DBRundown, '_id'>[]

for (const rundown of rundownsToUpdate) {
regenerateForRundownIds.add(rundown._id)
}
}

for (const rundownId of regenerateForRundownIds) {
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
if (newDoc) {
collection.replace(newDoc)
} else {
collection.remove(rundownId)
}
}
}
}

meteorCustomPublish(
PeripheralDevicePubSub.ingestDeviceRundownStatus,
PeripheralDevicePubSubCollectionsNames.ingestRundownStatus,
async function (pub, deviceId: PeripheralDeviceId, token: string | undefined) {
check(deviceId, String)

await checkAccessAndGetPeripheralDevice(deviceId, token, this)

await setUpCollectionOptimizedObserver<
IngestRundownStatus,
IngestRundownStatusArgs,
IngestRundownStatusState,
IngestRundownStatusUpdateProps
>(
`pub_${PeripheralDevicePubSub.ingestDeviceRundownStatus}_${deviceId}`,
{ deviceId },
setupIngestRundownStatusPublicationObservers,
manipulateIngestRundownStatusPublicationData,
pub,
100
)
}
)
Loading

0 comments on commit f29a11b

Please sign in to comment.