Reputation: 31
I am implementing a file annotation process in a Go web application. The process involves reading a file, storing IDs in an array, chunking the IDs, and then fetching data from a database using the IN clause. The code works perfectly fine when executed independently. However, when I run the annotation process inside a goroutine within my route handler (HandleAnnotations), I encounter an error related to acquiring the semaphore: fmt.Println("Error acquiring semaphore:", err).
Code Context: In my route handler (HandleAnnotations), I initiate the file annotation process in a goroutine to return a 202 Accepted status and keep the operation running in the background. Here is the relevant part of the handler:
// HandleAnnotations will handle the annotation process
func (a *BatchHandler) HandleAnnotations(c echo.Context) error {
header, err := c.FormFile("file-upload")
go func() {
# I want this long running task to finish in the background
variants, err := a.BatchRepo.AnnotateFile(ctx, batchData)
}()
# I want to return immediately
return c.JSON(http.StatusAccepted, "file uploaded successfully, the annotation process has started in the background")
}
This is my Annotation Function
func (p *psqlBatchRepository) AnnotateFile(ctx context.Context, batchData domain.BatchInput) (res []domain.BatchVariant, err error) {
// Open the file and ensure it's closed after the function returns
file, err := batchData.Header.Open()
// Read the file line by line and populate variantVcfs
variantVcfs := make([]string, 0, batchData.LineCount)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 {
continue
}
variantVcfs = append(variantVcfs, line)
}
// Set the maximum size for each chunk
maxChunkSize := 100
// Create a 2D slice to store the chunks
var chunks [][]string
// Iterate through the variantVcfs to create chunks
for len(variantVcfs) > 0 {
// Determine the current chunk size (up to maxChunkSize)
currentChunkSize := len(variantVcfs)
if currentChunkSize > maxChunkSize {
currentChunkSize = maxChunkSize
}
// Create the chunk
chunk := variantVcfs[:currentChunkSize]
variantVcfs = variantVcfs[currentChunkSize:]
// Append the chunk to the chunks slice
chunks = append(chunks, chunk)
}
// Set up concurrency control
maxWorkers := 5
sem := semaphore.NewWeighted(int64(maxWorkers))
var wg sync.WaitGroup
// Create an output channel to collect results
outputChan := make(chan []domain.BatchVariant, len(chunks))
// Iterate through the chunks and run the query concurrently
for i, chunk := range chunks {
// Acquire a semaphore slot
if err := sem.Acquire(ctx, 1); err != nil {
// Handle acquisition error
fmt.Println("Error acquiring semaphore:", err)
break
}
// Increment the WaitGroup counter
wg.Add(1)
// Run the query function in a goroutine
go func(chunk []string, workerIndex int) {
defer func() {
// Release the semaphore slot when done
sem.Release(1)
// Decrement the WaitGroup counter
wg.Done()
// Call the query function and handle errors
p.queryFunction(chunk, outputChan)
}(chunk, i)
}
// Close the output channel when all goroutines are done
go func() {
wg.Wait()
close(outputChan)
}()
// Collect the results from the output channel
for result := range outputChan {
res = append(res, result...)
}
return res, nil
}
Issue: The code within AnnotateFile runs successfully when not inside another goroutine. However, running it concurrently within the HandleAnnotations handler results in an error related to semaphore acquisition.
Error: fmt.Println("Error acquiring semaphore:", err)
Expected Behavior: I aim to run the annotation process concurrently in the background while returning a 202 Accepted status from the route handler.
I would appreciate insights from anyone with experience building a similar system
Upvotes: 1
Views: 41
Reputation: 31
Fixed it by removing the request context in AnnotateFile. (the context will be cancelled as soon as the handler returns)
Upvotes: 0