皮皮网
皮皮网

【svn 拷贝源码】【saas会员源码】【macdroc指标源码】baserichspout源码

来源:寄生虫软件可以跟踪源码吗 发表时间:2024-12-22 10:03:37

1.如何在eclipse调试storm程序

baserichspout源码

如何在eclipse调试storm程序

       ä¸€ã€ä»‹ç»

        storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。

        Storm has two modes of operation: local mode and distributed mode.

        In local mode,svn 拷贝源码 Storm executes completely in process by simulating

       worker nodes with threads. Local mode is useful for testing and

       development of topologies

        因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。

        二、实施步骤

        如何基于eclipse+maven调试storm程序,步骤如下:

        1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)

        2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)

        Github上的pom.xml,引入的依赖太多,有些不需要,

        3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount

        重要的是LocalCluster cluster = new LocalCluster();这一句

       Config conf = new Config();

       conf.setDebug(true);

       conf.setNumWorkers(2);

       LocalCluster cluster = new LocalCluster();

       cluster.submitTopology("test", conf, builder.createTopology());

       Utils.sleep();

       cluster.killTopology("test");

       cluster.shutdown();

       pom.xml文件

       <project xmlns="mons-collections</groupId>

        <artifactId>commons-collections</artifactId>

        <version>3.2.1</version>

        </dependency>

        </dependencies>

       </project>

       storm程序

       package storm.starter;

       import java.util.HashMap;

       import java.util.Map;

       import storm.starter.spout.RandomSentenceSpout;

       import backtype.storm.Config;

       import backtype.storm.LocalCluster;

       import backtype.storm.StormSubmitter;

       import backtype.storm.topology.BasicOutputCollector;

       import backtype.storm.topology.OutputFieldsDeclarer;

       import backtype.storm.topology.TopologyBuilder;

       import backtype.storm.topology.base.BaseBasicBolt;

       import backtype.storm.tuple.Fields;

       import backtype.storm.tuple.Tuple;

       import backtype.storm.tuple.Values;

       /

**

        * This topology demonstrates Storm's stream groupings and multilang

        * capabilities.

        */

       public class WordCountTopology {

        public static class SplitSentence extends BaseBasicBolt {

        @Override

        public void execute(Tuple input, BasicOutputCollector collector) {

        try {

        String msg = input.getString(0);

        System.out.println(msg + "-------------------");

        if (msg != null) {

        String[] s = msg.split(" ");

        for (String string : s) {

        collector.emit(new Values(string));

        }

        }

        } catch (Exception e) {

        e.printStackTrace();

        }

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

        }

        }

        public static class WordCount extends BaseBasicBolt {

        Map<String, Integer> counts = new HashMap<String, Integer>();

        @Override

        public void execute(Tuple tuple, BasicOutputCollector collector) {

        String word = tuple.getString(0);

        Integer count = counts.get(word);

        if (count == null)

        count = 0;

        count++;

        counts.put(word, count);

        collector.emit(new Values(word, count));

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word", "count"));

        }

        }

        public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(

        "spout");

        builder.setBolt("count", new WordCount(), ).fieldsGrouping("split",

        new Fields("word"));

        Config conf = new Config();

        conf.setDebug(true);

        if (args != null && args.length > 0) {

        conf.setNumWorkers(3);

        StormSubmitter.submitTopology(args[0], conf,

        builder.createTopology());

        } else {

        conf.setMaxTaskParallelism(3);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("word-count", conf, builder.createTopology());

        Thread.sleep();

        cluster.shutdown();

        }

        }

       }

       package storm.starter.spout;

       import backtype.storm.spout.SpoutOutputCollector;

       import backtype.storm.task.TopologyContext;

       import backtype.storm.topology.OutputFieldsDeclarer;

       import backtype.storm.topology.base.BaseRichSpout;

       import backtype.storm.tuple.Fields;

       import backtype.storm.tuple.Values;

       import backtype.storm.utils.Utils;

       import java.util.Map;

       import java.util.Random;

       public class RandomSentenceSpout extends BaseRichSpout {

        SpoutOutputCollector _collector;

        Random _rand;

       @Override

        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

        _collector = collector;

        _rand = new Random();

        }

        @Override

        public void nextTuple() {

        Utils.sleep();

        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",

        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };

        String sentence = sentences[_rand.nextInt(sentences.length)];

        _collector.emit(new Values(sentence));

        }

        @Override

        public void ack(Object id) {

        }

        @Override

        public void fail(Object id) {

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

        }

       }

相关栏目:焦点