So here we go: given a Confluent.Kafka
IConsumer<>
, it wraps it into a dedicated async
CE and consumes as long as cancellation hasn't been requested. This piece of code is also defends itself against the OperationCancelledException
and runs finally
block to ensure graceful termination of consumer.
let private consumeUntiCancelled callback (consumer: IConsumer<'key, 'value>) =
async {
let! ct = Async.CancellationToken
try
try
while not ct.IsCancellationRequested do
let consumeResult = consumer.Consume(ct)
if not consumeResult.IsPartitionEOF then do! (callback consumeResult)
with
| :? OperationCanceledException -> return ()
finally
consumer.Close()
consumer.Dispose()
}
Question #1: is this code correct or am I abusing the async
?
So far so good. In my app I have to deal with lots of consumers that must die altogether. So, assuming that consumers: seq<Async<unit>>
represents them, the following code is what I came up with:
async {
for consumer in consumers do
do! (Async.StartChild consumer |> Async.Ignore).
}
I expect this code to chain childs to the parent's cancellation context, and once it is cancelled, childs gonna be cancelled as well.
Question #2: is my finally block guaranteed to be ran even though child got cancelled?
I have two observations about your code:
Your use of Async.StartChild
is correct - all child computations will inherit the same cancellation token and they will all get cancelled when the main token is cancelled.
The async workflow can be cancelled after you call consumer.Consume(ct)
and before you call callback
. I'm not sure what this means for your specific problem, but if it removes some data from a queue, the data could be lost before it is processed. If that's an issue, then I think you'll need to make callback
non-asynchronous, or invoke it differently.
In your consumeUntilCancelled
function, you do not explicity need to check while not if ct.IsCancellationRequested
is true. The async workflow does this automatically in every do!
or let!
, so you can replace this with just a while
loop.
Here is a minimal stand-alone demo:
let consume s = async {
try
while true do
do! Async.Sleep 1000
printfn "%s did work" s
finally
printfn "%s finalized" s }
let work =
async {
for c in ["A"; "B"; "C"; "D"] do
do! Async.StartChild (consume c) |> Async.Ignore }
Now we create the computation with a cancellation token:
// Run this in F# interactive
let ct = new System.Threading.CancellationTokenSource()
Async.Start(work, ct.Token)
// Run this sometime later
ct.Cancel()
Once you call ct.Cancel
, all the finally
blocks will be called and all the loops will stop.