Majority element - parts of an array Majority element - parts of an array arrays arrays

Majority element - parts of an array


O(log n) queries and O(n log n) preprocessing/space could be achieved by finding and using majority intervals with following properties:

  1. For each value from input array there may be one or several majority intervals (or there may be none if elements with these values are too sparse; we don't need majority intervals of length 1 because they may be useful only for query intervals of size 1 which are better handled as a special case).
  2. If query interval lies completely inside one of these majority intervals, corresponding value may be the majority element of this query interval.
  3. If there is no majority interval completely containing query interval, corresponding value cannot be the majority element of this query interval.
  4. Each element of input array is covered by O(log n) majority intervals.

In other words, the only purpose of majority intervals is to provide O(log n) majority element candidates for any query interval.

This algorithm uses following data structures:

  1. List of positions for each value from input array (map<Value, vector<Position>>). Alternatively unordered_map may be used here to improve performance (but we'll need to extract all keys and sort them so that structure #3 is filled in proper order).
  2. List of majority intervals for each value (vector<Interval>).
  3. Data structure for handling queries (vector<small_map<Value, Data>>). Where Data contains two indexes of appropriate vector from structure #1 pointing to next/previous positions of elements with given value. Update: Thanks to @justhalf, it is better to store in Data cumulative frequencies of elements with given value. small_map may be implemented as sorted vector of pairs - preprocessing will append elements already in sorted order and query will use small_map only for linear search.

Preprocessing:

  1. Scan input array and push current position to appropriate vector in structure #1.
  2. Perform steps 3 .. 4 for every vector in structure #1.
  3. Transform list of positions into list of majority intervals. See details below.
  4. For each index of input array covered by one of majority intervals, insert data to appropriate element of structure #3: value and positions of previous/next elements with this value (or cumulative frequency of this value).

Query:

  1. If query interval length is 1, return corresponding element of source array.
  2. For starting point of query interval get corresponding element of 3rd structure's vector. For each element of the map perform step 3. Scan all elements of the map corresponding to ending point of query interval in parallel with this map to allow O(1) complexity for step 3 (instead of O(log log n)).
  3. If the map corresponding to ending point of query interval contains matching value, compute s3[stop][value].prev - s3[start][value].next + 1. If it is greater than half of the query interval, return value. If cumulative frequencies are used instead of next/previous indexes, compute s3[stop+1][value].freq - s3[start][value].freq instead.
  4. If nothing found on step 3, return "Nothing".

Main part of the algorithm is getting majority intervals from list of positions:

  1. Assign weight to each position in the list: number_of_matching_values_to_the_left - number_of_nonmatching_values_to_the_left.
  2. Filter only weights in strictly decreasing order (greedily) to the "prefix" array: for (auto x: positions) if (x < prefix.back()) prefix.push_back(x);.
  3. Filter only weights in strictly increasing order (greedily, backwards) to the "suffix" array: reverse(positions); for (auto x: positions) if (x > suffix.back()) suffix.push_back(x);.
  4. Scan "prefix" and "suffix" arrays together and find intervals from every "prefix" element to corresponding place in "suffix" array and from every "suffix" element to corresponding place in "prefix" array. (If all "suffix" elements' weights are less than given "prefix" element or their position is not to the right of it, no interval generated; if there is no "suffix" element with exactly the weight of given "prefix" element, get nearest "suffix" element with larger weight and extend interval with this weight difference to the right).
  5. Merge overlapping intervals.

Properties 1 .. 3 for majority intervals are guaranteed by this algorithm. As for property #4, the only way I could imagine to cover some element with maximum number of majority intervals is like this: 11111111222233455666677777777. Here element 4 is covered by 2 * log n intervals, so this property seems to be satisfied. See more formal proof of this property at the end of this post.

Example:

For input array "0 1 2 0 0 1 1 0" the following lists of positions would be generated:

value  positions    0  0 3 4 7    1  1 5 6    2  2

Positions for value 0 will get the following properties:

weights:    0:1 3:0 4:1 7:0prefix:     0:1 3:0          (strictly decreasing)suffix:     4:1 7:0          (strictly increasing when scanning backwards)intervals:  0->4 3->7 4->0 7->3merged intervals: 0-7

Positions for value 1 will get the following properties:

weights:    1:0  5:-2  6:-1prefix:     1:0  5:-2suffix:     1:0  6:-1intervals:  1->none 5->6+1 6->5-1 1->nonemerged intervals: 4-7

Query data structure:

positions value next prev        0     0    0    x     1..2     0    1    0        3     0    1    1        4     0    2    2        4     1    1    x        5     0    3    2    ...

Query [0,4]:

prev[4][0]-next[0][0]+1=2-0+1=3query size=53>2.5, returned result 0

Query [2,5]:

prev[5][0]-next[2][0]+1=2-1+1=2query size=42=2, returned result "none"

Note that there is no attempt to inspect element "1" because its majority interval does not include either of these intervals.

Proof of property #4:

Majority intervals are constructed in such a way that strictly more than 1/3 of all their elements have corresponding value. This ratio is nearest to 1/3 for sub-arrays like any*(m-1) value*m any*m, for example, 01234444456789.

To make this proof more obvious, we could represent each interval as a point in 2D: every possible starting point represented by horizontal axis and every possible ending point represented by vertical axis (see diagram below).

enter image description here

All valid intervals are located on or above diagonal. White rectangle represents all intervals covering some array element (represented as unit-size interval on its lower right corner).

Let's cover this white rectangle with squares of size 1, 2, 4, 8, 16, ... sharing the same lower right corner. This divides white area into O(log n) areas similar to yellow one (and single square of size 1 containing single interval of size 1 which is ignored by this algorithm).

Let's count how many majority intervals may be placed into yellow area. One interval (located at the nearest to diagonal corner) occupies 1/4 of elements belonging to interval at the farthest from diagonal corner (and this largest interval contains all elements belonging to any interval in yellow area). This means that smallest interval contains strictly more than 1/12 values available for whole yellow area. So if we try to place 12 intervals to yellow area, we have not enough elements for different values. So yellow area cannot contain more than 11 majority intervals. And white rectangle cannot contain more than 11 * log n majority intervals. Proof completed.

11 * log n is overestimation. As I said earlier, it's hard to imagine more than 2 * log n majority intervals covering some element. And even this value is much greater than average number of covering majority intervals.

C++11 implementation. See it either at ideone or here:

#include <iostream>#include <vector>#include <map>#include <algorithm>#include <functional>#include <random>constexpr int SrcSize = 1000000;constexpr int NQueries = 100000;using src_vec_t = std::vector<int>;using index_vec_t = std::vector<int>;using weight_vec_t = std::vector<int>;using pair_vec_t = std::vector<std::pair<int, int>>;using index_map_t = std::map<int, index_vec_t>;using interval_t = std::pair<int, int>;using interval_vec_t = std::vector<interval_t>;using small_map_t = std::vector<std::pair<int, int>>;using query_vec_t = std::vector<small_map_t>;constexpr int None = -1;constexpr int Junk = -2;src_vec_t generate_e(){ // good query length = 3    src_vec_t src;    std::random_device rd;    std::default_random_engine eng{rd()};    auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);    for (int i = 0; i < SrcSize; ++i)    {        int x = exp();        src.push_back(x);        //std::cout << x << ' ';    }    return src;}src_vec_t generate_ep(){ // good query length = 500    src_vec_t src;    std::random_device rd;    std::default_random_engine eng{rd()};    auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);    auto poisson = std::bind(std::poisson_distribution<int>{100}, eng);    while (int(src.size()) < SrcSize)    {        int x = exp();        int n = poisson();        for (int i = 0; i < n; ++i)        {            src.push_back(x);            //std::cout << x << ' ';        }    }    return src;}src_vec_t generate(){    //return generate_e();    return generate_ep();}int trivial(const src_vec_t& src, interval_t qi){    int count = 0;    int majorityElement = 0; // will be assigned before use for valid args    for (int i = qi.first; i <= qi.second; ++i)    {        if (count == 0)            majorityElement = src[i];        if (src[i] == majorityElement)            ++count;        else            --count;    }    count = 0;    for (int i = qi.first; i <= qi.second; ++i)    {        if (src[i] == majorityElement)            count++;    }    if (2 * count > qi.second + 1 - qi.first)        return majorityElement;    else        return None;}index_map_t sort_ind(const src_vec_t& src){    int ind = 0;    index_map_t im;    for (auto x: src)        im[x].push_back(ind++);    return im;}weight_vec_t get_weights(const index_vec_t& indexes){    weight_vec_t weights;    for (int i = 0; i != int(indexes.size()); ++i)        weights.push_back(2 * i - indexes[i]);    return weights;}pair_vec_t get_prefix(const index_vec_t& indexes, const weight_vec_t& weights){    pair_vec_t prefix;    for (int i = 0; i != int(indexes.size()); ++i)        if (prefix.empty() || weights[i] < prefix.back().second)            prefix.emplace_back(indexes[i], weights[i]);    return prefix;}pair_vec_t get_suffix(const index_vec_t& indexes, const weight_vec_t& weights){    pair_vec_t suffix;    for (int i = indexes.size() - 1; i >= 0; --i)        if (suffix.empty() || weights[i] > suffix.back().second)            suffix.emplace_back(indexes[i], weights[i]);    std::reverse(suffix.begin(), suffix.end());    return suffix;}interval_vec_t get_intervals(const pair_vec_t& prefix, const pair_vec_t& suffix){    interval_vec_t intervals;    int prev_suffix_index = 0; // will be assigned before use for correct args    int prev_suffix_weight = 0; // same assumptions    for (int ind_pref = 0, ind_suff = 0; ind_pref != int(prefix.size());)    {        auto i_pref = prefix[ind_pref].first;        auto w_pref = prefix[ind_pref].second;        if (ind_suff != int(suffix.size()))        {            auto i_suff = suffix[ind_suff].first;            auto w_suff = suffix[ind_suff].second;            if (w_pref <= w_suff)            {                auto beg = std::max(0, i_pref + w_pref - w_suff);                if (i_pref < i_suff)                    intervals.emplace_back(beg, i_suff + 1);                if (w_pref == w_suff)                    ++ind_pref;                ++ind_suff;                prev_suffix_index = i_suff;                prev_suffix_weight = w_suff;                continue;            }        }        // ind_suff out of bounds or w_pref > w_suff:        auto end = prev_suffix_index + prev_suffix_weight - w_pref + 1;        // end may be out-of-bounds; that's OK if overflow is not possible        intervals.emplace_back(i_pref, end);        ++ind_pref;    }    return intervals;}interval_vec_t merge(const interval_vec_t& from){    using endpoints_t = std::vector<std::pair<int, bool>>;    endpoints_t ep(2 * from.size());    std::transform(from.begin(), from.end(), ep.begin(),                   [](interval_t x){ return std::make_pair(x.first, true); });    std::transform(from.begin(), from.end(), ep.begin() + from.size(),                   [](interval_t x){ return std::make_pair(x.second, false); });    std::sort(ep.begin(), ep.end());    interval_vec_t to;    int start; // will be assigned before use for correct args    int overlaps = 0;    for (auto& x: ep)    {        if (x.second) // begin        {            if (overlaps++ == 0)                start = x.first;        }        else // end        {            if (--overlaps == 0)                to.emplace_back(start, x.first);        }    }    return to;}interval_vec_t get_intervals(const index_vec_t& indexes){    auto weights = get_weights(indexes);    auto prefix = get_prefix(indexes, weights);    auto suffix = get_suffix(indexes, weights);    auto intervals = get_intervals(prefix, suffix);    return merge(intervals);}void update_qv(    query_vec_t& qv,    int value,    const interval_vec_t& intervals,    const index_vec_t& iv){    int iv_ind = 0;    int qv_ind = 0;    int accum = 0;    for (auto& interval: intervals)    {        int i_begin = interval.first;        int i_end = std::min<int>(interval.second, qv.size() - 1);        while (iv[iv_ind] < i_begin)        {            ++accum;            ++iv_ind;        }        qv_ind = std::max(qv_ind, i_begin);        while (qv_ind <= i_end)        {            qv[qv_ind].emplace_back(value, accum);            if (iv[iv_ind] == qv_ind)            {                ++accum;                ++iv_ind;            }            ++qv_ind;        }    }}void print_preprocess_stat(const index_map_t& im, const query_vec_t& qv){    double sum_coverage = 0.;    int max_coverage = 0;    for (auto& x: qv)    {        sum_coverage += x.size();        max_coverage = std::max<int>(max_coverage, x.size());    }    std::cout << "             size = " << qv.size() - 1 << '\n';    std::cout << "           values = " << im.size() << '\n';    std::cout << "     max coverage = " << max_coverage << '\n';    std::cout << "     avg coverage = " << sum_coverage / qv.size() << '\n';}query_vec_t preprocess(const src_vec_t& src){    query_vec_t qv(src.size() + 1);    auto im = sort_ind(src);    for (auto& val: im)    {        auto intervals = get_intervals(val.second);        update_qv(qv, val.first, intervals, val.second);    }    print_preprocess_stat(im, qv);    return qv;}int do_query(const src_vec_t& src, const query_vec_t& qv, interval_t qi){    if (qi.first == qi.second)        return src[qi.first];    auto b = qv[qi.first].begin();    auto e = qv[qi.second + 1].begin();    while (b != qv[qi.first].end() && e != qv[qi.second + 1].end())    {        if (b->first < e->first)        {            ++b;        }        else if (e->first < b->first)        {            ++e;        }        else // if (e->first == b->first)        {            // hope this doesn't overflow            if (2 * (e->second - b->second) > qi.second + 1 - qi.first)                return b->first;            ++b;            ++e;        }    }    return None;}int main(){    std::random_device rd;    std::default_random_engine eng{rd()};    auto poisson = std::bind(std::poisson_distribution<int>{500}, eng);    int majority = 0;    int nonzero = 0;    int failed = 0;    auto src = generate();    auto qv = preprocess(src);    for (int i = 0; i < NQueries; ++i)    {        int size = poisson();        auto ud = std::uniform_int_distribution<int>(0, src.size() - size - 1);        int start = ud(eng);        int stop = start + size;        auto res1 = do_query(src, qv, {start, stop});        auto res2 = trivial(src, {start, stop});        //std::cout << size << ": " << res1 << ' ' << res2 << '\n';        if (res2 != res1)            ++failed;        if (res2 != None)        {            ++majority;            if (res2 != 0)                ++nonzero;        }    }    std::cout << "majority elements = " << 100. * majority / NQueries << "%\n";    std::cout << " nonzero elements = " << 100. * nonzero / NQueries << "%\n";    std::cout << "          queries = " << NQueries << '\n';    std::cout << "           failed = " << failed << '\n';    return 0;}

Related work:

As pointed in other answer to this question, there is other work where this problem is already solved: "Range majority in constant time and linear space" by S. Durocher, M. He, I Munro, P.K. Nicholson, M. Skala.

Algorithm presented in this paper has better asymptotic complexities for query time: O(1) instead of O(log n) and for space: O(n) instead of O(n log n).

Better space complexity allows this algorithm to process larger data sets (comparing to the algorithm proposed in this answer). Less memory needed for preprocessed data and more regular data access pattern, most likely, allow this algorithm to preprocess data more quickly. But it is not so easy with query time...

Let's suppose we have input data most favorable to algorithm from the paper: n=1000000000 (it's hard to imagine a system with more than 10..30 gigabytes of memory, in year 2013).

Algorithm proposed in this answer needs to process up to 120 (or 2 query boundaries * 2 * log n) elements for each query. But it performs very simple operations, similar to linear search. And it sequentially accesses two contiguous memory areas, so it is cache-friendly.

Algorithm from the paper needs to perform up to 20 operations (or 2 query boundaries * 5 candidates * 2 wavelet tree levels) for each query. This is 6 times less. But each operation is more complex. Each query for succinct representation of bit counters itself contains a linear search (which means 20 linear searches instead of one). Worst of all, each such operation should access several independent memory areas (unless query size and therefore quadruple size is very small), so query is cache-unfriendly. Which means each query (while is a constant-time operation) is pretty slow, probably slower than in algorithm proposed here. If we decrease input array size, increased are the chances that proposed here algorithm is quicker.

Practical disadvantage of algorithm in the paper is wavelet tree and succinct bit counter implementation. Implementing them from scratch may be pretty time consuming. Using a pre-existing implementation is not always convenient.


the trick

When looking for a majority element, you may discard intervals that do not have a majority element. See Find the majority element in array. This allows you to solve this quite simply.

preparation

At preparation time, recursively keep dividing the array into two halves and store these array intervals in a binary tree. For each node, count the occurrence of each element in the array interval. You need a data structure that offers O(1) inserts and reads. I suggest using an unsorted_multiset, which on average behaves as needed (but worst case inserts are linear). Also check if the interval has a majority element and store it if it does.

runtime

At runtime, when asked to compute the majority element for a range, dive into the tree to compute the set of intervals that covers the given range exactly. Use the trick to combine these intervals.

If we have array interval 7 5 5 7 7 7, with majority element 7, we can split off and discard 5 5 7 7 since it has no majority element. Effectively the fives have gobbled up two of the sevens. What's left is an array 7 7, or 2x7. Call this number 2 the majority count of the majority element 7:

The majority count of a majority element of an array interval is the occurrence count of the majority element minus the combined occurrence of all other elements.

Use the following rules to combine intervals to find the potential majority element:

  • Discard the intervals that have no majority element
  • Combining two arrays with the same majority element is easy, just add up the element's majority counts. 2x7 and 3x7 become 5x7
  • When combining two arrays with different majority elements, the higher majority count wins. Subtract the lower majority count from the higher to find the resulting majority count. 3x7 and 2x3 become 1x7.
  • If their majority elements are different but have have equal majority counts, disregard both arrays. 3x7 and 3x5 cancel each other out.

When all intervals have been either discarded or combined, you are either left with nothing, in which case there is no majority element. Or you have one combined interval containing a potential majority element. Lookup and add this element's occurrence counts in all array intervals (also the previously discarded ones) to check if it really is the majority element.

example

For the array 1,1,1,2,2,3,3,2,2,2,3,2,2, you get the tree (majority count x majority element listed in brackets)

                        1,1,1,2,2,3,3,2,2,2,3,2,2                                      (1x2)                      /                           \             1,1,1,2,2,3,3                       2,2,2,3,2,2                                                    (4x2)            /              \                   /            \        1,1,1,2           2,3,3            2,2,2             3,2,2         (2x1)            (1x3)            (3x2)             (1x2)        /     \          /    \            /    \            /    \     1,1      1,2       2,3     3        2,2     2        3,2      2    (1x1)                     (1x3)     (2x2)  (1x2)             (1x2)    /   \     /  \     /   \            /  \             /   \   1     1   1   2    2    3           2    2           3     2(1x1) (1x1)(1x1)(1x2)(1x2)(1x3)       (1x2)(1x2)       (1x3) (1x2)     

Range [5,10] (1-indexed) is covered by the set of intervals 2,3,3 (1x3), 2,2,2 (3x2). They have different majority elements. Subtract their majority counts, you're left with 2x2. So 2 is the potential majority element. Lookup and sum the actual occurrence counts of 2 in the arrays: 1+3 = 4 out of 6. 2 is the majority element.

Range [1,10] is covered by the set of intervals 1,1,1,2,2,3,3 (no majority element) and 2,2,2 (3x2). Disregard the first interval since it has no majority element, so 2 is the potential majority element. Sum the occurrence counts of 2 in all intervals: 2+3 = 5 out of 10. There is no majority element.


Actually, it can be done in constant time and linear space(!)

See https://cs.stackexchange.com/questions/16671/range-majority-queries-most-freqent-element-in-range and S. Durocher, M. He, I Munro, P.K. Nicholson, M. Skala, Range majority in constant time and linear space, Information and Computation 222 (2013) 169–179, Elsevier.

Their preparation time is O(n log n), the space needed is O(n) and queries are O(1). It is a theoretical paper and I don't claim to understand all of it but it seems far from impossible to implement. They're using wavelet trees.

For an implementation of wavelet trees, see https://github.com/fclaude/libcds