My goal is to provide an interface for a stream processing module in Flink 1.10. The pipeline contains an AggregateFunction among other operators. All operators have generic types but the problem lies within the AggregateFunction, which cannot determine the output type.
Note: The actual pipeline has a slidingEventTimeWindow assigner and a WindowFunction passed along with the AggregateFunction, but the error can be reproduced much easier with the code below.
This is a simple test case that reproduces the error:
@Test
public void aggregateFunction_genericType() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("0",1), Tuple2.of("0",2), Tuple2.of("0",3));
ConfigAPI cfg = new ConfigAPI();
source
.keyBy(k -> k.f0)
.countWindow(5, 1)
.aggregate(new GenericAggregateFunc<>(cfg))
.print();
env.execute();
}
As you can see, a Configuration class is passed as an argument to the Custom aggregateFunction. This is what the user would implement.
public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String,Integer>> {
@Override
public Tuple2<String, Integer> createAcc() {
return new Tuple2<>("0", 0);
}
@Override
public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
acc.f1 += in.f1;
return acc;
}
}
The provided interface is:
public interface BaseConfigAPI<In, Acc> {
Acc createAcc();
Acc addAccumulators(In in, Acc acc);
// other methods to override
}
The GenericAggregateFunction:
public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {
private BaseConfigAPI<In, Acc> cfg;
GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
this.cfg = cfg;
}
@Override
public Acc createAccumulator() {
return cfg.createAcc();
}
@Override
public Acc add(In in, Acc acc) {
return cfg.addAccumulators(in, acc);
}
@Override
public Acc getResult(Acc acc) {
return acc;
}
@Override
public Acc merge(Acc acc, Acc acc1) {
return null;
}
}
The output log:
org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem.
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
Solution 1 (not working): At first I thought this is the usual case of "return type cannot be determined" so I tried adding
.returns(Types.TUPLE(Types.STRING, Types.INT))
after .aggregate(...)
but without success.
Solution 2 (working):
I created a Wrapper class with a generic type, named Accumulator<Acc>
which is then passed as Type to the
AggregateFunction<In, Accumulator<Acc>, Accumulator<Acc>>
and seems to be working.
This does not look very elegant though and it is not very consistent with the rest of the interface. Is there any other solution to this problem?
Edit: Thanks @deduper for your time and insight, I think I found a solution.
Solution 3 (working): I created a new interface which extends my BaseConfigAPI
and the AggregateFunction
in the following manner:
public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}
public interface BaseConfigAPI extends Serializable {
//These will be implemented directly from AggregateFunction interface
//Acc createAcc();
//Acc addAccumulators(In in, Acc acc);
//other methods to override
}
Now the user must only implement the MergedConfigAPI<In, Acc, Out>
and pass it as a parameter to the .aggregate(...)
function.
UPDATE: I tested @deduper's 3rd solution against the framework and it didn't work either. It seems like the exception is thrown by the Acc
and not the Out
type. Taking a closer look at the internals of the .aggregate
operator, I realized that there is an overloaded aggregate
method that takes 2 more arguments. A TypeInformation<ACC> accumulatorType
and a TypeInformation<R> returnType
.
This is how the simplest solution emerged without any code refactoring.
Solution 4 (working):
@Test
public void aggregateFunction_genericType() throws Exception {
...
.aggregate(
new GenericAggregateFunc<>(cfg),
Types.TUPLE(Types.STRING, Types.INT),
Types.TUPLE(Types.STRING, Types.INT))
...
}
Note: As of Flink 1.10.1 the aggregate
methods are annotated with @PublicEvolving.
„Can you implement Flink's AggregateFunction with Generic Types?“
Yes. You can. As you've done yourself already. Your error is a result of how you used it (as in „use-site generics“) rather than how you implemented it.
„...Is there any other solution to this problem?...“
I propose the following three candidate solutions in ascending order of simplicity…
...
source
.keyBy(k -> k.f0)
.countWindow(5, 1)
.aggregate(new GenericAggregateFunc< Tuple2<String, Integer>, Tuple2<String, Integer> >(cfg)) /* filling in the diamond will aid type inference */
.print();
...
The above is the simplest because you wouldn't have to refactor your original GenericAgregateFunc
; simply fill in the diamond with the specific type arguments you want to instantiate your generic class with.
There is also another slightly less simple solution…
public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
private BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg;
GenericAggregateFunc(BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg) {
this.cfg = cfg;
}
@Override
public Tuple2<String, Integer> createAccumulator() {
return cfg.createAcc();
}
@Override
public Tuple2<String, Integer> add(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
return cfg.addAccumulators(in, acc);
}
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
return acc;
}
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc, Tuple2<String, Integer> acc1) {
return null;
}
}
Although this one involves a minor refactor, it simplifies your entire application more than the first proposed solution — in my opinion.
Flink already handles the „complicated“ Generic polymorphism for you. All you have to do, to plug-in to Flink, is simply instantiate their built-in generic AggregateFunction<IN, ACC, OUT>
with the specific type arguments you want to instantiate it with. Those type arguments being of type Tuple2<String, Integer>
in your case.
So you're still „using Generics“ with the second solution, but you're doing so in a much simpler way.
Another option closer to your original implementation, but with a couple minor refactors…
public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {
...
@Override
public Out getResult(Acc acc) {
return ...;
}
...
}
Also, to force the precondition that the user's config implements an interface that's compatible with your function…
public interface BaseConfigAPI< In, Acc, Out >{ ... }
In my experiment I've confirmed that adding the Out
type parameter to BaseConfigAPI
too, makes it compatible.
I did have a more complicated alternative solution in mind. But since simpler is almost always better, I'll leave that more complicated solution for somebody else to propose.
Thanks for your answer! I tried the first solution myself as I stated in the comment above but it didn't work. Filling the diamond seems to be redundant. The second solution looks very logical and it definitely simplifies the code, but that means the user has to implement 2 interfaces instead of 1. BaseConfigAPI and the AggregateFunction.
Thanks @EdwardEp! — „...the user has to implement 2 interfaces instead of 1. BaseConfigAPI and the AggregateFunction...“ — It's not clear what you mean. So I did an experiment to try to figure it out. I copied the classes in your Q. But replaced your
GenericAggregateFunc
class with the simplified one in my answer. To use the simplerGenericAggregateFunc
class I simply instantiated it:GenericAggregateFunc simple = new GenericAggregateFunc()
. Your class implements the Flink interface. The user just has to create your class. No?Notice that the second code snippet
ConfigAPI
is implemented by the user, so I would not know beforehand what types he would use. Therefore, he should also implement Flink'sAggregateFunction
interface besides theBaseConfigAPI
. This is what I mean by saying "the user has to implement 2 interfaces". My goal was to have a generic implementation of theAggregateFunction
which internally would call methods from theBaseConfigAPI
interface. I feel like the solution is somewhere in the middle, I just can't grasp it yet.Thanks @EdwardEp. What error do you get when you use the implementation of the
GenericAggregateFunc
from my answer, in yourSlidingWindow.aggregateFunction_genericType()
test?When using your
GenericAggregateFunc
I get no errors, but its types are hardcoded toTuple2<String, Integer>
.