Warm tip: This article is reproduced from serverfault.com, please click

Streaming case class objects from a scala Play server using chunked transfer encoding

发布于 2020-11-23 21:56:39

So, I'm using Play framework 2.7 to setup a streaming server. What I'm trying to do is stream about 500 custom case class objects that are all of similar size.

This is part of the controller that generates the stream -

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products))
  }

where Product is the custom case class I'm using. An implicit writable deserialises this object to a json.

and this is part of the controller that processes this stream -

def process(): Action[AnyContent] = Action.async {
    val request = ws.url(STREAMING_URL).withRequestTimeout(Duration.Inf).withMethod("GET")
    request.stream().flatMap {
      _.bodyAsSource
        .map(_.utf8String)
        .map { x => println(x); x }
        .fold(0) { (acc, _) => acc + 1 }
        .runWith(Sink.last)
        .andThen {
          case Success(v) => println(s"Total count - $v")
          case Failure(_) => println("Error encountered")
        }
    }.map(_ => Ok)
  }

What I expected is that each object of my case class is transmitted as a single chunk and received likewise, so that they can be individually serialised and used by the receiver. That means, using the above code, my expectation is that I should receive exactly 500 chunks, but this value always comes out to be more than that.

What I can see is that exactly one object among these 500 is transmitted in split and transmitted in 2 chunks instead of 1.

This is a normal object, as seen on the receiving side -

{
  "id" : 494,
  "name" : "some random string"
}

and this is an object that's split in two -

{
  "id" : 463,
  "name" : "some random strin
g"
}

as such, this cannot be serialised back into an instance of my Product case class.

However, if I have some sort of throttling on the source in the sender controller, I receive the chunks just as expected.

For instance, this works completely fine where I stream only 5 elements per second -

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products).throttle(5, 1.second))
  }

Can anyone help me understand why this happens?

Questioner
gravetii
Viewed
0
Dmytro Maslenko 2020-12-03 10:57:21

As described here there is a JsonFraming to separate valid JSON objects from incoming ByteString stream.

In your case you can try this way

  _.bodyAsSource.via(JsonFraming.objectScanner(Int.MaxValue)).map(_.utf8String)