Warm tip: This article is reproduced from serverfault.com, please click

kotlin-科达

(kotlin - Corda)

发布于 2020-12-31 09:30:30

考虑以下 Corda 流对:

发件人流程

class SendFlow(val message: String, val session: FlowSession) : FlowLogic<Boolean>() {

    @Suspendable
    override fun call(): Boolean {
        return session.sendAndReceive<Boolean>(message)
    }

    @StartableByRPC
    @StartableByService
    @InitiatingFlow(version = 1)
    class Initiator(val message: String, val recipient: Party) : FlowLogic<Boolean>() {
    
        @Suspendable
        override fun call(): Boolean {
            return subFlow(SendFlow(message, initiateFlow(party)))
        }
    }
}

接收流

class ReceiveFlow(val session: FlowSession) : FlowLogic<String>() {
    
    @Suspendable
    override fun call(): String {
        val result = session.receive<String>().unwrap { it }
        session.send(result == "Hello, World!")
        return result
    }

    @InitiatedBy(SendFlow.Initator::class)
    class Handler(val session: FlowSession) : FlowLogic<String>() {
    
        @Suspendable
        override fun call(): String {
            return subFlow(ReceiveFlow(session))
        }
    }
}

从 RPC 上的启动流中获得结果是有据可查的和微不足道的......

rpc
    .startFlow(SendFlow::Initiator, "Hello, World!", someParty)
    .returnValue
    .getOrThrow() // expect to be true

我如何注册一些东西来监听@InitiatedBy(...)的输出- 也就是说,我希望能够观察从接收节点接收到的消息。

Questioner
Matthew Layton
Viewed
0
Dimos 2021-01-06 19:12:27

你可以使用stateMachinesFeedRPC API 来观察新流的创建和完成时的结果。

可以通过以下方式使用此 API(Kotlin 中的代码):

val rpcClient: CordaRpcOps // initialise this properly depending on your environment
rpcClient.stateMachinesFeed().updates.subscribe {
    when(it) {
        is StateMachineUpdate.Added -> {
            val flowId = it.id
            val flowInfo = it.stateMachineInfo
            // code to react to new flow initiation
        }
        is StateMachineUpdate.Removed -> {
            val flowId = it.id
            val flowResult = it.result
            flowResult.doOnSuccess(Consumer { result -> /* code to process result of successful completion */ })
            flowResult.doOnFailure(Consumer { error -> /* code to process error from failed completion */ })
        }
    }
}