Skip to content

Commit 026ef12

Browse files
committed
feat(pace-83): handles paging
1 parent e7f0cf3 commit 026ef12

File tree

35 files changed

+294
-184
lines changed

35 files changed

+294
-184
lines changed

app/src/main/graphql/collibra/ListAssets.graphql

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ query ListDatabases($skip: Int!, $pageSize: Int!) {
3838
domain {
3939
name
4040
}
41-
42-
4341
}
4442
}
4543

@@ -69,6 +67,16 @@ query ListTablesInSchema($schemaId: UUID!, $skip: Int!, $pageSize: Int!) {
6967
}
7068
}
7169

70+
query GetSchema($schemaId: UUID!) {
71+
assets(
72+
where: {id: {eq: $schemaId}}
73+
){
74+
id
75+
displayName
76+
fullName
77+
}
78+
}
79+
7280
query ListSchemaIds($databaseId: UUID!, $skip: Int!, $pageSize: Int!) {
7381
assets(
7482
where: {domain: {type: {publicId: {eq: "PhysicalDataDictionary"}}}, id: {eq: $databaseId}, type: {publicId: {eq: "Database"}}}
@@ -90,3 +98,21 @@ query ListSchemaIds($databaseId: UUID!, $skip: Int!, $pageSize: Int!) {
9098
}
9199
}
92100

101+
query GetDataBase($databaseId: UUID!) {
102+
assets(
103+
where: {id: {eq: $databaseId}}
104+
){
105+
displayName
106+
fullName
107+
id
108+
domain {
109+
name
110+
}
111+
stringAttributes {
112+
stringValue
113+
type {
114+
publicId
115+
}
116+
}
117+
}
118+
}
Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,23 @@
11

2-
query TableWithColumns($id: UUID!) {
3-
tables: assets(
4-
where: {id: {eq: $id}}
2+
query ColumnTypesAndTags($tableId: UUID!, $pageSize: Int!, $skip: Int!) {
3+
columns: assets(
4+
where: { _and: [
5+
{type: {publicId: {eq: "Column"}}}
6+
{outgoingRelations: {typePublicId: "ColumnIsPartOfTable",
7+
all: {target: {id: {eq: $tableId}}}}}
8+
]
9+
}
10+
# the main advantage of this query is that offset and limit are not on the nested
11+
# where statement, so we can loop over the results if needed.
12+
# also it seeems like we use less nodes on Collibra, and don't trigger the
13+
# node size limit of 100000 so easily.
14+
offset: $skip
15+
limit: $pageSize
516
) {
617
displayName
7-
8-
schema: incomingRelations(where: {source: {type: {publicId: {eq: "Schema"}}}}) {
9-
schemaDetails: source {
10-
displayName
11-
12-
database: incomingRelations(
13-
where: {source: {type: {publicId: {eq: "Database"}}}}
14-
) {
15-
databaseDetails: source {
16-
displayName
17-
domain {
18-
name
19-
}
20-
}
21-
}
22-
}
23-
}
24-
columns: incomingRelations(
25-
# TODO we need handle paging!
26-
limit: 400,
27-
where: {type: {publicId: {eq: "ColumnIsPartOfTable"}}}
28-
) {
29-
columnDetails: source {
30-
displayName
31-
dataType: stringAttributes(
32-
where: {type: {name: {eq: "Technical Data Type"}}}
33-
) {
34-
value: stringValue
35-
}
36-
tags {
37-
name
38-
}
39-
40-
41-
42-
}
18+
dataType: stringAttributes(where: {type: {name: {eq: "Technical Data Type"}}}) {
19+
value: stringValue
4320
}
21+
tags { name }
4422
}
4523
}

app/src/main/kotlin/com/getstrm/pace/api/DataPoliciesApi.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class DataPoliciesApi(
2323
) : DataPoliciesServiceGrpcKt.DataPoliciesServiceCoroutineImplBase() {
2424
override suspend fun listDataPolicies(request: ListDataPoliciesRequest): ListDataPoliciesResponse {
2525
return ListDataPoliciesResponse.newBuilder()
26-
.addAllDataPolicies(dataPolicyService.listDataPolicies())
26+
.addAllDataPolicies(dataPolicyService.listDataPolicies(request))
2727
.build()
2828
}
2929

app/src/main/kotlin/com/getstrm/pace/api/ProcessingPlatformsApi.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ class ProcessingPlatformsApi(
2222

2323
override suspend fun listTables(request: ListTablesRequest): ListTablesResponse =
2424
ListTablesResponse.newBuilder().addAllTables(
25-
processingPlatformsService.listProcessingPlatformTables(request.platformId).map { it.fullName },
25+
processingPlatformsService.listProcessingPlatformTables(request).map { it.fullName },
2626
).build()
2727

2828
override suspend fun listGroups(request: ListGroupsRequest): ListGroupsResponse =
2929
ListGroupsResponse.newBuilder().addAllGroups(
30-
processingPlatformsService.listProcessingPlatformGroups(request.platformId).map { it.name },
30+
processingPlatformsService.listProcessingPlatformGroups(request).map { it.name },
3131
).build()
3232

3333
override suspend fun getBlueprintPolicy(request: GetBlueprintPolicyRequest) =

app/src/main/kotlin/com/getstrm/pace/catalogs/Collibra.kt

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ package com.getstrm.pace.catalogs
33
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
44
import build.buf.gen.getstrm.pace.api.paging.v1alpha.PageParameters
55
import com.apollographql.apollo3.ApolloClient
6-
import com.collibra.generated.ListPhysicalDataAssetsQuery
7-
import com.collibra.generated.ListSchemaIdsQuery
8-
import com.collibra.generated.ListTablesInSchemaQuery
9-
import com.collibra.generated.TableWithColumnsQuery
6+
import com.collibra.generated.*
107
import com.getstrm.pace.config.CatalogConfiguration
8+
import com.getstrm.pace.exceptions.BadRequestException
119
import com.getstrm.pace.util.normalizeType
10+
import com.google.rpc.BadRequest
1211
import java.util.*
1312

1413
class CollibraCatalog(config: CatalogConfiguration) : DataCatalog(config) {
@@ -23,52 +22,82 @@ class CollibraCatalog(config: CatalogConfiguration) : DataCatalog(config) {
2322
Database(this, it.id.toString(), it.getDataSourceType(), it.displayName?:"")
2423
}
2524

25+
override suspend fun getDatabase(databaseId: String): DataCatalog.Database {
26+
val database = client.query(GetDataBaseQuery(databaseId)).execute().data!!.assets!!.filterNotNull().first()
27+
return Database(
28+
catalog = this,
29+
id = database.id.toString(),
30+
dbType = database.getDataSourceType(),
31+
displayName = database.displayName ?: ""
32+
)
33+
}
34+
2635
class Database(override val catalog: CollibraCatalog, id: String, dbType: String, displayName: String) :
2736
DataCatalog.Database(catalog, id, dbType, displayName) {
2837

2938
override suspend fun listSchemas(pageParameters: PageParameters): List<DataCatalog.Schema> {
30-
val assets = catalog.client.query(ListSchemaIdsQuery(id, 0, 10)).execute().data!!.assets!!.filterNotNull()
39+
val assets = catalog.client.query(ListSchemaIdsQuery(id, pageParameters.skip, pageParameters.pageSize))
40+
.execute().data!!.assets!!.filterNotNull()
3141
.flatMap { schema ->
3242
schema.schemas
3343
}
3444
return assets.map {
3545
Schema(catalog, this, it.target.id.toString(), it.target.fullName)
3646
}
3747
}
48+
49+
override suspend fun getSchema(schemaId: String): DataCatalog.Schema {
50+
val schemaAsset = catalog.client.query(GetSchemaQuery(schemaId)).execute().data!!.assets!!.filterNotNull().first()
51+
return Schema(catalog, this, schemaAsset.id.toString(), schemaAsset.fullName)
52+
}
3853
}
3954

4055
class Schema(private val catalog: CollibraCatalog, database: DataCatalog.Database, id: String, name: String) :
4156
DataCatalog.Schema(database, id, name) {
4257
override suspend fun listTables(pageParameters: PageParameters): List<DataCatalog.Table> =
43-
catalog.client.query(ListTablesInSchemaQuery(id, 0, 10)).execute().data!!.assets!!.filterNotNull()
58+
catalog.client.query(ListTablesInSchemaQuery(id, pageParameters.skip, pageParameters.pageSize)).execute().data!!.assets!!.filterNotNull()
4459
.flatMap { table ->
4560
table.tables.map { Table(catalog, this, it.target.id.toString(), it.target.fullName) }
4661
}
62+
63+
// TODO create graphql specific query
64+
override suspend fun getTable(tableId: String): DataCatalog.Table {
65+
return super.getTable(tableId)
66+
}
4767
}
4868

4969
class Table(private val catalog: CollibraCatalog, schema: DataCatalog.Schema, id: String, name: String) :
5070
DataCatalog.Table(schema, id, name) {
71+
5172
override suspend fun getDataPolicy(): DataPolicy? {
52-
val response = catalog.client.query(TableWithColumnsQuery(id = id)).execute()
53-
return response.data?.tables?.firstOrNull()?.let { table ->
54-
val systemName =
55-
table.schema.firstOrNull()?.schemaDetails?.database?.firstOrNull()?.databaseDetails?.domain?.name
56-
57-
val builder = DataPolicy.newBuilder()
58-
builder.metadataBuilder.title = table.displayName
59-
builder.metadataBuilder.description = systemName
60-
builder.sourceBuilder.addAllFields(table.columns.map { it.toField() })
61-
builder.build()
73+
// TODO loop when more results
74+
val query = ColumnTypesAndTagsQuery(tableId = id, pageSize = 1000, skip = 0)
75+
val response = catalog.client.query(query).execute()
76+
if(!response.errors.isNullOrEmpty()){
77+
throw BadRequestException(
78+
BadRequestException.Code.INVALID_ARGUMENT,
79+
BadRequest.newBuilder()
80+
.build(),
81+
errorMessage = response.errors!!.joinToString { it.message }
82+
)
83+
6284
}
85+
val builder = DataPolicy.newBuilder()
86+
builder.metadataBuilder.title = name
87+
builder.metadataBuilder.description = schema.database.displayName
88+
response.data?.columns?.filterNotNull()?.forEach { column ->
89+
builder.sourceBuilder.addFields(column.toField())
90+
}
91+
return builder.build()
6392
}
6493

65-
private fun TableWithColumnsQuery.Column.toField(): DataPolicy.Field =
94+
private fun ColumnTypesAndTagsQuery.Column.toField(): DataPolicy.Field =
6695
with(DataPolicy.Field.newBuilder()) {
67-
addNameParts(columnDetails.displayName)
68-
val sourceType = columnDetails.dataType.firstOrNull()?.value ?: "unknown"
96+
addNameParts(displayName)
97+
val sourceType = dataType.firstOrNull()?.value ?: "unknown"
6998
// source type mapping
7099
type = sourceType
71-
addAllTags(columnDetails.tags.map { it.name })
100+
addAllTags(tags.map { it.name })
72101
build().normalizeType()
73102
}
74103
}
@@ -87,6 +116,10 @@ class CollibraCatalog(config: CatalogConfiguration) : DataCatalog(config) {
87116

88117
private fun ListPhysicalDataAssetsQuery.Asset.getDataSourceType(): String =
89118
stringAttributes.find { it.type.publicId == "DataSourceType" }?.stringValue ?: "unknown"
119+
private fun GetDataBaseQuery.Asset.getDataSourceType(): String =
120+
stringAttributes.find { it.type.publicId == "DataSourceType" }?.stringValue ?: "unknown"
121+
122+
90123
}
91124

92125
enum class AssetTypes(val assetName: String) {

app/src/main/kotlin/com/getstrm/pace/catalogs/DataCatalog.kt

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package com.getstrm.pace.catalogs
33
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
44
import build.buf.gen.getstrm.pace.api.paging.v1alpha.PageParameters
55
import com.getstrm.pace.config.CatalogConfiguration
6+
import com.getstrm.pace.exceptions.ResourceException
67
import com.getstrm.pace.util.DEFAULT_PAGE_PARAMETERS
8+
import com.getstrm.pace.util.THOUSAND_RECORDS
9+
import com.google.rpc.ResourceInfo
710
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataCatalog as ApiCatalog
811
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataCatalog.Database as ApiDatabase
912
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataCatalog.Schema as ApiSchema
@@ -23,6 +26,24 @@ abstract class DataCatalog(
2326
get() = ApiCatalog.newBuilder().setId(id).setType(config.type).build()
2427

2528
abstract suspend fun listDatabases(pageParameters: PageParameters = DEFAULT_PAGE_PARAMETERS): List<Database>
29+
30+
// TODO Should be overridden by implementations to avoid query size constraints
31+
// TODO make more efficient implementation that does not list all the databases
32+
open suspend fun getDatabase(databaseId: String): Database {
33+
return listDatabases(PageParameters.newBuilder().setPageSize(1000).build()).find{it.id == databaseId}?:
34+
throw ResourceException(
35+
ResourceException.Code.NOT_FOUND,
36+
ResourceInfo.newBuilder()
37+
.setResourceType("Database")
38+
.setResourceName(databaseId)
39+
.setDescription("Database $databaseId not found in catalog $id")
40+
.setOwner("Catalog: $id")
41+
.build()
42+
43+
44+
)
45+
}
46+
2647

2748
/**
2849
* A table is a collection of columns.
@@ -52,6 +73,21 @@ abstract class DataCatalog(
5273
.setName(name)
5374
.setDatabase(database.apiDatabase)
5475
.build()
76+
// TODO Should be overridden by implementations to avoid query size constraints
77+
// TODO make more efficient implementation that does not list all the tables
78+
open suspend fun getTable(tableId: String): Table {
79+
return listTables(PageParameters.newBuilder().setPageSize(1000).build()).firstOrNull { it.id == tableId }
80+
?: throw ResourceException(
81+
ResourceException.Code.NOT_FOUND,
82+
ResourceInfo.newBuilder()
83+
.setResourceType("Table")
84+
.setResourceName(tableId)
85+
.setDescription("Table $tableId not found in schema $id")
86+
.setOwner("Schema: $id")
87+
.build()
88+
)
89+
90+
}
5591
}
5692

5793
/** meta information database */
@@ -65,6 +101,20 @@ abstract class DataCatalog(
65101
open suspend fun getTags(): List<String> = emptyList()
66102
override fun toString() = dbType?.let { "Database($id, $dbType, $displayName)" } ?: "Database($id)"
67103

104+
// TODO Should be overridden by implementations to avoid query size constraints
105+
// TODO make more efficient implementation that does not list all the schemas
106+
open suspend fun getSchema(schemaId: String): Schema {
107+
return listSchemas(THOUSAND_RECORDS).firstOrNull { it.id == schemaId } ?: throw ResourceException(
108+
ResourceException.Code.NOT_FOUND,
109+
ResourceInfo.newBuilder()
110+
.setResourceType("Catalog Database Schema")
111+
.setResourceName(schemaId)
112+
.setDescription("Schema $schemaId not found in database $id of catalog $catalog.id")
113+
.setOwner("Database: $id")
114+
.build()
115+
)
116+
}
117+
68118
val apiDatabase: ApiDatabase
69119
get() = ApiDatabase.newBuilder()
70120
.setCatalog(catalog.apiCatalog)

app/src/main/kotlin/com/getstrm/pace/dao/DataPolicyDao.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.getstrm.pace.dao
22

33
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
4+
import build.buf.gen.getstrm.pace.api.paging.v1alpha.PageParameters
45
import com.getstrm.jooq.generated.tables.DataPolicies.Companion.DATA_POLICIES
56
import com.getstrm.jooq.generated.tables.records.DataPoliciesRecord
67
import com.getstrm.pace.exceptions.BadRequestException
@@ -19,9 +20,13 @@ class DataPolicyDao(
1920
private val jooq: DSLContext,
2021
) {
2122

22-
fun listDataPolicies(): List<DataPoliciesRecord> = jooq.select()
23+
fun listDataPolicies(pageParameters: PageParameters): List<DataPoliciesRecord> =
24+
jooq.select()
2325
.from(DATA_POLICIES)
2426
.where(DATA_POLICIES.ACTIVE.isTrue)
27+
.orderBy(DATA_POLICIES.ID, DATA_POLICIES.PLATFORM_ID, DATA_POLICIES.VERSION)
28+
.offset(pageParameters.skip)
29+
.limit(pageParameters.pageSize)
2530
.fetchInto(DATA_POLICIES)
2631

2732
fun upsertDataPolicy(dataPolicy: DataPolicy): DataPoliciesRecord {

app/src/main/kotlin/com/getstrm/pace/processing_platforms/ProcessingPlatformClient.kt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
package com.getstrm.pace.processing_platforms
22

33
import build.buf.gen.getstrm.pace.api.entities.v1alpha.DataPolicy
4+
import build.buf.gen.getstrm.pace.api.paging.v1alpha.PageParameters
45
import com.getstrm.pace.exceptions.ResourceException
6+
import com.getstrm.pace.util.THOUSAND_RECORDS
57
import com.google.rpc.ResourceInfo
68
import org.jooq.Field
79

810
interface ProcessingPlatformClient {
911
val id: String
1012
val type: DataPolicy.ProcessingPlatform.PlatformType
11-
suspend fun listGroups(): List<Group>
12-
suspend fun listTables(): List<Table>
13+
suspend fun listGroups(pageParameters: PageParameters): List<Group>
14+
suspend fun listTables(pageParameters: PageParameters): List<Table>
1315
suspend fun applyPolicy(dataPolicy: DataPolicy)
1416

15-
suspend fun getTable(tableName: String): Table =
16-
listTables().find { it.fullName == tableName } ?: throw ResourceException(
17+
// TODO Should be overridden by implementations to avoid query size constraints
18+
// TODO make more efficient implementation that does not list all the databases
19+
open suspend fun getTable(tableName: String): Table =
20+
listTables(THOUSAND_RECORDS).find { it.fullName == tableName } ?: throw ResourceException(
1721
ResourceException.Code.NOT_FOUND,
1822
ResourceInfo.newBuilder()
1923
.setResourceType("Table")

0 commit comments

Comments
 (0)