Home
Softono
ordered-concurrently

ordered-concurrently

Open source BSD-3-Clause Go
46
Stars
8
Forks
3
Issues
1
Watchers
3 years
Last Commit

About ordered-concurrently

Ordered-concurrently a library for concurrent processing with ordered output in Go. Process work concurrently and returns output in a channel in the order of input. It is useful in concurrently processing items in a queue, and get output in the order provided by the queue.

Platforms

Web Self-hosted

Languages

Go

Links

Tests codecov Go Reference Gitpod ready-to-code Go Report Card

Ordered Concurrently

A library for parallel processing with ordered output in Go. This module processes work concurrently / in parallel and returns output in a channel in the order of input. It is useful in concurrently / parallelly processing items in a queue, and get output in the order provided by the queue.

Usage

Get Module

go get github.com/tejzpr/ordered-concurrently/v3

Import Module in your source code

import concurrently "github.com/tejzpr/ordered-concurrently/v3" 

Create a work function by implementing WorkFunction interface

// Create a type based on your input to the work function
type loadWorker int

// The work that needs to be performed
// The input type should implement the WorkFunction interface
func (w loadWorker) Run(ctx context.Context) interface{} {
    time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
    return w * 2
}

Demo

Go Playground

Run

Example - 1

func main() {
    max := 10
    inputChan := make(chan concurrently.WorkFunction)
    ctx := context.Background()
    output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
    go func() {
        for work := 0; work < max; work++ {
            inputChan <- loadWorker(work)
        }
        close(inputChan)
    }()
    for out := range output {
        log.Println(out.Value)
    }
}

Example - 2 - Process unknown number of inputs

func main() {
    inputChan := make(chan concurrently.WorkFunction, 10)
    ctx := context.Background()
    output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})

    ticker := time.NewTicker(100 * time.Millisecond)
    done := make(chan bool)
    wg := &sync.WaitGroup{}
    go func() {
        input := 0
        for {
            select {
            case <-done:
                return
            case <-ticker.C:
                inputChan <- loadWorker(input)
                wg.Add(1)
                input++
            default:
            }
        }
    }()

    var res []loadWorker
    go func() {
        for out := range output {
            res = append(res, out.Value.(loadWorker))
            wg.Done()
        }
    }()

    time.Sleep(1600 * time.Millisecond)
    ticker.Stop()
    done <- true
    close(inputChan)
    wg.Wait()

    // Check if output is sorted
    isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
        return res[i] < res[j]
    })
    if !isSorted {
        log.Println("output is not sorted")
    }
}

Credits

  1. u/justinisrael for inputs on improving resource usage.
  2. mh-cbon for identifying potential deadlocks.