diff --git a/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala b/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala index 31381bc33352..2c80ecbb8526 100644 --- a/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala +++ b/ledger-service/http-json/src/itlib/scala/http/AbstractWebsocketServiceIntegrationTest.scala @@ -268,6 +268,112 @@ abstract class AbstractWebsocketServiceIntegrationTest ).toJson } + "query should return matchedQueries appropriately" in withHttpService { (uri, _, _, _) => + import spray.json._ + + val (party, headers) = getUniquePartyAndAuthHeaders("Alice") + val initialCreate = initialIouCreate(uri, party, headers) + + val query = + """[ + {"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}} + ]""" + + @nowarn("msg=pattern var evtsWrapper .* is never used") + def resp( + iouCid: domain.ContractId, + kill: UniqueKillSwitch, + ): Sink[JsValue, Future[ShouldHaveEnded]] = { + val dslSyntax = Consume.syntax[JsValue] + import dslSyntax._ + Consume + .interpret( + for { + ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne + _ = (ctid: String) shouldBe (iouCid.unwrap: String) + _ <- liftF( + postJsonRequest( + uri.withPath(Uri.Path("/v1/exercise")), + exercisePayload(domain.ContractId(ctid)), + headers, + ) map { case (statusCode, _) => + statusCode.isSuccess shouldBe true + } + ) + + ContractDelta(Vector(), _, Some(offset)) <- readOne + + (preOffset, consumedCtid) = (offset, ctid) + evtsWrapper @ ContractDelta( + Vector((fstId, fst), (sndId, snd)), + Vector(observeConsumed), + Some(lastSeenOffset), + ) <- readOne + (liveStartOffset, msgCount) = { + observeConsumed.contractId should ===(consumedCtid) + Set(fstId, sndId, consumedCtid) should have size 3 + inside(evtsWrapper) { case JsObject(obj) => + inside(obj get "events") { + case Some( + JsArray( + Vector( + Archived(_, _), + Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)), + Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _)), + ) + ) + ) => + Set((amt1, ixes1), (amt2, ixes2)) should ===( + Set( + (BigDecimal("42.42"), Vector(BigDecimal(0))), + (BigDecimal("957.57"), Vector(BigDecimal(0))), + ) + ) + } + } + (preOffset, 2) + } + + _ = kill.shutdown() + heartbeats <- drain + hbCount = (heartbeats.iterator.map(heartbeatOffset).toSet + lastSeenOffset).size - 1 + } yield + // don't count empty events block if lastSeenOffset does not change + ShouldHaveEnded( + liveStartOffset = liveStartOffset, + msgCount = msgCount + hbCount, + lastSeenOffset = lastSeenOffset, + ) + ) + } + + def queryWithAndWithoutOffsets(offset: domain.Offset) = + s"""[ + {"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}, "offset": "${offset.toString}"}, + {"templateIds": ["Iou:Iou"]} + ]""" + for { + creation <- initialCreate + _ = creation._1 shouldBe a[StatusCodes.Success] + iouCid = getContractId(getResult(creation._2)) + jwt = jwtForParties(List(party.unwrap), List(), testId) + (kill, source) = singleClientQueryStream(jwt, uri, query) + .viaMat(KillSwitches.single)(Keep.right) + .preMaterialize() + lastState <- source via parseResp runWith resp(iouCid, kill) + liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 2, lastSeen) => + lastSeen.unwrap should be > liveStart.unwrap + liveStart + } + clientMsg <- singleClientQueryStream(jwt, uri, queryWithAndWithoutOffsets(liveOffset)) + .take(3) + .runWith(collectResultsAsTextMessageSkipOffsetTicks) + } yield inside(clientMsg) { case _ +: result +: _ => + //we should expect to have matchedQueries [1] to indicate a match for the new template query + result should include(s""""matchedQueries":[1]""") + } + } + "query should receive deltas as contracts are archived/created" in withHttpService { (uri, _, _, _) => import spray.json._ diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala index b0950e433c32..52d00cbbbbc1 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala @@ -283,23 +283,23 @@ object WebSocketService { } def fn( - q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]] + q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]] )(a: domain.ActiveContract[LfV], o: Option[domain.Offset]): Option[Positive] = { q.get(a.templateId).flatMap { preds => - preds.collect(Function unlift { case ((_, p), ix) => + preds.collect(Function unlift { case ((_, p), (ix, pos)) => val matchesPredicate = p(a.payload) - (matchesPredicate && matchesOffset(ix, o)).option(ix) + (matchesPredicate && matchesOffset(ix, o)).option(pos) }) } } def dbQueriesPlan( - q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]] + q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]] )(implicit sjd: dbbackend.SupportedJdbcDriver.TC ): (Seq[(domain.TemplateId.RequiredPkg, doobie.Fragment)], Map[Int, Int]) = { val annotated = q.toSeq.flatMap { case (tpid, nel) => - nel.toVector.map { case ((vp, _), pos) => (tpid, vp.toSqlWhereClause, pos) } + nel.toVector.map { case ((vp, _), (_, pos)) => (tpid, vp.toSqlWhereClause, pos) } } val posMap = annotated.iterator.zipWithIndex.map { case ((_, _, pos), ix) => (ix, pos) @@ -307,7 +307,7 @@ object WebSocketService { (annotated map { case (tpid, sql, _) => (tpid, sql) }, posMap) } - val query = (gacr: domain.SearchForeverQuery, ix: Int) => + val query = (gacr: domain.SearchForeverQuery, ix: Int, pos: Int) => for { res <- gacr.templateIds.toList @@ -322,10 +322,12 @@ object WebSocketService { ) (resolved, unresolved) = res q = prepareFilters(resolved, gacr.query, lookupType): CompiledQueries - } yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, ix)))) + } yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, (ix, pos))))) for { res <- request.queries.zipWithIndex + .zip(request.pos) + .map(x => (x._1._1, x._1._2, x._2)) .foldMapM(query.tupled) (resolved, unresolved, q) = res } yield StreamPredicate( @@ -363,8 +365,12 @@ object WebSocketService { request: SearchForeverRequest, ): Option[SearchForeverRequest] = { import scalaz.std.list - val withoutOffset = request.queries.toList.filter(_.offset.isEmpty) - list.toNel(withoutOffset).map(SearchForeverRequest(_)) + val (withoutOffset, ixes) = + request.queries.zip(request.pos).toList.filter { case (q, _) => q.offset.isEmpty }.unzip + for { + q <- list.toNel(withoutOffset) + ix <- list.toNel(ixes) + } yield SearchForeverRequest(q, ix) } override def adjustRequest( diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala index 36168fda017f..0515d92fa8cc 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala @@ -116,7 +116,8 @@ object domain extends com.daml.fetchcontracts.domain.Aliases { ) final case class SearchForeverRequest( - queries: NonEmptyList[SearchForeverQuery] + queries: NonEmptyList[SearchForeverQuery], + pos: NonEmptyList[Int], ) final case class PartyDetails(identifier: Party, displayName: Option[String], isLocal: Boolean) diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala index e3bbfc6fe024..f40e931f8dab 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/json/JsonProtocol.scala @@ -288,9 +288,14 @@ object JsonProtocol extends DefaultJsonProtocol with ExtraFormats { implicit val SearchForeverRequestFormat: RootJsonReader[domain.SearchForeverRequest] = { case multi @ JsArray(_) => - domain.SearchForeverRequest(multi.convertTo[NonEmptyList[domain.SearchForeverQuery]]) + val (queries, pos) = + multi.convertTo[NonEmptyList[domain.SearchForeverQuery]].zipWithIndex.unzip + domain.SearchForeverRequest(queries, pos) case single => - domain.SearchForeverRequest(NonEmptyList(single.convertTo[domain.SearchForeverQuery])) + domain.SearchForeverRequest( + NonEmptyList(single.convertTo[domain.SearchForeverQuery]), + NonEmptyList(0), + ) } implicit val CommandMetaFormat: RootJsonFormat[domain.CommandMeta] = jsonFormat1(