当前位置 :首页 >> 电视

大数据培训Spark SQLcodice_血缘扩展实战案例

2023-03-11   来源 : 电视

lan(sqlText: String): LogicalPlan = {

val lineAgeEnabled = SparkSession.getActiveSession

.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean

logDebug(s"SqlText: $sqlText")

if(sqlText.toLowerCase().contains("insert")){

if(lineAgeEnabled){

if(FIELD_LINE_AGE_SQL_COULD_SET.get()){

//寄存器本地codice_在这里

FIELD_LINE_AGE_SQL.set(sqlText)

}

FIELD_LINE_AGE_SQL_COULD_SET.remove()

}

}

delegate.parsePlan(sqlText)

}

//加载更早的sqlparser

override def parseExpression(sqlText: String): Expression = {

delegate.parseExpression(sqlText)

}

//加载更早的sqlparser

override def parseTableIdentifier(sqlText: String): TableIdentifier = {

delegate.parseTableIdentifier(sqlText)

}

//加载更早的sqlparser

override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {

delegate.parseFunctionIdentifier(sqlText)

}

//加载更早的sqlparser

override def parseTableSchema(sqlText: String): StructType = {

delegate.parseTableSchema(sqlText)

}

//加载更早的sqlparser

override def parseDataType(sqlText: String): DataType = {

delegate.parseDataType(sqlText)

}

}

3.3 构建的法则类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {

val executor: ThreadPoolExecutor =

ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)

override def apply(plan: LogicalPlan): Unit = {

val sql = FIELD_LINE_AGE_SQL.get

FIELD_LINE_AGE_SQL.remove()

if(sql != null){

//这里我们取得sql然后激活一个寄存器花钱剩余的重构护航

val task = new FieldLineageRunnableV3(sparkSession,sql)

executor.execute(task)

}

}

}

很简单,我们只是取得了 SQL 然后就让激活了一个寄存器去受益 SparkPlan,实际直觉在

FieldLineageRunnableV3。

3.4 具体的花钱到统计分析方法

3.4.1 受益 SparkPlan

我们在 run 统计分析方法当中受益 SparkPlan:

override def run(): Unit = {

val parser = sparkSession.sessionState.sqlParser

val analyzer = sparkSession.sessionState.analyzer

val optimizer = sparkSession.sessionState.optimizer

val planner = sparkSession.sessionState.planner

val newPlan = parser.parsePlan(sql)

PASS_TABLE_AUTH.set(true)

val analyzedPlan = analyzer.executeAndCheck(newPlan)

val optimizerPlan = optimizer.execute(analyzedPlan)

//受益sparkPlan

val sparkPlan = planner.plan(optimizerPlan).next()

if(targetTable != null){

val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()

val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()

//projection

projectionLineAge(levelProject, sparkPlan.child)

//predication

predicationLineAge(predicates, sparkPlan.child)

为什么要应用于 SparkPlan 呢?原本我们选择的时候,科专修计划案拿取URL关联的时候是非常准的,且信令非常稍短也更直接。

在这里多余一下 Spark SQL 重构的全过程如下:

经过SqlParser后才会受益直觉计划案,此时此表名、线性等都并未重构,还不用可执行;经过Analyzer才会统计分析一些绑定个人信息,例如此表验证、URL个人信息、线性个人信息;经过Optimizer 后直觉计划案才会根据既定法则被细化,这里的法则是RBO,当然 Spark 还背书CBO的细化;经过SparkPlanner后就并成了可可执行的科专修计划案。

我们看一个直觉计划案与科专修计划案对比的比如说:

一个 SQL 语义:

select item_id,TYPE,v_value,imei from t1

union all

select item_id,TYPE,v_value,imei from t2

union all

select item_id,TYPE,v_value,imei from t3

直觉计划案是这样的:

科专修计划案是这样的:

或许细化了很多。

受益 SparkPlan 后,我们就可以根据各有不同的SparkPlan节点花钱乘积处理。

我们将URL祖先分为两种各种类型:projection(select查阅URL)、predication(wehre查阅条件)。

这两种是一种点对点的关联,即从更早此表【关心尚为硅谷,轻松专修IT】的URL生并成远距离此表的URL的相关联关联。

一心象一个查阅是一棵栽,那么乘积关联才会如下从栽的顶端开始乘积,直到栽的叶子节点,叶子节点即为更早此表:

那么我们乘积查阅的结果应该为

id ->tab1.id ,

name->tab1.name,tabb2.name,

age→tabb2.age。

说明了有该codice_

val levelProject = new ArrayBuffer

[ArrayBuffer[NameExpressionHolder]](),通过projecti-onLineAge 乘积后 levelProject 加载了顶层id,name,age相关联的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是简单的递归乘积,还必须选择特殊性状况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都必须特殊性选择。

比如说及效果:

SQL:

with A as (select id,name,age from tab1 where id> 100 ) ,

C as (select id,name,max(age) from A group by A.id,A.name) ,

B as (select id,name,age from tabb2 where age> 28)

insert into tab3

select C.id,concat(C.name,B.name) as name, B.age from

B,C where C.id = B.id

效果:

{

"edges": [

{

"sources": [

3

],

"targets": [

0

],

"expression": "id",

"edgeType": "PROJECTION"

},

{

"sources": [

4,

7

],

"targets": [

1

],

"expression": "name",

"edgeType": "PROJECTION"

},

{

"sources": [

5

],

"targets": [

2

],

"expression": "age",

"edgeType": "PROJECTION"

},

{

"sources": [

6,

3

],

"targets": [

0,

1,

2

],

"expression": "INNER",

"edgeType": "PREDICATE"

},

{

"sources": [

6,

5

],

"targets": [

0,

1,

2

],

"expression": "((((default.tabb2.MLT-ageMLT- IS NOT NULL) AND (CAST(default.tabb2.MLT-ageMLT- AS INT)> 28)) AND (B.MLT-idMLT-> 100)) AND (B.MLT-idMLT- IS NOT NULL))",

"edgeType": "PREDICATE"

},

{

"sources": [

3

],

"targets": [

0,

1,

2

],

"expression": "((default.tab1.MLT-idMLT- IS NOT NULL) AND (default.tab1.MLT-idMLT-> 100))",

"edgeType": "PREDICATE"

}

],

"vertices": [

{

"id": 0,

"vertexType": "COLUMN",

"vertexId": "default.tab3.id"

},

{

"id": 1,

"vertexType": "COLUMN",

"vertexId": "default.tab3.name"

},

{

"id": 2,

"vertexType": "COLUMN",

"vertexId": "default.tab3.age"

},

{

"id": 3,

"vertexType": "COLUMN",

"vertexId": "default.tab1.id"

},

{

"id": 4,

"vertexType": "COLUMN",

"vertexId": "default.tab1.name"

},

{

"id": 5,

"vertexType": "COLUMN",

"vertexId": "default.tabb2.age"

},

{

"id": 6,

"vertexType": "COLUMN",

"vertexId": "default.tabb2.id"

},

{

"id": 7,

"vertexType": "COLUMN",

"vertexId": "default.tabb2.name"

}

]

}

四、回顾

在 Spark SQL 的URL祖先花钱到当中,我们通过其自构建,首先取得了 insert 语义,在我们自己的检测法则当中取得

SQL 语义,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终受益了科专修计划案。

我们通过乘积科专修计划案,根据各有不同可执行计划案花钱相关联的类比,www.atguigu.com然后就受益了URL错综复杂的相关联关联。也就是说的花钱到是非常简单的,URL错综复杂是直线的相关联关联,当上端全过程被比如说,如果一心花钱到URL的类比的整个全过程也是并未情况的。

文章刊发起源于图表仓库与Python大图表

推荐阅读:

大图表开发计划 Spark 计算机系统之SparkSQL

Spark SQL之RDD类比DataFrame的统计分析方法

大图表开发计划之SparkSQL面试篇

大图表开发计划之Spark SQL可执行机动性的改善

甘肃男科医院排名
长沙比较好的牛皮癣医院
长沙看白癜风哪个医院最好
浙江男科医院预约挂号
易克和英太青作用一样吗
一胎盼女儿,结果又生了个儿子,宝爸当场的表情太过于真实了

女婴国策的新开让很多家庭成员都多了一个宝宝,但对于夙女婴这件坏事,每对祖父母期欣然的结果都是不一样的,之所以这样说道主要是因为,大部分的爹妈都期欣然一夙能凑个好字,就比如一家人现在有妻子,那么女...

友情链接