001package org.unix4j.unix.sort;
002
003import java.util.Comparator;
004import java.util.List;
005
006import org.unix4j.context.ExecutionContext;
007import org.unix4j.io.Input;
008import org.unix4j.line.Line;
009import org.unix4j.processor.LineProcessor;
010
011class MergeProcessor extends AbstractSortProcessor {
012
013        private final List<? extends Input> inputs;
014        
015        public MergeProcessor(SortCommand command, ExecutionContext context, LineProcessor output, List<? extends Input> inputs) {
016                super(command, context, output);
017                this.inputs = inputs;
018        }
019
020        @Override
021        public boolean processLine(Line line) {
022                //if lines come from standard input, there is nothing to merge
023                return getOutput().processLine(line);
024        }
025
026        @Override
027        public void finish() {
028                final int len = inputs.size();
029                final Line[] lines = new Line[len];
030                for (int i = 0; i < len; i++) {
031                        final Input input = inputs.get(i); 
032                        lines[i] = input.hasMoreLines() ? input.readLine() : null;
033                }
034                final LineProcessor output = getOutput();
035                final Comparator<? super Line> comparator = getComparator();
036                while (true) {
037                        Line line = null;
038                        int inputIndexOfLine = -1;
039                        for (int i = 0; i < len; i++) {
040                                final Line cur = lines[i];
041                                if (cur != null) {
042                                        if (inputIndexOfLine < 0 || 0 < comparator.compare(line, cur)) {
043                                                line = cur;
044                                                inputIndexOfLine = i;
045                                        }
046                                }
047                        }
048                        if (line != null) {
049                                //write the winner line
050                                if (!output.processLine(line)) {
051                                        output.finish();
052                                        closeInputs();
053                                        return;
054                                }
055                                //move to next line for winner input
056                                final Input input = inputs.get(inputIndexOfLine); 
057                                lines[inputIndexOfLine] = input.hasMoreLines() ? input.readLine() : null;
058                        } else {
059                                //no more lines
060                                output.finish();
061                                closeInputs();
062                                return;
063                        }
064                }
065        }
066
067        private void closeInputs() {
068                for (final Input input : inputs) {
069                        input.close();
070                }
071        }
072
073}