- 积分
- 10647
- 明经币
- 个
- 注册时间
- 2015-8-18
- 在线时间
- 小时
- 威望
-
- 金钱
- 个
- 贡献
-
- 激情
-
|
本帖最后由 你有种再说一遍 于 2024-11-14 20:36 编辑
# Parallel Linq
看了C#版本的十亿行天文台数据挑战,发现多线程没有锁.
而代码上用了一个Linq聚合函数Aggregate,
聚合函数本身是顺序执行的,只有.AsParallel()后才转为并行,
那么它聚合的时候是并发才对啊?
怎么会不需要锁呢?
研究后发现,即使并行转并发也不一定需要锁,这种叫做无锁编程.
但是无锁编程并非完全无锁,
例如我很喜欢并发容器跳表(Redis/Java都有提供),它的实现是利用原子锁.
原子锁是硬件层实现的.
其实所有锁都是原子锁,只是封装层的不同.
例如读写锁,
只是原子性讲进入读取线程数+1,就放行,
写入时候就堵塞,并原子性-1,直到为0才放行写入.
不过!!连原子锁也尽可能减少呢?
并行的Linq利用了无锁队列实现,
它的无锁原理是:
1,同线程就顺序执行,这和单线程任务一样.
2,非同线程会等待全部结束并行之后再由主线程合并.
当有子线程结束时候,队列会把节点移除,并将内容合并到主线程,
这样就只有队列结构修改原子锁,没有数据处理的锁.
这是Aggregate的并行重载版本
```csharp
var result = list.AsParallel().Aggregate(
seed, // 初始值
(accumulator, item) => { /* 累加函数 */ }, // func
acc => { /* 最终结果选择器 */ }, // resultSelector
ex => { /* 异常处理 */ } // exceptionHandler
);
```
它们执行顺序就是:
切割多个块,
并行任务:线程1聚合块1,线程2聚合块2...
主线程,最终聚合.
那么重点是线程和业务分离了,
使用PLinq编程时候就不需要考虑锁
在聚合函数上面,提供线程不安全容器,也可以实现不同线程的合并.
(除非你进行多线程计数,以此判断第一个线程执行剩余处理,十亿行挑战就有这个)
若你需要并发排序,然而这也是没有推荐的容器.
因为这个世界没有并行排序的东西的,
只有并发排序,然而并发排序就是跳表/红黑树/B+树,
它们着重点不一样,需要独立优化.
## 例子1
我们来看一个最简单的例子,它并行了也不需要锁.
```csharp
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
class Program {
static void Main() {
// 创建一个字符串列表
List<string> words = new List<string>
{ "hello", "world", "linq", "parallel", "example" };
// 使用PLINQ并行处理
var uppercasedWords = words.AsParallel().Select(word => {
// 模拟处理,转为大写
return word.ToUpper();
});
// 输出结果是乱序的
foreach (var word in uppercasedWords) {
Console.WriteLine(word);
}
}
}
```
## 例子2 统计单词重复数量
```csharp
using System;
using System.Collections.Generic;
using System.Linq;
class Program {
static void Main() {
// 示例文本,包含多个句子
string[] sentences = {
"The quick brown fox jumps over the lazy dog",
"The quick brown fox is very quick and jumps over the lazy dog",
"The dog was not as quick as the fox"
};
// 统计每个句子中单词的出现次数
var wordCounts = sentences
.Select(SentenceWordCount)
.Aggregate((dict1, dict2) => {
foreach (var pair in dict2) {
if (dict1.ContainsKey(pair.Key)) {
dict1[pair.Key] += pair.Value;
} else {
dict1.Add(pair.Key, pair.Value);
}
}
return dict1;
});
// 打印单词出现的次数
foreach (var pair in wordCounts)
{
Console.WriteLine($"Word: {pair.Key}, Count: {pair.Value}");
}
}
// 提取出来的函数,用于统计单个句子中单词的出现次数
static Dictionary<string, int> SentenceWordCount(string sentence)
{
return sentence.ToLower()
.Split(new char[] { ' ', '.', '!', '?' }, StringSplitOptions.RemoveEmptyEntries)
.GroupBy(word => word)
.ToDictionary(group => group.Key, group => group.Count());
}
}
```
## 例子3 mapReduce
```csharp
var result = numbers
.AsParallel()
.Select(n => n * 2)
.Aggregate(0, (acc, n) => acc + n);
```
在这个例子中,
.Aggregate() 方法会在
.Select(n => n * 2) 操作完成后开始执行,
并在所有元素处理完毕后合并结果.
.Aggregate() 本身是顺序执行,
只是这改为了并行,返回一个结果,这使用了PLinq原理保证.
如果不是末尾,那么再次.AsParallel,
此时根据聚合方式实现不一样的并行
```
var partialSums = numbers
.AsParallel()
.Select(n => n * 2)
.Aggregate(0, (acc, n) => acc + n) //这个也是并行的,共线程执行,不同线程等
.AsParallel() // 有没有这句都一样,因为上面只会有一个结果
.Select(n => n / 2); // 如果不是唯一值才可以并行
```
## 例子4 顺序执行
利用 .AsSequential() 将后续操作转换为顺序执行.
```
using System;
using System.Collections.Generic;
using System.Linq;
class Order {
public int OrderId { get; set; }
public double TotalAmount { get; set; }
// 假设这是个非线程安全的方法,需要顺序调用
// 更新数据库记录的逻辑
public void UpdateDatabaseRecord()
{
Console.WriteLine($"Order {OrderId} updated in database.");
}
}
class Program {
static void Main() {
// 从数据库加载的订单列表
List<Order> orders = Enumerable.Range(1, 1000)
.Select(i => new Order { OrderId = i, TotalAmount = 100.0 * i })
.ToList();
// 并行查询
var parallelQuery = orders.AsParallel();
// 并行执行一些操作,比如过滤和选择
var largeOrders = parallelQuery
.Where(o => o.TotalAmount > 500)
.Select(o => new { o.OrderId, o.TotalAmount });
// 此处需要转为顺序执行操作,
// 因为是非线程安全的方法!!
// 顺序更新每个订单的数据库记录
largeOrders.AsSequential().ForEach(order =>
{
order.UpdateDatabaseRecord();
});
}
}
```
# 热路径优化
## 案例1
在这个例子中,Select和Where操作符都被频繁调用,
但并没有特别针对它们进行优化.
我们识别出Where操作符是一个热路径,
没有有效地利用缓存机制.
无优化例子:
```csharp
using System;
using System.Linq;
class Program {
static void Main() {
int[] numbers = Enumerable.Range(1, 10000).ToArray();
// 未优化的PLINQ查询
var query = numbers.AsParallel()
.Select(n => n * n) // 计算平方
.Where(n => n > 100); // 筛选大于100的结果
// 执行查询并输出结果
long sum = query.Sum(); // 计算总和
Console.WriteLine($"Sum of squares greater than 100: {sum}");
}
}
```
优化后例子:
```csharp
using System;
using System.Linq;
class Program {
static void Main() {
int[] numbers = Enumerable.Range(1, 10000).ToArray();
// 优化后的PLINQ查询,利用数组存入,可以利用CPU缓存,不过内存压力就大了
var squares = numbers.AsParallel().Select(n => n * n).ToArray();
var query = squares.AsParallel().Where(n => n > 100);
// 执行查询并输出结果
long sum = query.Sum(); // 计算总和
Console.WriteLine($"Sum of squares greater than 100: {sum}");
}
}
```
## 案例2
以下是一个简单的PLinq(Parallel LINQ)热路径优化的示例,
展示了如何通过一些常见技巧来提升性能:
示例场景:
假设我们有一个包含大量整数的列表,
我们想要对这个列表中的每个元素进行某种计算,
比如计算每个元素的平方,
并且希望利用多核处理器来并行执行这个操作以提高速度。
初始代码(未优化的PLinq使用)
```csharp
using System;
using System.Collections.Generic;
using System.Linq;
class Program{
static void Main() {
List<int> numbers = Enumerable
.Range(1, 1000000).ToList();
// 使用PLinq进行计算
var result = numbers.AsParallel()
.Select(n => n * n).ToList();
Console.WriteLine($"计算完成,结果数量: {result.Count}");
}
}
```
在上述代码中,我们简单地将一个整数列表转换为并行查询,
然后使用 Select 操作符来计算每个元素的平方,并最终将结果收集到一个新的列表中.
热路径优化步骤
### 设置合适的并行度
默认情况下,PLinq会根据系统的处理器核心数自动确定并行度.
但在某些情况下,你可能需要根据具体的硬件环境和任务特性来手动设置并行度.
例如,如果你的任务是I/O密集型的,可能不需要过高的并行度,
因为过多的并发I/O操作可能会导致性能下降.
而对于计算密集型任务,可能可以适当提高并行度.
以下是设置并行度的示例修改
```csharp
using System;
using System.Collections.Generic;
using System.Linq;
class Program{
static void Main() {
List<int> numbers = Enumerable
.Range(1, 1000000).ToList();
// 设置并行度为4(这里只是示例,可根据实际情况调整)
var result = numbers.AsParallel()
.WithDegreeOfParallelism(4)
.Select(n => n * n).ToList();
Console.WriteLine($"计算完成,结果数量: {result.Count}");
}
}
```
在这个修改后的代码中,
我们通过 WithDegreeOfParallelism 方法将并行度设置为4,
你可以根据实际测试情况在你的机器上找到最适合的并行度值
### 避免不必要的排序
PLinq在某些情况下可能会对结果进行排序,这可能会增加额外的开销,
尤其是当排序不是我们所需要的操作时
例如,如果我们只是关心计算结果本身,
而不关心它们的顺序是否与原始列表中的顺序相对应,
我们可以通过指定 AsOrdered(false) 来告诉PLinq不需要保持顺序
修改后的代码如下:
```csharp
using System;
using System.Collections.Generic;
using System.Linq;
class Program {
static void Main() {
List<int> numbers = Enumerable
.Range(1, 1000000).ToList();
// 设置并行度为4并避免不必要的排序
var result = numbers.AsParallel()
.WithDegreeOfParallelism(4)
.AsOrdered(false)
.Select(n => n * n).ToList();
Console.WriteLine($"计算完成,结果数量: {result.Count}");
}
}
``` |
|