Dataflow components in Task Parallel Library
Sometimes you have components, structures, or techniques you stumble upon during your work, which are gems. These are the techniques that are not used daily by all developers, but for you, in your position, can save you a lot of time. I have found such a gem which is helpful in my current position.
This gem is called Dataflow components and all its components and features form the TPL (Task Parallel Library). With these components, you can create different multi-threaded parallel processes that can communicate with each other through in-process messaging. These different processes can be formed inside "so-called" blocks and within these blocks you do parallel processing.
How does this programming model work and what are blocks?
You have 3 types of blocks. Source, Target, and Propagator blocks. The names already tell you what they are used for. Source block acts as a source of data and can be read from. A target block acts as a data receiver and can be written to. A propagator block acts as both a source block and a target block and can be read from and written to.
Together they can be linked to form a pipeline in which you can process your data.
Here is a small example of a BufferBlock, a source block based on the FIFO principle.
public class DataFlowExample
{
public BufferBlock<int> BufferBlock { get; }
public DataFlowExample()
{
BufferBlock = new BufferBlock<int>();
}
public async Task BuildUpBufferAsync()
{
// Post several messages to the block.
for (var i = 0; i < 30; i++)
{
BufferBlock.Post(i);
Console.WriteLine("add number");
await Task.Delay(500);
}
}
}
private static void Main(string[] args)
{
var dataFlow = new DataFlowExample();
//execute build up buffer async
_ = Task.Run(dataFlow.BuildUpBufferAsync);
//consume buffer async
_ = Task.Run(async () =>
{
while (await dataFlow.BufferBlock.OutputAvailableAsync())
{
var number = await dataFlow.BufferBlock.ReceiveAsync();
Console.WriteLine($"received number: {number}");
await Task.Delay(500);
}
});
Console.ReadKey();
}
What happens is that, inside a separate Task all data is being stored inside the BufferBlock. Meanwhile, we start another Task which is reading from the same BufferBlock while data is also being added.
Now this is not spectacular, but say if you want to handle a big bunch of data coming in faster than you can process, it can serve you well. You can also link blocks together link so. Let's link our Bufferblock to another block called ActionBlock.
public class DataFlowExample
{
private BufferBlock<int> _bufferBlock { get; }
private ActionBlock<int> _actionBlock = new(async number =>
{
Console.WriteLine($"Linked agent 1 gets number {number}");
await Task.Delay(500);
});
public DataFlowExample()
{
_bufferBlock = new BufferBlock<int>();
//Link the actionblock to the bufferblock.
_bufferBlock.LinkTo(_actionBlock);
}
public void BuildUpBuffer()
{
// Post several messages to the block.
for (var i = 0; i < 30; i++)
{
_bufferBlock.Post(i);
}
_bufferBlock.Complete();
}
}
The BufferBlock can only be linked to 1 TargetBlock at a time. Now I hear you thinking, what if I want to send the same message to multiple blocks? For that scenario, you can use the BroadcastBlock. Its main purpose is to broadcast messages to other blocks.
public class DataFlowExample
{
private BroadcastBlock<int> _broadcastBlock;
private ActionBlock<int> _actionBlock1 = new(async number =>
{
Console.WriteLine($"Linked agent 1 gets number {number}");
await Task.Delay(500);
});
private ActionBlock<int> _actionBlock2 = new(async number =>
{
Console.WriteLine($"Linked agent 2 gets number {number}");
await Task.Delay(500);
});
public DataFlowExample()
{
_broadcastBlock = new BroadcastBlock<int>(null);
//The broadcast block will send the same message to all linked blocks
_broadcastBlock.LinkTo(_actionBlock1);
_broadcastBlock.LinkTo(_actionBlock2);
}
public void BuildUpBuffer()
{
// Post several messages to the block.
for (var i = 0; i < 30; i++)
{
_broadcastBlock.Post(i);
}
_broadcastBlock.Complete();
}
}
Now there is also a downside to using the BroadcastBlock. It can't hold messages like the BufferBlock. It exposes only one message at a time. It also overrides the object when a new object arrives. But with linking we can place the BufferBlock before the BroadcastBlock to hold all the messages, link so.
public DataFlowExample()
{
_bufferBlock = new BufferBlock<int>();
_broadcastBlock = new BroadcastBlock<int>(null);
//We link the buffer block to the broadcast block
_bufferBlock.LinkTo(_broadcastBlock);
//The broadcast block will send the same message to all linked blocks
_broadcastBlock.LinkTo(_actionBlock1);
_broadcastBlock.LinkTo(_actionBlock2);
}
public void BuildUpBuffer()
{
// Post several messages to the first block.
for (var i = 0; i < 30; i++)
{
_bufferBlock.Post(i);
}
//The buffer block will store all messages until the block is completed
_bufferBlock.Complete();
}
This means that we can have a big load of messages incoming and being buffered inside the Bufferblock. (With the BufferBlock we can read and write async from the same object) and let it be broadcasted by the Broadcastblock one at a time to multiple targets.
Another Block worth mentioning is the TransformBlock. This block allows you to specify an Action which gives you to ability to Transform data as it passes through the Transformblock.
Let's also link it to our pipeline of Blocks. Like so
public DataFlowExample()
{
_bufferBlock = new BufferBlock<int>();
_broadcastBlock = new BroadcastBlock<int>(null);
_transformBlock = new(async number =>
{
Console.WriteLine($"Transform block gets number {number}");
await Task.Delay(500);
return number;
});
//We link the buffer block to the transform block
_bufferBlock.LinkTo(_transformBlock);
//We link the transform block to the broadcast block
_transformBlock.LinkTo(_broadcastBlock);
//The broadcast block will send the same message to all linked blocks
_broadcastBlock.LinkTo(_actionBlock1);
_broadcastBlock.LinkTo(_actionBlock2);
}
Fun right!?
Check it out there are more blocks to choose from.