spark怎么編寫udaf函數(shù)求中位數(shù)

本篇內(nèi)容主要講解“spark怎么編寫udaf函數(shù)求中位數(shù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“spark怎么編寫udaf函數(shù)求中位數(shù)”吧!

順河網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián),順河網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為順河上1000家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的順河做網(wǎng)站的公司定做!

package com.frank.sparktest.java;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class MedianUdaf extends UserDefinedAggregateFunction {

    private StructType inputSchema;
    private StructType bufferSchema;

    public MedianUdaf(){
        List<StructField> inputFields = new ArrayList<>();
        inputFields.add(DataTypes.createStructField("nums",DataTypes.IntegerType,true));
        inputSchema=DataTypes.createStructType(inputFields);
        List<StructField> bufferFields = new ArrayList<>();
        bufferFields.add(DataTypes.createStructField("datas",DataTypes.StringType,true));
        bufferSchema=DataTypes.createStructType(bufferFields);
    }

    @Override
    public StructType inputSchema() {
        return inputSchema;
    }

    @Override
    public StructType bufferSchema() {
        return bufferSchema;
    }

    @Override
    public DataType dataType() {
        return DataTypes.DoubleType;
    }

    @Override
    public boolean deterministic() {
        return true;
    }

    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0,0);
        buffer.update(1,0);
    }

    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        if (!input.isNullAt(0)){
            buffer.update(0,buffer.getString(0)+","+input.getInt(0));
        }
    }

    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        buffer1.update(0,buffer1.getString(0)+","+buffer2.getInt(0));
    }

    @Override
    public Object evaluate(Row buffer) {
        List<Integer> list = new ArrayList<Integer>();
        List<String> stringList = Arrays.asList(buffer.getString(0).split(","));
        for (String s : stringList){
            list.add(Integer.valueOf(s));
        }
        Collections.sort(list);
        int size = list.size();
        int num=0;
        if(size % 2 == 1) {
            num = list.get((size / 2)+1);
        }
        if(size %2  == 0) {
            num = (list.get(size / 2)+list.get((size / 2)+1))/2;
        }
        return num;
    }

}

上面是代碼段,可以直接拿來使用

下面是測試程序

package com.frank.sparktest.java;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;

import java.io.IOException;
import java.util.stream.IntStream;

public class DemoUDAF {

    public static void main(String[] args) throws IOException {
        SQLContext sqlContext = SparkSession.builder().master("local").getOrCreate().sqlContext();
        sqlContext.udf().register("generate", (Integer start, Integer end)-> IntStream.range(start, end+1).boxed().toArray(), DataTypes.createArrayType(DataTypes.IntegerType));
        sqlContext.udf().register("media",new MedianUdaf());
        sqlContext.sql("select generate(1,10)").show();
    }
}

到此,相信大家對“spark怎么編寫udaf函數(shù)求中位數(shù)”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

網(wǎng)頁題目:spark怎么編寫udaf函數(shù)求中位數(shù)
轉(zhuǎn)載注明:http://muchs.cn/article32/jpghpc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機網(wǎng)站建設(shè)、商城網(wǎng)站、Google關(guān)鍵詞優(yōu)化、網(wǎng)站策劃、響應(yīng)式網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

手機網(wǎng)站建設(shè)