今天就跟大家聊聊有關(guān)如何實(shí)現(xiàn) LoggingMetricsConsumer將指標(biāo)值輸出到metric.log日志文件,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
公司主營(yíng)業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站制作、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。成都創(chuàng)新互聯(lián)是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。成都創(chuàng)新互聯(lián)推出濂溪免費(fèi)做網(wǎng)站回饋大家。
前提說(shuō)明:
storm從0.9.0開(kāi)始,增加了指標(biāo)統(tǒng)計(jì)框架,用來(lái)收集應(yīng)用程序的特定指標(biāo),并將其輸出到外部系統(tǒng)。
一般來(lái)說(shuō),您只需要去實(shí)現(xiàn) LoggingMetricsConsumer,統(tǒng)計(jì)將指標(biāo)值輸出到metric.log日志文件之中。
當(dāng)然,您也可以自定義一個(gè)監(jiān)聽(tīng)的類:只需要去實(shí)現(xiàn)IMetricsConsumer接口就可以了。這些類可以在代碼里注冊(cè)(registerMetricsConsumer),也可以在 storm.yaml配置文件中注冊(cè):
package com.digitalpebble.storm.crawler; import backtype.storm.Config; import backtype.storm.metric.MetricsConsumerBolt; import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.task.IErrorReporter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.utils.Utils; import com.google.common.base.Joiner; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectWriter; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; /** * @author Enno Shioji (enno.shioji@peerindex.com) */ public class DebugMetricConsumer implements IMetricsConsumer { private static final Logger log = LoggerFactory .getLogger(DebugMetricConsumer.class); private IErrorReporter errorReporter; private Server server; // Make visible to servlet threads private volatile TopologyContext context; private volatile ConcurrentMap<String, Number> metrics; private volatile ConcurrentMap<String, Map<String, Object>> metrics_metadata; public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { this.context = context; this.errorReporter = errorReporter; this.metrics = new ConcurrentHashMap<String, Number>(); this.metrics_metadata = new ConcurrentHashMap<String, Map<String, Object>>(); try { // TODO Config file not tested final String PORT_CONFIG_STRING = "topology.metrics.consumers.debug.servlet.port"; Integer port = (Integer) stormConf.get(PORT_CONFIG_STRING); if (port == null) { log.warn("Metrics debug servlet's port not specified, defaulting to 7070. You can specify it via " + PORT_CONFIG_STRING + " in storm.yaml"); port = 7070; } server = startServlet(port); } catch (Exception e) { log.error("Failed to start metrics server", e); throw new AssertionError(e); } } private static final Joiner ON_COLONS = Joiner.on("::"); public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { // In order String componentId = taskInfo.srcComponentId; Integer taskId = taskInfo.srcTaskId; Integer updateInterval = taskInfo.updateIntervalSecs; Long timestamp = taskInfo.timestamp; for (DataPoint point : dataPoints) { String metric_name = point.name; try { Map<String, Number> metric = (Map<String, Number>) point.value; for (Map.Entry<String, Number> entry : metric.entrySet()) { String metricId = ON_COLONS.join(componentId, taskId, metric_name, entry.getKey()); Number val = entry.getValue(); metrics.put(metricId, val); metrics_metadata.put(metricId, ImmutableMap .<String, Object> of("updateInterval", updateInterval, "lastreported", timestamp)); } } catch (RuntimeException e) { // One can easily send something else than a Map<String,Number> // down the __metrics stream and make this part break. // If you ask me either the message should carry type // information or there should be different stream per message // type // This is one of the reasons why I want to write a further // abstraction on this facility errorReporter.reportError(e); metrics_metadata .putIfAbsent("ERROR_METRIC_CONSUMER_" + e.getClass().getSimpleName(), ImmutableMap .of("offending_message_sample", point.value)); } } } private static final ObjectMapper OM = new ObjectMapper(); private Server startServlet(int serverPort) throws Exception { // Setup HTTP server Server server = new Server(serverPort); Context root = new Context(server, "/"); server.start(); HttpServlet servlet = new HttpServlet() { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { SortedMap<String, Number> metrics = ImmutableSortedMap .copyOf(DebugMetricConsumer.this.metrics); SortedMap<String, Map<String, Object>> metrics_metadata = ImmutableSortedMap .copyOf(DebugMetricConsumer.this.metrics_metadata); Map<String, Object> toplevel = ImmutableMap .of("retrieved", new Date(), // TODO this call fails with mysterious // exception // "java.lang.IllegalArgumentException: Could not find component common for __metrics" // Mailing list suggests it's a library version // issue but couldn't find anything suspicious // Need to eventually investigate // "sources", // context.getThisSources().toString(), "metrics", metrics, "metric_metadata", metrics_metadata); ObjectWriter prettyPrinter = OM .writerWithDefaultPrettyPrinter(); prettyPrinter.writeValue(resp.getWriter(), toplevel); } }; root.addServlet(new ServletHolder(servlet), "/metrics"); log.info("Started metric server..."); return server; } public void cleanup() { try { server.stop(); } catch (Exception e) { throw new AssertionError(e); } } }
看完上述內(nèi)容,你們對(duì)如何實(shí)現(xiàn) LoggingMetricsConsumer將指標(biāo)值輸出到metric.log日志文件有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。
網(wǎng)頁(yè)名稱:如何實(shí)現(xiàn)LoggingMetricsConsumer將指標(biāo)值輸出到metric.log日志文件
標(biāo)題網(wǎng)址:http://muchs.cn/article18/pgojdp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁(yè)設(shè)計(jì)公司、軟件開(kāi)發(fā)、App設(shè)計(jì)、響應(yīng)式網(wǎng)站、ChatGPT、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)