From 3a852847ecb5d7be9cfc1d8a63b505257cc4d8c2 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Fri, 6 Dec 2024 09:19:08 +0000 Subject: [PATCH 1/6] wip: get container stats --- src/components/c2d/compute_engine_docker.ts | 102 +++++++++++++++++++- 1 file changed, 99 insertions(+), 3 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index ec48c3d42..c19527ada 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -19,7 +19,11 @@ import { C2DDatabase } from '../database/C2DDatabase.js' import { create256Hash } from '../../utils/crypt.js' import { Storage } from '../storage/index.js' import Dockerode from 'dockerode' -import type { ContainerCreateOptions, VolumeCreateOptions } from 'dockerode' +import type { + ContainerCreateOptions, + // ContainerStats, + VolumeCreateOptions +} from 'dockerode' import * as tar from 'tar' import { createWriteStream, @@ -41,6 +45,7 @@ import { Service } from '../../@types/DDO/Service.js' import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.js' import * as drc from 'docker-registry-client' import { ValidateParams } from '../httpRoutes/validateCommands.js' +import { streamToString } from '../../utils/util.js' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] @@ -202,7 +207,7 @@ export class C2DEngineDocker extends C2DEngine { const jobId = generateUniqueID() - // TO DO C2D - Check image, check arhitecture, etc + // C2D - Check image, check arhitecture, etc const image = getAlgorithmImage(algorithm) // ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36 if (!image) { @@ -219,6 +224,7 @@ export class C2DEngineDocker extends C2DEngine { envIdWithHash ? environment : null, environment ) + const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) if (!validation.valid) throw new Error(`Unable to validate docker image ${image}: ${validation.reason}`) @@ -533,7 +539,9 @@ export class C2DEngineDocker extends C2DEngine { try { const container = await this.docker.createContainer(containerInfo) - console.log('container: ', container) + // TODO + this.checkResources(job, container) + // console.log('container: ', container) job.status = C2DStatusNumber.Provisioning job.statusText = C2DStatusText.Provisioning await this.db.updateJob(job) @@ -589,6 +597,8 @@ export class C2DEngineDocker extends C2DEngine { } } } else { + // TODO + this.checkResources(job, container) // is running, we need to stop it.. console.log('running, need to stop it?') const timeNow = Date.now() / 1000 @@ -646,6 +656,92 @@ export class C2DEngineDocker extends C2DEngine { } } + private async checkResources(job: DBComputeJob, container: Dockerode.Container) { + // need to have this mapped maybe already + const environments: ComputeEnvironment[] = await ( + await this.getComputeEnvironments() + ).filter((env: ComputeEnvironment) => env.id === job.environment) + // TODO + if (environments.length === 1) { + const environment = environments[0] + const statsRaw: any = await container.stats({ stream: true }) + const stats = await JSON.parse(await streamToString(statsRaw as Readable)) + console.log('container stats1: ', stats) + const memory = stats.memory_stats + console.log('memory stats: ', memory) + const cpu = stats.cpu_stats + console.log('cpu stats: ', cpu) + console.log('got environment:', environment) + /** + * stats: + * { + "read": "0001-01-01T00:00:00Z", + "preread": "0001-01-01T00:00:00Z", + "pids_stats": {}, + "blkio_stats": { + "io_service_bytes_recursive": null, + "io_serviced_recursive": null, + "io_queue_recursive": null, + "io_service_time_recursive": null, + "io_wait_time_recursive": null, + "io_merged_recursive": null, + "io_time_recursive": null, + "sectors_recursive": null + }, + "num_procs": 0, + "storage_stats": {}, + "cpu_stats": { + "cpu_usage": { + "total_usage": 0, + "usage_in_kernelmode": 0, + "usage_in_usermode": 0 + }, + "throttling_data": { + "periods": 0, + "throttled_periods": 0, + "throttled_time": 0 + } + }, + "precpu_stats": { + "cpu_usage": { + "total_usage": 0, + "usage_in_kernelmode": 0, + "usage_in_usermode": 0 + }, + "throttling_data": { + "periods": 0, + "throttled_periods": 0, + "throttled_time": 0 + } + }, + "memory_stats": {}, + "name": "/8a44fe02-b3dc-4677-90e8-9f29090ad279-algoritm", + "id": "47b13b90ac02709adb7ac03140ca9ec2cd9731ec084f2eaa5c0b1c046551d6d3" +} + * got environment: { +id: '0x46f61c90309fcffa02e887e1a8a1ebdfeabe4f1ff279e306de2803df36bd46f7-free', +cpuNumber: 1, +cpuType: '', +gpuNumber: 0, +ramGB: 1, +diskGB: 1, +priceMin: 0, +desc: 'Free', +currentJobs: 0, +maxJobs: 1, +consumerAddress: '0x8F292046bb73595A978F4e7A131b4EBd03A15e8a', +storageExpiry: 600, +maxJobDuration: 600, +feeToken: '0x0000000000000000000000000000000000000000', +chainId: 8996, +free: true, +platform: { architecture: 'x86_64', os: 'linux' } +} + + */ + } + } + // eslint-disable-next-line require-await private async cleanupJob(job: DBComputeJob) { // cleaning up From 9687a972460694a43e1dfd49a2188b087a1d8e6c Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Thu, 12 Dec 2024 15:51:37 +0000 Subject: [PATCH 2/6] add hostconfig options for mem and cpu --- src/components/c2d/compute_engine_base.ts | 12 ++ src/components/c2d/compute_engine_docker.ts | 124 +++++--------------- src/utils/util.ts | 10 ++ 3 files changed, 54 insertions(+), 92 deletions(-) diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 1b79b8724..68d1d3245 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -111,6 +111,18 @@ export abstract class C2DEngine { public getStreamableLogs(jobId: string): Promise { throw new Error(`Not implemented for this engine type`) } + + protected async getJobEnvironment(job: DBComputeJob): Promise { + const environments: ComputeEnvironment[] = await ( + await this.getComputeEnvironments() + ).filter((env: ComputeEnvironment) => env.id === job.environment) + // found it + if (environments.length === 1) { + const environment = environments[0] + return environment + } + return null + } } export class C2DEngineLocal extends C2DEngine { diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index c19527ada..3d3dff881 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -21,6 +21,7 @@ import { Storage } from '../storage/index.js' import Dockerode from 'dockerode' import type { ContainerCreateOptions, + HostConfig, // ContainerStats, VolumeCreateOptions } from 'dockerode' @@ -45,7 +46,7 @@ import { Service } from '../../@types/DDO/Service.js' import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.js' import * as drc from 'docker-registry-client' import { ValidateParams } from '../httpRoutes/validateCommands.js' -import { streamToString } from '../../utils/util.js' +import { convertGigabytesToBytes } from '../../utils/util.js' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] @@ -504,9 +505,11 @@ export class C2DEngineDocker extends C2DEngine { await this.db.updateJob(job) await this.cleanupJob(job) } + // get env info + const environment = await this.getJobEnvironment(job) // create the container const mountVols: any = { '/data': {} } - const hostConfig: any = { + const hostConfig: HostConfig = { Mounts: [ { Type: 'volume', @@ -516,6 +519,15 @@ export class C2DEngineDocker extends C2DEngine { } ] } + if (environment != null) { + // limit container CPU & Memory usage according to env specs + hostConfig.CpuCount = 1 || environment.cpuNumber + hostConfig.CpusetCpus = environment.cpuNumber + ? `0-${environment.cpuNumber}` + : '0-1' + hostConfig.Memory = 0 || convertGigabytesToBytes(environment.ramGB) + } + // console.log('host config: ', hostConfig) const containerInfo: ContainerCreateOptions = { name: job.jobId + '-algoritm', Image: job.containerImage, @@ -539,9 +551,8 @@ export class C2DEngineDocker extends C2DEngine { try { const container = await this.docker.createContainer(containerInfo) - // TODO - this.checkResources(job, container) - // console.log('container: ', container) + // this.checkResources(job, container) + console.log('container: ', container) job.status = C2DStatusNumber.Provisioning job.statusText = C2DStatusText.Provisioning await this.db.updateJob(job) @@ -580,6 +591,7 @@ export class C2DEngineDocker extends C2DEngine { if (details.State.Running === false) { try { await container.start() + // this.checkResources(job, container) job.isStarted = true await this.db.updateJob(job) return @@ -597,8 +609,7 @@ export class C2DEngineDocker extends C2DEngine { } } } else { - // TODO - this.checkResources(job, container) + // this.checkResources(job, container) // is running, we need to stop it.. console.log('running, need to stop it?') const timeNow = Date.now() / 1000 @@ -656,91 +667,20 @@ export class C2DEngineDocker extends C2DEngine { } } - private async checkResources(job: DBComputeJob, container: Dockerode.Container) { - // need to have this mapped maybe already - const environments: ComputeEnvironment[] = await ( - await this.getComputeEnvironments() - ).filter((env: ComputeEnvironment) => env.id === job.environment) - // TODO - if (environments.length === 1) { - const environment = environments[0] - const statsRaw: any = await container.stats({ stream: true }) - const stats = await JSON.parse(await streamToString(statsRaw as Readable)) - console.log('container stats1: ', stats) - const memory = stats.memory_stats - console.log('memory stats: ', memory) - const cpu = stats.cpu_stats - console.log('cpu stats: ', cpu) - console.log('got environment:', environment) - /** - * stats: - * { - "read": "0001-01-01T00:00:00Z", - "preread": "0001-01-01T00:00:00Z", - "pids_stats": {}, - "blkio_stats": { - "io_service_bytes_recursive": null, - "io_serviced_recursive": null, - "io_queue_recursive": null, - "io_service_time_recursive": null, - "io_wait_time_recursive": null, - "io_merged_recursive": null, - "io_time_recursive": null, - "sectors_recursive": null - }, - "num_procs": 0, - "storage_stats": {}, - "cpu_stats": { - "cpu_usage": { - "total_usage": 0, - "usage_in_kernelmode": 0, - "usage_in_usermode": 0 - }, - "throttling_data": { - "periods": 0, - "throttled_periods": 0, - "throttled_time": 0 - } - }, - "precpu_stats": { - "cpu_usage": { - "total_usage": 0, - "usage_in_kernelmode": 0, - "usage_in_usermode": 0 - }, - "throttling_data": { - "periods": 0, - "throttled_periods": 0, - "throttled_time": 0 - } - }, - "memory_stats": {}, - "name": "/8a44fe02-b3dc-4677-90e8-9f29090ad279-algoritm", - "id": "47b13b90ac02709adb7ac03140ca9ec2cd9731ec084f2eaa5c0b1c046551d6d3" -} - * got environment: { -id: '0x46f61c90309fcffa02e887e1a8a1ebdfeabe4f1ff279e306de2803df36bd46f7-free', -cpuNumber: 1, -cpuType: '', -gpuNumber: 0, -ramGB: 1, -diskGB: 1, -priceMin: 0, -desc: 'Free', -currentJobs: 0, -maxJobs: 1, -consumerAddress: '0x8F292046bb73595A978F4e7A131b4EBd03A15e8a', -storageExpiry: 600, -maxJobDuration: 600, -feeToken: '0x0000000000000000000000000000000000000000', -chainId: 8996, -free: true, -platform: { architecture: 'x86_64', os: 'linux' } -} - - */ - } - } + // Seems like monitoring container stats is useles... everything related with cpu/mem is at zeros + // private async checkResources(job: DBComputeJob, container: Dockerode.Container) { + // const environment = await this.getJobEnvironment(job) + // try { + // const statsRaw: any = await container.stats({ stream: false }) + // const stats = await JSON.parse( + // JSON.stringify(statsRaw) // await streamToString(statsRaw as Readable)) + // ) + // const memory = stats.memory_stats + // const cpu = stats.cpu_stats + // } catch (e) { + // console.error('error getting stats: ', e) + // } + // } // eslint-disable-next-line require-await private async cleanupJob(job: DBComputeJob) { diff --git a/src/utils/util.ts b/src/utils/util.ts index b4fff9e76..a6c4b1339 100644 --- a/src/utils/util.ts +++ b/src/utils/util.ts @@ -154,3 +154,13 @@ export function deleteKeysFromObject(source: any, keys: string[]): any { }) return source } + +export function convertGigabytesToBytes(gigabytes: number): number { + if (gigabytes < 0) { + throw new Error('Input must be a non-negative number') + } + + const bytesInAGigabyte = 1024 ** 3 // 1 gigabyte = 1024^3 bytes + const bytes = gigabytes * bytesInAGigabyte + return bytes +} From d8201c39e537b0e1502e17ec9cb5e52e783b4eb7 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Fri, 13 Dec 2024 10:00:00 +0000 Subject: [PATCH 3/6] set swap same value, disable it --- src/components/c2d/compute_engine_docker.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 3d3dff881..c1c1939ff 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -526,6 +526,8 @@ export class C2DEngineDocker extends C2DEngine { ? `0-${environment.cpuNumber}` : '0-1' hostConfig.Memory = 0 || convertGigabytesToBytes(environment.ramGB) + // set swap to same memory value means no swap (otherwise it use like 2X mem) + hostConfig.MemorySwap = hostConfig.Memory } // console.log('host config: ', hostConfig) const containerInfo: ContainerCreateOptions = { From 63e2987d496663d7a12af4005a585306d8bd1e93 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Fri, 13 Dec 2024 10:26:05 +0000 Subject: [PATCH 4/6] small refactor --- src/components/c2d/compute_engine_docker.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index c1c1939ff..a77623385 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -522,9 +522,10 @@ export class C2DEngineDocker extends C2DEngine { if (environment != null) { // limit container CPU & Memory usage according to env specs hostConfig.CpuCount = 1 || environment.cpuNumber - hostConfig.CpusetCpus = environment.cpuNumber - ? `0-${environment.cpuNumber}` - : '0-1' + // if more than 1 CPU + if (hostConfig.CpuCount > 1) { + hostConfig.CpusetCpus = `0-${hostConfig.CpuCount - 1}` + } hostConfig.Memory = 0 || convertGigabytesToBytes(environment.ramGB) // set swap to same memory value means no swap (otherwise it use like 2X mem) hostConfig.MemorySwap = hostConfig.Memory From 910c5dcdb8e86bc0a4e73b63f8de671c83da1fb4 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Fri, 13 Dec 2024 10:27:07 +0000 Subject: [PATCH 5/6] small refactor --- src/components/c2d/compute_engine_docker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index a77623385..750a37e48 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -521,7 +521,7 @@ export class C2DEngineDocker extends C2DEngine { } if (environment != null) { // limit container CPU & Memory usage according to env specs - hostConfig.CpuCount = 1 || environment.cpuNumber + hostConfig.CpuCount = environment.cpuNumber || 1 // if more than 1 CPU if (hostConfig.CpuCount > 1) { hostConfig.CpusetCpus = `0-${hostConfig.CpuCount - 1}` From 5651337fe6083d0c3efb1f7dc42a6961a65f2c53 Mon Sep 17 00:00:00 2001 From: paulo-ocean Date: Fri, 13 Dec 2024 12:47:24 +0000 Subject: [PATCH 6/6] remove some commented lines --- src/components/c2d/compute_engine_docker.ts | 26 +-------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 750a37e48..93767a025 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -12,19 +12,13 @@ import type { ComputeResult, DockerPlatform } from '../../@types/C2D/C2D.js' -// import { getProviderFeeToken } from '../../components/core/utils/feesHandler.js' import { getConfiguration } from '../../utils/config.js' import { C2DEngine } from './compute_engine_base.js' import { C2DDatabase } from '../database/C2DDatabase.js' import { create256Hash } from '../../utils/crypt.js' import { Storage } from '../storage/index.js' import Dockerode from 'dockerode' -import type { - ContainerCreateOptions, - HostConfig, - // ContainerStats, - VolumeCreateOptions -} from 'dockerode' +import type { ContainerCreateOptions, HostConfig, VolumeCreateOptions } from 'dockerode' import * as tar from 'tar' import { createWriteStream, @@ -554,7 +548,6 @@ export class C2DEngineDocker extends C2DEngine { try { const container = await this.docker.createContainer(containerInfo) - // this.checkResources(job, container) console.log('container: ', container) job.status = C2DStatusNumber.Provisioning job.statusText = C2DStatusText.Provisioning @@ -594,7 +587,6 @@ export class C2DEngineDocker extends C2DEngine { if (details.State.Running === false) { try { await container.start() - // this.checkResources(job, container) job.isStarted = true await this.db.updateJob(job) return @@ -612,7 +604,6 @@ export class C2DEngineDocker extends C2DEngine { } } } else { - // this.checkResources(job, container) // is running, we need to stop it.. console.log('running, need to stop it?') const timeNow = Date.now() / 1000 @@ -670,21 +661,6 @@ export class C2DEngineDocker extends C2DEngine { } } - // Seems like monitoring container stats is useles... everything related with cpu/mem is at zeros - // private async checkResources(job: DBComputeJob, container: Dockerode.Container) { - // const environment = await this.getJobEnvironment(job) - // try { - // const statsRaw: any = await container.stats({ stream: false }) - // const stats = await JSON.parse( - // JSON.stringify(statsRaw) // await streamToString(statsRaw as Readable)) - // ) - // const memory = stats.memory_stats - // const cpu = stats.cpu_stats - // } catch (e) { - // console.error('error getting stats: ', e) - // } - // } - // eslint-disable-next-line require-await private async cleanupJob(job: DBComputeJob) { // cleaning up