问题 两个排序整数数组的快速交集


我需要找到两个排序整数数组的交集并快速完成。

现在,我使用以下代码:

int i = 0, j = 0;

while (i < arr1.Count && j < arr2.Count)
{
    if (arr1[i] < arr2[j])
    {
        i++;
    }
    else
    {
        if (arr2[j] < arr1[i])
        {
            j++;
        }
        else 
        {
            intersect.Add(arr2[j]);
            j++;
            i++;
        }
    }
}

不幸的是,完成所有工作可能需要数小时。

怎么做得更快?我发现 本文 使用SIMD指令的地方。是否可以在.NET中使用SIMD?

你有什么想法:

http://docs.go-mono.com/index.aspx?link=N:Mono.Simd Mono.SIMD

http://netasm.codeplex.com/ NetASM(将asm代码注入托管)

和类似的东西 http://www.atrevido.net/blog/PermaLink.aspx?guid=ac03f447-d487-45a6-8119-dc4fa1e932e1

编辑:

当我说数千我的意思是跟随(在代码​​中)

for(var i=0;i<arrCollection1.Count-1;i++)
{
    for(var j=i+1;j<arrCollection2.Count;j++)
    {
        Intersect(arrCollection1[i],arrCollection2[j])  
    }
}

6140
2018-06-02 23:36


起源

你不想 break 找到交叉路口后从循环中找到? - Brendan
@Brendan但我怎么能发现这一刻呢? - Neir0
你的标题是“两个”,但你的问题是“数千”。你能描述一下你想做什么吗?可能有更好的方法,而不是一次比较两个。 - Mark Byers
那是什么考虑 intersection?位置i的第一个数组中的值大于第二个数组中的值的那一刻? - 如果数千,SIMD,可能是要走的路 - Brendan
也许HashSet是更好的数据结构 - Lukasz Madon


答案:


UPDATE

我得到的最快的是200ms,阵列大小为10mil,带有不安全版本(最后一段代码)。

我做过的测试:

var arr1 = new int[10000000];
var arr2 = new int[10000000];

for (var i = 0; i < 10000000; i++)
{
    arr1[i] = i;
    arr2[i] = i * 2;
}

var sw = Stopwatch.StartNew();

var result = arr1.IntersectSorted(arr2);

sw.Stop();

Console.WriteLine(sw.Elapsed); // 00:00:00.1926156

全职:

我已经测试了各种方法,并发现这非常好:

public static List<int> IntersectSorted(this int[] source, int[] target)
{
    // Set initial capacity to a "full-intersection" size
    // This prevents multiple re-allocations
    var ints = new List<int>(Math.Min(source.Length, target.Length));

    var i = 0;
    var j = 0;

    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;

                // Saves us a JMP instruction
                continue;
            case 1:
                j++;

                // Saves us a JMP instruction
                continue;
            default:
                ints.Add(source[i++]);
                j++;

                // Saves us a JMP instruction
                continue;
        }
    }

    // Free unused memory (sets capacity to actual count)
    ints.TrimExcess();

    return ints;
}

为了进一步改进,您可以删除 ints.TrimExcess();,这也会产生很大的不同,但你应该考虑一下你是否需要那种记忆。

此外,如果您知道可能会破坏使用交叉点的循环,并且您不必将结果作为数组/列表,则应将实现更改为迭代器:

public static IEnumerable<int> IntersectSorted(this int[] source, int[] target)
{
    var i = 0;
    var j = 0;

    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;

                // Saves us a JMP instruction
                continue;
            case 1:
                j++;

                // Saves us a JMP instruction
                continue;
            default:
                yield return source[i++];
                j++;

                // Saves us a JMP instruction
                continue;
        }
    }
}

另一个改进是使用不安全的代码:

public static unsafe List<int> IntersectSorted(this int[] source, int[] target)
{
    var ints = new List<int>(Math.Min(source.Length, target.Length));

    fixed (int* ptSrc = source)
    {
        var maxSrcAdr = ptSrc + source.Length;

        fixed (int* ptTar = target)
        {
            var maxTarAdr = ptTar + target.Length;

            var currSrc = ptSrc;
            var currTar = ptTar;

            while (currSrc < maxSrcAdr && currTar < maxTarAdr)
            {
                switch ((*currSrc).CompareTo(*currTar))
                {
                    case -1:
                        currSrc++;
                        continue;
                    case 1:
                        currTar++;
                        continue;
                    default:
                        ints.Add(*currSrc);
                        currSrc++;
                        currTar++;
                        continue;
                }
            }
        }
    }

    ints.TrimExcess();
    return ints;
}

综上所述,最重要的表现是在if-else中。 把它变成一个开关盒有很大的不同(大约快2倍)。


9
2018-06-03 00:21



var i = 0;使用var关键字IMO不是最好的情况。你可以发布/链接到时间和测试吗? - Lukasz Madon
@lukas添加了我在帖子开头使用的测试。你能告诉我为什么var不合适吗?你在谈论可读性吗? - SimpleVar
我质疑你的测量结果......至少你的第一个例子的时间高度依赖于匹配的整数。 - Ilia G
@IliaG它实际上产生了5000000的列表。但我也会用randoms进行测试。结果确实不同,但330ms仍然很棒!您可以看到随机生成的测试 这里。 - SimpleVar
@KemalCanKara乍一看,我发现了一个问题 var maxTarAdr = ptTar + source.Length; 应该是 var maxTarAdr = ptTar + target.Length;  - 哎呀! - SimpleVar


你尝试过像这样简单的东西:

var a = Enumerable.Range(1, int.MaxValue/100).ToList();
var b = Enumerable.Range(50, int.MaxValue/100 - 50).ToList();

//var c = a.Intersect(b).ToList();
List<int> c = new List<int>();

var t1 = DateTime.Now;

foreach (var item in a)
{
    if (b.BinarySearch(item) >= 0)
        c.Add(item);
}

var t2 = DateTime.Now;

var tres = t2 - t1;

这段代码需要1个数组 21474836 元素和另一个 21474786

如果我使用 var c = a.Intersect(b).ToList(); 我得到了 OutOfMemoryException

结果产品将是 461,167,507,485,096 使用嵌套foreach的迭代

但是使用这个简单的代码,交叉点发生在 TotalSeconds = 7.3960529 (使用一个核心)

现在我仍然不开心,所以我试图通过并行打破这个来提高性能,一旦我完成,我会发布它


2
2018-06-03 00:48



当我接近O(m + n)时,二进制搜索给出O(m * lg(n))。因此二进制搜索适用于非常长的数组。在我的情况下,我有短阵列(15-30元素) - Neir0
嗯,但是你说成千上万......起初大声笑我刚刚检查了你的编辑...正如你在我的例子中看到的那样我正在谈论的是一个20百万的交叉点与另一个20千万的数组+没有内存异常的交叉点可掠夺的时间(7秒) - Jupaol
+1。如果有人感兴趣,这确实是一个很好的方法来计算排序数组之间的交集,其中m << n,但可以通过重新使用返回的插入索引来显着改善 二分查找 作为一个论据,它从那里重新开始搜索,而不是再次搜索整个集合。 - sharky


Yorye Nathan用最后一个“不安全的代码”方法给了我两个数组中最快的交集。不幸的是,它对我来说仍然太慢,我需要组合阵列交叉点,最多可达2 ^ 32种组合,几乎没有?我做了以下修改和调整,时间缩短到2.6倍。您需要先进行一些预优化,确保您可以采取某种方式。我只使用索引而不是实际的对象或id或其他一些抽象的比较。因此,举例来说,如果你必须像这样交叉大数字

ARR1: 103344,234566,789900,1947890, ARR2: 150034,234566,845465,23849854

将所有内容放入阵列中

ARR1: 103344,234566,789900,1947890,150034,845465,23849854

并且,对于交集,使用结果数组的有序索引

Arr1Index: 0,1,2,3 Arr2Index: 1,4,5,6

现在我们有更小的数字,我们可以建立一些其他好的数组。从Yorye获取方法后我做了什么,我把Arr2Index扩展为理论上的布尔数组,实际上是字节数组,因为内存大小的含义,对于以下内容:

Arr2IndexCheck:0,1,0,0,1,1,1

这或多或少是一个字典,告诉我任何索引,如果第二个数组包含它。 下一步我没有使用内存分配也需要时间,而是在调用方法之前预先创建了结果数组,所以在查找我的组合的过程中,我从不实例化任何东西。当然你必须分别处理这个数组的长度,所以也许你需要将它存储在某个地方。

最后代码如下:

    public static unsafe int IntersectSorted2(int[] arr1, byte[] arr2Check, int[] result)
    {
        int length;

        fixed (int* pArr1 = arr1, pResult = result)
        fixed (byte* pArr2Check = arr2Check)
        {
            int* maxArr1Adr = pArr1 + arr1.Length;
            int* arr1Value = pArr1;
            int* resultValue = pResult;

            while (arr1Value < maxArr1Adr)
            {
                if (*(pArr2Check + *arr1Value) == 1)
                {
                    *resultValue = *arr1Value;
                    resultValue++;
                }

                arr1Value++;
            }

            length = (int)(resultValue - pResult);
        }

        return length;
    }

您可以看到函数返回结果数组大小,然后按照您的意愿执行(调整大小,保留它)。显然,结果数组必须至少具有arr1和arr2的最小大小。

最大的改进是,我只迭代第一个数组,最好是比第二个数组更小,所以你有更少的迭代。减少迭代意味着更少的CPU周期吗?

所以这里是两个有序数组的真正快速交集,如果你需要一个可靠的高性能;)。


1
2017-10-20 14:27





arrCollection1 和 arrCollection2 整数数组的集合?在这种情况下,你应该通过启动第二个循环来获得一些显着的改进 i+1 而不是 0


0
2018-06-03 00:15



是的,对不起我的坏 - Neir0
我编辑了代码 - Neir0


C#不支持SIMD。另外,我还没弄清楚为什么,使用SSE的DLL在从C#调用时不比非SSE等效函数快。此外,我所知道的所有SIMD扩展都不支持分支,即你的“if”语句。

如果您使用的是.net 4.0,如果您有多个内核,则可以使用Parallel For来获得速度。否则,如果您的.net 3.5或更低版本,则可以编写多线程版本。

这是一个类似于你的方法:

    IList<int> intersect(int[] arr1, int[] arr2)
    {
        IList<int> intersect = new List<int>();
        int i = 0, j = 0;
        int iMax = arr1.Length - 1, jMax = arr2.Length - 1;
        while (i < iMax && j < jMax)
        {
            while (i < iMax && arr1[i] < arr2[j]) i++;
            if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
            while (i < iMax && arr1[i] == arr2[j]) i++; //prevent reduntant entries
            while (j < jMax && arr2[j] < arr1[i]) j++;
            if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
            while (j < jMax && arr2[j] == arr1[i]) j++; //prevent redundant entries
        }
        return intersect;
    }

这个也可以防止任何条目出现两次。有2个大小为1000万的排序数组,它在大约一秒钟内完成。如果在For语句中使用array.Length,编译器应该删除数组边界检查,但我不知道它是否在while语句中有效。


0
2018-06-03 00:17



实际上我已经使用了TPL(不是在交叉函数中,而是在遍历所有数组时)。为什么“使用SSE的DLL在从C#调用时比非SSE等效函数更快”? - Neir0
这对于Intersect + Distinct来说非常棒,但如果他只想要相交,那么10mil元素的1秒就相当慢。我的实现在150ms内完成(用2个数组测试,每个10mil元素,首先是{0,1,2,3,...},第二个是{0,2,4,6,8,...}) 。 - SimpleVar
我还没弄清楚为什么DLL在从C#调用时速度较慢但是所有其他条件相同,SSE调用在c ++中快3倍,从C#调用时速度大约相同。它可能与pinvoke有关,但数据集非常大。因人而异。 - HypnoToad
Yorye如果你有一个更快的方法你应该发布它。 - HypnoToad
@Doctor Zero好的,我明白了。我不仅可以将2个数组传递给外部函数,还可以将所有数组传递给外部dll。因此,一个Pinvoke呼叫的成本将很低。 - Neir0