/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.pbcast;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Membership;
import org.jgroups.MergeView;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.ViewId;
import org.jgroups.logging.Log;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.MergeData;
import org.jgroups.stack.Protocol;
import org.jgroups.util.BoundedList;
import org.jgroups.util.Digest;
import org.jgroups.util.MergeId;
import org.jgroups.util.MutableDigest;
import org.jgroups.util.ResponseCollector;
import org.jgroups.util.Util;

public class Merger {
    protected final GMS gms;
    protected final Log log;
    protected final MergeTask merge_task = new MergeTask();
    protected final ResponseCollector<MergeData> merge_rsps = new ResponseCollector();
    protected final ResponseCollector<Digest> digest_collector = new ResponseCollector();
    protected MergeId merge_id = null;
    protected final BoundedList<MergeId> merge_id_history = new BoundedList(20);
    protected Future<?> merge_killer = null;

    public Merger(GMS gms) {
        this.gms = gms;
        this.log = gms.getLog();
    }

    public String getMergeIdAsString() {
        return this.merge_id != null ? this.merge_id.toString() : null;
    }

    public String getMergeIdHistory() {
        return this.merge_id_history.toString();
    }

    public boolean isMergeTaskRunning() {
        return this.merge_task.isRunning();
    }

    public boolean isMergeKillerTaskRunning() {
        return this.merge_killer != null && !this.merge_killer.isDone();
    }

    public synchronized MergeId getMergeId() {
        return this.merge_id;
    }

    public synchronized boolean isMergeInProgress() {
        return this.merge_id != null;
    }

    public synchronized boolean matchMergeId(MergeId id) {
        return Util.match(this.merge_id, id);
    }

    public synchronized boolean setMergeId(MergeId expected, MergeId new_value) {
        boolean match = Util.match(this.merge_id, expected);
        if (match) {
            if (new_value != null && this.merge_id_history.contains(new_value)) {
                return false;
            }
            this.merge_id_history.add(new_value);
            this.merge_id = new_value;
            if (this.merge_id != null) {
                this.gms.getViewHandler().suspend();
                ((Protocol)this.gms.getDownProtocol()).down(new Event(65, 20000));
                this.startMergeKiller();
            }
        }
        return match;
    }

    public void merge(Map<Address, View> views) {
        if (views == null || views.isEmpty()) {
            this.log.warn("the views passed with the MERGE event were empty (or null); ignoring MERGE event");
            return;
        }
        if (View.sameViews(views.values())) {
            this.log.debug("MERGE event is ignored because of identical views: %s", Util.printListWithDelimiter(views.values(), ", "));
            return;
        }
        if (this.isMergeInProgress()) {
            this.log.trace("%s: merge is already running (merge_id=%s)", this.gms.getAddress(), this.merge_id);
            return;
        }
        Address merge_leader = this.determineMergeLeader(views);
        if (merge_leader == null) {
            return;
        }
        if (merge_leader.equals(this.gms.getAddress())) {
            this.log.debug("%s: I will be the merge leader. Starting the merge task. Views: %s", this.gms.getAddress(), views);
            this.merge_task.start(views);
        } else {
            this.log.trace("%s: I'm not the merge leader, waiting for merge leader (%s) to start merge", this.gms.getAddress(), merge_leader);
        }
    }

    public void handleMergeRequest(Address sender, MergeId merge_id, Collection<? extends Address> mbrs) {
        try {
            this._handleMergeRequest(sender, merge_id, mbrs);
        }
        catch (Throwable t) {
            this.log.error("%s: failure handling the merge request: %s", this.gms.getAddress(), t.getMessage());
            this.cancelMerge(merge_id);
            this.sendMergeRejectedResponse(sender, merge_id);
        }
    }

    public void handleMergeResponse(MergeData data, MergeId merge_id) {
        if (!this.matchMergeId(merge_id)) {
            this.log.trace("%s: this.merge_id (%s) is different from merge_id %s sent by %s as merge response, discarding it", this.gms.getAddress(), this.merge_id, merge_id, data.getSender());
            return;
        }
        this.merge_rsps.add(data.getSender(), data);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleMergeView(MergeData data, MergeId merge_id) {
        if (!this.matchMergeId(merge_id)) {
            this.log.trace("%s: merge_ids (mine: %s, received: %s) don't match; merge view %s is discarded", this.gms.getAddress(), this.merge_id, merge_id, data.view.getViewId());
            return;
        }
        List<Address> expected_acks = this.gms.members.getMembers();
        try {
            this.gms.castViewChangeAndSendJoinRsps(data.view, data.digest, expected_acks, null, null);
        }
        finally {
            this.cancelMerge(merge_id);
        }
    }

    public void handleMergeCancelled(MergeId merge_id) {
        this.log.trace("%s: merge %s is cancelled", this.gms.getAddress(), merge_id);
        this.cancelMerge(merge_id);
    }

    public void handleDigestResponse(Address sender, Digest digest) {
        this.digest_collector.add(sender, digest);
    }

    public static void sanitizeViews(Map<Address, View> map) {
        if (map == null) {
            return;
        }
        for (Map.Entry<Address, View> entry : map.entrySet()) {
            Address key = entry.getKey();
            ArrayList<Address> members = new ArrayList<Address>(entry.getValue().getMembers());
            boolean modified = false;
            Iterator it = members.iterator();
            while (it.hasNext()) {
                View view;
                List<Address> tmp_mbrs;
                Address val = (Address)it.next();
                if (val.equals(key) || (tmp_mbrs = (view = map.get(val)) != null ? view.getMembers() : null) == null || tmp_mbrs.contains(key)) continue;
                it.remove();
                modified = true;
            }
            if (!modified) continue;
            View old_view = entry.getValue();
            entry.setValue(new View(old_view.getViewId(), members));
        }
    }

    protected Address determineMergeLeader(Map<Address, View> views) {
        Collection<Address> coords = Util.determineActualMergeCoords(views);
        if (coords.isEmpty()) {
            coords = Util.determineMergeCoords(views);
        }
        if (coords.isEmpty()) {
            this.log.error("%s: unable to determine merge leader from %s; not starting a merge", this.gms.getAddress(), views);
            return null;
        }
        return new Membership(coords).sort().elementAt(0);
    }

    protected static Map<Address, Collection<Address>> determineMergeCoords(Map<Address, View> views) {
        HashMap<Address, Collection<Address>> retval = new HashMap<Address, Collection<Address>>();
        for (View view : views.values()) {
            Address coord = view.getCreator();
            Collection members = retval.computeIfAbsent(coord, k -> new ArrayList());
            for (Address mbr : view.getMembersRaw()) {
                if (members.contains(mbr)) continue;
                members.add(mbr);
            }
        }
        Collection<Address> merge_participants = Util.determineMergeParticipants(views);
        merge_participants.removeAll(retval.keySet());
        merge_participants.stream().filter(merge_participant -> !retval.containsKey(merge_participant)).forEach(merge_participant -> retval.put((Address)merge_participant, (Collection<Address>)Collections.singletonList(merge_participant)));
        return retval;
    }

    protected void _handleMergeRequest(Address sender, MergeId merge_id, Collection<? extends Address> mbrs) throws Exception {
        ViewId tmp_vid;
        boolean success;
        MergeId current_merge_id = this.merge_id;
        boolean bl = success = this.matchMergeId(merge_id) || this.setMergeId(null, merge_id);
        if (!success) {
            this.log.trace("%s: merge %s is already in progress, received merge-id=%s", this.gms.getAddress(), current_merge_id, merge_id);
            return;
        }
        this.log.trace("%s: got merge request from %s, merge_id=%s, mbrs=%s", this.gms.getAddress(), sender, merge_id, mbrs);
        ArrayList<Address> members = new ArrayList<Address>(mbrs != null ? mbrs.size() : 32);
        if (mbrs != null) {
            mbrs.stream().filter(mbr -> !members.contains(mbr)).forEach(members::add);
        }
        members.retainAll(this.gms.view().getMembers());
        if (!members.contains(this.gms.getAddress())) {
            members.add(this.gms.getAddress());
        }
        if ((tmp_vid = this.gms.getViewId()) == null) {
            throw new Exception("view ID is null; cannot return merge response");
        }
        View view = new View(tmp_vid, members);
        Digest digest = this.fetchDigestsFromAllMembersInSubPartition(view, merge_id);
        if (digest == null || digest.capacity() == 0) {
            throw new Exception("failed fetching digests from subpartition members; dropping merge response");
        }
        this.sendMergeResponse(sender, view, digest, merge_id);
    }

    protected void sendMergeResponse(Address sender, View view, Digest digest, MergeId merge_id) {
        Message msg = new BytesMessage(sender).setArray(GMS.marshal(view, digest)).setFlag(Message.Flag.OOB).putHeader(this.gms.getId(), new GMS.GmsHeader(7).mergeId(merge_id));
        ((Protocol)this.gms.getDownProtocol()).down(msg);
    }

    protected void sendMergeView(Collection<Address> coords, MergeData combined_merge_data, MergeId merge_id) {
        if (coords == null || coords.isEmpty() || combined_merge_data == null) {
            return;
        }
        View view = combined_merge_data.view;
        Digest digest = combined_merge_data.digest;
        if (view == null || digest == null) {
            this.log.error(Util.getMessage("ViewOrDigestIsNullCannotSendConsolidatedMergeView/Digest"));
            return;
        }
        Event install_merge_view_evt = new Event(114, view);
        ((Protocol)this.gms.getUpProtocol()).up(install_merge_view_evt);
        ((Protocol)this.gms.getDownProtocol()).down(install_merge_view_evt);
        ArrayList<Address> coords_copy = new ArrayList<Address>(coords);
        if (coords_copy.remove(this.gms.getAddress())) {
            coords_copy.add(this.gms.getAddress());
        }
        this.log.debug("%s: installing merge view %s in %s", this.gms.getAddress(), combined_merge_data.view.getViewId(), coords_copy);
        for (Address coord : coords_copy) {
            Message msg = new BytesMessage(coord).setArray(GMS.marshal(view, digest)).putHeader(this.gms.getId(), new GMS.GmsHeader(8).mergeId(merge_id));
            ((Protocol)this.gms.getDownProtocol()).down(msg);
        }
    }

    protected void sendMergeRejectedResponse(Address sender, MergeId merge_id) {
        Message msg = new EmptyMessage(sender).setFlag(Message.Flag.OOB).putHeader(this.gms.getId(), new GMS.GmsHeader(7).mergeId(merge_id).mergeRejected(true));
        ((Protocol)this.gms.getDownProtocol()).down(msg);
    }

    protected void sendMergeCancelledMessage(Collection<Address> coords, MergeId merge_id) {
        if (coords == null || merge_id == null) {
            return;
        }
        for (Address coord : coords) {
            Message msg = new EmptyMessage(coord).putHeader(this.gms.getId(), new GMS.GmsHeader(9).mergeId(merge_id));
            ((Protocol)this.gms.getDownProtocol()).down(msg);
        }
    }

    protected Digest fetchDigestsFromAllMembersInSubPartition(View view, MergeId merge_id) {
        List<Address> current_mbrs = view.getMembers();
        if (current_mbrs == null || current_mbrs.size() == 1 && current_mbrs.get(0).equals(this.gms.getAddress())) {
            return new MutableDigest(view.getMembersRaw()).set((Digest)((Protocol)this.gms.getDownProtocol()).down(new Event(39, this.gms.getAddress())));
        }
        Message get_digest_req = new EmptyMessage().setFlag(Message.Flag.OOB).setFlag(Message.TransientFlag.DONT_LOOPBACK).putHeader(this.gms.getId(), new GMS.GmsHeader(13).mergeId(merge_id));
        long max_wait_time = this.gms.merge_timeout / 2L;
        this.digest_collector.reset(current_mbrs);
        long start = System.currentTimeMillis();
        ((Protocol)this.gms.getDownProtocol()).down(get_digest_req);
        Digest digest = (Digest)((Protocol)this.gms.getDownProtocol()).down(new Event(39, this.gms.getAddress()));
        this.digest_collector.add(this.gms.getAddress(), digest);
        this.digest_collector.waitForAllResponses(max_wait_time);
        if (this.log.isTraceEnabled()) {
            if (this.digest_collector.hasAllResponses()) {
                this.log.trace("%s: fetched all digests for %s in %d ms", this.gms.getAddress(), current_mbrs, System.currentTimeMillis() - start);
            } else {
                this.log.trace("%s: fetched incomplete digests (after timeout of %d) ms for %s", this.gms.getAddress(), max_wait_time, current_mbrs);
            }
        }
        ArrayList<Address> valid_rsps = new ArrayList<Address>(current_mbrs);
        valid_rsps.removeAll(this.digest_collector.getMissing());
        Address[] tmp = new Address[valid_rsps.size()];
        valid_rsps.toArray(tmp);
        MutableDigest retval = new MutableDigest(tmp);
        HashMap<Address, Digest> responses = new HashMap<Address, Digest>(this.digest_collector.getResults());
        responses.values().forEach(retval::set);
        return retval;
    }

    protected void fixDigests() {
        Digest digest = this.fetchDigestsFromAllMembersInSubPartition(this.gms.view, null);
        Message msg = new BytesMessage().putHeader(this.gms.getId(), new GMS.GmsHeader(15)).setArray(GMS.marshal(null, digest));
        ((Protocol)this.gms.getDownProtocol()).down(msg);
    }

    protected void stop() {
        this.merge_task.stop();
    }

    protected synchronized void cancelMerge(MergeId id) {
        if (this.setMergeId(id, null)) {
            this.merge_task.stop();
            this.stopMergeKiller();
            this.merge_rsps.reset();
            this.gms.getViewHandler().resume();
            ((Protocol)this.gms.getDownProtocol()).down(new Event(66));
        }
    }

    protected synchronized void forceCancelMerge() {
        if (this.merge_id != null) {
            this.cancelMerge(this.merge_id);
        }
    }

    protected synchronized void startMergeKiller() {
        if (this.merge_killer == null || this.merge_killer.isDone()) {
            MergeKiller task = new MergeKiller(this.merge_id);
            this.merge_killer = this.gms.timer.schedule(task, this.gms.merge_timeout * 2L, TimeUnit.MILLISECONDS, false);
        }
    }

    protected synchronized void stopMergeKiller() {
        if (this.merge_killer != null) {
            this.merge_killer.cancel(false);
            this.merge_killer = null;
        }
    }

    protected class MergeKiller
    implements Runnable {
        protected final MergeId my_merge_id;

        MergeKiller(MergeId my_merge_id) {
            this.my_merge_id = my_merge_id;
        }

        @Override
        public void run() {
            Merger.this.cancelMerge(this.my_merge_id);
        }

        public String toString() {
            return Merger.class.getSimpleName() + ": " + this.getClass().getSimpleName();
        }
    }

    protected class MergeTask
    implements Runnable {
        protected Thread thread = null;
        protected final ConcurrentMap<Address, Collection<Address>> coords = Util.createConcurrentMap(8, 0.75f, 8);
        protected final Set<View> subviews = new HashSet<View>();

        protected MergeTask() {
        }

        public synchronized void start(Map<Address, View> views) {
            if (this.thread != null && this.thread.isAlive()) {
                return;
            }
            this.coords.clear();
            this.subviews.clear();
            this.subviews.addAll(views.values());
            Merger.sanitizeViews(views);
            Map<Address, Collection<Address>> tmp_coords = Merger.determineMergeCoords(views);
            this.coords.putAll(tmp_coords);
            this.thread = Merger.this.gms.getThreadFactory().newThread(this, "MergeTask");
            this.thread.setDaemon(true);
            this.thread.start();
        }

        public synchronized void stop() {
            Thread tmp = this.thread;
            if (this.thread != null && this.thread.isAlive()) {
                tmp.interrupt();
            }
            this.thread = null;
        }

        public synchronized boolean isRunning() {
            return this.thread != null && this.thread.isAlive();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            MergeId new_merge_id = MergeId.create(Merger.this.gms.getAddress());
            ArrayList<Address> coordsCopy = new ArrayList<Address>(this.coords.keySet());
            long start = System.currentTimeMillis();
            try {
                this._run(new_merge_id, coordsCopy);
            }
            catch (Throwable ex) {
                if (ex instanceof Error || ex instanceof RuntimeException) {
                    Merger.this.log.warn(Merger.this.gms.getAddress() + ": merge is cancelled", ex);
                } else {
                    Merger.this.log.warn("%s: merge is cancelled: %s", Merger.this.gms.getAddress(), ex.getMessage());
                }
                Merger.this.sendMergeCancelledMessage(coordsCopy, new_merge_id);
                Merger.this.cancelMerge(new_merge_id);
            }
            finally {
                this.thread = null;
            }
            long diff = System.currentTimeMillis() - start;
            Merger.this.log.debug("%s: merge %s took %d ms", Merger.this.gms.getAddress(), new_merge_id, diff);
        }

        protected void _run(MergeId new_merge_id, Collection<Address> coordsCopy) throws Exception {
            boolean success = Merger.this.setMergeId(null, new_merge_id);
            if (!success) {
                Merger.this.log.warn("%s: failed to set my own merge_id (%s) to %s", Merger.this.gms.getAddress(), Merger.this.merge_id, new_merge_id);
                return;
            }
            Merger.this.log.debug("%s: merge task %s started with %d participants", Merger.this.gms.getAddress(), Merger.this.merge_id, this.coords.keySet().size());
            success = this.getMergeDataFromSubgroupCoordinators(this.coords, new_merge_id, Merger.this.gms.merge_timeout);
            List<Address> missing = null;
            if (!success) {
                missing = Merger.this.merge_rsps.getMissing();
                Merger.this.log.debug("%s: merge leader %s did not get responses from all %d partition coordinators; missing responses from %d members, removing them from the merge", Merger.this.gms.getAddress(), Merger.this.gms.getAddress(), this.coords.keySet().size(), missing.size());
                Merger.this.merge_rsps.remove(missing);
            }
            if (missing != null && !missing.isEmpty()) {
                missing.forEach(this.coords.keySet()::remove);
                coordsCopy.removeAll(missing);
            }
            this.removeRejectedMergeRequests(this.coords.keySet());
            if (Merger.this.merge_rsps.size() == 0) {
                throw new Exception("did not get any merge responses from partition coordinators");
            }
            if (!this.coords.containsKey(Merger.this.gms.getAddress())) {
                throw new Exception("merge leader rejected merge request");
            }
            ArrayList<MergeData> merge_data = new ArrayList<MergeData>(Merger.this.merge_rsps.getResults().values());
            MergeData combined_merge_data = this.consolidateMergeData(merge_data, new ArrayList<View>(this.subviews));
            if (combined_merge_data == null) {
                throw new Exception("could not consolidate merge");
            }
            Merger.this.sendMergeView(this.coords.keySet(), combined_merge_data, new_merge_id);
        }

        protected boolean getMergeDataFromSubgroupCoordinators(Map<Address, Collection<Address>> coords, MergeId new_merge_id, long timeout) {
            long start = System.currentTimeMillis();
            Merger.this.merge_rsps.reset(coords.keySet());
            Merger.this.log.trace("%s: sending MERGE_REQ to %s", Merger.this.gms.getAddress(), coords.keySet());
            for (Map.Entry<Address, Collection<Address>> entry : coords.entrySet()) {
                Address coord = entry.getKey();
                Collection<Address> mbrs = entry.getValue();
                Message msg = new BytesMessage(coord).setFlag(Message.Flag.OOB).putHeader(Merger.this.gms.getId(), new GMS.GmsHeader(6).mbr(Merger.this.gms.getAddress()).mergeId(new_merge_id)).setArray(GMS.marshal(mbrs));
                ((Protocol)Merger.this.gms.getDownProtocol()).down(msg);
            }
            Merger.this.merge_rsps.waitForAllResponses(timeout);
            boolean gotAllResponses = Merger.this.merge_rsps.hasAllResponses();
            long time = System.currentTimeMillis() - start;
            Merger.this.log.trace("%s: collected %d merge response(s) in %d ms", Merger.this.gms.getAddress(), Merger.this.merge_rsps.numberOfValidResponses(), time);
            return gotAllResponses;
        }

        protected void removeRejectedMergeRequests(Collection<Address> coords) {
            int num_removed = 0;
            Iterator<Map.Entry<Address, MergeData>> it = Merger.this.merge_rsps.getResults().entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Address, MergeData> entry = it.next();
                MergeData data = entry.getValue();
                if (!data.merge_rejected) continue;
                if (data.getSender() != null) {
                    coords.remove(data.getSender());
                }
                it.remove();
                ++num_removed;
            }
            if (num_removed > 0) {
                Merger.this.log.trace("%s: removed %d rejected merge responses", Merger.this.gms.getAddress(), num_removed);
            }
        }

        protected MergeData consolidateMergeData(List<MergeData> merge_rsps, List<View> subviews) {
            Address new_coord;
            long logical_time = 0L;
            ArrayList<Collection<Address>> sub_mbrships = new ArrayList<Collection<Address>>();
            HashSet<Address> digest_membership = new HashSet<Address>();
            for (MergeData mergeData : merge_rsps) {
                Digest digest;
                View tmp_view = mergeData.getView();
                if (tmp_view != null) {
                    ViewId tmp_vid = tmp_view.getViewId();
                    if (tmp_vid != null) {
                        logical_time = Math.max(logical_time, tmp_vid.getId());
                    }
                    sub_mbrships.add(new ArrayList<Address>(tmp_view.getMembers()));
                }
                if ((digest = mergeData.getDigest()) == null) continue;
                for (Digest.Entry entry : digest) {
                    digest_membership.add(entry.getMember());
                }
            }
            if (!digest_membership.isEmpty()) {
                for (Collection collection : sub_mbrships) {
                    collection.retainAll(digest_membership);
                }
            }
            List<Address> merged_mbrs = Merger.this.gms.computeNewMembership(sub_mbrships);
            HashSet hashSet = new HashSet();
            sub_mbrships.forEach(hashSet::addAll);
            merged_mbrs.retainAll(hashSet);
            Address address = new_coord = merged_mbrs.isEmpty() ? null : merged_mbrs.get(0);
            if (new_coord == null) {
                return null;
            }
            Iterator<View> it = subviews.iterator();
            while (it.hasNext()) {
                View v = it.next();
                logical_time = Math.max(logical_time, v.getViewId().getId());
                Address creator = v.getCreator();
                if (creator == null || merged_mbrs.contains(creator)) continue;
                it.remove();
            }
            MergeView new_view = new MergeView(new_coord, logical_time + 1L, merged_mbrs, subviews);
            MutableDigest new_digest = this.consolidateDigests(new_view, merge_rsps);
            if (new_digest == null || !new_digest.allSet()) {
                return null;
            }
            Merger.this.log.trace("%s: consolidated view=%s\nconsolidated digest=%s", Merger.this.gms.getAddress(), new_view, new_digest);
            return new MergeData(Merger.this.gms.getAddress(), new_view, new_digest);
        }

        protected MutableDigest consolidateDigests(View new_view, List<MergeData> merge_rsps) {
            MutableDigest retval = new MutableDigest(new_view.getMembersRaw());
            for (MergeData data : merge_rsps) {
                Digest tmp_digest = data.getDigest();
                if (tmp_digest == null) continue;
                retval.merge(tmp_digest);
            }
            return retval;
        }
    }
}

