| skipped 18 lines |
19 | 19 | | isInferredIndex, |
20 | 20 | | waitInSec, |
21 | 21 | | } from './utils'; |
22 | | - | import { isStixExportableData } from '../schema/stixCoreObject'; |
| 22 | + | import { INTERNAL_EXPORTABLE_TYPES, isStixExportableData } from '../schema/stixCoreObject'; |
23 | 23 | | import { DatabaseError, FunctionalError, UnsupportedError } from '../config/errors'; |
24 | 24 | | import { now, utcDate } from '../utils/format'; |
25 | 25 | | import RedisStore, { REDIS_PREFIX } from './sessionStore-redis'; |
| skipped 5 lines |
31 | 31 | | import type { |
32 | 32 | | CreateEventOpts, |
33 | 33 | | DeleteEvent, |
34 | | - | DeleteEventOpts, |
35 | 34 | | Event, |
| 35 | + | EventOpts, |
36 | 36 | | MergeEvent, |
37 | 37 | | StreamEvent, |
38 | 38 | | UpdateEvent, |
| skipped 1 lines |
40 | 40 | | } from '../types/event'; |
41 | 41 | | import type { StixCoreObject } from '../types/stix-common'; |
42 | 42 | | import type { EditContext } from '../generated/graphql'; |
43 | | - | import { RELATION_PARTICIPATE_TO } from '../schema/internalRelationship'; |
44 | 43 | | import { telemetry } from '../config/tracing'; |
45 | 44 | | |
46 | 45 | | const USE_SSL = booleanConf('redis:use_ssl', false); |
| skipped 7 lines |
54 | 53 | | const REDIS_EXPIRE_TIME = 90; |
55 | 54 | | const MAX_RETRY_COMMAND = 10; |
56 | 55 | | |
57 | | - | const allowedInternalEventTypes = [RELATION_PARTICIPATE_TO]; |
58 | | - | const isStreamPublishable = (instance: StoreObject) => { |
| 56 | + | const isStreamPublishable = (instance: StoreObject, opts: EventOpts) => { |
59 | 57 | | const isInferredInstance = isInferredIndex(instance._index); |
60 | 58 | | if (isInferredInstance && !INCLUDE_INFERENCES) return false; |
61 | | - | return isStixExportableData(instance) || allowedInternalEventTypes.includes(instance.entity_type); |
| 59 | + | return opts.publishStreamEvent === undefined || opts.publishStreamEvent; |
62 | 60 | | }; |
63 | 61 | | |
64 | 62 | | const redisOptions = (database: number): RedisOptions => ({ |
| skipped 320 lines |
385 | 383 | | }); |
386 | 384 | | return cmdArgs; |
387 | 385 | | }; |
388 | | - | const isDataStreamable = (instance: StoreObject, opts: { publishStreamEvent?: boolean }) => { |
389 | | - | return isStixExportableData(instance) && (opts.publishStreamEvent === undefined || opts.publishStreamEvent); |
390 | | - | }; |
391 | | - | const pushToStream = async (context: AuthContext, user: AuthUser, client: Redis, instance: StoreObject, event: Event) => { |
392 | | - | if (isStreamPublishable(instance)) { |
| 386 | + | const pushToStream = async (context: AuthContext, user: AuthUser, client: Redis, instance: StoreObject, event: Event, opts: EventOpts) => { |
| 387 | + | if (isStreamPublishable(instance, opts)) { |
393 | 388 | | const pushToStreamFn = async () => { |
394 | 389 | | if (streamTrimming) { |
395 | 390 | | await client.call('XADD', REDIS_STREAM_NAME, 'MAXLEN', '~', streamTrimming, '*', ...mapJSToStream(event)); |
| skipped 8 lines |
404 | 399 | | }; |
405 | 400 | | |
406 | 401 | | // Merge |
407 | | - | const buildMergeEvent = (user: AuthUser, previous: StoreObject, instance: StoreObject, sourceEntities: Array<StoreObject>, impacts: any): MergeEvent => { |
| 402 | + | interface MergeImpacts { |
| 403 | + | updatedRelations: Array<string>; |
| 404 | + | dependencyDeletions: Array<StoreObject>; |
| 405 | + | } |
| 406 | + | const buildMergeEvent = (user: AuthUser, previous: StoreObject, instance: StoreObject, sourceEntities: Array<StoreObject>, impacts: MergeImpacts): MergeEvent => { |
408 | 407 | | const message = generateMergeMessage(instance, sourceEntities); |
409 | 408 | | const { updatedRelations, dependencyDeletions } = impacts; |
410 | 409 | | const previousStix = convertStoreToStix(previous) as StixCoreObject; |
| skipped 1 lines |
412 | 411 | | return { |
413 | 412 | | version: EVENT_CURRENT_VERSION, |
414 | 413 | | type: EVENT_TYPE_MERGE, |
415 | | - | scope: 'external', |
| 414 | + | scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external', |
416 | 415 | | message, |
417 | 416 | | origin: user.origin, |
418 | 417 | | data: currentStix, |
| skipped 12 lines |
431 | 430 | | initialInstance: StoreObject, |
432 | 431 | | mergedInstance: StoreObject, |
433 | 432 | | sourceEntities: Array<StoreObject>, |
434 | | - | impacts: any |
| 433 | + | impacts: MergeImpacts, |
| 434 | + | opts: EventOpts, |
435 | 435 | | ) => { |
436 | 436 | | try { |
437 | 437 | | const event = buildMergeEvent(user, initialInstance, mergedInstance, sourceEntities, impacts); |
438 | | - | await pushToStream(context, user, clientBase, mergedInstance, event); |
| 438 | + | await pushToStream(context, user, clientBase, mergedInstance, event, opts); |
439 | 439 | | return event; |
440 | 440 | | } catch (e) { |
441 | 441 | | throw DatabaseError('Error in store merge event', { error: e }); |
| skipped 15 lines |
457 | 457 | | return { |
458 | 458 | | version: EVENT_CURRENT_VERSION, |
459 | 459 | | type: EVENT_TYPE_UPDATE, |
460 | | - | scope: isDataStreamable(instance, opts) ? 'external' : 'internal', |
| 460 | + | scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external', |
461 | 461 | | message, |
462 | 462 | | origin: user.origin, |
463 | 463 | | data: stix, |
| skipped 8 lines |
472 | 472 | | try { |
473 | 473 | | if (isStixExportableData(instance)) { |
474 | 474 | | const event = buildUpdateEvent(user, previous, instance, message, opts); |
475 | | - | if (opts.publishStreamEvent === undefined || opts.publishStreamEvent) { |
476 | | - | await pushToStream(context, user, clientBase, instance, event); |
477 | | - | } |
| 475 | + | await pushToStream(context, user, clientBase, instance, event, opts); |
478 | 476 | | return event; |
479 | 477 | | } |
480 | 478 | | return undefined; |
| skipped 2 lines |
483 | 481 | | } |
484 | 482 | | }; |
485 | 483 | | // Create |
486 | | - | export const buildCreateEvent = (user: AuthUser, instance: StoreObject, message: string, opts: CreateEventOpts): Event => { |
| 484 | + | export const buildCreateEvent = (user: AuthUser, instance: StoreObject, message: string): Event => { |
487 | 485 | | const stix = convertStoreToStix(instance) as StixCoreObject; |
488 | 486 | | return { |
489 | 487 | | version: EVENT_CURRENT_VERSION, |
490 | 488 | | type: EVENT_TYPE_CREATE, |
491 | | - | scope: isDataStreamable(instance, opts) ? 'external' : 'internal', |
| 489 | + | scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external', |
492 | 490 | | message, |
493 | 491 | | origin: user.origin, |
494 | 492 | | data: stix, |
| skipped 4 lines |
499 | 497 | | if (isStixExportableData(instance)) { |
500 | 498 | | const { withoutMessage = false } = opts; |
501 | 499 | | const message = withoutMessage ? '-' : generateCreateMessage(instance); |
502 | | - | const event = buildCreateEvent(user, instance, message, opts); |
503 | | - | await pushToStream(context, user, clientBase, instance, event); |
| 500 | + | const event = buildCreateEvent(user, instance, message); |
| 501 | + | await pushToStream(context, user, clientBase, instance, event, opts); |
504 | 502 | | return event; |
505 | 503 | | } |
506 | 504 | | return undefined; |
| skipped 4 lines |
511 | 509 | | export const storeCreateEntityEvent = async (context: AuthContext, user: AuthUser, instance: StoreObject, message: string, opts: CreateEventOpts = {}) => { |
512 | 510 | | try { |
513 | 511 | | if (isStixExportableData(instance)) { |
514 | | - | const event = buildCreateEvent(user, instance, message, opts); |
515 | | - | if (opts.publishStreamEvent === undefined || opts.publishStreamEvent) { |
516 | | - | await pushToStream(context, user, clientBase, instance, event); |
517 | | - | } |
| 512 | + | const event = buildCreateEvent(user, instance, message); |
| 513 | + | await pushToStream(context, user, clientBase, instance, event, opts); |
518 | 514 | | return event; |
519 | 515 | | } |
520 | 516 | | return undefined; |
| skipped 8 lines |
529 | 525 | | instance: StoreObject, |
530 | 526 | | message: string, |
531 | 527 | | deletions: Array<StoreObject>, |
532 | | - | opts: DeleteEventOpts |
533 | 528 | | ): Promise<DeleteEvent> => { |
534 | 529 | | const stix = convertStoreToStix(instance) as StixCoreObject; |
535 | 530 | | return { |
536 | 531 | | version: EVENT_CURRENT_VERSION, |
537 | 532 | | type: EVENT_TYPE_DELETE, |
538 | | - | scope: isDataStreamable(instance, opts) ? 'external' : 'internal', |
| 533 | + | scope: INTERNAL_EXPORTABLE_TYPES.includes(instance.entity_type) ? 'internal' : 'external', |
539 | 534 | | message, |
540 | 535 | | origin: user.origin, |
541 | 536 | | data: stix, |
| skipped 2 lines |
544 | 539 | | } |
545 | 540 | | }; |
546 | 541 | | }; |
547 | | - | export const storeDeleteEvent = async (context: AuthContext, user: AuthUser, instance: StoreObject, deletions: Array<StoreObject>, opts: DeleteEventOpts = {}) => { |
| 542 | + | export const storeDeleteEvent = async (context: AuthContext, user: AuthUser, instance: StoreObject, deletions: Array<StoreObject>, opts: EventOpts = {}) => { |
548 | 543 | | try { |
549 | 544 | | if (isStixExportableData(instance)) { |
550 | 545 | | const message = generateDeleteMessage(instance); |
551 | | - | const event = await buildDeleteEvent(user, instance, message, deletions, opts); |
552 | | - | await pushToStream(context, user, clientBase, instance, event); |
| 546 | + | const event = await buildDeleteEvent(user, instance, message, deletions); |
| 547 | + | await pushToStream(context, user, clientBase, instance, event, opts); |
553 | 548 | | return event; |
554 | 549 | | } |
555 | 550 | | return undefined; |
| skipped 151 lines |