package org.redisson.command;

import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/redisson/command/CommandReactiveService.class */
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
    public CommandReactiveService(ConnectionManager connectionManager) {
        super(connectionManager);
    }

    public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
        return Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                ((RFuture) supplier.get()).whenComplete((obj, th) -> {
                    if (th != null) {
                        fluxSink.error(th);
                        return;
                    }
                    if (obj != null) {
                        fluxSink.next(obj);
                    }
                    fluxSink.complete();
                });
            });
        });
    }

    @Override // org.redisson.command.CommandReactiveExecutor
    public <T, R> Publisher<R> writeReactive(final String str, final Codec codec, final RedisCommand<T> redisCommand, final Object... objArr) {
        return reactive(new Supplier<RFuture<R>>() { // from class: org.redisson.command.CommandReactiveService.1
            @Override // java.util.function.Supplier
            public RFuture<R> get() {
                return CommandReactiveService.this.writeAsync(str, codec, redisCommand, objArr);
            }
        });
    }

    @Override // org.redisson.command.CommandReactiveExecutor
    public <T, R> Publisher<R> readReactive(final String str, final Codec codec, final RedisCommand<T> redisCommand, final Object... objArr) {
        return reactive(new Supplier<RFuture<R>>() { // from class: org.redisson.command.CommandReactiveService.2
            @Override // java.util.function.Supplier
            public RFuture<R> get() {
                return CommandReactiveService.this.readAsync(str, codec, redisCommand, objArr);
            }
        });
    }

    @Override // org.redisson.command.CommandReactiveExecutor
    public <T, R> Publisher<R> evalWriteReactive(final String str, final Codec codec, final RedisCommand<T> redisCommand, final String str2, final List<Object> list, final Object... objArr) {
        return reactive(new Supplier<RFuture<R>>() { // from class: org.redisson.command.CommandReactiveService.3
            @Override // java.util.function.Supplier
            public RFuture<R> get() {
                return CommandReactiveService.this.evalWriteAsync(str, codec, redisCommand, str2, list, objArr);
            }
        });
    }
}
