PrabhaT
PrabhaT

Reputation: 888

jersey - StreamingOutput as Response entity

I had implemented streaming output in my Jersey Resource class.

@GET
@Path("xxxxx")
@Produces(BulkConstants.TEXT_XML_MEDIA_TYPE})   
public Response getFile() {

    FeedReturnStreamingOutput sout = new FeedReturnStreamingOutput();
    response = Response.ok(sout).build();
    return response;
}

class FeedReturnStreamingOutput implements StreamingOutput {

    public FeedReturnStreamingOutput()

    @Override
    public void write(OutputStream outputStream)  {
        //write into Output Stream
    }
}

The problem is eventhough a response is sent back from the resource before FeedReturnStreamingOutput is called Jersey client waits until FeedReturnStreamingOutput execution is completed.

Client Code :

Client client = Client.create();

ClientResponse response = webResource
    //headers
    .get(ClientResponse.class);

//The codes underneath executes after FeedReturnStreamingOutput is executed which undermines the necessity of streaming

OutputStream os = new FileOutputStream("c:\\test\\feedoutput5.txt");
System.out.println(new Date() + " : Reached point A");

if (response.getStatus() == 200) {
    System.out.println(new Date() + " : Reached point B");
    InputStream io = response.getEntityInputStream();

    byte[] buff = new byte[1024000];
    int count = 0;

    while ((count = io.read(buff, 0, buff.length)) != -1) {
        os.write(buff, 0, count);
    }

    os.close();
    io.close();

} else {
    System.out.println("Response code :" + response.getStatus());
}

System.out.println("Time taken -->> "+(System.currentTimeMillis()-startTime)+" ms");

Upvotes: 30

Views: 40661

Answers (4)

kylejmcintyre
kylejmcintyre

Reputation: 2006

Consider using @Context to inject the underlying HttpServletResponse object and writing/flushing its stream directly. This seems to bypass the OUTBOUND_CONTENT_LENGTH_BUFFER setting that is problematically configured at the application level.

Here's a Scala take:

  @POST
  @Path("/internal/stream-test")
  def testStream(@Context response: HttpServletResponse): Unit = {

    val stream = response.getOutputStream
    Range(1, 10).foreach(i => {
      val data = s"Hello $i".getBytes
      stream.write(data)
      stream.flush()
      Thread.sleep(2000)
    })
  }

I also prefer this over using Jersey's ChunkedOutput abstraction for returning progressive results, because ChunkedOutput forces you to introduce a separate thread that writes the chunks instead of just adding to the stream as you process. This example returns 204 instead of 200, hopefully that doesn't bother you.

Upvotes: 0

Paul Samsotha
Paul Samsotha

Reputation: 209012

The problem is the buffering OutputStream that Jersey uses to buffer the entity in order to determine the Content-Length header. The size of the buffer default to 8 kb. You disable the buffering if you want, or just change the size of the buffer, with the property

ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER

An integer value that defines the buffer size used to buffer server-side response entity in order to determine its size and set the value of HTTP "Content-Length" header.

If the entity size exceeds the configured buffer size, the buffering would be cancelled and the entity size would not be determined. Value less or equal to zero disable the buffering of the entity at all.

This property can be used on the server side to override the outbound message buffer size value - default or the global custom value set using the "jersey.config.contentLength.buffer" global property.

The default value is 8192.

Here's an example

@Path("streaming")
public class StreamingResource {

    @GET
    @Produces("application/octet-stream")
    public Response getStream() {
        return Response.ok(new FeedReturnStreamingOutput()).build();
    }

    public static class FeedReturnStreamingOutput implements StreamingOutput {

        @Override
        public void write(OutputStream output)
                throws IOException, WebApplicationException {
            try {
                for (int i = 0; i < 10; i++) {
                    output.write(String.format("Hello %d\n", i).getBytes());
                    output.flush();
                    TimeUnit.MILLISECONDS.sleep(500);
                }
            } catch (InterruptedException e) {  throw new RuntimeException(e); }
        }
    }
}

Here's the result without setting the property

enter image description here

And here's the result after setting the property value to 0

public class AppConfig extends ResourceConfig {
    public AppConfig() {
        ...
        property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);
    }
}

enter image description here

Upvotes: 53

Daniel Sperry
Daniel Sperry

Reputation: 4491

Either your response is too small and never gets chunked so the server flushes the entire request at once. Or you have a server side issue were your jax-rs library is awaiting to have the complete stream before flushing.

However this looks more like a client problem. And you seem to be using an old version of jersey-client.

Also that .get(ClientResponse.class) looks fishy.

Try using the JAX-RS standard as it is today (at least in the client):

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;

Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target("http://localhost:8080/");
Response response = target.path("path/to/resource").request().get();

While having jersey client 2.17 in the classpath:

<dependency>
    <groupId>org.glassfish.jersey.core</groupId>
    <artifactId>jersey-client</artifactId>
    <version>2.17</version>
</dependency>

Upvotes: 1

Montecarlo
Montecarlo

Reputation: 1324

Try invoking outputStream.flush() from the method FeedReturnStreamingOutput.write(...) every X number of bytes written to the output stream or something like that.

I guess the buffer of the connection is not filled with the data you are returning. So the service does not return anything until Jersey invokes outputStream.close().

In my case, I have a service that streams data and I am doing it exactly as you: by returning Response.ok(<instance of StreamingOutput>).build();.

My service returns data from a database and I invoke outputStream.flush() after writing each row to the output stream.

I know that the service streams data because I can see the client begins receiving data before the service has finished sending the entire result.

Upvotes: 1

Related Questions