我最近正在深入研究F#源代码。
在Seq.fs中:
// Binding.
//
// We use a type defintion to apply a local dynamic optimization.
// We automatically right-associate binding, i.e. push the continuations to the right.
// That is, bindG (bindG G1 cont1) cont2 --> bindG G1 (cont1 o cont2)
// This makes constructs such as the following linear rather than quadratic:
//
// let rec rwalk n = { if n > 0 then
// yield! rwalk (n-1)
// yield n }
看到上面的代码后,我测试了两个代码:
let rec rwalk n = seq { if n > 0 then
yield n
yield! rwalk (n-1)
}
和
let rec rwalk n = seq { if n > 0 then
yield! rwalk (n-1)
yield n
}
我发现第一个非常快,而第二个非常慢。如果n = 10000,我的机器上生成此序列需要3秒,因此是二次时间。
二次时间是合理的,例如,
seq { yield! {1; 2; ...; n-1}; yield n }
翻译成
Seq.append {1; 2; ...; n-1} {n}
我想这个追加操作应该是线性时间。在第一个代码中,append操作是这样的: seq { yield n; yield! {n-1; n-2; ...; 1} }
,这需要花费不变的时间。
代码中的注释说它是 linear
(也许这个线性不是线性时间)。也许这个 linear
关于使用序列的定制实现而不是Moand / F#计算表达式(如F#规范中所述,但是规范没有提到这样做的原因......)。
谁能澄清这里的模糊性?非常感谢!
(因为这是一个语言设计和优化问题,我还附上了Haskell标签,看看那里的人是否有见解。)
什么时候 yield!
出现在 非尾部呼叫位置,它本质上意味着同样的事情:
for v in <expr> do yield v
这个的问题(以及为什么是二次方的原因)是对于递归调用,这会创建一个嵌套的迭代器链 for
循环。您需要迭代生成的整个序列 <expr>
对于每个单独的元素,因此如果迭代是线性的,则得到二次时间(因为线性迭代发生在每个元素上)。
让我们说吧 rwalk
功能生成 [ 9; 2; 3; 7 ]
。在第一次迭代中,递归生成的序列有4个元素,因此您将迭代4个元素并添加1.在递归调用中,您将迭代3个元素并添加1等。使用图表,您可以看看那是二次的:
x
x x
x x x
x x x x
此外,每个递归调用都会创建一个新的对象实例(IEnumerator
)所以也有一些内存成本(虽然只是线性)。
在一个 尾部呼叫位置,F#compiler / librar进行优化。它“取代”了当前 IEnumerable
与递归调用返回的一个,所以它不需要迭代它来生成所有元素 - 它只是返回(这也消除了内存成本)。
有关。 在C#lanaugage设计中已经讨论过同样的问题,并且有一个问题 有趣的论文 (他们的名字为 yield!
是 yield foreach
)。
我不确定你在寻找什么样的答案。您已经注意到,注释与编译器的行为不匹配。我不能说这是一个与实现不同步的评论实例,还是它实际上是一个性能错误(例如,规范似乎没有提出任何特定的性能要求)。
但是,从理论上讲,编译器的机器应该可以生成一个在线性时间内对您的示例进行操作的实现。实际上,甚至可以使用计算表达式在库中构建这样的实现。这是一个粗略的例子,主要基于Tomas引用的论文:
open System.Collections
open System.Collections.Generic
type 'a nestedState =
/// Nothing to yield
| Done
/// Yield a single value before proceeding
| Val of 'a
/// Yield the results from a nested iterator before proceeding
| Enum of (unit -> 'a nestedState)
/// Yield just the results from a nested iterator
| Tail of (unit -> 'a nestedState)
type nestedSeq<'a>(ntor) =
let getEnumerator() : IEnumerator<'a> =
let stack = ref [ntor]
let curr = ref Unchecked.defaultof<'a>
let rec moveNext() =
match !stack with
| [] -> false
| e::es as l ->
match e() with
| Done -> stack := es; moveNext()
| Val(a) -> curr := a; true
| Enum(e) -> stack := e :: l; moveNext()
| Tail(e) -> stack := e :: es; moveNext()
{ new IEnumerator<'a> with
member x.Current = !curr
interface System.IDisposable with
member x.Dispose() = ()
interface IEnumerator with
member x.MoveNext() = moveNext()
member x.Current = box !curr
member x.Reset() = failwith "Reset not supported" }
member x.NestedEnumerator = ntor
interface IEnumerable<'a> with
member x.GetEnumerator() = getEnumerator()
interface IEnumerable with
member x.GetEnumerator() = upcast getEnumerator()
let getNestedEnumerator : 'a seq -> _ = function
| :? ('a nestedSeq) as n -> n.NestedEnumerator
| s ->
let e = s.GetEnumerator()
fun () ->
if e.MoveNext() then
Val e.Current
else
Done
let states (arr : Lazy<_[]>) =
let state = ref -1
nestedSeq (fun () -> incr state; arr.Value.[!state]) :> seq<_>
type SeqBuilder() =
member s.Yield(x) =
states (lazy [| Val x; Done |])
member s.Combine(x:'a seq, y:'a seq) =
states (lazy [| Enum (getNestedEnumerator x); Tail (getNestedEnumerator y) |])
member s.Zero() =
states (lazy [| Done |])
member s.Delay(f) =
states (lazy [| Tail (f() |> getNestedEnumerator) |])
member s.YieldFrom(x) = x
member s.Bind(x:'a seq, f) =
let e = x.GetEnumerator()
nestedSeq (fun () ->
if e.MoveNext() then
Enum (f e.Current |> getNestedEnumerator)
else
Done) :> seq<_>
let seq = SeqBuilder()
let rec walkr n = seq {
if n > 0 then
return! walkr (n-1)
return n
}
let rec walkl n = seq {
if n > 0 then
return n
return! walkl (n-1)
}
let time =
let watch = System.Diagnostics.Stopwatch.StartNew()
walkr 10000 |> Seq.iter ignore
watch.Stop()
watch.Elapsed
请注意我的 SeqBuilder
不稳健;它缺少几个工作流成员,并且它没有做任何有关对象处理或错误处理的事情。但是,它确实证明了这一点 SequenceBuilder
不要 需要 在像你这样的例子上展示二次运行时间。
还要注意,这里有一个时空权衡 - 嵌套迭代器 walkr n
将在O(n)时间内迭代序列,但它需要O(n)空间才能这样做。