Projects STRLCPY opencti Commits acbcf36d
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/config/default.json
    skipped 134 lines
    135 135   "use_aws_role": false
    136 136   },
    137 137   "rabbitmq": {
     138 + "queue_prefix": "",
    138 139   "hostname": "localhost",
    139 140   "vhost": "/",
    140 141   "use_ssl": false,
    skipped 24 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/config/test.json
    1 1  {
    2 2   "app": {
    3 3   "port": 4010,
    4  - "logs_level": "info",
    5 4   "performance_logger": false,
    6 5   "sync_raw_start_remote_uri": "http://localhost:4200/graphql",
    7 6   "sync_live_start_remote_uri": "http://localhost:4200/graphql",
    8 7   "sync_direct_start_remote_uri": "http://localhost:4200/graphql",
    9 8   "sync_restore_start_remote_uri": "http://localhost:4200/graphql",
     9 + "app_logs": {
     10 + "logs_level": "error",
     11 + "logs_files": false,
     12 + "logs_console": true
     13 + },
     14 + "audit_logs": {
     15 + "logs_files": false,
     16 + "logs_console": false
     17 + },
    10 18   "telemetry": {
    11 19   "tracing": {
    12 20   "enabled": false,
    skipped 20 lines
    33 41   "index_prefix": "test",
    34 42   "url": "http://localhost:9200"
    35 43   },
     44 + "minio": {
     45 + "bucket_name": "test",
     46 + "endpoint": "localhost",
     47 + "port": 9000
     48 + },
     49 + "rabbitmq": {
     50 + "queue_prefix": "test",
     51 + "hostname": "localhost"
     52 + },
    36 53   "subscription_scheduler": {
    37 54   "enabled": false
    38 55   },
    skipped 17 lines
    56 73   },
    57 74   "retention_manager": {
    58 75   "enabled": true
    59  - },
    60  - "minio": {
    61  - "bucket_name": "test",
    62  - "endpoint": "localhost",
    63  - "port": 9000
    64  - },
    65  - "rabbitmq": {
    66  - "hostname": "localhost"
    67 76   },
    68 77   "providers": {
    69 78   "local": {
    skipped 5 lines
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/package.json
    skipped 16 lines
    17 17   "build": "yarn install:python && yarn build:prod",
    18 18   "start": "yarn build:dev && node build/back.js",
    19 19   "serv": "node build/back.js",
    20  - "test": "node --max-old-space-size=8192 --expose-gc ./node_modules/jest/bin/jest.js --bail --testSequencer ./tests/utils/testSequencer.js --logHeapUsage --no-watchman --verbose --runInBand --forceExit --coverage --e test"
     20 + "test:base": "node --max-old-space-size=8192 ./node_modules/jest/bin/jest.js --bail --testSequencer ./tests/utils/testSequencer.js --no-watchman --runInBand --forceExit",
     21 + "test:dev": "yarn test:base ./tests/01-unit ./tests/02-integration ./tests/03-streams ./tests/05-rules",
     22 + "test": "yarn test:base --coverage --e test"
    21 23   },
    22 24   "pkg": {
    23 25   "scripts": "dist/**/*.js",
    skipped 159 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/database/file-storage.js
    skipped 77 lines
    78 78   }
    79 79  };
    80 80   
     81 +export const deleteBucket = async () => {
     82 + try {
     83 + // Try to access to the bucket
     84 + await s3Client.send(new s3.DeleteBucketCommand({ Bucket: bucketName }));
     85 + } catch (err) {
     86 + // Dont care
     87 + }
     88 +};
     89 + 
    81 90  export const isStorageAlive = () => initializeBucket();
    82 91   
    83 92  export const deleteFile = async (context, user, id) => {
    skipped 224 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/database/rabbitmq.js
    1 1  import { readFileSync } from 'node:fs';
    2 2  import amqp from 'amqplib';
    3 3  import axios from 'axios';
    4  -import * as R from 'ramda';
    5 4  import { SemanticAttributes } from '@opentelemetry/semantic-conventions';
    6 5  import conf, { booleanConf, configureCA } from '../config/conf';
    7 6  import { DatabaseError, UnknownError } from '../config/errors';
    8 7  import { SYSTEM_USER } from '../utils/access';
    9 8  import { telemetry } from '../config/tracing';
    10  - 
    11  -export const INTERNAL_SYNC_QUEUE = 'sync';
    12  -export const CONNECTOR_EXCHANGE = 'amqp.connector.exchange';
    13  -export const WORKER_EXCHANGE = 'amqp.worker.exchange';
     9 +import { RABBIT_QUEUE_PREFIX } from './utils';
    14 10   
    15  -export const EVENT_TYPE_DEPENDENCIES = 'init-dependencies';
    16  -export const EVENT_TYPE_INIT = 'init-create';
    17  - 
    18  -export const EVENT_TYPE_CREATE = 'create';
    19  -export const EVENT_TYPE_UPDATE = 'update';
    20  -export const EVENT_TYPE_MERGE = 'merge';
    21  -export const EVENT_TYPE_DELETE = 'delete';
     11 +export const CONNECTOR_EXCHANGE = `${RABBIT_QUEUE_PREFIX}amqp.connector.exchange`;
     12 +export const WORKER_EXCHANGE = `${RABBIT_QUEUE_PREFIX}amqp.worker.exchange`;
    22 13   
    23 14  const USE_SSL = booleanConf('rabbitmq:use_ssl', false);
    24 15  const QUEUE_TYPE = conf.get('rabbitmq:queue_type');
    skipped 80 lines
    105 96   return response.data;
    106 97   });
    107 98   // Compute number of push queues
    108  - const pushQueues = R.filter((q) => R.includes('push_', q.name) && q.consumers > 0, queues);
    109  - const consumers = R.head(pushQueues) ? R.head(pushQueues).consumers : 0;
    110  - return { overview, consumers, queues };
     99 + const platformQueues = queues.filter((q) => q.name.startsWith(RABBIT_QUEUE_PREFIX));
     100 + const pushQueues = platformQueues.filter((q) => q.name.startsWith(`${RABBIT_QUEUE_PREFIX}push_`) && q.consumers > 0);
     101 + const consumers = pushQueues.length > 0 ? pushQueues[0].consumers : 0;
     102 + return { overview, consumers, queues: platformQueues };
    111 103   };
    112 104   return telemetry(context, user, 'QUEUE metrics', {
    113 105   [SemanticAttributes.DB_NAME]: 'messaging_engine',
    skipped 3 lines
    117 109   
    118 110  export const connectorConfig = (id) => ({
    119 111   connection: config(),
    120  - push: `push_${id}`,
    121  - push_exchange: 'amqp.worker.exchange',
    122  - listen: `listen_${id}`,
    123  - listen_exchange: 'amqp.connector.exchange',
     112 + push: `${RABBIT_QUEUE_PREFIX}push_${id}`,
     113 + push_exchange: WORKER_EXCHANGE,
     114 + listen: `${RABBIT_QUEUE_PREFIX}listen_${id}`,
     115 + listen_exchange: CONNECTOR_EXCHANGE,
    124 116  });
    125 117   
    126  -export const listenRouting = (connectorId) => `listen_routing_${connectorId}`;
     118 +export const listenRouting = (connectorId) => `${RABBIT_QUEUE_PREFIX}listen_routing_${connectorId}`;
    127 119   
    128  -export const pushRouting = (connectorId) => `push_routing_${connectorId}`;
     120 +export const pushRouting = (connectorId) => `${RABBIT_QUEUE_PREFIX}push_routing_${connectorId}`;
    129 121   
    130 122  export const registerConnectorQueues = async (id, name, type, scope) => {
    131  - const listenQueue = `listen_${id}`;
    132  - const pushQueue = `push_${id}`;
     123 + const listenQueue = `${RABBIT_QUEUE_PREFIX}listen_${id}`;
     124 + const pushQueue = `${RABBIT_QUEUE_PREFIX}push_${id}`;
    133 125   await amqpExecute(async (channel) => {
    134 126   // 01. Ensure exchange exists
    135 127   await channel.assertExchange(CONNECTOR_EXCHANGE, 'direct', { durable: true });
    skipped 21 lines
    157 149  };
    158 150   
    159 151  export const unregisterConnector = async (id) => {
    160  - const listen = await amqpExecute((channel) => channel.deleteQueue(`listen_${id}`));
    161  - const push = await amqpExecute((channel) => channel.deleteQueue(`push_${id}`));
     152 + const listen = await amqpExecute((channel) => channel.deleteQueue(`${RABBIT_QUEUE_PREFIX}listen_${id}`));
     153 + const push = await amqpExecute((channel) => channel.deleteQueue(`${RABBIT_QUEUE_PREFIX}push_${id}`));
    162 154   return { listen, push };
    163 155  };
    164 156   
     157 +export const unregisterExchanges = async () => {
     158 + await amqpExecute((channel) => channel.deleteExchange(CONNECTOR_EXCHANGE));
     159 + await amqpExecute((channel) => channel.deleteExchange(WORKER_EXCHANGE));
     160 +};
     161 + 
    165 162  export const rabbitMQIsAlive = async () => {
    166  - // 01. Ensure exchange exists
    167 163   await amqpExecute((channel) => channel.assertExchange(CONNECTOR_EXCHANGE, 'direct', {
    168 164   durable: true,
    169 165   })).catch(
    skipped 1 lines
    171 167   throw DatabaseError('RabbitMQ seems down', { error: e.message });
    172 168   }
    173 169   );
    174  - // 02. Ensure sync queue exists
    175  - await registerConnectorQueues(INTERNAL_SYNC_QUEUE, 'Internal sync manager', 'internal', 'sync');
    176 170  };
    177 171   
    178 172  export const pushToSync = (message) => {
    skipped 13 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/database/redis.ts
    skipped 7 lines
    8 8  import { SemanticAttributes } from '@opentelemetry/semantic-conventions';
    9 9  import conf, { booleanConf, configureCA, DEV_MODE, ENABLED_CACHING, getStoppingState, logApp } from '../config/conf';
    10 10  import {
     11 + EVENT_TYPE_CREATE,
     12 + EVENT_TYPE_DELETE,
     13 + EVENT_TYPE_MERGE,
     14 + EVENT_TYPE_UPDATE,
    11 15   generateCreateMessage,
    12 16   generateDeleteMessage,
    13 17   generateMergeMessage,
    skipped 2 lines
    16 20   waitInSec,
    17 21  } from './utils';
    18 22  import { isStixData } from '../schema/stixCoreObject';
    19  -import { EVENT_TYPE_CREATE, EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE } from './rabbitmq';
    20 23  import { DatabaseError, FunctionalError, UnsupportedError } from '../config/errors';
    21 24  import { now, utcDate } from '../utils/format';
    22 25  import RedisStore from './sessionStore-redis';
    skipped 51 lines
    74 77   showFriendlyErrorStack: DEV_MODE,
    75 78  });
    76 79   
    77  -const createRedisClient = (provider: string, database?: number) : Redis => {
     80 +const createRedisClient = (provider: string, database?: number): Redis => {
    78 81   const client = new Redis(redisOptions(database ?? BASE_DATABASE));
    79 82   client.on('close', () => logApp.info(`[REDIS] Redis '${provider}' client closed`));
    80 83   client.on('ready', () => logApp.info(`[REDIS] Redis '${provider}' client ready`));
    skipped 2 lines
    83 86   client.defineCommand('cacheGet', {
    84 87   lua:
    85 88   'local index = 1\n'
    86  - + "local resolvedKeys = redis.call('mget', unpack(KEYS))\n"
     89 + + 'local resolvedKeys = redis.call(\'mget\', unpack(KEYS))\n'
    87 90   + 'for p, k in pairs(resolvedKeys) do \n'
    88 91   + ' if (k==nil or (type(k) == "boolean" and not k)) then \n'
    89 92   + ' index = index+1\n'
    90 93   + ' elseif (k:sub(0, 1) == "@") then \n'
    91 94   + ' local subKey = "cache:" .. k:sub(2, #k)\n'
    92  - + " resolvedKeys[index] = redis.call('get', subKey)\n"
     95 + + ' resolvedKeys[index] = redis.call(\'get\', subKey)\n'
    93 96   + ' index = index+1\n'
    94 97   + ' else \n'
    95 98   + ' index = index+1\n'
    skipped 495 lines
    591 594   start: (from: string | undefined) => Promise<void>;
    592 595   shutdown: () => Promise<void>;
    593 596  }
     597 + 
    594 598  export const createStreamProcessor = (user: AuthUser, provider: string, callback: any, maxRange = MAX_RANGE_MESSAGES): StreamProcessor => {
    595 599   let client: Redis;
    596 600   let startEventId: string;
    skipped 96 lines
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/database/repository.js
    1 1  import { filter, includes, map, pipe } from 'ramda';
    2 2  import { ENTITY_TYPE_CONNECTOR } from '../schema/internalObject';
    3  -import { connectorConfig, INTERNAL_SYNC_QUEUE } from './rabbitmq';
     3 +import { connectorConfig } from './rabbitmq';
    4 4  import { sinceNowInMinutes } from '../utils/format';
    5 5  import { CONNECTOR_INTERNAL_ENRICHMENT, CONNECTOR_INTERNAL_IMPORT_FILE } from '../schema/general';
    6 6  import { listEntities } from './middleware-loader';
     7 +import { INTERNAL_SYNC_QUEUE } from './utils';
    7 8   
    8 9  // region global queries
    9 10  // TODO Will be removed during typescript migration
    skipped 125 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/database/utils.js
    skipped 13 lines
    14 14  } from '../schema/stixCyberObservableRelationship';
    15 15  import { isStixMetaRelationship, META_FIELD_TO_STIX_ATTRIBUTE } from '../schema/stixMetaRelationship';
    16 16  import { isStixObject } from '../schema/stixCoreObject';
    17  -import { EVENT_TYPE_CREATE, EVENT_TYPE_DELETE } from './rabbitmq';
    18 17  import conf from '../config/conf';
    19 18  import { now, observableValue } from '../utils/format';
    20 19  import { isStixRelationship } from '../schema/stixRelationship';
    21 20  import { isDictionaryAttribute, isJsonAttribute } from '../schema/fieldDataAdapter';
    22 21   
    23 22  export const ES_INDEX_PREFIX = conf.get('elasticsearch:index_prefix') || 'opencti';
     23 +const rabbitmqPrefix = conf.get('rabbitmq:queue_prefix');
     24 +export const RABBIT_QUEUE_PREFIX = rabbitmqPrefix ? `${rabbitmqPrefix}_` : '';
     25 + 
     26 +export const INTERNAL_SYNC_QUEUE = 'sync';
     27 +export const EVENT_TYPE_CREATE = 'create';
     28 +export const EVENT_TYPE_DELETE = 'delete';
     29 +export const EVENT_TYPE_DEPENDENCIES = 'init-dependencies';
     30 +export const EVENT_TYPE_INIT = 'init-create';
     31 +export const EVENT_TYPE_UPDATE = 'update';
     32 +export const EVENT_TYPE_MERGE = 'merge';
    24 33   
    25 34  // Operations definition
    26 35  export const UPDATE_OPERATION_ADD = 'add';
    skipped 256 lines
    283 292   }
    284 293   return '-';
    285 294  };
     295 + 
    286 296  export const generateCreateMessage = (instance) => {
    287 297   return generateCreateDeleteMessage(EVENT_TYPE_CREATE, instance);
    288 298  };
    skipped 62 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/domain/connector.js
    skipped 6 lines
    7 7   storeLoadById,
    8 8   updateAttribute
    9 9  } from '../database/middleware';
    10  -import { completeConnector, connectorsFor } from '../database/repository';
    11  -import { registerConnectorQueues, unregisterConnector } from '../database/rabbitmq';
     10 +import { completeConnector, connectors, connectorsFor } from '../database/repository';
     11 +import { registerConnectorQueues, unregisterConnector, unregisterExchanges } from '../database/rabbitmq';
    12 12  import { ENTITY_TYPE_CONNECTOR, ENTITY_TYPE_SYNC, ENTITY_TYPE_WORK } from '../schema/internalObject';
    13 13  import { FunctionalError, UnsupportedError } from '../config/errors';
    14 14  import { now } from '../utils/format';
    15 15  import { elLoadById } from '../database/engine';
    16  -import { isEmptyField, READ_INDEX_HISTORY } from '../database/utils';
     16 +import { INTERNAL_SYNC_QUEUE, isEmptyField, READ_INDEX_HISTORY } from '../database/utils';
    17 17  import { ABSTRACT_INTERNAL_OBJECT, CONNECTOR_INTERNAL_EXPORT_FILE } from '../schema/general';
    18 18  import { SYSTEM_USER } from '../utils/access';
    19 19  import { delEditContext, notify, setEditContext } from '../database/redis';
    skipped 5 lines
    25 25  export const loadConnectorById = (context, user, id) => {
    26 26   return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((connector) => completeConnector(connector));
    27 27  };
    28  - 
    29 28  export const connectorForWork = async (context, user, id) => {
    30 29   const work = await elLoadById(context, user, id, ENTITY_TYPE_WORK, READ_INDEX_HISTORY);
    31 30   if (work) return loadConnectorById(context, user, work.connector_id);
    32 31   return null;
    33 32  };
    34  - 
    35 33  export const connectorsForExport = async (context, user, scope, onlyAlive = false) => {
    36 34   return connectorsFor(context, user, CONNECTOR_INTERNAL_EXPORT_FILE, scope, onlyAlive);
    37 35  };
    38  -// endregion
    39  - 
    40  -// region mutations
    41 36  export const pingConnector = async (context, user, id, state) => {
    42 37   const creation = now();
    43 38   const connector = await storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR);
    skipped 13 lines
    57 52   }
    58 53   return storeLoadById(context, user, id, 'Connector').then((data) => completeConnector(data));
    59 54  };
    60  - 
    61 55  export const resetStateConnector = async (context, user, id) => {
    62 56   const patch = { connector_state: '', connector_state_reset: true };
    63 57   await patchAttribute(context, user, id, ENTITY_TYPE_CONNECTOR, patch);
    64 58   return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((data) => completeConnector(data));
    65 59  };
     60 +export const registerConnector = async (context, user, connectorData) => {
     61 + // eslint-disable-next-line camelcase
     62 + const { id, name, type, scope, auto = null, only_contextual = null } = connectorData;
     63 + const connector = await storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR);
     64 + // Register queues
     65 + await registerConnectorQueues(id, name, type, scope);
     66 + if (connector) {
     67 + // Simple connector update
     68 + const patch = {
     69 + name,
     70 + updated_at: now(),
     71 + connector_user_id: user.id,
     72 + connector_scope: scope && scope.length > 0 ? scope.join(',') : null,
     73 + auto,
     74 + only_contextual,
     75 + };
     76 + const { element } = await patchAttribute(context, user, id, ENTITY_TYPE_CONNECTOR, patch);
     77 + // Notify configuration change for caching system
     78 + await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].EDIT_TOPIC, element, user);
     79 + return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((data) => completeConnector(data));
     80 + }
     81 + // Need to create the connector
     82 + const connectorToCreate = {
     83 + internal_id: id,
     84 + name,
     85 + connector_type: type,
     86 + connector_scope: scope && scope.length > 0 ? scope.join(',') : null,
     87 + auto,
     88 + only_contextual,
     89 + connector_user_id: user.id,
     90 + };
     91 + const createdConnector = await createEntity(context, user, connectorToCreate, ENTITY_TYPE_CONNECTOR);
     92 + // Notify configuration change for caching system
     93 + await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].ADDED_TOPIC, createdConnector, user);
     94 + // Return the connector
     95 + return completeConnector(createdConnector);
     96 +};
     97 +export const connectorDelete = async (context, user, connectorId) => {
     98 + await deleteWorkForConnector(context, user, connectorId);
     99 + await unregisterConnector(connectorId);
     100 + const { element } = await internalDeleteElementById(context, user, connectorId);
     101 + // Notify configuration change for caching system
     102 + await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].DELETE_TOPIC, element, user);
     103 + return element.internal_id;
     104 +};
     105 +// endregion
    66 106   
    67 107  // region syncs
    68 108  export const patchSync = async (context, user, id, patch) => {
    skipped 74 lines
    143 183  };
    144 184  // endregion
    145 185   
    146  -export const registerConnector = async (context, user, connectorData) => {
    147  - // eslint-disable-next-line camelcase
    148  - const { id, name, type, scope, auto = null, only_contextual = null } = connectorData;
    149  - const connector = await storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR);
    150  - // Register queues
    151  - await registerConnectorQueues(id, name, type, scope);
    152  - if (connector) {
    153  - // Simple connector update
    154  - const patch = {
    155  - name,
    156  - updated_at: now(),
    157  - connector_user_id: user.id,
    158  - connector_scope: scope && scope.length > 0 ? scope.join(',') : null,
    159  - auto,
    160  - only_contextual,
    161  - };
    162  - const { element } = await patchAttribute(context, user, id, ENTITY_TYPE_CONNECTOR, patch);
    163  - // Notify configuration change for caching system
    164  - await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].EDIT_TOPIC, element, user);
    165  - return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((data) => completeConnector(data));
     186 +// region testing
     187 +export const deleteQueues = async (context, user) => {
     188 + try { await unregisterConnector(INTERNAL_SYNC_QUEUE); } catch (e) { /* nothing */ }
     189 + const platformConnectors = await connectors(context, user);
     190 + for (let index = 0; index < platformConnectors.length; index += 1) {
     191 + const connector = platformConnectors[index];
     192 + await unregisterConnector(connector.id);
    166 193   }
    167  - // Need to create the connector
    168  - const connectorToCreate = {
    169  - internal_id: id,
    170  - name,
    171  - connector_type: type,
    172  - connector_scope: scope && scope.length > 0 ? scope.join(',') : null,
    173  - auto,
    174  - only_contextual,
    175  - connector_user_id: user.id,
    176  - };
    177  - const createdConnector = await createEntity(context, user, connectorToCreate, ENTITY_TYPE_CONNECTOR);
    178  - // Notify configuration change for caching system
    179  - await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].ADDED_TOPIC, createdConnector, user);
    180  - // Return the connector
    181  - return completeConnector(createdConnector);
    182  -};
    183  - 
    184  -export const connectorDelete = async (context, user, connectorId) => {
    185  - await deleteWorkForConnector(context, user, connectorId);
    186  - await unregisterConnector(connectorId);
    187  - const { element } = await internalDeleteElementById(context, user, connectorId);
    188  - // Notify configuration change for caching system
    189  - await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].DELETE_TOPIC, element, user);
    190  - return element.internal_id;
     194 + try { await unregisterExchanges(); } catch (e) { /* nothing */ }
    191 195  };
    192 196  // endregion
    193 197   
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/domain/log.js
    1  -import { head } from 'ramda';
    2 1  import * as R from 'ramda';
     2 +import { head } from 'ramda';
    3 3  import { elPaginate } from '../database/engine';
    4 4  import conf, { booleanConf } from '../config/conf';
    5  -import { EVENT_TYPE_CREATE } from '../database/rabbitmq';
    6 5  import { findById } from './user';
    7 6  import { storeLoadById, timeSeriesEntities } from '../database/middleware';
    8  -import { READ_INDEX_HISTORY, INDEX_HISTORY } from '../database/utils';
     7 +import { EVENT_TYPE_CREATE, INDEX_HISTORY, READ_INDEX_HISTORY } from '../database/utils';
    9 8  import { SYSTEM_USER } from '../utils/access';
    10 9   
    11 10  export const findAll = (context, user, args) => {
    skipped 46 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/graphql/sseMiddleware.js
    skipped 6 lines
    7 7  import { createStreamProcessor, EVENT_CURRENT_VERSION, STREAM_BATCH_TIME } from '../database/redis';
    8 8  import { generateInternalId, generateStandardId } from '../schema/identifier';
    9 9  import { findById, streamCollectionGroups } from '../domain/stream';
     10 +import { internalLoadById, stixLoadById, stixLoadByIds, storeLoadByIdWithRefs } from '../database/middleware';
     11 +import { elList, ES_MAX_CONCURRENCY, MAX_SPLIT } from '../database/engine';
    10 12  import {
    11 13   EVENT_TYPE_CREATE,
    12 14   EVENT_TYPE_DELETE,
    13 15   EVENT_TYPE_DEPENDENCIES,
    14 16   EVENT_TYPE_INIT,
    15  - EVENT_TYPE_UPDATE
    16  -} from '../database/rabbitmq';
    17  -import { internalLoadById, stixLoadById, stixLoadByIds, storeLoadByIdWithRefs } from '../database/middleware';
    18  -import { elList, ES_MAX_CONCURRENCY, MAX_SPLIT } from '../database/engine';
    19  -import {
     17 + EVENT_TYPE_UPDATE,
    20 18   generateCreateMessage,
    21 19   isEmptyField,
    22 20   isNotEmptyField,
    skipped 688 lines
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/initialization.js
    skipped 4 lines
    5 5  import { elCreateIndexes, elIndexExists, searchEngineInit } from './database/engine';
    6 6  import { initializeAdminUser } from './config/providers';
    7 7  import { initializeBucket, isStorageAlive } from './database/file-storage';
    8  -import { rabbitMQIsAlive } from './database/rabbitmq';
     8 +import { rabbitMQIsAlive, registerConnectorQueues } from './database/rabbitmq';
    9 9  import { addMarkingDefinition } from './domain/markingDefinition';
    10 10  import { addSettings } from './domain/settings';
    11 11  import { ROLE_DEFAULT, STREAMAPI, TAXIIAPI } from './domain/user';
    skipped 3 lines
    15 15  import { ENTITY_TYPE_MIGRATION_STATUS } from './schema/internalObject';
    16 16  import { applyMigration, lastAvailableMigrationTime } from './database/migration';
    17 17  import { createEntity, loadEntity, patchAttribute } from './database/middleware';
    18  -import { INDEX_INTERNAL_OBJECTS } from './database/utils';
     18 +import { INDEX_INTERNAL_OBJECTS, INTERNAL_SYNC_QUEUE } from './database/utils';
    19 19  import { ConfigurationError, LockTimeoutError, TYPE_LOCK_ERROR, UnknownError, UnsupportedError } from './config/errors';
    20 20  import { BYPASS, BYPASS_REFERENCE, executionContext, ROLE_ADMINISTRATOR, SYSTEM_USER } from './utils/access';
    21 21  import { smtpIsAlive } from './database/smtp';
    skipped 128 lines
    150 150   await elCreateIndexes();
    151 151   logApp.info('[INIT] Search engine indexes loaded');
    152 152   return true;
     153 +};
     154 + 
     155 +const initializeQueues = async () => {
     156 + await registerConnectorQueues(INTERNAL_SYNC_QUEUE, 'Internal sync manager', 'internal', 'sync');
    153 157  };
    154 158   
    155 159  const initializeMigration = async (context) => {
    skipped 167 lines
    323 327   const alreadyExists = await isExistingPlatform(context);
    324 328   if (!alreadyExists) {
    325 329   logApp.info('[INIT] New platform detected, initialization...');
     330 + await initializeQueues();
    326 331   await initializeBucket();
    327 332   await initializeSchema();
    328 333   await initializeMigration(context);
    skipped 27 lines
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/manager/historyManager.ts
    skipped 1 lines
    2 2  import { clearIntervalAsync, setIntervalAsync, SetIntervalAsyncTimer } from 'set-interval-async/fixed';
    3 3  import { createStreamProcessor, lockResource, StreamProcessor } from '../database/redis';
    4 4  import conf, { logApp } from '../config/conf';
    5  -import { INDEX_HISTORY, isEmptyField } from '../database/utils';
     5 +import { EVENT_TYPE_UPDATE, INDEX_HISTORY, isEmptyField } from '../database/utils';
    6 6  import { TYPE_LOCK_ERROR } from '../config/errors';
    7 7  import { executionContext, SYSTEM_USER } from '../utils/access';
    8 8  import { STIX_EXT_OCTI } from '../types/stix-extensions';
    9 9  import type { StreamEvent, UpdateEvent } from '../types/event';
    10 10  import { utcDate } from '../utils/format';
    11 11  import { elIndexElements } from '../database/engine';
    12  -import { EVENT_TYPE_UPDATE } from '../database/rabbitmq';
    13 12  import type { StixRelation, StixSighting } from '../types/stix-sro';
    14 13  import { listEntities } from '../database/middleware-loader';
    15 14  import type { BasicRuleEntity, StoreProxyEntity } from '../types/store';
    skipped 195 lines
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/manager/ruleManager.ts
    skipped 11 lines
    12 12   stixLoadById,
    13 13   storeLoadByIdWithRefs
    14 14  } from '../database/middleware';
    15  -import { isEmptyField, isNotEmptyField, READ_DATA_INDICES } from '../database/utils';
    16  -import { EVENT_TYPE_CREATE, EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE } from '../database/rabbitmq';
     15 +import {
     16 + EVENT_TYPE_CREATE,
     17 + EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE,
     18 + isEmptyField,
     19 + isNotEmptyField,
     20 + READ_DATA_INDICES
     21 +} from '../database/utils';
    17 22  import { RULE_PREFIX } from '../schema/general';
    18 23  import { ENTITY_TYPE_RULE_MANAGER } from '../schema/internalObject';
    19 24  import { TYPE_LOCK_ERROR } from '../config/errors';
    skipped 347 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/src/manager/taskManager.js
    skipped 4 lines
    5 5  import { lockResource, storeCreateEntityEvent } from '../database/redis';
    6 6  import {
    7 7   ACTION_TYPE_ADD,
    8  - ACTION_TYPE_DELETE, ACTION_TYPE_ENRICHMENT,
     8 + ACTION_TYPE_DELETE,
     9 + ACTION_TYPE_ENRICHMENT,
    9 10   ACTION_TYPE_MERGE,
    10 11   ACTION_TYPE_PROMOTE,
    11 12   ACTION_TYPE_REMOVE,
    skipped 14 lines
    26 27  import {
    27 28   createRelation,
    28 29   deleteElementById,
    29  - deleteRelationsByFromAndTo, internalFindByIds,
     30 + deleteRelationsByFromAndTo,
     31 + internalFindByIds,
    30 32   internalLoadById,
    31 33   mergeEntities,
    32 34   patchAttribute,
    skipped 2 lines
    35 37  } from '../database/middleware';
    36 38  import { now } from '../utils/format';
    37 39  import {
     40 + EVENT_TYPE_CREATE,
    38 41   INDEX_INTERNAL_OBJECTS,
    39 42   READ_DATA_INDICES,
    40 43   READ_DATA_INDICES_WITHOUT_INFERRED,
    skipped 11 lines
    52 55  import { getActivatedRules, getRule } from '../domain/rules';
    53 56  import { isStixRelationship } from '../schema/stixRelationship';
    54 57  import { isStixObject } from '../schema/stixCoreObject';
    55  -import { EVENT_TYPE_CREATE } from '../database/rabbitmq';
    56 58  import { ENTITY_TYPE_INDICATOR } from '../schema/stixDomainObject';
    57 59  import { isStixCyberObservable } from '../schema/stixCyberObservable';
    58 60  import { promoteObservableToIndicator } from '../domain/stixCyberObservable';
    skipped 362 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/tests/02-integration/00-inject/loader-test.js
    skipped 5 lines
    6 6  import { WRITE_PLATFORM_INDICES } from '../../../src/database/utils';
    7 7  import platformInit from '../../../src/initialization';
    8 8  import { deleteStream } from '../../../src/database/redis';
     9 +import { deleteQueues } from '../../../src/domain/connector';
     10 +import { deleteBucket } from '../../../src/database/file-storage';
    9 11   
    10 12  describe('Database provision', () => {
    11 13   const importOpts = [API_URI, API_TOKEN, './tests/data/DATA-TEST-STIX2_v2.json'];
    12 14   it(
    13 15   'should platform init',
    14 16   async () => {
     17 + // Platform cleanup before executing tests
     18 + // Delete the bucket
     19 + await deleteBucket();
     20 + // Delete all rabbitmq queues
     21 + await deleteQueues(testContext, ADMIN_USER);
     22 + // Remove all elastic indices
    15 23   await elDeleteIndexes(WRITE_PLATFORM_INDICES.map((id) => `${id}${ELASTIC_CREATION_PATTERN}`));
     24 + // Delete redis streams
    16 25   await deleteStream();
     26 + // Starting test with simple platform initialization
    17 27   return expect(platformInit()).resolves.toBe(true);
    18 28   },
    19 29   FIVE_MINUTES
    skipped 26 lines
  • ■ ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/tests/02-integration/01-database/rabbitmq-test.js
    skipped 7 lines
    8 8  } from '../../../src/database/rabbitmq';
    9 9  import { CONNECTOR_INTERNAL_IMPORT_FILE } from '../../../src/schema/general';
    10 10  import { ADMIN_USER, testContext } from '../../utils/testQuery';
     11 +import { RABBIT_QUEUE_PREFIX } from '../../../src/database/utils';
    11 12   
    12 13  describe('Rabbit basic and utils', () => {
    13 14   it('should rabbit in correct version', async () => {
    skipped 18 lines
    32 33   it('should register the connector', async () => {
    33 34   const config = await registerConnectorQueues(connectorId, connectorName, connectorType, connectorScope);
    34 35   expect(config.uri).not.toBeNull();
    35  - expect(config.push).toEqual(`push_${connectorId}`);
    36  - expect(config.push_exchange).toEqual('amqp.worker.exchange');
    37  - expect(config.listen).toEqual(`listen_${connectorId}`);
    38  - expect(config.listen_exchange).toEqual('amqp.connector.exchange');
     36 + expect(config.push).toEqual(`${RABBIT_QUEUE_PREFIX}push_${connectorId}`);
     37 + expect(config.push_exchange).toEqual(`${RABBIT_QUEUE_PREFIX}amqp.worker.exchange`);
     38 + expect(config.listen).toEqual(`${RABBIT_QUEUE_PREFIX}listen_${connectorId}`);
     39 + expect(config.listen_exchange).toEqual(`${RABBIT_QUEUE_PREFIX}amqp.connector.exchange`);
    39 40   });
    40 41   it('should connector queues available', async () => {
    41 42   const data = await metrics(testContext, ADMIN_USER);
    42 43   expect(data).not.toBeNull();
    43 44   expect(data.queues.length).toEqual(4);
    44 45   const aggregationMap = new Map(data.queues.map((q) => [q.name, q]));
    45  - expect(aggregationMap.get(`listen_${connectorId}`)).not.toBeUndefined();
    46  - expect(aggregationMap.get(`push_${connectorId}`)).not.toBeUndefined();
     46 + expect(aggregationMap.get(`${RABBIT_QUEUE_PREFIX}listen_${connectorId}`)).not.toBeUndefined();
     47 + expect(aggregationMap.get(`${RABBIT_QUEUE_PREFIX}push_${connectorId}`)).not.toBeUndefined();
    47 48   });
    48 49   it('should push message to connector', async () => {
    49 50   const connector = { internal_id: connectorId };
    skipped 7 lines
    57 58   expect(unregister.push.messageCount).toEqual(0);
    58 59   const data = await metrics(testContext, ADMIN_USER);
    59 60   const aggregationMap = new Map(data.queues.map((q) => [q.name, q]));
    60  - expect(aggregationMap.get(`listen_${connectorId}`)).toBeUndefined();
    61  - expect(aggregationMap.get(`push_${connectorId}`)).toBeUndefined();
     61 + expect(aggregationMap.get(`${RABBIT_QUEUE_PREFIX}listen_${connectorId}`)).toBeUndefined();
     62 + expect(aggregationMap.get(`${RABBIT_QUEUE_PREFIX}push_${connectorId}`)).toBeUndefined();
    62 63   });
    63 64  });
    64 65   
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/tests/03-streams/00-Raw/raw-test.js
    1 1  import * as R from 'ramda';
    2 2  import { FIVE_MINUTES, RAW_EVENTS_SIZE } from '../../utils/testQuery';
    3 3  import { shutdownModules, startModules } from '../../../src/modules';
    4  -import {
    5  - checkStreamData,
    6  - checkStreamGenericContent,
    7  - fetchStreamEvents,
    8  -} from '../../utils/testStream';
    9  -import {
    10  - EVENT_TYPE_CREATE,
    11  - EVENT_TYPE_DELETE,
    12  - EVENT_TYPE_MERGE,
    13  - EVENT_TYPE_UPDATE,
    14  -} from '../../../src/database/rabbitmq';
     4 +import { checkStreamData, checkStreamGenericContent, fetchStreamEvents, } from '../../utils/testStream';
    15 5  import { PORT } from '../../../src/config/conf';
     6 +import { EVENT_TYPE_CREATE, EVENT_TYPE_DELETE, EVENT_TYPE_MERGE, EVENT_TYPE_UPDATE } from '../../../src/database/utils';
    16 7   
    17 8  describe('Raw streams tests', () => {
    18 9   beforeAll(async () => {
    skipped 104 lines
  • ■ ■ ■ ■ ■
    opencti-platform/opencti-graphql/tests/utils/testQuery.js
    skipped 66 lines
    67 67   schema: createSchema(),
    68 68   introspection: true,
    69 69   persistedQueries: false,
    70  - context: () => ({ user }),
     70 + context: () => {
     71 + const executeContext = executionContext('test');
     72 + executeContext.user = user;
     73 + return executeContext;
     74 + },
    71 75   });
    72 76  };
    73 77   
    skipped 3 lines
  • ■ ■ ■ ■
    opencti-platform/opencti-graphql/tests/utils/testStream.js
    skipped 3 lines
    4 4  import { ADMIN_USER, generateBasicAuth, testContext } from './testQuery';
    5 5  import { internalLoadById } from '../../src/database/middleware';
    6 6  import { isStixId } from '../../src/schema/schemaUtils';
    7  -import { EVENT_TYPE_UPDATE } from '../../src/database/rabbitmq';
    8 7  import { isStixRelationship } from '../../src/schema/stixRelationship';
    9 8  import { STIX_EXT_OCTI } from '../../src/types/stix-extensions';
     9 +import { EVENT_TYPE_UPDATE } from '../../src/database/utils';
    10 10   
    11 11  export const fetchStreamEvents = (uri, { from } = {}) => {
    12 12   const opts = {
    skipped 112 lines
Please wait...
Page is in error, reload to recover