這篇文章將為大家詳細(xì)講解有關(guān)flume中hdfssink如何自定義EventSerializer序列化類,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的巴彥網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
因?yàn)橹白隽薶basesink的序列化類,覺(jué)得寫hdfs的應(yīng)該會(huì)很簡(jiǎn)單,可是沒(méi)想到竟然不一樣。hdfs并沒(méi)有直接配置序列化類的選項(xiàng)需要根據(jù)fileType來(lái)選擇對(duì)相應(yīng)序列化類,我們使用的datastream的類型,對(duì)應(yīng)的類是HDFSDataStream,這個(gè)類默認(rèn)的序列化類TEXT(這是個(gè)枚舉類型)
serializerType = context.getString("serializer", "TEXT");
枚舉的類如下:
public enum EventSerializerType { TEXT(BodyTextEventSerializer.Builder.class), HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class), AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class), CUSTOM(CUSTOMEventSerializer.Builder.class),//自定義的序列化類 OTHER(null); private final Class<? extends EventSerializer.Builder> builderClass; EventSerializerType(Class<? extends EventSerializer.Builder> builderClass) { this.builderClass = builderClass; } public Class<? extends EventSerializer.Builder> getBuilderClass() { return builderClass; } }
在里面加了自定義的類型和枚舉,在配置agent的時(shí)候配置好filetype和serializer即可,同樣需要編譯上傳。
自定義的序列化類如下:
public class CUSTOMEventSerializer implements EventSerializer { private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class); private final String SPLITCHAR = "\001";//列分隔符 // for legacy reasons, by default, append a newline to each event written // out private final String APPEND_NEWLINE = "appendNewline"; private final boolean APPEND_NEWLINE_DFLT = true; private final OutputStream out; private final boolean appendNewline; private CUSTOMEventSerializer(OutputStream out, Context ctx) { this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT); this.out = out; } @Override public boolean supportsReopen() { return true; } @Override public void afterCreate() { // noop } @Override public void afterReopen() { // noop } @Override public void beforeClose() { // noop } @Override public void write(Event e) throws IOException { // 獲取日志信息 String log = new String(e.getBody(), StandardCharsets.UTF_8); logger.info("-----------logs-------" + log); // headers包含日志中項(xiàng)目編號(hào)和host信息 Map<String, String> headers = e.getHeaders(); String parsedLog = parseJson2Value(log, headers); out.write(parsedLog.getBytes()); logger.info("-----------values-------" + parsedLog); logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes()); out.write('\n'); } /** * * @Title: parseJson2Value * @Description: 解析出json日志中的value。 * @param log json格式日志 * @param headers event頭信息 * @return * @return String 解析后的日志 * @throws */ private String parseJson2Value(String log, Map<String, String> headers) { log.replace("\\", "/"); String time = ""; String path = ""; Object value = ""; StringBuilder values = new StringBuilder(); ObjectMapper objectMapper = new ObjectMapper(); try { Map<String,Object> m = objectMapper.readValue(log, Map.class); for(String key:m.keySet()){ value = m.get(key); if (key.equals("uri")){ //解析訪問(wèn)路徑 path = pasreUriToPath(value.toString()); } if(key.equals("time")){ time = value.toString().substring(10); } values.append(value).append(this.SPLITCHAR); } } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 解析headers中的項(xiàng)目編號(hào)和服務(wù)host String pcode = headers.get("pcode"); String host = headers.get("host"); values.append(path).append(this.SPLITCHAR). append(pcode).append(this.SPLITCHAR). append(host).append(this.SPLITCHAR). append(time).append(this.SPLITCHAR); //value字符串 return values.toString(); } @Override public void flush() throws IOException { // noop } public static class Builder implements EventSerializer.Builder { @Override public EventSerializer build(Context context, OutputStream out) { CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context); return s; } } /** * 把請(qǐng)求uri轉(zhuǎn)換成具體的訪問(wèn)路徑 * * @param uri 請(qǐng)求uri * @return 訪問(wèn)路徑 */ protected String pasreUriToPath(String uri){ if(uri == null || "".equals(uri.trim())){ return uri; } int index = uri.indexOf("/"); if(index > -1){ uri = uri.substring(index); } index = uri.indexOf("?"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(";"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(" HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf("HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } return uri; } }
關(guān)于“flume中hdfssink如何自定義EventSerializer序列化類”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
文章標(biāo)題:flume中hdfssink如何自定義EventSerializer序列化類
文章轉(zhuǎn)載:http://www.muchs.cn/article18/ippjgp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、靜態(tài)網(wǎng)站、網(wǎng)站設(shè)計(jì)、企業(yè)網(wǎng)站制作、響應(yīng)式網(wǎng)站、App設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)