package com.tc.async.impl;

import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import com.tc.async.api.AddPredicate;
import com.tc.async.api.EventContext;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.exception.TCRuntimeException;
import com.tc.logging.TCLogger;
import com.tc.logging.TCLoggerProvider;
import com.tc.stats.Stats;
import com.tc.util.Assert;
import com.tc.util.State;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

/* loaded from: input_file:com/tc/async/impl/StageQueueImpl.class */
public class StageQueueImpl implements Sink, Source {
    private static final State RUNNING = new State("RUNNING");
    private static final State PAUSED = new State("PAUSED");
    private final Channel queue;
    private final String stage;
    private final TCLogger logger;
    private AddPredicate predicate = DefaultAddPredicate.getInstance();
    private volatile State state = RUNNING;
    private volatile StageQueueStatsCollector statsCollector;

    /* loaded from: input_file:com/tc/async/impl/StageQueueImpl$NullStageQueueStatsCollector.class */
    public static class NullStageQueueStatsCollector extends StageQueueStatsCollector {
        private String name;

        public NullStageQueueStatsCollector(String str) {
            this.name = makeWidth(str, 40);
        }

        @Override // com.tc.stats.Stats
        public String getDetails() {
            return this.name + " : Not Monitored";
        }

        @Override // com.tc.async.impl.StageQueueImpl.StageQueueStatsCollector
        public void contextAdded() {
        }

        @Override // com.tc.async.impl.StageQueueImpl.StageQueueStatsCollector
        public void contextRemoved() {
        }

        @Override // com.tc.async.impl.StageQueueImpl.StageQueueStatsCollector
        public void reset() {
        }
    }

    /* loaded from: input_file:com/tc/async/impl/StageQueueImpl$StageQueueStatsCollector.class */
    public static abstract class StageQueueStatsCollector implements Stats {
        @Override // com.tc.stats.Stats
        public void logDetails(TCLogger tCLogger) {
            tCLogger.info(getDetails());
        }

        public abstract void contextAdded();

        public abstract void reset();

        public abstract void contextRemoved();

        protected String makeWidth(String str, int i) {
            int length = str.length();
            if (length == i) {
                return str;
            }
            if (length > i) {
                return str.substring(0, i);
            }
            StringBuffer stringBuffer = new StringBuffer(str);
            for (int i2 = length; i2 < i; i2++) {
                stringBuffer.append(' ');
            }
            return stringBuffer.toString();
        }
    }

    /* loaded from: input_file:com/tc/async/impl/StageQueueImpl$StageQueueStatsCollectorImpl.class */
    public static class StageQueueStatsCollectorImpl extends StageQueueStatsCollector {
        private int count = 0;
        private String name;

        public StageQueueStatsCollectorImpl(String str) {
            this.name = makeWidth(str, 40);
        }

        @Override // com.tc.stats.Stats
        public synchronized String getDetails() {
            return this.name + " : " + this.count;
        }

        @Override // com.tc.async.impl.StageQueueImpl.StageQueueStatsCollector
        public synchronized void contextAdded() {
            this.count++;
        }

        @Override // com.tc.async.impl.StageQueueImpl.StageQueueStatsCollector
        public synchronized void contextRemoved() {
            this.count--;
        }

        @Override // com.tc.async.impl.StageQueueImpl.StageQueueStatsCollector
        public synchronized void reset() {
            this.count = 0;
        }
    }

    public StageQueueImpl(TCLoggerProvider tCLoggerProvider, String str, Channel channel) {
        this.queue = channel;
        this.logger = tCLoggerProvider.getLogger(Sink.class.getName() + ": " + str);
        this.stage = str;
        this.statsCollector = new NullStageQueueStatsCollector(str);
    }

    @Override // com.tc.async.api.Sink
    public boolean addLossy(EventContext eventContext) {
        if (!isEmpty()) {
            return false;
        }
        add(eventContext);
        return true;
    }

    private boolean isEmpty() {
        if (this.queue instanceof BoundedLinkedQueue) {
            return this.queue.isEmpty();
        }
        if (this.queue instanceof LinkedQueue) {
            return this.queue.isEmpty();
        }
        throw new AssertionError("Unsupported channel " + this.queue.getClass().getName() + " in " + getClass().getName());
    }

    @Override // com.tc.async.api.Sink
    public void addMany(Collection collection) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Added many:" + collection + " to:" + this.stage);
        }
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            add((EventContext) it.next());
        }
    }

    @Override // com.tc.async.api.Sink
    public void add(EventContext eventContext) {
        Assert.assertNotNull(eventContext);
        if (this.state == PAUSED) {
            this.logger.info("Ignoring event while PAUSED: " + eventContext);
            return;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Added:" + eventContext + " to:" + this.stage);
        }
        if (!this.predicate.accept(eventContext)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Predicate caused skip add for:" + eventContext + " to:" + this.stage);
            }
        } else {
            this.statsCollector.contextAdded();
            try {
                this.queue.put(eventContext);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // com.tc.async.api.Source
    public EventContext get() throws InterruptedException {
        return poll(Long.MAX_VALUE);
    }

    @Override // com.tc.async.api.Source
    public EventContext poll(long j) throws InterruptedException {
        EventContext eventContext = (EventContext) this.queue.poll(j);
        if (eventContext != null) {
            this.statsCollector.contextRemoved();
        }
        return eventContext;
    }

    @Override // com.tc.async.api.Sink
    public int size() {
        if (this.queue instanceof BoundedLinkedQueue) {
            return this.queue.size();
        }
        return 0;
    }

    @Override // com.tc.async.api.Source
    public Collection getAll() throws InterruptedException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(this.queue.take());
        while (true) {
            Object poll = this.queue.poll(0L);
            if (poll == null) {
                this.statsCollector.reset();
                return linkedList;
            }
            linkedList.add(poll);
        }
    }

    @Override // com.tc.async.api.Sink
    public void setAddPredicate(AddPredicate addPredicate) {
        Assert.eval(addPredicate != null);
        this.predicate = addPredicate;
    }

    @Override // com.tc.async.api.Sink
    public AddPredicate getPredicate() {
        return this.predicate;
    }

    public String toString() {
        return "StageQueue(" + this.stage + ")";
    }

    @Override // com.tc.async.api.Sink
    public void clear() {
        int i = 0;
        while (poll(0L) != null) {
            try {
                i++;
            } catch (InterruptedException e) {
                throw new TCRuntimeException(e);
            }
        }
        this.statsCollector.reset();
        this.logger.info("Cleared " + i);
    }

    @Override // com.tc.async.api.Sink
    public void pause(List list) {
        if (this.state != RUNNING) {
            throw new AssertionError("Attempt to pause while not running: " + this.state);
        }
        this.state = PAUSED;
        clear();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            try {
                this.queue.put(it.next());
                this.statsCollector.contextAdded();
            } catch (InterruptedException e) {
                throw new TCRuntimeException(e);
            }
        }
    }

    @Override // com.tc.async.api.Sink
    public void unpause() {
        if (this.state != PAUSED) {
            throw new AssertionError("Attempt to unpause while not paused: " + this.state);
        }
        this.state = RUNNING;
    }

    @Override // com.tc.stats.Monitorable
    public void enableStatsCollection(boolean z) {
        if (z) {
            this.statsCollector = new StageQueueStatsCollectorImpl(this.stage);
        } else {
            this.statsCollector = new NullStageQueueStatsCollector(this.stage);
        }
    }

    @Override // com.tc.stats.Monitorable
    public Stats getStats(long j) {
        return this.statsCollector;
    }

    @Override // com.tc.stats.Monitorable
    public Stats getStatsAndReset(long j) {
        return getStats(j);
    }

    @Override // com.tc.stats.Monitorable
    public boolean isStatsCollectionEnabled() {
        return this.statsCollector instanceof StageQueueStatsCollectorImpl;
    }

    @Override // com.tc.stats.Monitorable
    public void resetStats() {
    }
}
