| skipped 6 lines |
7 | 7 | | storeLoadById, |
8 | 8 | | updateAttribute |
9 | 9 | | } from '../database/middleware'; |
10 | | - | import { completeConnector, connectorsFor } from '../database/repository'; |
11 | | - | import { registerConnectorQueues, unregisterConnector } from '../database/rabbitmq'; |
| 10 | + | import { completeConnector, connectors, connectorsFor } from '../database/repository'; |
| 11 | + | import { registerConnectorQueues, unregisterConnector, unregisterExchanges } from '../database/rabbitmq'; |
12 | 12 | | import { ENTITY_TYPE_CONNECTOR, ENTITY_TYPE_SYNC, ENTITY_TYPE_WORK } from '../schema/internalObject'; |
13 | 13 | | import { FunctionalError, UnsupportedError } from '../config/errors'; |
14 | 14 | | import { now } from '../utils/format'; |
15 | 15 | | import { elLoadById } from '../database/engine'; |
16 | | - | import { isEmptyField, READ_INDEX_HISTORY } from '../database/utils'; |
| 16 | + | import { INTERNAL_SYNC_QUEUE, isEmptyField, READ_INDEX_HISTORY } from '../database/utils'; |
17 | 17 | | import { ABSTRACT_INTERNAL_OBJECT, CONNECTOR_INTERNAL_EXPORT_FILE } from '../schema/general'; |
18 | 18 | | import { SYSTEM_USER } from '../utils/access'; |
19 | 19 | | import { delEditContext, notify, setEditContext } from '../database/redis'; |
| skipped 5 lines |
25 | 25 | | export const loadConnectorById = (context, user, id) => { |
26 | 26 | | return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((connector) => completeConnector(connector)); |
27 | 27 | | }; |
28 | | - | |
29 | 28 | | export const connectorForWork = async (context, user, id) => { |
30 | 29 | | const work = await elLoadById(context, user, id, ENTITY_TYPE_WORK, READ_INDEX_HISTORY); |
31 | 30 | | if (work) return loadConnectorById(context, user, work.connector_id); |
32 | 31 | | return null; |
33 | 32 | | }; |
34 | | - | |
35 | 33 | | export const connectorsForExport = async (context, user, scope, onlyAlive = false) => { |
36 | 34 | | return connectorsFor(context, user, CONNECTOR_INTERNAL_EXPORT_FILE, scope, onlyAlive); |
37 | 35 | | }; |
38 | | - | // endregion |
39 | | - | |
40 | | - | // region mutations |
41 | 36 | | export const pingConnector = async (context, user, id, state) => { |
42 | 37 | | const creation = now(); |
43 | 38 | | const connector = await storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR); |
| skipped 13 lines |
57 | 52 | | } |
58 | 53 | | return storeLoadById(context, user, id, 'Connector').then((data) => completeConnector(data)); |
59 | 54 | | }; |
60 | | - | |
61 | 55 | | export const resetStateConnector = async (context, user, id) => { |
62 | 56 | | const patch = { connector_state: '', connector_state_reset: true }; |
63 | 57 | | await patchAttribute(context, user, id, ENTITY_TYPE_CONNECTOR, patch); |
64 | 58 | | return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((data) => completeConnector(data)); |
65 | 59 | | }; |
| 60 | + | export const registerConnector = async (context, user, connectorData) => { |
| 61 | + | // eslint-disable-next-line camelcase |
| 62 | + | const { id, name, type, scope, auto = null, only_contextual = null } = connectorData; |
| 63 | + | const connector = await storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR); |
| 64 | + | // Register queues |
| 65 | + | await registerConnectorQueues(id, name, type, scope); |
| 66 | + | if (connector) { |
| 67 | + | // Simple connector update |
| 68 | + | const patch = { |
| 69 | + | name, |
| 70 | + | updated_at: now(), |
| 71 | + | connector_user_id: user.id, |
| 72 | + | connector_scope: scope && scope.length > 0 ? scope.join(',') : null, |
| 73 | + | auto, |
| 74 | + | only_contextual, |
| 75 | + | }; |
| 76 | + | const { element } = await patchAttribute(context, user, id, ENTITY_TYPE_CONNECTOR, patch); |
| 77 | + | // Notify configuration change for caching system |
| 78 | + | await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].EDIT_TOPIC, element, user); |
| 79 | + | return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((data) => completeConnector(data)); |
| 80 | + | } |
| 81 | + | // Need to create the connector |
| 82 | + | const connectorToCreate = { |
| 83 | + | internal_id: id, |
| 84 | + | name, |
| 85 | + | connector_type: type, |
| 86 | + | connector_scope: scope && scope.length > 0 ? scope.join(',') : null, |
| 87 | + | auto, |
| 88 | + | only_contextual, |
| 89 | + | connector_user_id: user.id, |
| 90 | + | }; |
| 91 | + | const createdConnector = await createEntity(context, user, connectorToCreate, ENTITY_TYPE_CONNECTOR); |
| 92 | + | // Notify configuration change for caching system |
| 93 | + | await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].ADDED_TOPIC, createdConnector, user); |
| 94 | + | // Return the connector |
| 95 | + | return completeConnector(createdConnector); |
| 96 | + | }; |
| 97 | + | export const connectorDelete = async (context, user, connectorId) => { |
| 98 | + | await deleteWorkForConnector(context, user, connectorId); |
| 99 | + | await unregisterConnector(connectorId); |
| 100 | + | const { element } = await internalDeleteElementById(context, user, connectorId); |
| 101 | + | // Notify configuration change for caching system |
| 102 | + | await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].DELETE_TOPIC, element, user); |
| 103 | + | return element.internal_id; |
| 104 | + | }; |
| 105 | + | // endregion |
66 | 106 | | |
67 | 107 | | // region syncs |
68 | 108 | | export const patchSync = async (context, user, id, patch) => { |
| skipped 74 lines |
143 | 183 | | }; |
144 | 184 | | // endregion |
145 | 185 | | |
146 | | - | export const registerConnector = async (context, user, connectorData) => { |
147 | | - | // eslint-disable-next-line camelcase |
148 | | - | const { id, name, type, scope, auto = null, only_contextual = null } = connectorData; |
149 | | - | const connector = await storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR); |
150 | | - | // Register queues |
151 | | - | await registerConnectorQueues(id, name, type, scope); |
152 | | - | if (connector) { |
153 | | - | // Simple connector update |
154 | | - | const patch = { |
155 | | - | name, |
156 | | - | updated_at: now(), |
157 | | - | connector_user_id: user.id, |
158 | | - | connector_scope: scope && scope.length > 0 ? scope.join(',') : null, |
159 | | - | auto, |
160 | | - | only_contextual, |
161 | | - | }; |
162 | | - | const { element } = await patchAttribute(context, user, id, ENTITY_TYPE_CONNECTOR, patch); |
163 | | - | // Notify configuration change for caching system |
164 | | - | await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].EDIT_TOPIC, element, user); |
165 | | - | return storeLoadById(context, user, id, ENTITY_TYPE_CONNECTOR).then((data) => completeConnector(data)); |
| 186 | + | // region testing |
| 187 | + | export const deleteQueues = async (context, user) => { |
| 188 | + | try { await unregisterConnector(INTERNAL_SYNC_QUEUE); } catch (e) { /* nothing */ } |
| 189 | + | const platformConnectors = await connectors(context, user); |
| 190 | + | for (let index = 0; index < platformConnectors.length; index += 1) { |
| 191 | + | const connector = platformConnectors[index]; |
| 192 | + | await unregisterConnector(connector.id); |
166 | 193 | | } |
167 | | - | // Need to create the connector |
168 | | - | const connectorToCreate = { |
169 | | - | internal_id: id, |
170 | | - | name, |
171 | | - | connector_type: type, |
172 | | - | connector_scope: scope && scope.length > 0 ? scope.join(',') : null, |
173 | | - | auto, |
174 | | - | only_contextual, |
175 | | - | connector_user_id: user.id, |
176 | | - | }; |
177 | | - | const createdConnector = await createEntity(context, user, connectorToCreate, ENTITY_TYPE_CONNECTOR); |
178 | | - | // Notify configuration change for caching system |
179 | | - | await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].ADDED_TOPIC, createdConnector, user); |
180 | | - | // Return the connector |
181 | | - | return completeConnector(createdConnector); |
182 | | - | }; |
183 | | - | |
184 | | - | export const connectorDelete = async (context, user, connectorId) => { |
185 | | - | await deleteWorkForConnector(context, user, connectorId); |
186 | | - | await unregisterConnector(connectorId); |
187 | | - | const { element } = await internalDeleteElementById(context, user, connectorId); |
188 | | - | // Notify configuration change for caching system |
189 | | - | await notify(BUS_TOPICS[ABSTRACT_INTERNAL_OBJECT].DELETE_TOPIC, element, user); |
190 | | - | return element.internal_id; |
| 194 | + | try { await unregisterExchanges(); } catch (e) { /* nothing */ } |
191 | 195 | | }; |
192 | 196 | | // endregion |
193 | 197 | | |