Skip to content

Commit

Permalink
Merge pull request #490 from sarmis/rmq-stream-merge-branch
Browse files Browse the repository at this point in the history
add stream-based rabbit mq bus
  • Loading branch information
Memoyu committed Sep 19, 2023
2 parents 930d9f0 + f26e1a0 commit a39a5bf
Show file tree
Hide file tree
Showing 7 changed files with 421 additions and 2 deletions.
9 changes: 8 additions & 1 deletion EasyCaching.sln
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Bus.Zookeeper",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.FasterKv", "src\EasyCaching.FasterKv\EasyCaching.FasterKv.csproj", "{7191E567-38DF-4879-82E1-73EC618AFCAC}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EasyCaching.Serialization.MemoryPack", "serialization\EasyCaching.Serialization.MemoryPack\EasyCaching.Serialization.MemoryPack.csproj", "{EEF22C21-F380-4980-B72C-F14488369333}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Bus.RabbitMQStream", "bus\EasyCaching.Bus.RabbitMQStream\EasyCaching.Bus.RabbitMQStream.csproj", "{3C9D5E40-B3A5-4649-8B40-08094644B0FB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Serialization.MemoryPack", "serialization\EasyCaching.Serialization.MemoryPack\EasyCaching.Serialization.MemoryPack.csproj", "{EEF22C21-F380-4980-B72C-F14488369333}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EasyCaching.Demo.Locks", "sample\EasyCaching.Demo.Locks\EasyCaching.Demo.Locks.csproj", "{9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}"
EndProject
Expand Down Expand Up @@ -211,6 +213,10 @@ Global
{EEF22C21-F380-4980-B72C-F14488369333}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EEF22C21-F380-4980-B72C-F14488369333}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EEF22C21-F380-4980-B72C-F14488369333}.Release|Any CPU.Build.0 = Release|Any CPU
{3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3C9D5E40-B3A5-4649-8B40-08094644B0FB}.Release|Any CPU.Build.0 = Release|Any CPU
{9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -251,6 +257,7 @@ Global
{5E488583-391E-4E15-83C1-7301B4FE79AE} = {B337509B-75F9-4851-821F-9BBE87C4E4BC}
{7191E567-38DF-4879-82E1-73EC618AFCAC} = {A0F5CC7E-155F-4726-8DEB-E966950B3FE9}
{EEF22C21-F380-4980-B72C-F14488369333} = {15070C49-A507-4844-BCFE-D319CFBC9A63}
{3C9D5E40-B3A5-4649-8B40-08094644B0FB} = {B337509B-75F9-4851-821F-9BBE87C4E4BC}
{9B15A0A0-BD6B-40B0-90D4-848BC3E4AF98} = {F88D727A-9F9C-43D9-90B1-D4A02BF8BC98}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
Expand Down
1 change: 1 addition & 0 deletions build/version.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<EasyCachingRedisBusPackageVersion>1.9.1</EasyCachingRedisBusPackageVersion>
<EasyCachingCSRedisBusPackageVersion>1.9.1</EasyCachingCSRedisBusPackageVersion>
<EasyCachingRabbitBusPackageVersion>1.9.1</EasyCachingRabbitBusPackageVersion>
<EasyCachingRabbitStreamBusPackageVersion>1.9.1</EasyCachingRabbitStreamBusPackageVersion>
<EasyCachingKafkaBusPackageVersion>1.9.1</EasyCachingKafkaBusPackageVersion>
<EasyCachingZookeeperBusPackageVersion>1.9.1</EasyCachingZookeeperBusPackageVersion>
<EasyCachingDiskPackageVersion>1.9.1</EasyCachingDiskPackageVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/// RabbitMQ Bus options.
/// </summary>
public class RabbitMQBusOptions : BaseRabbitMQOptions
{
{
/// <summary>
/// Gets or sets the name of the queue.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
namespace Microsoft.Extensions.DependencyInjection
{
using System;
using EasyCaching.Bus.RabbitMQ;
using EasyCaching.Bus.RabbitMQStream;
using EasyCaching.Core;
using EasyCaching.Core.Configurations;
using Microsoft.Extensions.Configuration;

/// <summary>
/// EasyCaching options extensions.
/// </summary>
public static class EasyCachingOptionsExtensions
{
/// <summary>
/// Withs the RabbitMQStream bus (specify the config via hard code).
/// </summary>
/// <param name="options">Options.</param>
/// <param name="configure">Configure bus settings.</param>

public static EasyCachingOptions WithRabbitMQStreamBus(
this EasyCachingOptions options
, Action<RabbitMQBusOptions> configure
)
{
ArgumentCheck.NotNull(configure, nameof(configure));

options.RegisterExtension(new RabbitMQStreamBusOptionsExtension(configure));
return options;
}

/// <summary>
/// Withs the RabbitMQStream bus (read config from configuration file).
/// </summary>
/// <param name="options">Options.</param>
/// <param name="configuration">The configuration.</param>
/// <param name="sectionName">The section name in the configuration file.</param>
public static EasyCachingOptions WithRabbitMQStreamBus(
this EasyCachingOptions options
, IConfiguration configuration
, string sectionName = EasyCachingConstValue.RabbitMQBusSection
)
{
var dbConfig = configuration.GetSection(sectionName);
var busOptions = new RabbitMQBusOptions();
dbConfig.Bind(busOptions);

void configure(RabbitMQBusOptions x)
{
x.HostName = busOptions.HostName;
x.Password = busOptions.Password;
x.Port = busOptions.Port;
x.QueueMessageExpires = busOptions.QueueMessageExpires;
x.RequestedConnectionTimeout = busOptions.RequestedConnectionTimeout;
//x.RouteKey = busOptions.RouteKey;
x.SocketReadTimeout = busOptions.SocketReadTimeout;
x.SocketWriteTimeout = busOptions.SocketWriteTimeout;
x.TopicExchangeName = busOptions.TopicExchangeName;
x.UserName = busOptions.UserName;
x.VirtualHost = busOptions.VirtualHost;
x.QueueName = busOptions.QueueName;
}

options.RegisterExtension(new RabbitMQStreamBusOptionsExtension(configure));
return options;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace EasyCaching.Bus.RabbitMQStream
{
using System;
using EasyCaching.Bus.RabbitMQ;
using EasyCaching.Core.Bus;
using EasyCaching.Core.Configurations;
using global::RabbitMQ.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.ObjectPool;

/// <summary>
/// RabbitMQ Bus options extension.
/// </summary>
public class RabbitMQStreamBusOptionsExtension : IEasyCachingOptionsExtension
{
/// <summary>
/// The configure.
/// </summary>
private readonly Action<RabbitMQBusOptions> configure;

/// <summary>
/// Initializes a new instance of the <see cref="T:EasyCaching.Bus.RabbitMQ.RabbitMQBusOptionsExtension"/> class.
/// </summary>
/// <param name="configure">Configure.</param>
public RabbitMQStreamBusOptionsExtension(Action<RabbitMQBusOptions> configure)
{
this.configure = configure;
}

/// <summary>
/// Adds the services.
/// </summary>
/// <param name="services">Services.</param>
public void AddServices(IServiceCollection services)
{
services.AddOptions();
services.Configure(configure);

services.AddSingleton<IPooledObjectPolicy<IModel>, ModelPooledObjectPolicy>();
services.AddSingleton<IEasyCachingBus, DefaultRabbitMQStreamBus>();
}
}
}
Loading

0 comments on commit a39a5bf

Please sign in to comment.