Warm tip: This article is reproduced from serverfault.com, please click

playframework-使用分块传输编码从Scala Play服务器流式传输案例类对象

(playframework - Streaming case class objects from a scala Play server using chunked transfer encoding)

发布于 2020-11-23 21:56:39

因此,我正在使用Play框架2.7设置流服务器。我想做的是流大约500个大小相似的自定义案例类对象。

这是生成流的控制器的一部分-

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products))
  }

Product我正在使用的自定义案例类在哪里隐式writable将此对象反序列化为json。

这是处理此流的控制器的一部分-

def process(): Action[AnyContent] = Action.async {
    val request = ws.url(STREAMING_URL).withRequestTimeout(Duration.Inf).withMethod("GET")
    request.stream().flatMap {
      _.bodyAsSource
        .map(_.utf8String)
        .map { x => println(x); x }
        .fold(0) { (acc, _) => acc + 1 }
        .runWith(Sink.last)
        .andThen {
          case Success(v) => println(s"Total count - $v")
          case Failure(_) => println("Error encountered")
        }
    }.map(_ => Ok)
  }

我期望的是,我的案例类的每个对象都作为单个块发送并同样接收,因此接收者可以分别对其进行序列化和使用。这意味着,使用上面的代码,我的期望是我应该准确地接收500个块,但是这个值总是比这个更大。

我看到的是,这500个对象中只有一个对象被拆分传输,并以2个块而不是1个块传输。

从接收方来看,这是一个正常的对象-

{
  "id" : 494,
  "name" : "some random string"
}

这是一分为二的对象-

{
  "id" : 463,
  "name" : "some random strin
g"
}

因此,无法将其序列化回我的Productcase类的实例

但是,如果我在发送方控制器中对源进行某种限制,那么我将按预期接收到这些块。

例如,在我每秒仅流式传输5个元素的情况下,这完全可以正常工作-

def generate: Action[AnyContent] = Action {
    val products = (1 to 500).map(Product(_, "some random string")).toList
    Ok.chunked[Product](Source(products).throttle(5, 1.second))
  }

谁能帮助我了解为什么会这样?

Questioner
gravetii
Viewed
11
Dmytro Maslenko 2020-12-03 10:57:21

如此处所述有一个JsonFraming用于将有效的JSON对象与传入ByteString分开

在你的情况下,你可以尝试这种方式

  _.bodyAsSource.via(JsonFraming.objectScanner(Int.MaxValue)).map(_.utf8String)