mad
mad

Reputation: 31

Using Goroutines to Annotate and Fetch Data in Background – Semaphore Acquisition Error within Route Handler

I am implementing a file annotation process in a Go web application. The process involves reading a file, storing IDs in an array, chunking the IDs, and then fetching data from a database using the IN clause. The code works perfectly fine when executed independently. However, when I run the annotation process inside a goroutine within my route handler (HandleAnnotations), I encounter an error related to acquiring the semaphore: fmt.Println("Error acquiring semaphore:", err).

Code Context: In my route handler (HandleAnnotations), I initiate the file annotation process in a goroutine to return a 202 Accepted status and keep the operation running in the background. Here is the relevant part of the handler:

// HandleAnnotations will handle the annotation process
func (a *BatchHandler) HandleAnnotations(c echo.Context) error {
    header, err := c.FormFile("file-upload")

    go func() {
        # I want this long running task to finish in the background
        variants, err := a.BatchRepo.AnnotateFile(ctx, batchData)
    }()

    # I want to return immediately
    return c.JSON(http.StatusAccepted, "file uploaded successfully, the annotation process has started in the background")
}

This is my Annotation Function

func (p *psqlBatchRepository) AnnotateFile(ctx context.Context, batchData domain.BatchInput) (res []domain.BatchVariant, err error) {
    // Open the file and ensure it's closed after the function returns
    file, err := batchData.Header.Open()

    // Read the file line by line and populate variantVcfs
    variantVcfs := make([]string, 0, batchData.LineCount)
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := scanner.Text()

        if len(line) == 0 {
            continue
        }

        variantVcfs = append(variantVcfs, line)
    }

    // Set the maximum size for each chunk
    maxChunkSize := 100

    // Create a 2D slice to store the chunks
    var chunks [][]string

    // Iterate through the variantVcfs to create chunks
    for len(variantVcfs) > 0 {
        // Determine the current chunk size (up to maxChunkSize)
        currentChunkSize := len(variantVcfs)
        if currentChunkSize > maxChunkSize {
            currentChunkSize = maxChunkSize
        }

        // Create the chunk
        chunk := variantVcfs[:currentChunkSize]
        variantVcfs = variantVcfs[currentChunkSize:]

        // Append the chunk to the chunks slice
        chunks = append(chunks, chunk)
    }

    // Set up concurrency control
    maxWorkers := 5
    sem := semaphore.NewWeighted(int64(maxWorkers))
    var wg sync.WaitGroup

    // Create an output channel to collect results
    outputChan := make(chan []domain.BatchVariant, len(chunks))

    // Iterate through the chunks and run the query concurrently
    for i, chunk := range chunks {
        // Acquire a semaphore slot
        if err := sem.Acquire(ctx, 1); err != nil {
            // Handle acquisition error
            fmt.Println("Error acquiring semaphore:", err)
            break
        }

        // Increment the WaitGroup counter
        wg.Add(1)

        // Run the query function in a goroutine
        go func(chunk []string, workerIndex int) {
            defer func() {
                // Release the semaphore slot when done
                sem.Release(1)
                // Decrement the WaitGroup counter
                wg.Done()

            // Call the query function and handle errors
            p.queryFunction(chunk, outputChan)
                
        }(chunk, i)
    }

    // Close the output channel when all goroutines are done
    go func() {
        wg.Wait()
        close(outputChan)
    }()

    // Collect the results from the output channel
    for result := range outputChan {
        res = append(res, result...)
    }

    return res, nil
}

Issue: The code within AnnotateFile runs successfully when not inside another goroutine. However, running it concurrently within the HandleAnnotations handler results in an error related to semaphore acquisition.

Error: fmt.Println("Error acquiring semaphore:", err)

Expected Behavior: I aim to run the annotation process concurrently in the background while returning a 202 Accepted status from the route handler.

I would appreciate insights from anyone with experience building a similar system

Upvotes: 1

Views: 41

Answers (1)

mad
mad

Reputation: 31

Fixed it by removing the request context in AnnotateFile. (the context will be cancelled as soon as the handler returns)

Upvotes: 0

Related Questions