/*************************************************************************
 * File: InplaceMerge.hh
 * Author: Keith Schwarz (htiek@cs.stanford.edu)
 *
 * An implementation of the fast inplace merge algorithm described by Huang
 * and Langston in "Practical In-Place Merging," Communications of the ACM,
 * Volume 31, Issue 3, March 1988, pages 348-352.
 *
 * The algorithm works by splitting the input into sqrt(n) blocks of size
 * sqrt(n), choosing one block to use as a buffer, and then merging the
 * other blocks together with the buffer as scratch space.  The main complexity
 * of the algorithm comes from the case where the input is not a perfect
 * square in length.  When this happens, the input needs to be heavily
 * preprocessed into a sequence of blocks that are of suitable size.
 *
 * A summary of the algorithm is as follows.  First, we compute the size of
 * each block to be s = ceil(sqrt(n)).  Next, we locate the largest s elements
 * of the input, which are some combination of the last elements of the two
 * sequences to be merged.  The paper refers to the group of largest elements
 * in the first sequence as A and the group in the second sequence as B.
 * Next, a group of elements C is chosen which directly precedes A and has
 * size equal to B, and a group of elements D is chosen which directly precedes
 * B and has size K - |B| mod s, where K is the length of either list.  The
 * reason for this choice is that the second list, with B and D removed, has
 * a perfect multiple of s elements.  A group F is then chosen at the front
 * of the first list with size K mod s, again to make the first list have
 * a size multiple of s.  The input now looks like this:
 *
 * F s s s ... s s C A s s s ... s D B
 *
 * Here, each s refers to an arbitrary block of size s.  We ned swap regions
 * C and B, which have the same size.  This puts B and A next to each other,
 * and since they were chosen to be the largest elements of the entire
 * sequence, we will treat them together as a buffer block, referred to by
 * Z.  This is seen here:
 *
 * F s s s ... s s Z s s s ... s D B
 *
 * Everything here except for F, D, and B are in their proper place.  Without
 * any optimizations, we could now correctly sort all of these elements
 * in-place in O(n) as follows.  First we apply the subroutine described
 * below to sort all of the s blocks together, giving us the following
 * sequence:
 *
 * F [all s blocks in sorted order] Z D B
 *
 * Since |F| < s and |Z| = s, we could then merge F and the sorted sequence
 * using Z as a buffer in O(n) time, then repeat this process to merge
 * C and B into the sequence as well.  However, the paper details several
 * optimizations which yield an O(n) algorithm with much smaller runtime
 * as follows.  Recall that the elements are ordered as follows:
 *
 * F s s s ... s s Z s s s ... s D B
 *
 * We begin by merging together the elements of D and B together using Z as
 * a temporary buffer.  This produces a new group E, as seen here:
 *
 * F s s s ... s s Z s s s ... s E
 *
 * At this point we slightly diverge from the original text.  Let the first
 * block of the second range be G, and then swap that block with the first
 * actual block of the first list (H):
 *
 * F G s s ... s s Z H s ... s E
 *
 * Notice that F and G have the smallest elements from the range, so if we
 * merge the two of them together, the first |F| elements of the result are
 * the smallest elements of the entire range.  We merge them together
 * using the buffer as scratch space, yielding two new groups F' and G',
 * as seen here:
 *
 * F' G' s s ... s s Z H s ... s E
 *
 * Finally, exchange G' and H to get
 *
 * F' H s s ... s s Z G' s ... s E
 *
 * Now, the only parts of this setup that aren't perfect multiples of s are
 * F', which we don't need to touch (it's already in the right place), and
 * E.  Once we've run the subroutine described below, we can merge E into the
 * sequence quite easily; we'll see how later.
 *
 * The core of the algorithm is a block merging step in which all of the
 * size-s blocks are merged into sorted order, using a special buffer block
 * as scratch space.  This buffer must consist of the largest elements of the
 * sequence for reasons that will become clearer later on.  Thus the input to
 * this subroutine looks like this:
 *
 * s s s ... s Z s ... s s
 *
 * Where Z is the buffer block.  Note that because the input was initially two
 * sorted sequences, each of these blocks is sorted in ascending order.
 *
 * The first step of this algorithm is to swap the buffer to the front of the
 * sequence in O(s) time, as seen here:
 *
 * Z s s s ... s s
 *
 * Next, we sort the remaining blocks in ascending order by their last element.
 * To do so, we use naive selection sort.  Selection sorting O(s) blocks will
 * make O(s^2) = O(n) comparisons, and will perform a total of O(s) block
 * swaps, each of which takes O(s) time, for a total of O(s^2) = O(n) swap
 * time.
 *
 * Finally, we begin the merge.  Starting at the leftmost block, we scan for
 * the largest increasing sequence we can find by picking a string of blocks
 * where each block's last element is less than the next block's first element.
 * This gives us an O(s) way of locating increasing sequences, since each
 * block is internally sorted.  Once we have located this sequence, we merge
 * it in with the next block, using the buffer as temporary space.  We stop
 * the merge as soon as the last element of the first sequence is moved into
 * the correct place.  Since the blocks are stored in sorted ascending order,
 * the merge will terminate with the buffer elements moved up past the first
 * sequence and before some remaining portion of the second sequence.  This
 * process terminates when the buffer comes directly before a nondecreasing
 * sequence.  When this happens, we use the rotate algorithm to push the 
 * buffer past the last elements.  This gives us a sequence of sorted blocks 
 * followed by an unsorted block containing the largest elements of the 
 * sequence, which are all in the correct region of the input.  We can then 
 * sort the buffer to finish the merge.
 *
 * Once we've finished this step, we have everything in place except for the
 * elements in E.  We can fix this easily as follows.  First, swap |E| of the
 * smallest elements from the sorted sequence and E.  We now have this setup:
 *
 * E {...sorted sequence...} {|E| of the smallest elements}
 *
 * Now, do a backwards merge of E and the sorted sequence in a fashion that
 * moves the last |E| elements to the front of the array, then sort the first
 * E elements in-place.  Voila!  Everything is sorted!
 */

#ifndef InplaceMerge_Included
#define InplaceMerge_Included

#include <algorithm>  // For rotate, swap_ranges, make_heap, sort_heap, iter_swap
#include <iterator>   // For iterator_traits, distance
#include <stdexcept>  // For invalid_argument
#include <cmath>      // For ceil, sqrt
#include <utility>    // For pair
#include <functional> // For less

/**
 * Function: InplaceMerge(RandomIterator start, RandomIterator end);
 * -------------------------------------------------------------------
 * Given the sequence [start, end) such that the first half of the
 * elements form an ascending sequence and the second half of the
 * elements is also an ascending sequences, sorts the entire range
 * in ascending order in O(n) time and O(1) auxiliary storage space.
 */

template <typename RandomIterator>
void InplaceMerge(RandomIterator start, RandomIterator end);

/**
 * Function: InplaceMerge(RandomIterator start, RandomIterator end,
 *                        Comparator comp);
 * -------------------------------------------------------------------
 * Given the sequence [start, end) such that the first half of the
 * elements form an ascending sequence and the second half of the
 * elements is also an ascending sequences, sorts the entire range
 * in ascending order in O(n) time and O(1) auxiliary storage space.
 * The ranges are assumed to be sorted in ascending order according
 * to comp.
 */

template <typename RandomIterator, typename Comparator>
void InplaceMerge(RandomIterator start, RandomIterator end,
                  Comparator comp);

/* * * * * Implementation Below This Point * * * * */
namespace inplacemerge_detail {
  /**
   * Function: pair<RandomIterator, RandomIterator> 
   *              LocateMaxElems(RandomIterator begin, RandomIterator mid,
   *                             RandomIterator end, size_t s,
   *                             Comparator comp);
   * ---------------------------------------------------------------------
   * Given two sorted ranges [begin, mid) and [mid, end) which are sorted
   * according to comp and a parameter s, returns iterators (A, B) such
   * that [A, mid) and [B, end) are the s largest elements of the two
   * sorted ranges.
   */

  template <typename RandomIterator, typename Comparator>
  std::pair<RandomIterator, RandomIterator>
  LocateMaxElems(RandomIterator begin, RandomIterator mid,
                 RandomIterator end, size_t s, Comparator comp) {
    /* This operation is essentially a merge step in which we don't move
     * anything around.  We keep two pointers, one to the end of each range,
     * then continuously move them inward by skipping past the larger of
     * the two at each step.  Once we've visited s elements, we're done.
     */

    std::pair<RandomIterator, RandomIterator> result(mid, end);

    /* Locate the largest s elements one at a time. */
    for (size_t i = 0; i < s; ++i) {
      /* The elements we're interested in comparing are one step before
       * the current iterator positions.  This is because iterators define
       * half-open ranges.
       */

      if (comp(*(result.first - 1), *(result.second - 1))) // Second > First
        --result.second;
      else // First >= Second
        --result.first;
    }

    return result;
  }

  /**
   * Function: BufferedInplaceMerge(RandomIterator begin, RandomIterator mid,
   *                                RandomIterator end, RandomIterator bStart,
   *                                RandomIterator bEnd, Comparator comp);
   * ----------------------------------------------------------------------
   * Utility function which, given adjacent ranges [begin, mid) and [mid, end)
   * to be merged, does a standard merge using the buffer [bStart, bEnd) for
   * auxiliary storage space.  The input ranges are assumed to be sorted
   * using comp and will ultimately be sorted by comp.  It is assumed that
   * the range [begin, mid) fits in the buffer, though [mid, end) does not
   * need to.
   */

  template <typename RandomIterator, typename Comparator>
  void BufferedInplaceMerge(RandomIterator begin, RandomIterator mid,
                            RandomIterator end, RandomIterator bStart,
                            RandomIterator bEnd, Comparator comp) {
    /* The algorithm works as follows.  We begin by swapping the first range
     * into the buffer.  Next, we maintain two pointers to the start points
     * of the ranges, and use a standard merge technique to select the
     * smaller of the two, which is then swapped into the proper position.
     * If the second range is exhausted, what's left of the first range
     * is swapped into place.  If the first range is exhausted, then we do
     * not need to take any action because the second range is already
     * at the end of the destination.
     */


    /* Move the first elements into the buffer. */
    std::swap_ranges(begin, mid, bStart);

    /* Maintain pointers to the first elements of both ranges and the next
     * write position.
     */

    RandomIterator one = bStart, two = mid, nextWrite = begin;

    /* Cache the end point of the first range. */
    const RandomIterator oneEnd = bStart + (mid - begin);

    /* While neither range has been exhausted, do the standard merge step. */
    while (one != oneEnd && two != end) {
      /* Determine which element comes next. */
      if (comp(*one, *two)) { // First element is smaller
        std::iter_swap(one, nextWrite);
        ++one;
      } else { // Second element is smaller
        std::iter_swap(two, nextWrite);
        ++two;
      }

      /* Bump the next write position. */
      ++nextWrite;
    }

    /* If we ran out of elements from the first range, we're done. */
    if (one == oneEnd) return;

    /* Otherwise, we need to exchange the rest of the first range with
     * the accumulated buffer elements at the end of the merge range.
     */

    std::swap_ranges(one, oneEnd, nextWrite);
  }

  /**
   * Function: SortBlocks(RandomIterator begin, RandomIterator end,
   *                      size_t blockSize, Comparator comp);
   * ----------------------------------------------------------------------
   * Given a range of blocks [begin, end) of size blockSize, sorts them
   * in ascending order by comp.  This step uses naive selection sort to
   * ensure that the sorting takes time O(n).
   */

  template <typename RandomIterator, typename Comparator>
  void SortBlocks(RandomIterator begin, RandomIterator end,
                  size_t blockSize, Comparator comp) {
    for (RandomIterator itr = begin; itr != end; itr += blockSize) {
      /* Scan over all the blocks from here forward, finding the smallest. */
      RandomIterator minBlock = itr;
      for (RandomIterator curr = itr + blockSize; curr != end; curr += blockSize) {
        /* If this is outright smaller, pick it. */
        if (comp(*(curr + blockSize - 1), *(minBlock + blockSize - 1)))
          minBlock = curr;
        /* Otherwise if it's equal but has a smaller first element, pick it. */
        else if (!comp(*(minBlock + blockSize - 1), *(curr + blockSize - 1)) &&
                 comp(*curr, *minBlock))
          minBlock = curr;
      }

      /* If the element is in the right place, do nothing.  Otherwise, swap
       * the current block and the best block.
       */

      if (minBlock != itr)
        std::swap_ranges(itr, itr + blockSize, minBlock);
    }
  }

  /**
   * Function: FindEndOfRange(RandomIterator begin, RandomIterator end,
   *                          size_t blockSize, Comparator comp);
   * ---------------------------------------------------------------------
   * Given a range of blocks [begin, end) of size blockSize, returns an 
   * iterator to the start of the first block whose head is smaller than
   * its predecessor's tail.  If no such block exists, end is returned.
   */

  template <typename RandomIterator, typename Comparator>
  RandomIterator FindEndOfRange(RandomIterator begin, RandomIterator end,
                                size_t blockSize, Comparator comp) {
    /* Scan over the blocks after the first, checking whether the previous
     * block end is greater than the current block start.
     */

    for (RandomIterator itr = begin + blockSize; itr != end; itr += blockSize)
      if (comp(*itr, *(itr - 1)))
        return itr;

    /* All sorted; hand back end as a sentinel. */
    return end;
  }

  /**
   * Function: BufferMovingMerge(RandomIterator begin, RandomIterator mid,
   *                             RandomIterator end, RandomIterator buffer,
   *                             Comparator comp);
   * ----------------------------------------------------------------------
   * Given a range of elements [buffer, begin), [begin, mid), [mid, end),
   * merges the elements into a sorted range such that the resulting
   * sequence looks like
   *
   * [-merged elements-] [buffer] [unconsumed second sequence]
   *
   * This function then returns an iterator to the new start position of
   * the buffer.
   */

  template <typename RandomIterator, typename Comparator>
  RandomIterator BufferMovingMerge(RandomIterator begin, RandomIterator mid,
                                   RandomIterator end, RandomIterator buffer,
                                   Comparator comp) {
    /* This function works by doing a standard merge operation to consume
     * [begin, mid).  We maintain iterators to the start of the buffer,
     * the start of the first sequence, and the start of the second sequence.
     * We then continously swap the smaller of the two sequence elements
     * with the next buffer element and bump the pointers forward.
     */

    RandomIterator one = begin, two = mid, output = buffer;

    /* Keep writing until we exhaust the first sequence.  We are guaranteed
     * that we won't exhaust the second because its last element is bigger
     * than the last element of the first sequence.
     */

    while (one != mid) {
      /* Move the smaller element forward. */
      if (!comp(*two, *one)) { // First element is <=
        std::iter_swap(one, output);
        ++one;
      } else { // Second element is smaller
        std::iter_swap(two, output);
        ++two;
      }

      /* Advance the output pointer to the next spot. */
      ++output;
    }

    /* Return the start of the new buffer. */
    return output;
  }

  /**
   * Function: MergeBlocks(RandomIterator begin, RandomIterator end,
   *                       size_t blockSize, Comparator comp)
   * ----------------------------------------------------------------------
   * Given a range of blocks of size blockSize that evenly partition
   * [begin, end) into a sequence of blocks, does a merge step to convert the
   * range into a sorted range.  It is assumed that the elements 
   * [begin, begin + blockSize) are buffer elements, and these elements will
   * end up occupying the last block of the sequence.
   */

  template <typename RandomIterator, typename Comparator>
  void MergeBlocks(RandomIterator begin, RandomIterator end,
                   size_t blockSize, Comparator comp) {
    /* Begin by sorting all of the blocks in ascending order by their last
     * elements.
     */

    SortBlocks(begin + blockSize, end, blockSize, comp);

    /* Now, we need to start scanning over the blocks, looking for increasing
     * sequences that can be merged.  We'll do this by maintaining a pointer
     * to the start of the first block to merge, where in that block to
     * pick up from, and where the buffer is.
     */

    RandomIterator buffer = begin, currBlock = begin + blockSize,
      currPos = begin + blockSize;
    while (true) {
      /* Find the first block past the end of the current sequence. */
      RandomIterator rangeEnd = FindEndOfRange(currBlock, end,
                                               blockSize, 
                                               comp);

      /* If we didn't find anything, we're done. */
      if (rangeEnd == end) break;

      /* Otherwise, do a buffer-moving merge of the range up through the
       * start of the last block with the last block, and remember where
       * the buffer ended up getting repositioned.
       */

      buffer = BufferMovingMerge(currPos, rangeEnd, rangeEnd + blockSize,
                                 buffer, comp);

      /* Update the current block to be the end of the range since we
       * didn't finish moving everything out of it.
       */

      currBlock = rangeEnd;

      /* Update the write position to be one block past the buffer start. */
      currPos = buffer + blockSize;
    }

    /* When we get here, we know that the range [currPos, end) is sorted.
     * To finish up this step, we sort the buffer using an in-place sort
     * (here, heapsort), then rotate it through the rest of the range to
     * finish up the sequence.
     */

    std::make_heap(buffer, buffer + blockSize, comp);
    std::sort_heap(buffer, buffer + blockSize, comp);
    std::rotate(buffer, buffer + blockSize, end);
  }
}

/* Actual implementation of InplaceMerge */
template <typename RandomIterator, typename Comparator>
void InplaceMerge(RandomIterator begin, RandomIterator end,
                  Comparator comp) {
  /* Grant access to the utility functions we've written. */
  using namespace inplacemerge_detail;

  /* Confirm that the input has an even number of elements. */
  if (std::distance(begin, end) % 2 != 0)
    throw std::invalid_argument("Range to merge must have an even number of elements.");

  /* In order to guarantee that the input range has sufficiently many elements
   * that the named blocks exist, there should be at least 50 elements in
   * the range.  Otherwise, each half is not guaranteed to have at least
   * three blocks.  If there are fewer than 50 elements, we'll just use an
   * in-place heap sort to do the work.  This doesn't affect the overall
   * asymptotic runtime of the algorithm, since big-O only considers behavior
   * in the limit.
   */

  if (std::distance(begin, end) < 50) {
    std::make_heap(begin, end, comp);
    std::sort_heap(begin, end, comp);
    return;
  }

  /* Compute s, the block size.  The casts are necessary to resolve which
   * overload to use.
   */

  const size_t s = (size_t)std::ceil(std::sqrt(double(end - begin)));

  /* Cache the number of elements in each subrange. */
  const size_t listSize = (end - begin) / 2;

  /* Compute the midpoint, which is where the second list starts and
   * the first list stops.
   */

  RandomIterator mid = begin + listSize;

  /* Get back iterators to the start of blocks A and B. */
  std::pair<RandomIterator, RandomIterator> maxElems =
    LocateMaxElems(begin, mid, end, s, comp);

  /* Compute the groups C and D.  Group C is formed from the group of elements
   * of size |B| that directly precede A.
   */

  RandomIterator cStart = maxElems.first - std::distance(maxElems.second, end);

  /* Group D is formed by taking the K - |B| % s elements that precede B. */
  RandomIterator dStart = maxElems.second - ((listSize - std::distance(maxElems.second, end)) % s);

  /* Exchange C and B.  This makes the range [cStart, mid) the buffer. */
  std::swap_ranges(cStart, maxElems.first, maxElems.second);

  /* Now, we need to merge D and B together using the buffer as the temporary
   * storage space.  We don't want to move the buffer when doing this, so we'll
   * do an in-place merge using the buffer as a temporary.
   */

  BufferedInplaceMerge(dStart, maxElems.second, end, // D B
                       cStart, mid,                  // Buffer location
                       comp);
  
  /* Next, we need to fix up the leftover elements from the front of the first
   * list.  The first list is as it originally was, except that the last block
   * has been swapped with the buffer.  This means that the number of elements
   * that are beyond what's necessary is K mod s.  Let's see what this is.
   */

  const size_t firstSlack = listSize % s;

  /* There are two cases to consider.  First, if s == 0, then there is no
   * leftover slack and we can just run the main algorithm.  Otherwise,
   * we need to do a merge of these elements with the smallest elements from
   * the second list.
   */

  if (firstSlack != 0) {
    /* Swap the first block from the second list and the second block from
     * the first list.  This puts the smallest elements from the two lists
     * adjacent to one another.
     */

    std::swap_ranges(begin + firstSlack, begin + firstSlack + s, mid);

    /* Merge the slack space and this block using the buffer for scratch. */
    BufferedInplaceMerge(begin, begin + firstSlack, begin + firstSlack + s,
                         cStart, mid, comp);

    /* Move the blocks back. */
    std::swap_ranges(begin + firstSlack, begin + firstSlack + s, mid);
  }
 
  /* Okay!  At this point we're ready to begin the main algorithm.  We do
   * this by swapping the buffer and the first actual block.
   */

  std::swap_ranges(begin + firstSlack, begin + firstSlack + s, cStart);

  /* Run the block-merging step to eat everything from the first block up to
   * the start of E.
   */

  MergeBlocks(begin + firstSlack, dStart, s, comp);

  /* At this point, all that's left to do is merge E into this resulting
   * sequence.  We do this by swapping E and the smallest elements of the
   * range, doing an in-place merge, then rotating the buffer back and
   * sorting it.
   */

  std::swap_ranges(dStart, end, begin);
  BufferedInplaceMerge(begin, begin + distance(dStart, end), dStart,
                       dStart, end, comp);

  /* Sort the buffer using heapsort. */
  std::make_heap(dStart, end, comp);
  std::sort_heap(dStart, end, comp);

  /* Rotate the buffer to the front. */
  std::rotate(begin, dStart, end);
}

/* Non-comparator version calls comparator version. */
template <typename RandomIterator>
void InplaceMerge(RandomIterator begin, RandomIterator end) {
  InplaceMerge(begin, end,
               std::less<typename std::iterator_traits<RandomIterator>::value_type>());
}

#endif