Skip to content

Commit 9860659

Browse files
author
Bob van den Hoogen
authored
feat(pace-81): add aggregation field transform (#116)
1 parent f9d79c3 commit 9860659

File tree

31 files changed

+658
-58
lines changed

31 files changed

+658
-58
lines changed

app/src/main/kotlin/com/getstrm/pace/processing_platforms/ProcessingPlatformTransformer.kt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.getstrm.pace.processing_platforms
22

33
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
4+
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.Aggregation
45
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.Detokenize
56
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.Fixed
67
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.Hash
@@ -9,6 +10,7 @@ import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldT
910
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.SqlStatement
1011
import com.getstrm.pace.exceptions.BadRequestException
1112
import com.getstrm.pace.exceptions.InternalException
13+
import com.getstrm.pace.exceptions.PaceStatusException
1214
import com.getstrm.pace.exceptions.PaceStatusException.Companion.BUG_REPORT
1315
import com.getstrm.pace.util.fullName
1416
import com.getstrm.pace.util.sqlDataType
@@ -19,6 +21,7 @@ import com.google.rpc.DebugInfo
1921
import org.jooq.Parser
2022
import org.jooq.impl.DSL
2123
import org.jooq.impl.ParserException
24+
import org.jooq.impl.SQLDataType
2225
import org.jooq.Field as JooqField
2326

2427
val CAPTURING_GROUP_REGEX = Regex("""\$(\d+)""")
@@ -128,6 +131,41 @@ interface ProcessingPlatformTransformer : ProcessingPlatformRenderer {
128131
)
129132
}
130133

134+
fun aggregation(field: DataPolicy.Field, aggregation: Aggregation): JooqField<*> {
135+
val jooqField = DSL.field(field.fullName(), Float::class.java)
136+
137+
val jooqAggregation = when (aggregation.aggregationTypeCase) {
138+
Aggregation.AggregationTypeCase.SUM -> DSL.sum(jooqField)
139+
Aggregation.AggregationTypeCase.AVG -> DSL.avg(
140+
aggregation.avg.castTo.ifEmpty { null }
141+
?.let { DSL.field("cast({0} as {1})", Float::class.java, jooqField, DSL.unquotedName(it)) }
142+
?: DSL.cast(
143+
jooqField,
144+
SQLDataType.DECIMAL
145+
)
146+
)
147+
148+
Aggregation.AggregationTypeCase.MIN -> DSL.min(jooqField)
149+
Aggregation.AggregationTypeCase.MAX -> DSL.max(jooqField)
150+
else -> throw InternalException(
151+
InternalException.Code.INTERNAL,
152+
DebugInfo.newBuilder()
153+
.setDetail("Aggregation type ${aggregation.aggregationTypeCase.name} is not supported or does not exist. ${PaceStatusException.UNIMPLEMENTED}")
154+
.build()
155+
)
156+
}
157+
val aggregationField = DSL.field(
158+
"{0} over({1})",
159+
Float::class.java,
160+
jooqAggregation,
161+
DSL.partitionBy(aggregation.partitionByList.map { DSL.field(it.fullName()) })
162+
)
163+
164+
return aggregation.avg?.takeIf { it.hasPrecision() }?.let {
165+
DSL.round(aggregationField, it.precision)
166+
} ?: aggregationField
167+
}
168+
131169
companion object {
132170
private val typeParser: TypeParser = TypeParser.newBuilder().build()
133171

app/src/main/kotlin/com/getstrm/pace/processing_platforms/ProcessingPlatformViewGenerator.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.getstrm.pace.processing_platforms
22

33
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
4+
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform
5+
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.AGGREGATION
46
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.DETOKENIZE
57
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.FIXED
68
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.HASH
@@ -12,7 +14,6 @@ import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldT
1214
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.FieldTransform.Transform.TransformCase.TRANSFORM_NOT_SET
1315
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.Filter.FilterCase.GENERIC_FILTER
1416
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy.RuleSet.Filter.FilterCase.RETENTION_FILTER
15-
import com.getstrm.pace.processing_platforms.DefaultProcessingPlatformTransformer.renderName
1617
import com.getstrm.pace.util.defaultJooqSettings
1718
import com.getstrm.pace.util.fullName
1819
import com.getstrm.pace.util.headTailFold
@@ -161,6 +162,7 @@ abstract class ProcessingPlatformViewGenerator(
161162
if (retention.conditionsList.size == 1) {
162163
// If there is only one filter it should be the only option
163164
// create retention sql
165+
return condition(retention.conditionsList.first().toRetentionCondition(retention.field))
164166
}
165167

166168
val whereCondition = retention.conditionsList.headTailFold(
@@ -232,7 +234,9 @@ abstract class ProcessingPlatformViewGenerator(
232234
transform.detokenize,
233235
dataPolicy.source.ref,
234236
)
237+
235238
NUMERIC_ROUNDING -> transformer.numericRounding(field, transform.numericRounding)
239+
AGGREGATION -> transformer.aggregation(field, transform.aggregation)
236240
TRANSFORM_NOT_SET, IDENTITY, null -> transformer.identity(field)
237241
}
238242
return memberCheck to (statement as JooqField<Any>)

app/src/main/kotlin/com/getstrm/pace/processing_platforms/synapse/SynapseClient.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@ import com.getstrm.pace.config.SynapseConfig
66
import com.getstrm.pace.processing_platforms.Group
77
import com.getstrm.pace.processing_platforms.ProcessingPlatformClient
88
import com.getstrm.pace.processing_platforms.Table
9-
import com.getstrm.pace.processing_platforms.databricks.SynapseViewGenerator
109
import com.getstrm.pace.util.normalizeType
1110
import com.zaxxer.hikari.HikariConfig
1211
import com.zaxxer.hikari.HikariDataSource
1312
import kotlinx.coroutines.Dispatchers
1413
import kotlinx.coroutines.withContext
1514
import org.jooq.DSLContext
16-
import org.jooq.Field
1715
import org.jooq.SQLDialect
1816
import org.jooq.impl.DSL
1917

app/src/main/kotlin/com/getstrm/pace/processing_platforms/synapse/SynapseTransformer.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
22
import com.getstrm.pace.exceptions.BadRequestException
3-
import com.getstrm.pace.exceptions.InternalException
43
import com.getstrm.pace.exceptions.PaceStatusException
54
import com.getstrm.pace.processing_platforms.ProcessingPlatformTransformer
65
import com.getstrm.pace.util.fullName
76
import com.google.rpc.BadRequest
8-
import com.google.rpc.DebugInfo
97
import org.jooq.Field
108
import org.jooq.impl.DSL
119

app/src/main/kotlin/com/getstrm/pace/processing_platforms/synapse/SynapseViewGenerator.kt

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.getstrm.pace.processing_platforms.databricks
1+
package com.getstrm.pace.processing_platforms.synapse
22

33
import SynapseTransformer
44
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
@@ -114,30 +114,35 @@ class SynapseViewGenerator(
114114
}
115115

116116
override fun toCondition(retention: DataPolicy.RuleSet.Filter.RetentionFilter): Condition {
117-
val whereCondition = retention.conditionsList.headTailFold(
118-
headOperation = { condition ->
119-
DSL.`when`(
120-
toPrincipalCondition(condition.principalsList),
121-
condition.toSynapseRetentionCondition(),
122-
)
123-
},
124-
bodyOperation = { conditionStep, condition ->
125-
conditionStep.`when`(
126-
toPrincipalCondition(condition.principalsList),
127-
condition.toSynapseRetentionCondition(),
128-
)
129-
},
130-
tailOperation = { conditionStep, condition ->
131-
conditionStep.otherwise(condition.toSynapseRetentionCondition())
132-
},
133-
)
117+
val retentionCondition = if (retention.conditionsList.size == 1) {
118+
// If there is only one filter it should be the only option
119+
retention.conditionsList.first().toSynapseRetentionCondition()
120+
} else {
121+
retention.conditionsList.headTailFold(
122+
headOperation = { condition ->
123+
DSL.`when`(
124+
toPrincipalCondition(condition.principalsList),
125+
condition.toSynapseRetentionCondition(),
126+
)
127+
},
128+
bodyOperation = { conditionStep, condition ->
129+
conditionStep.`when`(
130+
toPrincipalCondition(condition.principalsList),
131+
condition.toSynapseRetentionCondition(),
132+
)
133+
},
134+
tailOperation = { conditionStep, condition ->
135+
conditionStep.otherwise(condition.toSynapseRetentionCondition())
136+
},
137+
)
138+
}
134139

135140
val retentionClause = DSL.field(
136141
"{0} > {1}",
137142
Boolean::class.java,
138143
DSL.timestampAdd(
139144
DSL.field(DSL.unquotedName(retention.field.fullName()), Timestamp::class.java),
140-
DSL.field("(${whereCondition})", Int::class.java),
145+
DSL.field("(${retentionCondition})", Int::class.java),
141146
DatePart.DAY
142147
),
143148
DSL.currentTimestamp()

app/src/main/resources/jsonschema/getstrm.pace.api.data_catalogs.v1alpha/GetBlueprintPolicyResponse.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)