Warm tip: This article is reproduced from serverfault.com, please click

Can you implement Flink's AggregateFunction with Generic Types?

发布于 2020-08-12 16:11:37

My goal is to provide an interface for a stream processing module in Flink 1.10. The pipeline contains an AggregateFunction among other operators. All operators have generic types but the problem lies within the AggregateFunction, which cannot determine the output type.

Note: The actual pipeline has a slidingEventTimeWindow assigner and a WindowFunction passed along with the AggregateFunction, but the error can be reproduced much easier with the code below.

This is a simple test case that reproduces the error:

    @Test
    public void aggregateFunction_genericType() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("0",1), Tuple2.of("0",2), Tuple2.of("0",3));

        ConfigAPI cfg = new ConfigAPI();

        source
                .keyBy(k -> k.f0)
                .countWindow(5, 1)
                .aggregate(new GenericAggregateFunc<>(cfg))
                .print();


        env.execute();
    }

As you can see, a Configuration class is passed as an argument to the Custom aggregateFunction. This is what the user would implement.

    public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String,Integer>> {
        @Override
        public Tuple2<String, Integer> createAcc() {
            return new Tuple2<>("0", 0);
        }

        @Override
        public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
            acc.f1 += in.f1;
            return acc;
        }
    }

The provided interface is:

    public interface BaseConfigAPI<In, Acc> {
        Acc createAcc();
        Acc addAccumulators(In in, Acc acc);
        // other methods to override
    }

The GenericAggregateFunction:

    public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {

        private BaseConfigAPI<In, Acc> cfg;
        GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
            this.cfg = cfg;
        }
        @Override
        public Acc createAccumulator() {
            return cfg.createAcc();
        }
        @Override
        public Acc add(In in, Acc acc) {
            return cfg.addAccumulators(in, acc);
        }
        @Override
        public Acc getResult(Acc acc) {
            return acc;
        }
        @Override
        public Acc merge(Acc acc, Acc acc1) {
            return null;
        }
    }

The output log:

org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

Solution 1 (not working): At first I thought this is the usual case of "return type cannot be determined" so I tried adding

.returns(Types.TUPLE(Types.STRING, Types.INT)) after .aggregate(...) but without success.

Solution 2 (working): I created a Wrapper class with a generic type, named Accumulator<Acc> which is then passed as Type to the AggregateFunction<In, Accumulator<Acc>, Accumulator<Acc>> and seems to be working.

This does not look very elegant though and it is not very consistent with the rest of the interface. Is there any other solution to this problem?

Edit: Thanks @deduper for your time and insight, I think I found a solution.

Solution 3 (working): I created a new interface which extends my BaseConfigAPI and the AggregateFunction in the following manner:

public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}

public interface BaseConfigAPI extends Serializable {
    //These will be implemented directly from AggregateFunction interface
    //Acc createAcc();
    //Acc addAccumulators(In in, Acc acc);
        
    //other methods to override
}

Now the user must only implement the MergedConfigAPI<In, Acc, Out> and pass it as a parameter to the .aggregate(...) function.

UPDATE: I tested @deduper's 3rd solution against the framework and it didn't work either. It seems like the exception is thrown by the Acc and not the Out type. Taking a closer look at the internals of the .aggregate operator, I realized that there is an overloaded aggregate method that takes 2 more arguments. A TypeInformation<ACC> accumulatorType and a TypeInformation<R> returnType.

This is how the simplest solution emerged without any code refactoring.

Solution 4 (working):

 @Test
 public void aggregateFunction_genericType() throws Exception {
                ...

                .aggregate(
                        new GenericAggregateFunc<>(cfg), 
                        Types.TUPLE(Types.STRING, Types.INT),
                        Types.TUPLE(Types.STRING, Types.INT))
                ...
    }

Note: As of Flink 1.10.1 the aggregate methods are annotated with @PublicEvolving.

Questioner
Edward Ep
Viewed
0
deduper 2020-08-13 23:17:37

Can you implement Flink's AggregateFunction with Generic Types?

Yes. You can. As you've done yourself already. Your error is a result of how you used it (as inuse-site generics“) rather than how you implemented it.

...Is there any other solution to this problem?...

I propose the following three candidate solutions in ascending order of simplicity

...
source
       .keyBy(k -> k.f0)
       .countWindow(5, 1)
       .aggregate(new GenericAggregateFunc< Tuple2<String, Integer>, Tuple2<String, Integer> >(cfg)) /* filling in the diamond will aid type inference */
       .print();
...

The above is the simplest because you wouldn't have to refactor your original GenericAgregateFunc; simply fill in the diamond with the specific type arguments you want to instantiate your generic class with.

There is also another slightly less simple solution…

public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg;
    GenericAggregateFunc(BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg) {
        this.cfg = cfg;
    }
    @Override
    public Tuple2<String, Integer> createAccumulator() {
        return cfg.createAcc();
    }
    @Override
    public Tuple2<String, Integer> add(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
        return cfg.addAccumulators(in, acc);
    }
    @Override
    public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
        return acc;
    }
    @Override
    public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc, Tuple2<String, Integer> acc1) {
        return null;
    }
}

Although this one involves a minor refactor, it simplifies your entire application more than the first proposed solution — in my opinion.

Flink already handles the „complicated“ Generic polymorphism for you. All you have to do, to plug-in to Flink, is simply instantiate their built-in generic AggregateFunction<IN, ACC, OUT> with the specific type arguments you want to instantiate it with. Those type arguments being of type Tuple2<String, Integer> in your case.

So you're still „using Generics“ with the second solution, but you're doing so in a much simpler way.

Another option closer to your original implementation, but with a couple minor refactors…

public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {
    
    ...
    @Override
    public Out getResult(Acc acc) {
        return ...;
    }
    ...
}

Also, to force the precondition that the user's config implements an interface that's compatible with your function…

public interface BaseConfigAPI< In, Acc, Out >{ ... }

In my experiment I've confirmed that adding the Out type parameter to BaseConfigAPI too, makes it compatible.

I did have a more complicated alternative solution in mind. But since simpler is almost always better, I'll leave that more complicated solution for somebody else to propose.