我正在尝试测试使用Akka HTTP编写的端点。该端点将某物的来源转换为流HTTP响应。我希望能够测试此响应的时机,例如,我想表达以下内容:]
Open a request on /events/
If no events, I should have no line on the body of that opened http connection
Make a event happen in my application:
Now I should have something on that connection
这里是我所想到的最小代码示例:
import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshalling.{Marshaller, Marshalling}
import akka.http.scaladsl.model.{ContentTypes}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.concurrent.duration._
object StupidStreamDirectives extends Directives {
implicit val entityStreamingSupport = EntityStreamingSupport.csv()
implicit val stringMarshaller = Marshaller.strict[String, ByteString] {
string => Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => ByteString(string))
}
def route = pathPrefix("stream") {
pathEndOrSingleSlash {
get {
parameter("take", "wait_in_ms") { (take, waitInMs) =>
complete(Source.repeat[String]("hello").take(take.toInt).throttle(1, waitInMs.toInt.millisecond))
}
}
}
}
}
import java.util.concurrent.TimeoutException
import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{MediaRange, MediaTypes, StatusCodes}
import akka.http.scaladsl.testkit.{ ScalatestRouteTest}
import akka.stream.scaladsl.{Framing, Sink}
import akka.util.ByteString
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration.DurationInt
class StupidStreamDirectivesTest extends FlatSpec with
ScalatestRouteTest with
Matchers with
ScalaFutures {
test =>
implicit val entityStreamingSupport = EntityStreamingSupport.csv()
val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`))
"stupidStream" should "stream only one line if take=1" in {
Get("/stream?take=1&wait_in_ms=1").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual "hello\n"
}
}
it should "stream only three lines if take=3" in {
Get("/stream?take=3&wait_in_ms=1").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual "hello\n".repeat(3)
}
}
it should "not produce line too fast" in {
Get("/stream?take=4&wait_in_ms=100").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
status shouldEqual StatusCodes.OK
response
.entity
.dataBytes
//.throttle(1, 100.millisecond) /* this line make the test work */
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWithin(250.millisecond)
.runWith(Sink.seq).futureValue(Timeout(1.second)) shouldEqual List.fill(3)("hello") :+ "end"
}
}
}
最后一次测试将失败,发生的情况是~>check
将在执行任何检查之前等待完全完成http请求。因此response.entity.dataBytes
不能反映http响应的时间,我想是缓存结果的流式传输(因此非常快)。
也是最后一次测试,如果我使响应的总时间超过一秒。我的测试在进行任何测试之前将失败,因为check
将超时。这意味着不可能测试较长的轮询http响应。
我看过Akka doc /代码时发现,使用Akka http testkit确实无法进行这种测试。如果有人知道该怎么做,我想听听。
否则,有人会说我可以开始启动真正的Akka http服务器...但是我认为这很可悲。
您可能正在寻找runRoute
helper
它的文档说:
可以用作
~> runRoute
的假人,可以运行路线,但不会阻塞结果。流水线的结果是可以稍后使用check
检查的结果。见ScalatestRouteTestSpec.scala中的“将运行路线与检查分开”示例。
您的样本可以这样写:
it should "not produce line too fast" in {
Get("/stream?take=4&wait_in_ms=100").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
status shouldEqual StatusCodes.OK
response
.entity
.dataBytes
//.throttle(1, 100.millisecond) /* this line make the test work */
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWithin(250.millisecond)
.runWith(Sink.seq).futureValue(Timeout(1.second)) shouldEqual List.fill(3)("hello") :+ "end"
}
}