WordCount在大数据领域就像学习一门语言时的hello world,得益于Storm的开源以及Storm.Net.Adapter,现在我们也可以像Java或Python一样,使用Csharp创建原生支持的Storm Topologies。下面我将通过介绍wordcount来展示如何使用Csharp开发Storm拓扑。


首先,我们创建一个控制台应用程序(使用控制台是方便调用) StormSimple;使用Nuget添加添加Storm.Net.Adapter(该类库的namespace为Storm)。



void Open(Config stormConf, TopologyContext context);
void NextTuple();
void Ack(long seqId);
void Fail(long seqId);


private Context ctx;

public Generator(Context ctx)
Context.Logger.Info("Generator constructor called");
this.ctx = ctx; // Declare Output schema
Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
outputSchema.Add("default", new List<Type>() { typeof(string) });
this.ctx.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema));


我使用了一个私有变量ctx来保存实例化时传入的Context对象,Context有一个静态的Logger,用于日志的发送,我们无需实例化即可使用它。根据日志级别不同,包含 Trace Debug Info Warn Error 五个级别,另外我们在实例化方法里还需要定义输入和输出的参数的数量和类型,本例子中输入为null,输出为一个字符串。另外我们还创建一个方法来直接返回实例化后的类:

/// <summary>
/// Implements of delegate "newPlugin", which is used to create a instance of this spout/bolt
/// </summary>
/// <param name="ctx">Context instance</param>
/// <returns></returns>
public static Generator Get(Context ctx)
return new Generator(ctx);



private const int MAX_PENDING_TUPLE_NUM = ;
private long lastSeqId = ;
private Dictionary<long, string> cachedTuples = new Dictionary<long, string>(); private Random rand = new Random();
string[] sentences = new string[] {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"};
/// <summary>
/// This method is used to emit one or more tuples. If there is nothing to emit, this method should return without emitting anything.
/// It should be noted that NextTuple(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process.
/// When there are no tuples to emit, it is courteous to have NextTuple sleep for a short amount of time (such as 10 milliseconds), so as not to waste too much CPU.
/// </summary>
public void NextTuple()
Context.Logger.Info("NextTuple enter");
string sentence; if (cachedTuples.Count <= MAX_PENDING_TUPLE_NUM)
sentence = sentences[rand.Next(, sentences.Length - )];
Context.Logger.Info("Generator Emit: {0}, seqId: {1}", sentence, lastSeqId);
this.ctx.Emit("default", new List<object>() { sentence }, lastSeqId);
cachedTuples[lastSeqId] = sentence;
// if have nothing to emit, then sleep for a little while to release CPU
Context.Logger.Info("cached tuple num: {0}", cachedTuples.Count); Context.Logger.Info("Generator NextTx exit");

this.ctx.Emit 即用来把Topology发送给下一个Bolt。


/// <summary>
/// Ack() will be called only when ack mechanism is enabled in spec file.
/// If ack is not supported in non-transactional topology, the Ack() can be left as empty function.
/// </summary>
/// <param name="seqId">Sequence Id of the tuple which is acked.</param>
public void Ack(long seqId)
Context.Logger.Info("Ack, seqId: {0}", seqId);
bool result = cachedTuples.Remove(seqId);
if (!result)
Context.Logger.Warn("Ack(), remove cached tuple for seqId {0} fail!", seqId);
} /// <summary>
/// Fail() will be called only when ack mechanism is enabled in spec file.
/// If ack is not supported in non-transactional topology, the Fail() can be left as empty function.
/// </summary>
/// <param name="seqId">Sequence Id of the tuple which is failed.</param>
public void Fail(long seqId)
Context.Logger.Info("Fail, seqId: {0}", seqId);
if (cachedTuples.ContainsKey(seqId))
string sentence = cachedTuples[seqId];
Context.Logger.Info("Re-Emit: {0}, seqId: {1}", sentence, seqId);
this.ctx.Emit("default", new List<object>() { sentence }, seqId);
Context.Logger.Warn("Fail(), can't find cached tuple for seqId {0}!", seqId);





private Context ctx;
private int msgTimeoutSecs; public Splitter(Context ctx)
Context.Logger.Info("Splitter constructor called");
this.ctx = ctx; // Declare Input and Output schemas
Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
inputSchema.Add("default", new List<Type>() { typeof(string) });
Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
outputSchema.Add("default", new List<Type>() { typeof(string), typeof(char) });
this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema)); // Demo how to get stormConf info
if (Context.Config.StormConf.ContainsKey("topology.message.timeout.secs"))
msgTimeoutSecs = Convert.ToInt32(Context.Config.StormConf["topology.message.timeout.secs"]);
Context.Logger.Info("msgTimeoutSecs: {0}", msgTimeoutSecs);
} /// <summary>
/// Implements of delegate "newPlugin", which is used to create a instance of this spout/bolt
/// </summary>
/// <param name="ctx">Context instance</param>
/// <returns></returns>
public static Splitter Get(Context ctx)
return new Splitter(ctx);



void Prepare(Config stormConf, TopologyContext context);
void Execute(StormTuple tuple);


/// <summary>
/// The Execute() function will be called, when a new tuple is available.
/// </summary>
/// <param name="tuple"></param>
public void Execute(StormTuple tuple)
Context.Logger.Info("Execute enter"); string sentence = tuple.GetString(); foreach (string word in sentence.Split(' '))
Context.Logger.Info("Splitter Emit: {0}", word);
this.ctx.Emit("default", new List<StormTuple> { tuple }, new List<object> { word, word[] });
} Context.Logger.Info("Splitter Execute exit");
} public void Prepare(Config stormConf, TopologyContext context)


using Storm;
using System;
using System.Collections.Generic; namespace StormSample
/// <summary>
/// The bolt "counter" uses a dictionary to record the occurrence number of each word.
/// </summary>
public class Counter : IBasicBolt
private Context ctx; private Dictionary<string, int> counts = new Dictionary<string, int>(); public Counter(Context ctx)
Context.Logger.Info("Counter constructor called"); this.ctx = ctx; // Declare Input and Output schemas
Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
inputSchema.Add("default", new List<Type>() { typeof(string), typeof(char) }); Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
outputSchema.Add("default", new List<Type>() { typeof(string), typeof(int) });
this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));
} /// <summary>
/// The Execute() function will be called, when a new tuple is available.
/// </summary>
/// <param name="tuple"></param>
public void Execute(StormTuple tuple)
Context.Logger.Info("Execute enter"); string word = tuple.GetString();
int count = counts.ContainsKey(word) ? counts[word] : ;
counts[word] = count; Context.Logger.Info("Counter Emit: {0}, count: {1}", word, count);
this.ctx.Emit("default", new List<StormTuple> { tuple }, new List<object> { word, count }); Context.Logger.Info("Counter Execute exit");
} /// <summary>
/// Implements of delegate "newPlugin", which is used to create a instance of this spout/bolt
/// </summary>
/// <param name="ctx">Context instance</param>
/// <returns></returns>
public static Counter Get(Context ctx)
return new Counter(ctx);
} public void Prepare(Config stormConf, TopologyContext context)


using Storm;
using System;
using System.Linq; namespace StormSample
class Program
static void Main(string[] args)
if (args.Count() > )
string compName = args[]; try
if ("generator".Equals(compName))
ApacheStorm.LaunchPlugin(new newPlugin(Generator.Get));
else if ("splitter".Equals(compName))
ApacheStorm.LaunchPlugin(new newPlugin(Splitter.Get));
else if ("counter".Equals(compName))
ApacheStorm.LaunchPlugin(new newPlugin(Counter.Get));
throw new Exception(string.Format("unexpected compName: {0}", compName));
catch (Exception ex)
Context.Logger.Error("Not support local model.");





