JVM上高性能數(shù)據(jù)格式庫包ApacheArrow入門和架構詳

Apache Arrow是是各種大數(shù)據(jù)工具(包括BigQuery)使用的一種流行格式,它是平面和分層數(shù)據(jù)的存儲格式。它是一種加快應用程序內(nèi)存密集型。

梅河口網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、APP開發(fā)、響應式網(wǎng)站建設等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)于2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設就選創(chuàng)新互聯(lián)。

數(shù)據(jù)處理和數(shù)據(jù)科學領域中的常用庫: 。諸如Apache Parquet,Apache Spark,pandas之類的開放源代碼項目以及許多商業(yè)或封閉源代碼服務都使用Arrow。它提供以下功能:

內(nèi)存計算 標準化的柱狀存儲格式 一個IPC和RPC框架,分別用于進程和節(jié)點之間的數(shù)據(jù)交換

讓我們看一看在Arrow出現(xiàn)之前事物是如何工作的:

我們可以看到,為了使Spark從Parquet文件中讀取數(shù)據(jù),我們需要以Parquet格式讀取和反序列化數(shù)據(jù)。這要求我們通過將數(shù)據(jù)加載到內(nèi)存中來制作數(shù)據(jù)的完整副本。首先,我們將數(shù)據(jù)讀入內(nèi)存緩沖區(qū),然后使用Parquet的轉換方法將數(shù)據(jù)(例如字符串或數(shù)字)轉換為我們的編程語言的表示形式。這是必需的,因為Parquet表示的數(shù)字與Python編程語言表示的數(shù)字不同。

由于許多原因,這對于性能來說是一個很大的問題:

我們正在復制數(shù)據(jù)并在其上運行轉換步驟。數(shù)據(jù)的格式不同,我們需要對所有數(shù)據(jù)進行讀取和轉換,然后再對數(shù)據(jù)進行任何計算。 我們正在加載的數(shù)據(jù)必須放入內(nèi)存中。您只有8GB的RAM,數(shù)據(jù)是10GB嗎?你真倒霉!

現(xiàn)在,讓我們看一下Apache Arrow如何改進這一點:

Arrow無需復制和轉換數(shù)據(jù),而是了解如何直接讀取和操作數(shù)據(jù)。為此,Arrow社區(qū)定義了一種新的文件格式以及直接對序列化數(shù)據(jù)起作用的操作??梢灾苯訌拇疟P讀取此數(shù)據(jù)格式,而無需將其加載到內(nèi)存中并轉換/反序列化數(shù)據(jù)。當然,部分數(shù)據(jù)仍將被加載到RAM中,但您的數(shù)據(jù)不必放入內(nèi)存中。Arrow使用其文件的內(nèi)存映射功能,僅在必要和可能的情況下將盡可能多的數(shù)據(jù)加載到內(nèi)存中。

Apache Arrow支持以下語言:

C++ C# Go Java JavaScript Rust Python (through the C++ library) Ruby (through the C++ library) R (through the C++ library) MATLAB (through the C++ library).
Arrow特點

Arrow首先是提供用于內(nèi)存計算的列式數(shù)據(jù)結構的庫,可以將任何數(shù)據(jù)解壓縮并解碼為Arrow柱狀數(shù)據(jù)結構,以便隨后可以對解碼后的數(shù)據(jù)進行內(nèi)存內(nèi)分析。Arrow列格式具有一些不錯的屬性:隨機訪問為O(1),每個值單元格在內(nèi)存中的前一個和后一個相鄰,因此進行迭代非常有效。

Apache Arrow定義了一種二進制“序列化”協(xié)議,用于安排Arrow列數(shù)組的集合(稱為“記錄批處理”),該數(shù)組可用于消息傳遞和進程間通信。您可以將協(xié)議放在任何地方,包括磁盤上,以后可以對其進行內(nèi)存映射或讀入內(nèi)存并發(fā)送到其他地方。

Arrow協(xié)議的設計目的是使您可以“映射”一個Arrow數(shù)據(jù)塊而不進行任何反序列化,因此對磁盤上的Arrow協(xié)議數(shù)據(jù)執(zhí)行分析可以使用內(nèi)存映射并有效地支付零成本。該協(xié)議用于很多事情,例如Spark SQL和Python之間的流數(shù)據(jù),用于針對Spark SQL數(shù)據(jù)塊運行pandas函數(shù),這些被稱為“ pandas udfs”。

Arrow是為內(nèi)存而設計的(但是您可以將其放在磁盤上,然后再進行內(nèi)存映射)。它們旨在相互兼容,并在應用程序中一起使用,而其競爭對手Apache Parquet文件是為磁盤存儲而設計的。

優(yōu)點:Apache Arrow為平面和分層數(shù)據(jù)定義了一種獨立于語言的列式存儲格式,該格式組織為在CPU和GPU等現(xiàn)代硬件上進行高效的分析操作而組織。Arrow存儲器格式還支持零拷貝讀取,以實現(xiàn)閃電般的數(shù)據(jù)訪問,而無需序列化開銷。

Java的Apache Arrow

導入庫:

<dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-memory-netty</artifactId> <version>${arrow.version}</version></dependency><dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-vector</artifactId> <version>${arrow.version}</version></dependency>

在開始之前,必須了解對于Arrow的讀/寫操作,使用了字節(jié)緩沖區(qū)。諸如讀取和寫入之類的操作是字節(jié)的連續(xù)交換。為了提高效率,Arrow附帶了一個緩沖區(qū)分配器,該緩沖區(qū)分配器可以具有一定的大小,也可以具有自動擴展功能。支持分配管理的庫是arrow-memory-netty和arrow-memory-unsafe。我們這里使用netty。

用Arrow存儲數(shù)據(jù)需要一個模式,模式可以通過編程定義:

package com.gkatzioura.arrow;import java.io.IOException;import java.util.List;import org.apache.arrow.vector.types.pojo.ArrowType;import org.apache.arrow.vector.types.pojo.Field;import org.apache.arrow.vector.types.pojo.FieldType;import org.apache.arrow.vector.types.pojo.Schema;public class SchemaFactory {public static Schema DEFAULT_SCHEMA = createDefault();public static Schema createDefault() {var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);return new Schema(List.of(strField, intField));}public static Schema schemaWithChildren() {var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));return new Schema(List.of(itemField));}public static Schema fromJson(String jsonString) {try {return Schema.fromJSON(jsonString);} catch (IOException e) {throw new ArrowExampleException(e);}}}

他們也有一個可解析的json表示形式:

{ "fields" : [ { "name" : "col1", "nullable" : true, "type" : { "name" : "utf8" }, "children" : [ ] }, { "name" : "col2", "nullable" : true, "type" : { "name" : "int", "bitWidth" : 32, "isSigned" : true }, "children" : [ ] } ]}

另外,就像Avro一樣,您可以在字段上設計復雜的架構和嵌入式值:

public static Schema schemaWithChildren() { var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null); var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null); var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency)); return new Schema(List.of(itemField));}

基于上面的的Schema,我們將為我們的類創(chuàng)建一個DTO:

package com.gkatzioura.arrow; import lombok.Builder;import lombok.Data; @Data@Builderpublic class DefaultArrowEntry { private String col1; private Integer col2; }

我們的目標是將這些Java對象轉換為Arrow字節(jié)流。

1. 使用分配器創(chuàng)建

這些緩沖區(qū)是 的 。您確實需要釋放所使用的內(nèi)存,但是對于庫用戶而言,這是通過在分配器上執(zhí)行 操作來完成的。在我們的例子中,我們的類將實現(xiàn) ,該接口將執(zhí)行分配器關閉操作。

通過使用流api,數(shù)據(jù)將被流傳輸?shù)绞褂肁rrow格式提交的OutPutStream:

package com.gkatzioura.arrow; import java.io.Closeable;import java.io.IOException;import java.nio.channels.WritableByteChannel;import java.util.List; import org.apache.arrow.memory.RootAllocator;import org.apache.arrow.vector.IntVector;import org.apache.arrow.vector.VarCharVector;import org.apache.arrow.vector.VectorSchemaRoot;import org.apache.arrow.vector.dictionary.DictionaryProvider;import org.apache.arrow.vector.ipc.ArrowStreamWriter;import org.apache.arrow.vector.util.Text; import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA; public class DefaultEntriesWriter implements Closeable { private final RootAllocator rootAllocator; private final VectorSchemaRoot vectorSchemaRoot;//向量分配器創(chuàng)建: public DefaultEntriesWriter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator); } public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) { if (batchSize <= 0) { batchSize = defaultArrowEntries.size(); } DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider(); try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) { writer.start(); VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0); IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1); childVector1.reset(); childVector2.reset(); boolean exactBatches = defaultArrowEntries.size()%batchSize == 0; int batchCounter = 0; for(int i=0; i < defaultArrowEntries.size(); i++) { childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1())); childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2()); batchCounter++; if(batchCounter == batchSize) { vectorSchemaRoot.setRowCount(batchSize); writer.writeBatch(); batchCounter = 0; } } if(!exactBatches) { vectorSchemaRoot.setRowCount(batchCounter); writer.writeBatch(); } writer.end(); } catch (IOException e) { throw new ArrowExampleException(e); } } @Override public void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close(); } }

為了在Arrow上顯示批處理的支持,已在函數(shù)中實現(xiàn)了簡單的批處理算法。對于我們的示例,只需考慮將數(shù)據(jù)分批寫入。

讓我們深入了解上面代碼功能:

向量分配器創(chuàng)建:

public DefaultEntriesToBytesConverter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);}

然后在寫入流時,實現(xiàn)并啟動了Arrow流編寫器

ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));writer.start();

我們將數(shù)據(jù)填充向量,然后還重置它們,但讓預分配的緩沖區(qū) 存在 :

VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);childVector1.reset();childVector2.reset();

寫入數(shù)據(jù)時,我們使用 setSafe 操作。如果需要分配更多的緩沖區(qū),應采用這種方式。對于此示例,此操作在每次寫入時都完成,但是在考慮了所需的操作和緩沖區(qū)大小后可以避免:

childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

然后,將批處理寫入流中:

vectorSchemaRoot.setRowCount(batchSize);writer.writeBatch();

最后但并非最不重要的一點是,我們關閉了writer:

@Overridepublic void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close();}

以上就是JVM上高性能數(shù)據(jù)格式庫包Apache Arrow入門和架構詳解(Gkatziouras)的詳細內(nèi)容,更多關于Apache Arrow入門的資料請關注腳本之家其它相關文章!

當前文章:JVM上高性能數(shù)據(jù)格式庫包ApacheArrow入門和架構詳
網(wǎng)站鏈接:http://muchs.cn/article0/sgpoo.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設計網(wǎng)站導航、靜態(tài)網(wǎng)站ChatGPT、動態(tài)網(wǎng)站虛擬主機

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

小程序開發(fā)