Projects STRLCPY graphql-engine Commits 0e822cc9
🤬
  • NDC response size limit (#439)

    - Introduce a field in NDC `Configuration` struct that carries an
    optional limit (`usize`) value.
      - When set, reject NDC response that is greater than the limit.
    - Define a `HttpContext` struct that captures both `reqwest::Client` and
    an optional limit value. Replace the `http_client` argument with
    `http_context: &HttpContext` in all execute related functions.
    - The `execute_query` function caller in multitenant code need to pass a
    reference to `HttpContext` with appropriate NDC response size limit.
    
    V3_GIT_ORIGIN_REV_ID: 85a3647c4d136cc8d887f343736cc011166f036f
  • Loading...
  • Rakesh Emmadi committed with hasura-bot 1 month ago
    0e822cc9
    1 parent b66c43b6
  • ■ ■ ■ ■ ■ ■
    v3/Cargo.lock
    skipped 954 lines
    955 955   "base64 0.21.7",
    956 956   "bincode",
    957 957   "build-data",
     958 + "bytes",
    958 959   "clap 4.5.4",
    959 960   "criterion",
    960 961   "derive_more",
    skipped 7 lines
    968 969   "json_value_merge",
    969 970   "lang-graphql",
    970 971   "lazy_static",
     972 + "mockito",
    971 973   "ndc-client",
    972 974   "nonempty",
    973 975   "open-dds",
    skipped 2927 lines
  • ■ ■ ■ ■ ■
    v3/crates/engine/Cargo.toml
    skipped 31 lines
    32 32  axum = { version = "0.6.20" }
    33 33  base64 = "0.21.2"
    34 34  bincode = "1.3.3"
     35 +bytes = "1.6.0"
    35 36  clap = { version = "4", features = ["derive", "env"] }
    36 37  derive_more = "0.99.17"
    37 38  futures = "0.3.29"
    skipped 28 lines
    66 67  [dev-dependencies]
    67 68  criterion = { version = "0.4", features = ["html_reports", "async_tokio"] }
    68 69  goldenfile = "1.4.5"
     70 +mockito = { version = "1.1.0", default-features = false, features = [] }
     71 +pretty_assertions = "1.3.0"
    69 72  tokio-test = "0.4.2"
    70  -pretty_assertions = "1.3.0"
    71 73   
    72 74  [package.metadata.cargo-machete]
    73 75  ignored = [
    skipped 5 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/benches/execute.rs
    1 1  use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
    2 2  use engine::execute::plan::{execute_mutation_plan, execute_query_plan, generate_request_plan};
    3  -use engine::execute::{execute_query_internal, generate_ir};
     3 +use engine::execute::{execute_query_internal, generate_ir, HttpContext};
    4 4  use engine::schema::GDS;
    5 5  use hasura_authn_core::Identity;
    6 6  use lang_graphql::http::RawRequest;
    skipped 40 lines
    47 47   
    48 48   let gds = GDS::new(open_dds::traits::OpenDd::deserialize(metadata).unwrap()).unwrap();
    49 49   let schema = GDS::build_schema(&gds).unwrap();
    50  - 
    51  - let http_client = reqwest::Client::new();
     50 + let http_context = HttpContext {
     51 + client: reqwest::Client::new(),
     52 + ndc_response_size_limit: None,
     53 + };
    52 54   let runtime = Runtime::new().unwrap();
    53 55   
    54 56   let query = fs::read_to_string(request_path).unwrap();
    skipped 77 lines
    132 134   b.to_async(*runtime).iter(|| async {
    133 135   match generate_request_plan(&ir).unwrap() {
    134 136   engine::execute::plan::RequestPlan::QueryPlan(query_plan) => {
    135  - execute_query_plan(&http_client, query_plan, None).await
     137 + execute_query_plan(&http_context, query_plan, None).await
    136 138   }
    137 139   engine::execute::plan::RequestPlan::MutationPlan(mutation_plan) => {
    138  - execute_mutation_plan(&http_client, mutation_plan, None).await
     140 + execute_mutation_plan(&http_context, mutation_plan, None).await
    139 141   }
    140 142   }
    141 143   })
    skipped 6 lines
    148 150   &(&runtime, &schema, raw_request),
    149 151   |b, (runtime, schema, request)| {
    150 152   b.to_async(*runtime).iter(|| async {
    151  - execute_query_internal(&http_client, schema, &session, request.clone(), None)
     153 + execute_query_internal(&http_context, schema, &session, request.clone(), None)
    152 154   .await
    153 155   .unwrap()
    154 156   })
    skipped 71 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/bin/engine/main.rs
    skipped 18 lines
    19 19   TraceableError, TraceableHttpResponse,
    20 20  };
    21 21   
    22  -use engine::authentication::{AuthConfig, AuthConfig::V1 as V1AuthConfig, AuthModeConfig};
     22 +use engine::{
     23 + authentication::{AuthConfig, AuthConfig::V1 as V1AuthConfig, AuthModeConfig},
     24 + execute::HttpContext,
     25 +};
    23 26  use engine::{schema::GDS, VERSION};
    24 27  use hasura_authn_core::Session;
    25 28  use hasura_authn_jwt::auth as jwt_auth;
    skipped 17 lines
    43 46  }
    44 47   
    45 48  struct EngineState {
    46  - http_client: reqwest::Client,
     49 + http_context: HttpContext,
    47 50   schema: gql::schema::Schema<GDS>,
    48 51   auth_config: AuthConfig,
    49 52  }
    skipped 74 lines
    124 127   let auth_config =
    125 128   read_auth_config(&server.authn_config_path).map_err(StartupError::ReadAuth)?;
    126 129   let schema = read_schema(&server.metadata_path).map_err(StartupError::ReadSchema)?;
     130 + let http_context = HttpContext {
     131 + client: reqwest::Client::new(),
     132 + ndc_response_size_limit: None,
     133 + };
    127 134   let state = Arc::new(EngineState {
    128  - http_client: reqwest::Client::new(),
     135 + http_context,
    129 136   schema,
    130 137   auth_config,
    131 138   });
    skipped 181 lines
    313 320   V1AuthConfig(auth_config) => match &auth_config.mode {
    314 321   AuthModeConfig::Webhook(webhook_config) => {
    315 322   webhook::authenticate_request(
    316  - &engine_state.http_client,
     323 + &engine_state.http_context.client,
    317 324   webhook_config,
    318 325   &headers_map,
    319 326   auth_config.allow_role_emulation_by.clone(),
    skipped 3 lines
    323 330   }
    324 331   AuthModeConfig::Jwt(jwt_secret_config) => {
    325 332   jwt_auth::authenticate_request(
    326  - &engine_state.http_client,
     333 + &engine_state.http_context.client,
    327 334   *jwt_secret_config.clone(),
    328 335   auth_config.allow_role_emulation_by.clone(),
    329 336   &headers_map,
    skipped 25 lines
    355 362   let response = tracer
    356 363   .in_span_async("Handle request", SpanVisibility::User, || {
    357 364   Box::pin(engine::execute::execute_query(
    358  - &state.http_client,
     365 + &state.http_context,
    359 366   &state.schema,
    360 367   &session,
    361 368   request,
    skipped 21 lines
    383 390   let response = tracer
    384 391   .in_span_async("Handle explain request", SpanVisibility::User, || {
    385 392   Box::pin(engine::execute::explain::execute_explain(
    386  - &state.http_client,
     393 + &state.http_context,
    387 394   &state.schema,
    388 395   &session,
    389 396   request,
    skipped 22 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/src/execute/error.rs
    skipped 251 lines
    252 252   ndc_client::Error::InvalidConnectorError(invalid_connector_err) => {
    253 253   format!("invalid connector error: {0}", invalid_connector_err)
    254 254   }
     255 + ndc_client::Error::ResponseTooLarge(err) => {
     256 + format!("response received from connector is too large: {0}", err)
     257 + }
    255 258   }
    256 259  }
    257 260   
    skipped 12 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/src/execute/explain.rs
    1 1  use super::remote_joins::types::{JoinNode, RemoteJoinType};
    2  -use super::ExecuteOrExplainResponse;
     2 +use super::{ExecuteOrExplainResponse, HttpContext};
    3 3  use crate::execute::ndc::client as ndc_client;
    4 4  use crate::execute::plan::{ApolloFederationSelect, NodeQueryPlan, ProcessResponseAs};
    5 5  use crate::execute::remote_joins::types::{JoinId, JoinLocations, RemoteJoin};
    skipped 11 lines
    17 17  use lang_graphql::ast::common as ast;
    18 18   
    19 19  pub async fn execute_explain(
    20  - http_client: &reqwest::Client,
     20 + http_context: &HttpContext,
    21 21   schema: &Schema<GDS>,
    22 22   session: &Session,
    23 23   request: RawRequest,
    24 24  ) -> types::ExplainResponse {
    25  - execute_explain_internal(http_client, schema, session, request)
     25 + execute_explain_internal(http_context, schema, session, request)
    26 26   .await
    27 27   .unwrap_or_else(|e| types::ExplainResponse::error(e.to_graphql_error(None)))
    28 28  }
    29 29   
    30 30  /// Explains a GraphQL query
    31 31  pub async fn execute_explain_internal(
    32  - http_client: &reqwest::Client,
     32 + http_context: &HttpContext,
    33 33   schema: &gql::schema::Schema<GDS>,
    34 34   session: &Session,
    35 35   raw_request: gql::http::RawRequest,
    36 36  ) -> Result<types::ExplainResponse, error::Error> {
    37 37   let query_response = super::execute_request_internal(
    38  - http_client,
     38 + http_context,
    39 39   schema,
    40 40   session,
    41 41   raw_request,
    skipped 13 lines
    55 55   
    56 56  /// Produce an /explain plan for a given GraphQL query.
    57 57  pub(crate) async fn explain_query_plan(
    58  - http_client: &reqwest::Client,
     58 + http_context: &HttpContext,
    59 59   query_plan: plan::QueryPlan<'_, '_, '_>,
    60 60  ) -> Result<types::Step, error::Error> {
    61 61   let mut parallel_root_steps = vec![];
    skipped 2 lines
    64 64   match node {
    65 65   NodeQueryPlan::NDCQueryExecution(ndc_query_execution) => {
    66 66   let sequence_steps = get_execution_steps(
    67  - http_client,
     67 + http_context,
    68 68   alias,
    69 69   &ndc_query_execution.process_response_as,
    70 70   ndc_query_execution.execution_tree.remote_executions,
    skipped 5 lines
    76 76   }
    77 77   NodeQueryPlan::RelayNodeSelect(Some(ndc_query_execution)) => {
    78 78   let sequence_steps = get_execution_steps(
    79  - http_client,
     79 + http_context,
    80 80   alias,
    81 81   &ndc_query_execution.process_response_as,
    82 82   ndc_query_execution.execution_tree.remote_executions,
    skipped 9 lines
    92 92   let mut parallel_steps = Vec::new();
    93 93   for ndc_query_execution in parallel_ndc_query_executions {
    94 94   let sequence_steps = get_execution_steps(
    95  - http_client,
     95 + http_context,
    96 96   alias.clone(),
    97 97   &ndc_query_execution.process_response_as,
    98 98   ndc_query_execution.execution_tree.remote_executions,
    skipped 44 lines
    143 143   
    144 144  /// Produce an /explain plan for a given GraphQL mutation.
    145 145  pub(crate) async fn explain_mutation_plan(
    146  - http_client: &reqwest::Client,
     146 + http_context: &HttpContext,
    147 147   mutation_plan: plan::MutationPlan<'_, '_, '_>,
    148 148  ) -> Result<types::Step, error::Error> {
    149 149   let mut root_steps = vec![];
    skipped 7 lines
    157 157   for (_, mutation_group) in mutation_plan.nodes {
    158 158   for (alias, ndc_mutation_execution) in mutation_group {
    159 159   let sequence_steps = get_execution_steps(
    160  - http_client,
     160 + http_context,
    161 161   alias,
    162 162   &ndc_mutation_execution.process_response_as,
    163 163   ndc_mutation_execution.join_locations,
    skipped 18 lines
    182 182  }
    183 183   
    184 184  async fn get_execution_steps<'s>(
    185  - http_client: &reqwest::Client,
     185 + http_context: &HttpContext,
    186 186   alias: gql::ast::common::Alias,
    187 187   process_response_as: &ProcessResponseAs<'s>,
    188 188   join_locations: JoinLocations<(RemoteJoin<'s, '_>, JoinId)>,
    skipped 3 lines
    192 192   let mut sequence_steps = match process_response_as {
    193 193   ProcessResponseAs::CommandResponse { .. } => {
    194 194   // A command execution node
    195  - let data_connector_explain =
    196  - fetch_explain_from_data_connector(http_client, ndc_request.clone(), data_connector)
    197  - .await;
     195 + let data_connector_explain = fetch_explain_from_data_connector(
     196 + http_context,
     197 + ndc_request.clone(),
     198 + data_connector,
     199 + )
     200 + .await;
    198 201   NonEmpty::new(Box::new(types::Step::CommandSelect(
    199 202   types::CommandSelectIR {
    200 203   command_name: alias.to_string(),
    skipped 4 lines
    205 208   }
    206 209   ProcessResponseAs::Array { .. } | ProcessResponseAs::Object { .. } => {
    207 210   // A model execution node
    208  - let data_connector_explain =
    209  - fetch_explain_from_data_connector(http_client, ndc_request.clone(), data_connector)
    210  - .await;
     211 + let data_connector_explain = fetch_explain_from_data_connector(
     212 + http_context,
     213 + ndc_request.clone(),
     214 + data_connector,
     215 + )
     216 + .await;
    211 217   NonEmpty::new(Box::new(types::Step::ModelSelect(types::ModelSelectIR {
    212 218   model_name: alias.to_string(),
    213 219   ndc_request,
    skipped 1 lines
    215 221   })))
    216 222   }
    217 223   };
    218  - if let Some(join_steps) = get_join_steps(alias.to_string(), join_locations, http_client).await {
     224 + if let Some(join_steps) = get_join_steps(alias.to_string(), join_locations, http_context).await
     225 + {
    219 226   sequence_steps.push(Box::new(types::Step::Sequence(join_steps)));
    220 227   sequence_steps.push(Box::new(types::Step::HashJoin));
    221 228   };
    skipped 8 lines
    230 237  async fn get_join_steps(
    231 238   _root_field_name: String,
    232 239   join_locations: JoinLocations<(RemoteJoin<'async_recursion, 'async_recursion>, JoinId)>,
    233  - http_client: &reqwest::Client,
     240 + http_context: &HttpContext,
    234 241  ) -> Option<NonEmpty<Box<types::Step>>> {
    235 242   let mut sequence_join_steps = vec![];
    236 243   for (alias, location) in join_locations.locations {
    skipped 3 lines
    240 247   query_request.variables = Some(vec![]);
    241 248   let ndc_request = types::NDCRequest::Query(query_request);
    242 249   let data_connector_explain = fetch_explain_from_data_connector(
    243  - http_client,
     250 + http_context,
    244 251   ndc_request.clone(),
    245 252   remote_join.target_data_connector,
    246 253   )
    skipped 18 lines
    265 272   },
    266 273   )))
    267 274   };
    268  - if let Some(rest_join_steps) = get_join_steps(alias, location.rest, http_client).await {
     275 + if let Some(rest_join_steps) = get_join_steps(alias, location.rest, http_context).await {
    269 276   sequence_steps.push(Box::new(types::Step::Sequence(rest_join_steps)));
    270 277   sequence_steps.push(Box::new(types::Step::HashJoin));
    271 278   };
    skipped 34 lines
    306 313  }
    307 314   
    308 315  async fn fetch_explain_from_data_connector(
    309  - http_client: &reqwest::Client,
     316 + http_context: &HttpContext,
    310 317   ndc_request: types::NDCRequest,
    311 318   data_connector: &resolved::data_connector::DataConnectorLink,
    312 319  ) -> types::NDCExplainResponse {
    skipped 8 lines
    321 328   base_path: data_connector.url.get_url(ast::OperationType::Query),
    322 329   user_agent: None,
    323 330   // This is isn't expensive, reqwest::Client is behind an Arc
    324  - client: http_client.clone(),
     331 + client: http_context.client.clone(),
    325 332   headers: data_connector.headers.0.clone(),
     333 + response_size_limit: http_context.ndc_response_size_limit,
    326 334   };
    327 335   {
    328 336   // TODO: use capabilities from the data connector context
    skipped 72 lines
  • ■ ■ ■ ■ ■
    v3/crates/engine/src/execute/ndc/client.rs
     1 +use super::response::handle_response_with_size_limit;
    1 2  use ndc_client::models as ndc_models;
    2 3  use reqwest::header::{HeaderMap, HeaderValue};
    3 4  use serde::{de::DeserializeOwned, Deserialize};
    skipped 9 lines
    13 14   pub user_agent: Option<String>,
    14 15   pub client: reqwest::Client,
    15 16   pub headers: HeaderMap<HeaderValue>,
     17 + pub response_size_limit: Option<usize>,
    16 18  }
    17 19   
    18 20  /// Error type for the NDC API client interactions
    skipped 5 lines
    24 26   ConnectorError(ConnectorError),
    25 27   InvalidConnectorError(InvalidConnectorError),
    26 28   InvalidBaseURL,
     29 + ResponseTooLarge(String),
    27 30  }
    28 31   
    29 32  impl fmt::Display for Error {
    skipped 5 lines
    35 38   Error::ConnectorError(e) => ("response", format!("status code {}", e.status)),
    36 39   Error::InvalidConnectorError(e) => ("response", format!("status code {}", e.status)),
    37 40   Error::InvalidBaseURL => ("url", "invalid base URL".into()),
     41 + Error::ResponseTooLarge(message) => ("response", format!("too large: {}", message)),
    38 42   };
    39 43   write!(f, "error in {}: {}", module, e)
    40 44   }
    skipped 197 lines
    238 242   let resp = configuration.client.execute(request).await?;
    239 243   
    240 244   let response_status = resp.status();
    241  - let response_content = resp.json().await?;
     245 + 
     246 + let response_content = match configuration.response_size_limit {
     247 + None => resp.json().await?,
     248 + Some(size_limit) => handle_response_with_size_limit(resp, size_limit).await?,
     249 + };
    242 250   
    243 251   if !response_status.is_client_error() && !response_status.is_server_error() {
    244 252   serde_json::from_value(response_content).map_err(Error::from)
    skipped 71 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/src/execute/ndc/response.rs
     1 +use super::client as ndc_client;
     2 + 
     3 +/// Handle response return from an NDC request by applying the size limit and
     4 +/// deserializing into a JSON value
     5 +pub(crate) async fn handle_response_with_size_limit(
     6 + response: reqwest::Response,
     7 + size_limit: usize,
     8 +) -> Result<serde_json::Value, ndc_client::Error> {
     9 + if let Some(content_length) = &response.content_length() {
     10 + // Check with content length
     11 + if *content_length > size_limit as u64 {
     12 + Err(ndc_client::Error::ResponseTooLarge(format!(
     13 + "Received content length {} exceeds the limit {}",
     14 + content_length, size_limit
     15 + )))
     16 + } else {
     17 + Ok(response.json().await?)
     18 + }
     19 + } else {
     20 + // If no content length found, then check chunk-by-chunk
     21 + handle_response_by_chunks_with_size_limit(response, size_limit).await
     22 + }
     23 +}
     24 + 
     25 +/// Handle response by chunks. For each chunk consumed, check if the total size exceeds the limit.
     26 +///
     27 +/// This logic is separated in a function to allow testing.
     28 +async fn handle_response_by_chunks_with_size_limit(
     29 + response: reqwest::Response,
     30 + size_limit: usize,
     31 +) -> Result<serde_json::Value, ndc_client::Error> {
     32 + let mut size = 0;
     33 + let mut buf = bytes::BytesMut::new();
     34 + let mut response = response;
     35 + while let Some(chunk) = response.chunk().await? {
     36 + size += chunk.len();
     37 + if size > size_limit {
     38 + return Err(ndc_client::Error::ResponseTooLarge(format!(
     39 + "Size exceeds the limit {}",
     40 + size_limit
     41 + )));
     42 + } else {
     43 + buf.extend_from_slice(&chunk);
     44 + }
     45 + }
     46 + Ok(serde_json::from_slice(&buf)?)
     47 +}
     48 + 
     49 +#[cfg(test)]
     50 +mod test {
     51 + use pretty_assertions::assert_eq;
     52 + 
     53 + #[tokio::test]
     54 + async fn test_content_length() {
     55 + let mut server = mockito::Server::new_async().await;
     56 + let test_api = server
     57 + .mock("GET", "/test")
     58 + .with_status(200)
     59 + .with_header("content-type", "application/json")
     60 + .with_body(r#"{"message": "hello"}"#)
     61 + .create();
     62 + let response = reqwest::get(server.url() + "/test").await.unwrap();
     63 + test_api.assert();
     64 + let err = super::handle_response_with_size_limit(response, 10)
     65 + .await
     66 + .unwrap_err();
     67 + assert_eq!(
     68 + err.to_string(),
     69 + "error in response: too large: Received content length 20 exceeds the limit 10"
     70 + )
     71 + }
     72 + 
     73 + #[tokio::test]
     74 + async fn test_chunk_by_chunk() {
     75 + let mut server = mockito::Server::new_async().await;
     76 + let test_api = server
     77 + .mock("GET", "/test")
     78 + .with_status(200)
     79 + .with_header("content-type", "application/json")
     80 + .with_body(r#"{"message": "hello"}"#)
     81 + .create();
     82 + let response = reqwest::get(server.url() + "/test").await.unwrap();
     83 + test_api.assert();
     84 + let err = super::handle_response_by_chunks_with_size_limit(response, 5)
     85 + .await
     86 + .unwrap_err();
     87 + assert_eq!(
     88 + err.to_string(),
     89 + "error in response: too large: Size exceeds the limit 5"
     90 + )
     91 + }
     92 + 
     93 + #[tokio::test]
     94 + async fn test_success() {
     95 + let json = serde_json::json!(
     96 + [
     97 + {"name": "Alice"},
     98 + {"name": "Bob"},
     99 + {"name": "Charlie"}
     100 + ]
     101 + );
     102 + let mut server = mockito::Server::new_async().await;
     103 + let test_api = server
     104 + .mock("GET", "/test")
     105 + .with_status(200)
     106 + .with_header("content-type", "application/json")
     107 + .with_body(serde_json::to_vec(&json).unwrap())
     108 + .create();
     109 + let response = reqwest::get(server.url() + "/test").await.unwrap();
     110 + test_api.assert();
     111 + let res = super::handle_response_with_size_limit(response, 100)
     112 + .await
     113 + .unwrap();
     114 + assert_eq!(json, res)
     115 + }
     116 + 
     117 + #[tokio::test]
     118 + async fn test_success_by_chunks() {
     119 + let json = serde_json::json!(
     120 + [
     121 + {"name": "Alice"},
     122 + {"name": "Bob"},
     123 + {"name": "Charlie"}
     124 + ]
     125 + );
     126 + let mut server = mockito::Server::new_async().await;
     127 + let test_api = server
     128 + .mock("GET", "/test")
     129 + .with_status(200)
     130 + .with_header("content-type", "application/json")
     131 + .with_body(serde_json::to_vec(&json).unwrap())
     132 + .create();
     133 + let response = reqwest::get(server.url() + "/test").await.unwrap();
     134 + test_api.assert();
     135 + let res = super::handle_response_by_chunks_with_size_limit(response, 100)
     136 + .await
     137 + .unwrap();
     138 + assert_eq!(json, res)
     139 + }
     140 +}
     141 + 
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/src/execute/ndc.rs
     1 +pub mod response;
     2 + 
    1 3  use axum::http::HeaderMap;
    2 4  use serde_json as json;
    3 5   
    skipped 5 lines
    9 11   
    10 12  use super::plan::ProcessResponseAs;
    11 13  use super::process_response::process_command_mutation_response;
    12  -use super::{error, ProjectId};
     14 +use super::{error, HttpContext, ProjectId};
    13 15  use crate::metadata::resolved;
    14 16  use crate::schema::GDS;
    15 17   
    skipped 3 lines
    19 21   
    20 22  /// Executes a NDC operation
    21 23  pub async fn execute_ndc_query<'n, 's>(
    22  - http_client: &reqwest::Client,
     24 + http_context: &HttpContext,
    23 25   query: ndc_models::QueryRequest,
    24 26   data_connector: &resolved::data_connector::DataConnectorLink,
    25 27   execution_span_attribute: String,
    skipped 18 lines
    44 46   field_span_attribute,
    45 47   );
    46 48   let connector_response =
    47  - fetch_from_data_connector(http_client, query, data_connector, project_id)
     49 + fetch_from_data_connector(http_context, query, data_connector, project_id)
    48 50   .await?;
    49 51   Ok(connector_response.0)
    50 52   })
    skipped 3 lines
    54 56  }
    55 57   
    56 58  pub(crate) async fn fetch_from_data_connector<'s>(
    57  - http_client: &reqwest::Client,
     59 + http_context: &HttpContext,
    58 60   query_request: ndc_models::QueryRequest,
    59 61   data_connector: &resolved::data_connector::DataConnectorLink,
    60 62   project_id: Option<ProjectId>,
    skipped 11 lines
    72 74   base_path: data_connector.url.get_url(ast::OperationType::Query),
    73 75   user_agent: None,
    74 76   // This is isn't expensive, reqwest::Client is behind an Arc
    75  - client: http_client.clone(),
     77 + client: http_context.client.clone(),
    76 78   headers,
     79 + response_size_limit: http_context.ndc_response_size_limit,
    77 80   };
    78 81   client::query_post(&ndc_config, query_request)
    79 82   .await
    skipped 24 lines
    104 107   
    105 108  /// Executes a NDC mutation
    106 109  pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>(
    107  - http_client: &reqwest::Client,
     110 + http_context: &HttpContext,
    108 111   query: ndc_models::MutationRequest,
    109 112   data_connector: &resolved::data_connector::DataConnectorLink,
    110 113   selection_set: &'n normalized_ast::SelectionSet<'s, GDS>,
    skipped 20 lines
    131 134   field_span_attribute,
    132 135   );
    133 136   let connector_response = fetch_from_data_connector_mutation(
    134  - http_client,
     137 + http_context,
    135 138   query,
    136 139   data_connector,
    137 140   project_id,
    skipped 35 lines
    173 176  }
    174 177   
    175 178  pub(crate) async fn fetch_from_data_connector_mutation<'s>(
    176  - http_client: &reqwest::Client,
     179 + http_context: &HttpContext,
    177 180   query_request: ndc_models::MutationRequest,
    178 181   data_connector: &resolved::data_connector::DataConnectorLink,
    179 182   project_id: Option<ProjectId>,
    skipped 11 lines
    191 194   base_path: data_connector.url.get_url(ast::OperationType::Mutation),
    192 195   user_agent: None,
    193 196   // This is isn't expensive, reqwest::Client is behind an Arc
    194  - client: http_client.clone(),
     197 + client: http_context.client.clone(),
    195 198   headers,
     199 + response_size_limit: http_context.ndc_response_size_limit,
    196 200   };
    197 201   client::mutation_post(&ndc_config, query_request)
    198 202   .await
    skipped 7 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/src/execute/plan.rs
    skipped 20 lines
    21 21  use super::remote_joins::types::{
    22 22   JoinId, JoinLocations, JoinNode, Location, LocationKind, MonotonicCounter, RemoteJoin,
    23 23  };
    24  -use super::ProjectId;
     24 +use super::{HttpContext, ProjectId};
    25 25  use crate::metadata::resolved::{self, subgraph};
    26 26  use crate::schema::GDS;
    27 27   
    skipped 519 lines
    547 547   
    548 548  /// Execute a single root field's query plan to produce a result.
    549 549  async fn execute_query_field_plan<'n, 's, 'ir>(
    550  - http_client: &reqwest::Client,
     550 + http_context: &HttpContext,
    551 551   query_plan: NodeQueryPlan<'n, 's, 'ir>,
    552 552   project_id: Option<ProjectId>,
    553 553  ) -> RootFieldResult {
    skipped 49 lines
    603 603   }
    604 604   NodeQueryPlan::NDCQueryExecution(ndc_query) => RootFieldResult::new(
    605 605   &ndc_query.process_response_as.is_nullable(),
    606  - resolve_ndc_query_execution(http_client, ndc_query, project_id).await,
     606 + resolve_ndc_query_execution(http_context, ndc_query, project_id).await,
    607 607   ),
    608 608   NodeQueryPlan::RelayNodeSelect(optional_query) => RootFieldResult::new(
    609 609   &optional_query.as_ref().map_or(true, |ndc_query| {
    610 610   ndc_query.process_response_as.is_nullable()
    611 611   }),
    612  - resolve_optional_ndc_select(http_client, optional_query, project_id)
     612 + resolve_optional_ndc_select(http_context, optional_query, project_id)
    613 613   .await,
    614 614   ),
    615 615   NodeQueryPlan::ApolloFederationSelect(
    skipped 6 lines
    622 622   // To run the field plans parallely, we will need to use tokio::spawn for each field plan.
    623 623   let task = async {
    624 624   (resolve_optional_ndc_select(
    625  - http_client,
     625 + http_context,
    626 626   Some(query),
    627 627   project_id.clone(),
    628 628   )
    skipped 65 lines
    694 694   
    695 695  /// Execute a single root field's mutation plan to produce a result.
    696 696  async fn execute_mutation_field_plan<'n, 's, 'ir>(
    697  - http_client: &reqwest::Client,
     697 + http_context: &HttpContext,
    698 698   mutation_plan: NDCMutationExecution<'n, 's, 'ir>,
    699 699   project_id: Option<ProjectId>,
    700 700  ) -> RootFieldResult {
    skipped 6 lines
    707 707   Box::pin(async {
    708 708   RootFieldResult::new(
    709 709   &mutation_plan.process_response_as.is_nullable(),
    710  - resolve_ndc_mutation_execution(http_client, mutation_plan, project_id)
     710 + resolve_ndc_mutation_execution(http_context, mutation_plan, project_id)
    711 711   .await,
    712 712   )
    713 713   })
    skipped 6 lines
    720 720  /// root fields of the mutation sequentially rather than concurrently, in the order defined by the
    721 721  /// `IndexMap`'s keys.
    722 722  pub async fn execute_mutation_plan<'n, 's, 'ir>(
    723  - http_client: &reqwest::Client,
     723 + http_context: &HttpContext,
    724 724   mutation_plan: MutationPlan<'n, 's, 'ir>,
    725 725   project_id: Option<ProjectId>,
    726 726  ) -> ExecuteQueryResult {
    skipped 16 lines
    743 743   for (alias, field_plan) in mutation_group {
    744 744   executed_root_fields.push((
    745 745   alias,
    746  - execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await,
     746 + execute_mutation_field_plan(http_context, field_plan, project_id.clone()).await,
    747 747   ));
    748 748   }
    749 749   }
    skipped 9 lines
    759 759  /// Given an entire plan for a query, produce a result. We do this by executing all the singular
    760 760  /// root fields of the query in parallel, and joining the results back together.
    761 761  pub async fn execute_query_plan<'n, 's, 'ir>(
    762  - http_client: &reqwest::Client,
     762 + http_context: &HttpContext,
    763 763   query_plan: QueryPlan<'n, 's, 'ir>,
    764 764   project_id: Option<ProjectId>,
    765 765  ) -> ExecuteQueryResult {
    skipped 7 lines
    773 773   let task = async {
    774 774   (
    775 775   alias,
    776  - execute_query_field_plan(http_client, field_plan, project_id.clone()).await,
     776 + execute_query_field_plan(http_context, field_plan, project_id.clone()).await,
    777 777   )
    778 778   };
    779 779   
    skipped 44 lines
    824 824  }
    825 825   
    826 826  async fn resolve_ndc_query_execution(
    827  - http_client: &reqwest::Client,
     827 + http_context: &HttpContext,
    828 828   ndc_query: NDCQueryExecution<'_, '_>,
    829 829   project_id: Option<ProjectId>,
    830 830  ) -> Result<json::Value, error::Error> {
    skipped 5 lines
    836 836   process_response_as,
    837 837   } = ndc_query;
    838 838   let mut response = ndc::execute_ndc_query(
    839  - http_client,
     839 + http_context,
    840 840   execution_tree.root_node.query,
    841 841   execution_tree.root_node.data_connector,
    842 842   execution_span_attribute.clone(),
    skipped 4 lines
    847 847   // TODO: Failures in remote joins should result in partial response
    848 848   // https://github.com/hasura/v3-engine/issues/229
    849 849   execute_join_locations(
    850  - http_client,
     850 + http_context,
    851 851   execution_span_attribute,
    852 852   field_span_attribute,
    853 853   &mut response,
    skipped 7 lines
    861 861  }
    862 862   
    863 863  async fn resolve_ndc_mutation_execution(
    864  - http_client: &reqwest::Client,
     864 + http_context: &HttpContext,
    865 865   ndc_query: NDCMutationExecution<'_, '_, '_>,
    866 866   project_id: Option<ProjectId>,
    867 867  ) -> Result<json::Value, error::Error> {
    skipped 8 lines
    876 876   join_locations: _,
    877 877   } = ndc_query;
    878 878   let response = ndc::execute_ndc_mutation(
    879  - http_client,
     879 + http_context,
    880 880   query,
    881 881   data_connector,
    882 882   selection_set,
    skipped 7 lines
    890 890  }
    891 891   
    892 892  async fn resolve_optional_ndc_select(
    893  - http_client: &reqwest::Client,
     893 + http_context: &HttpContext,
    894 894   optional_query: Option<NDCQueryExecution<'_, '_>>,
    895 895   project_id: Option<ProjectId>,
    896 896  ) -> Result<json::Value, error::Error> {
    897 897   match optional_query {
    898 898   None => Ok(json::Value::Null),
    899  - Some(ndc_query) => resolve_ndc_query_execution(http_client, ndc_query, project_id).await,
     899 + Some(ndc_query) => resolve_ndc_query_execution(http_context, ndc_query, project_id).await,
    900 900   }
    901 901  }
    902 902   
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/src/execute/remote_joins.rs
    skipped 83 lines
    84 84   
    85 85  use super::ndc::execute_ndc_query;
    86 86  use super::plan::ProcessResponseAs;
    87  -use super::{error, ProjectId};
     87 +use super::{error, HttpContext, ProjectId};
    88 88   
    89 89  use self::collect::CollectArgumentResult;
    90 90  use types::{Argument, JoinId, JoinLocations, RemoteJoin};
    skipped 6 lines
    97 97  /// for the top-level query, and executes further remote joins recursively.
    98 98  #[async_recursion]
    99 99  pub(crate) async fn execute_join_locations<'ir>(
    100  - http_client: &reqwest::Client,
     100 + http_context: &HttpContext,
    101 101   execution_span_attribute: String,
    102 102   field_span_attribute: String,
    103 103   lhs_response: &mut Vec<ndc_models::RowSet>,
    skipped 34 lines
    138 138   SpanVisibility::Internal,
    139 139   || {
    140 140   Box::pin(execute_ndc_query(
    141  - http_client,
     141 + http_context,
    142 142   join_node.target_ndc_ir,
    143 143   join_node.target_data_connector,
    144 144   execution_span_attribute.clone(),
    skipped 8 lines
    153 153   // will modify the `target_response` with all joins down the tree
    154 154   if !location.rest.locations.is_empty() {
    155 155   execute_join_locations(
    156  - http_client,
     156 + http_context,
    157 157   execution_span_attribute.clone(),
    158 158   // TODO: is this field span correct?
    159 159   field_span_attribute.clone(),
    skipped 20 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/src/execute.rs
    skipped 27 lines
    28 28  pub mod process_response;
    29 29  pub mod remote_joins;
    30 30   
     31 +/// Context for making HTTP requests
     32 +pub struct HttpContext {
     33 + /// The HTTP client to use for making requests
     34 + pub client: reqwest::Client,
     35 + /// Response size limit for NDC requests
     36 + pub ndc_response_size_limit: Option<usize>,
     37 +}
     38 + 
    31 39  #[derive(Debug)]
    32 40  /// A simple wrapper around a reference of GraphQL errors
    33 41  pub struct GraphQLErrors<'a>(pub &'a nonempty::NonEmpty<gql::http::GraphQLError>);
    skipped 49 lines
    83 91  pub struct ProjectId(pub String);
    84 92   
    85 93  pub async fn execute_query(
    86  - http_client: &reqwest::Client,
     94 + http_context: &HttpContext,
    87 95   schema: &Schema<GDS>,
    88 96   session: &Session,
    89 97   request: RawRequest,
    90 98   project_id: Option<ProjectId>,
    91 99  ) -> GraphQLResponse {
    92  - execute_query_internal(http_client, schema, session, request, project_id)
     100 + execute_query_internal(http_context, schema, session, request, project_id)
    93 101   .await
    94 102   .unwrap_or_else(|e| GraphQLResponse(Response::error(e.to_graphql_error(None))))
    95 103  }
    skipped 19 lines
    115 123   
    116 124  /// Executes a GraphQL query
    117 125  pub async fn execute_query_internal(
    118  - http_client: &reqwest::Client,
     126 + http_context: &HttpContext,
    119 127   schema: &gql::schema::Schema<GDS>,
    120 128   session: &Session,
    121 129   raw_request: gql::http::RawRequest,
    122 130   project_id: Option<ProjectId>,
    123 131  ) -> Result<GraphQLResponse, error::Error> {
    124 132   let query_response = execute_request_internal(
    125  - http_client,
     133 + http_context,
    126 134   schema,
    127 135   session,
    128 136   raw_request,
    skipped 13 lines
    142 150   
    143 151  /// Executes or explains (query plan) a GraphQL query
    144 152  pub async fn execute_request_internal(
    145  - http_client: &reqwest::Client,
     153 + http_context: &HttpContext,
    146 154   schema: &gql::schema::Schema<GDS>,
    147 155   session: &Session,
    148 156   raw_request: gql::http::RawRequest,
    skipped 78 lines
    227 235   let execute_query_result = match request_plan {
    228 236   plan::RequestPlan::MutationPlan(mutation_plan) => {
    229 237   plan::execute_mutation_plan(
    230  - http_client,
     238 + http_context,
    231 239   mutation_plan,
    232 240   project_id,
    233 241   )
    skipped 1 lines
    235 243   }
    236 244   plan::RequestPlan::QueryPlan(query_plan) => {
    237 245   plan::execute_query_plan(
    238  - http_client,
     246 + http_context,
    239 247   query_plan,
    240 248   project_id,
    241 249   )
    skipped 15 lines
    257 265   let request_result = match request_plan {
    258 266   plan::RequestPlan::MutationPlan(mutation_plan) => {
    259 267   crate::execute::explain::explain_mutation_plan(
    260  - http_client,
     268 + http_context,
    261 269   mutation_plan,
    262 270   )
    263 271   .await
    264 272   }
    265 273   plan::RequestPlan::QueryPlan(query_plan) => {
    266 274   crate::execute::explain::explain_query_plan(
    267  - http_client,
     275 + http_context,
    268 276   query_plan,
    269 277   )
    270 278   .await
    skipped 140 lines
  • ■ ■ ■ ■ ■ ■
    v3/crates/engine/tests/common.rs
    skipped 10 lines
    11 11   path::PathBuf,
    12 12  };
    13 13   
    14  -use engine::execute::execute_query;
     14 +use engine::execute::{execute_query, HttpContext};
    15 15  use engine::schema::GDS;
    16 16   
    17 17  extern crate json_value_merge;
    skipped 1 lines
    19 19  use serde_json::Value;
    20 20   
    21 21  pub struct GoldenTestContext {
    22  - http_client: reqwest::Client,
     22 + http_context: HttpContext,
    23 23   mint: Mint,
    24 24  }
    25 25   
    26 26  pub fn setup(test_dir: &Path) -> GoldenTestContext {
    27  - let http_client = reqwest::Client::new();
     27 + let http_context = HttpContext {
     28 + client: reqwest::Client::new(),
     29 + ndc_response_size_limit: None,
     30 + };
    28 31   let mint = Mint::new(test_dir);
    29  - GoldenTestContext { http_client, mint }
     32 + GoldenTestContext { http_context, mint }
    30 33  }
    31 34   
    32 35  fn resolve_session(
    skipped 64 lines
    97 100   // Execute the test
    98 101   
    99 102   let response =
    100  - execute_query(&test_ctx.http_client, &schema, &session, raw_request, None).await;
     103 + execute_query(&test_ctx.http_context, &schema, &session, raw_request, None).await;
    101 104   
    102 105   let mut expected = test_ctx.mint.new_goldenfile_with_differ(
    103 106   response_path,
    skipped 85 lines
    189 192   let mut responses = Vec::new();
    190 193   for session in sessions.iter() {
    191 194   let response = execute_query(
    192  - &test_ctx.http_client,
     195 + &test_ctx.http_context,
    193 196   &schema,
    194 197   session,
    195 198   raw_request.clone(),
    skipped 92 lines
    288 291   let mut responses = Vec::new();
    289 292   for session in sessions.iter() {
    290 293   let response = execute_query(
    291  - &test_ctx.http_client,
     294 + &test_ctx.http_context,
    292 295   &schema,
    293 296   session,
    294 297   raw_request.clone(),
    skipped 75 lines
    370 373   variables: None,
    371 374   };
    372 375   let raw_response = engine::execute::explain::execute_explain(
    373  - &test_ctx.http_client,
     376 + &test_ctx.http_context,
    374 377   &schema,
    375 378   &session,
    376 379   raw_request,
    skipped 22 lines
Please wait...
Page is in error, reload to recover