| skipped 255 lines |
256 | 256 | | step.Parallel = 1 |
257 | 257 | | } |
258 | 258 | | |
259 | | - | // skip concurrency part |
260 | | - | if step.Parallel == 1 { |
261 | | - | for index, line := range data { |
262 | | - | customParams := make(map[string]string) |
263 | | - | customParams["line"] = line |
264 | | - | customParams["line_id"] = fmt.Sprintf("%v-%v", path.Base(line), index) |
265 | | - | customParams["_id_"] = fmt.Sprintf("%v", index) |
266 | | - | customParams["_line_"] = execution.StripName(line) |
267 | | - | |
268 | | - | if len(step.Commands) > 0 { |
269 | | - | r.RunCommands(step.Commands, step.Std) |
270 | | - | } |
271 | | - | |
272 | | - | if len(step.Ose) > 0 { |
273 | | - | for _, ose := range step.Ose { |
274 | | - | r.RunOse(ose) |
275 | | - | } |
276 | | - | } |
277 | | - | |
278 | | - | if len(step.Scripts) > 0 { |
279 | | - | r.RunScripts(step.Scripts) |
280 | | - | } |
281 | | - | |
282 | | - | // post scripts |
283 | | - | if len(step.PConditions) > 0 || len(step.PScripts) > 0 { |
284 | | - | err := r.CheckCondition(step.PConditions) |
285 | | - | if err == nil { |
286 | | - | if len(step.PScripts) > 0 { |
287 | | - | r.RunScripts(step.PScripts) |
288 | | - | } |
289 | | - | } |
290 | | - | } |
291 | | - | |
292 | | - | } |
293 | | - | |
294 | | - | if step.Label != "" { |
295 | | - | utils.BlockF("Done-Step", color.HiCyanString(step.Label)) |
296 | | - | } |
297 | | - | return out, nil |
298 | | - | } |
299 | | - | |
300 | | - | ///////////// |
301 | | - | // run multiple steps in concurrency mode |
302 | | - | |
303 | | - | utils.DebugF("Run step in Parallel: %v", step.Parallel) |
304 | | - | var wg sync.WaitGroup |
305 | | - | p, _ := ants.NewPoolWithFunc(step.Parallel, func(i interface{}) { |
306 | | - | r.startStepJob(i) |
307 | | - | wg.Done() |
308 | | - | }, ants.WithPreAlloc(true)) |
309 | | - | defer p.Release() |
310 | | - | |
311 | | - | //var mu sync.Mutex |
| 259 | + | // prepare the data first |
| 260 | + | var newGeneratedSteps []libs.Step |
312 | 261 | | for index, line := range data { |
313 | | - | //mu.Lock() |
314 | 262 | | customParams := make(map[string]string) |
315 | | - | //localOptions := libs.Options{} |
316 | 263 | | customParams["line"] = line |
317 | 264 | | customParams["line_id"] = fmt.Sprintf("%v-%v", path.Base(line), index) |
318 | 265 | | customParams["_id_"] = fmt.Sprintf("%v", index) |
| skipped 30 lines |
349 | 296 | | localStep.PScripts = append(localStep.PScripts, AltResolveVariable(script, customParams)) |
350 | 297 | | } |
351 | 298 | | |
| 299 | + | newGeneratedSteps = append(newGeneratedSteps, localStep) |
| 300 | + | } |
| 301 | + | |
| 302 | + | // skip concurrency part |
| 303 | + | if step.Parallel == 1 { |
| 304 | + | for _, newGeneratedStep := range newGeneratedSteps { |
| 305 | + | out, err = r.RunStep(newGeneratedStep) |
| 306 | + | if err != nil { |
| 307 | + | continue |
| 308 | + | } |
| 309 | + | } |
| 310 | + | if step.Label != "" { |
| 311 | + | utils.BlockF("Done-Step", color.HiCyanString(step.Label)) |
| 312 | + | } |
| 313 | + | } |
| 314 | + | |
| 315 | + | ///////////// |
| 316 | + | // run multiple steps in concurrency mode |
| 317 | + | |
| 318 | + | utils.DebugF("Run step in Parallel: %v", step.Parallel) |
| 319 | + | var wg sync.WaitGroup |
| 320 | + | p, _ := ants.NewPoolWithFunc(step.Parallel, func(i interface{}) { |
| 321 | + | r.startStepJob(i) |
| 322 | + | wg.Done() |
| 323 | + | }, ants.WithPreAlloc(true)) |
| 324 | + | defer p.Release() |
| 325 | + | |
| 326 | + | for _, newGeneratedStep := range newGeneratedSteps { |
352 | 327 | | wg.Add(1) |
353 | | - | err = p.Invoke(localStep) |
| 328 | + | err = p.Invoke(newGeneratedStep) |
354 | 329 | | if err != nil { |
355 | 330 | | utils.ErrorF("Error in parallel: %v", err) |
356 | 331 | | } |
357 | | - | //mu.Unlock() |
358 | 332 | | } |
359 | 333 | | |
360 | 334 | | wg.Wait() |
| skipped 46 lines |