MdTp
MdTp

Reputation: 377

Print spooler concept / API & Channels: issue passing jobs to a queue from serveHTTP

Got some help here already that made me move forward in this concept I'm trying, but it still isn't quite working and I hit a conflict that I can't seem to get around.

I have tried here to illustrate what I want in a flowchart - please note that the client can be many clients that will send in with printjobs therefore we cannot reply on the worker to be processing our job at that moment, but for most it will (peak times it won't as the processing job of printing can take time).

enter image description here

type QueueElement struct {
    jobid string
    rw   http.ResponseWriter
  doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf ("incoming\r\n")

            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
            newPrintJob := QueueElement{
                    doneChan: doneC,    
                    jobid:    "jobid",
            }

            gv.jobs <- newPrintJob
            func(doneChan chan struct{},w http.ResponseWriter) {

                  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                    defer cancel()
                    select {
                    //If this triggers first, then this waiting goroutine would exit
                    //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
                    case <-ctx.Done():
                            fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
                            fmt.Printf ("took longer than 5 secs\r\n")
                    case <-doneChan:
                            fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
                            fmt.Printf ("instant\r\n")
                    }
            }(doneC,w)

    default:
            fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
            job := <-jobs
            processExec ("START /i /b try.cmd")
            fmt.Printf ("job done")
        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}

    }
}

func main() {
      db, err := sql.Open("sqlite3", "jobs.sqlite")
      if err := db.Ping(); err != nil {
        log.Fatal(err)
    }   

      db.SetMaxOpenConns(1) // prevents locked database error
      _, err = db.Exec(setupSQL)
      if err != nil {
          log.Fatal(err)
      }

    // create a GlobalVars instance
    gv := GlobalVars{
        db  :   db,
        jobs: make(chan QueueElement),
    }
    go worker (gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr : ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}

The above code is of the serveHTTP and the worker with help from here, initially the func inside the ServeHTTP was a go routine and it's here the whole conflict arises for me - the concept is that in the serveHTTP it spawns a process that will get reply from the worker if the worker were able to process the job in time, within 5 seconds.

If the job was able to finish in 1 second, I want to reply back instantly after that 1 second to the client, if it takes 3 I want to reply after 3, if it takes more than 5 I will send reply back after 5 seconds (if the job takes 13 seconds I still want to reply after 5 seconds). That the client has to poll on the job from now on - but the conflict is:

a) When ServeHTTP exits - then the ResponseWriter closes - and to be able to reply back to the client, we have to write the answer to the ResponseWriter.

b) if I block serveHTTP (like in the code example below where I don't call the func as a go routine) then it affects not only that single API call but it seems that all others after that will be affected, so the first call in will be served correctly in time but the calls that would be coming in at the same time after the first will be sequentially delayed by the blocking operation.

c) if I don't block it ex - and change it to a go routine :

    gv.jobs <- newPrintJob
    go func(doneChan chan struct{},w http.ResponseWriter) {

Then there's no delay - can call in many APIs - but problem is here serveHTTP exists right away and thereby kills ResponseWriter and then I'm not able to reply back to the client.

I am not sure how I can get around this conflict where I don't cause any blockage to serveHTTP so I can handle all requests parallel, but still able to reply back to the ResponseWriter in question.

Is there any way of preventing serveHTTP of shutting down a responsewriter even though the function exits?

Upvotes: 0

Views: 248

Answers (3)

user5715068
user5715068

Reputation:

I've add some updates to your code. Now it works as you've described.

package main

import (
    "database/sql"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "sync"
    "time"
)

type QueueElement struct {
    jobid    string
    rw       http.ResponseWriter
    doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf("incoming\r\n")

        doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine

        go func(doneChan chan struct{}, w http.ResponseWriter) {
            gv.jobs <- QueueElement{
                doneChan: doneC,
                jobid:    "jobid",
            }
        }(doneC, w)

        select {
        case <-time.Tick(time.Second * 5):
            fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            fmt.Printf("took longer than 5 secs\r\n")
        case <-doneC:
            fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
            fmt.Printf("instant\r\n")
        }
    default:
        fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        fmt.Println("START /i /b try.cmd")
        fmt.Printf("job done")

        randTimeDuration := time.Second * time.Duration(rand.Intn(7))

        time.Sleep(randTimeDuration)

        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
        job.doneChan <- struct{}{}
    }
}

func main() {

    // create a GlobalVars instance
    gv := GlobalVars{
        //db:   db,
        jobs: make(chan QueueElement),
    }
    go worker(gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr:    ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}

Upvotes: 1

Avinash Dhinwa
Avinash Dhinwa

Reputation: 390

Yes, you are right with your point "c) if i don't block it ex".

In order to save the response writer, you shouldn't call a go routine inside it. Rather you should call ServeHTTP as a go-routine, which most of the http server implementations do.
This way you won't be blocking any api calls, each api call will run in a different go-routine, blocked by their functionalities.

Since your "jobs chan QueueElement" is a single channel (not a buffered channel), so all your processes get blocked at "gv.jobs <- newPrintJob".
You should rather use a buffered channel so that all api calls can add it to the queue and get response depending on work completion or time out.

Having a buffered channel simulates the real world memory limit of printers too. (queue length 1 being a special case)

Upvotes: 2

user5715068
user5715068

Reputation:

select statement should be outside the goroutine function and blocks request till the end of the job execution or reaching timeout.

Upvotes: 1

Related Questions