考虑以下 Corda 流对:
发件人流程
class SendFlow(val message: String, val session: FlowSession) : FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
return session.sendAndReceive<Boolean>(message)
}
@StartableByRPC
@StartableByService
@InitiatingFlow(version = 1)
class Initiator(val message: String, val recipient: Party) : FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
return subFlow(SendFlow(message, initiateFlow(party)))
}
}
}
接收流
class ReceiveFlow(val session: FlowSession) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val result = session.receive<String>().unwrap { it }
session.send(result == "Hello, World!")
return result
}
@InitiatedBy(SendFlow.Initator::class)
class Handler(val session: FlowSession) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
return subFlow(ReceiveFlow(session))
}
}
}
从 RPC 上的启动流中获得结果是有据可查的和微不足道的......
rpc
.startFlow(SendFlow::Initiator, "Hello, World!", someParty)
.returnValue
.getOrThrow() // expect to be true
我如何注册一些东西来监听@InitiatedBy(...)
流的输出?- 也就是说,我希望能够观察从接收节点接收到的消息。
你可以使用stateMachinesFeed
RPC API 来观察新流的创建和完成时的结果。
可以通过以下方式使用此 API(Kotlin 中的代码):
val rpcClient: CordaRpcOps // initialise this properly depending on your environment
rpcClient.stateMachinesFeed().updates.subscribe {
when(it) {
is StateMachineUpdate.Added -> {
val flowId = it.id
val flowInfo = it.stateMachineInfo
// code to react to new flow initiation
}
is StateMachineUpdate.Removed -> {
val flowId = it.id
val flowResult = it.result
flowResult.doOnSuccess(Consumer { result -> /* code to process result of successful completion */ })
flowResult.doOnFailure(Consumer { error -> /* code to process error from failed completion */ })
}
}
}