Skip to content

Commit

Permalink
Update code and docs about CCNP (#189)
Browse files Browse the repository at this point in the history
CCNP SDK has been updated, update related code and docs accordingly

Signed-off-by: Hao, Ruomeng <[email protected]>
  • Loading branch information
ruomengh authored May 30, 2024
1 parent c6e39c8 commit c7eb278
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 35 deletions.
24 changes: 12 additions & 12 deletions cnap/core/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
import base64
from hashlib import sha1, sha256, sha384, sha512

from ccnp.eventlog.eventlog_sdk import CCEventLogEntry, CCAlgorithms
from ccnp import Measurement, MeasurementType
from ccnp import CcnpSdk
from cctrusted_base.tcg import TcgAlgorithmRegistry

LOG = logging.getLogger(__name__)

IMR_VERIFY_COUNT = 3

def replay_event_log(event_logs: list[CCEventLogEntry]) -> dict:
def replay_event_log(event_logs) -> dict:
"""Replay event logs by Integrated Measurement Register (IMR) index.
Args:
event_logs (list[CCEventLogEntry]): Event logs fetched by CCNP.
event_logs: Event logs fetched by CCNP.
Returns:
dict: A dictionary containing the replay result displayed by IMR index and hash algorithm.
Expand All @@ -47,13 +47,13 @@ def replay_event_log(event_logs: list[CCEventLogEntry]) -> dict:

alg_id = event_log.alg_id.algo_id
# Check algorithm type and prepare for replay
if alg_id == CCAlgorithms.ALG_SHA1:
if alg_id == TcgAlgorithmRegistry.TPM_ALG_SHA1:
algo = sha1()
elif alg_id == CCAlgorithms.ALG_SHA384:
elif alg_id == TcgAlgorithmRegistry.TPM_ALG_SHA384:
algo = sha384()
elif alg_id == CCAlgorithms.ALG_SHA256:
elif alg_id == TcgAlgorithmRegistry.TPM_ALG_SHA256:
algo = sha256()
elif alg_id == CCAlgorithms.ALG_SHA512:
elif alg_id == TcgAlgorithmRegistry.TPM_ALG_SHA512:
algo = sha512()
else:
LOG.error("Unsupported hash algorithm %d", alg_id)
Expand Down Expand Up @@ -89,9 +89,9 @@ def verify_event_log(measurement_dict: dict) -> bool:
for index in range(IMR_VERIFY_COUNT):
# Fectch IMR measurement
LOG.info("Fetch measurements in IMR[%d]", index)
imr_measurement = base64.b64decode(Measurement.get_platform_measurement(
MeasurementType.TYPE_TDX_RTMR, None, index))
LOG.info("IMR[%d](measurement): %s", index, imr_measurement.hex())
imr_measurement = base64.b64decode(CcnpSdk.inst().get_cc_measurement(
[index, 12]))
LOG.info("IMR[%d](measurement): %s", index, imr_measurement.hash.hex())

# Get IMR value from replayed event log
if index not in measurement_dict or measurement_dict[index] == {}:
Expand All @@ -102,7 +102,7 @@ def verify_event_log(measurement_dict: dict) -> bool:
imr_replayed = value
break

LOG.info("IMR[%d](replayed): %s", index, imr_replayed.hex())
LOG.info("IMR[%d](replayed): %s", index, imr_replayed.hash.hex())
if imr_measurement == imr_replayed:
LOG.info("IMR[%d] passed the verification.", index)
else:
Expand Down
10 changes: 5 additions & 5 deletions cnap/core/keybroker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
import struct
import requests

from ccnp import Eventlog, Quote
from ccnp import CcnpSdk
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

from core.eventlog import replay_event_log, verify_event_log
from core.eventlog import verify_event_log

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -120,10 +120,10 @@ def get_key(self, server_url: str, key_id: str) -> bytes: # pylint: disable=too-
# Get and verify event logs before get quote.
# The exectuion environment judgment will be implemented by ccnp in the future.
LOG.debug("Getting event log by CCNP")
event_logs = Eventlog.get_platform_eventlog()
event_logs = CcnpSdk.inst().get_cc_eventlog()
if event_logs is None:
raise RuntimeError("Get event log failed")
measurement_dict = replay_event_log(event_logs)
measurement_dict = CcnpSdk.inst().replay_cc_eventlog(event_logs)
if verify_event_log(measurement_dict):
LOG.info("Event log verify successfully.\n")
else:
Expand All @@ -137,7 +137,7 @@ def get_key(self, server_url: str, key_id: str) -> bytes: # pylint: disable=too-

LOG.debug("Getting TDX Quote by CCNP")
user_data = base64.b64encode(pubkey_der).decode('utf-8')
quote = Quote.get_quote(user_data=user_data)
quote = CcnpSdk.inst().get_cc_report(data=user_data).dump()
if quote is None:
raise RuntimeError("Get TDX Quote failed")
quote = base64.b64encode(quote.quote).decode('utf-8')
Expand Down
3 changes: 2 additions & 1 deletion cnap/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ccnp==0.0.2
ccnp
cctrusted_base
Flask==3.0.0
Flask-Cors==4.0.1
kafka-python==2.0.2
Expand Down
35 changes: 18 additions & 17 deletions docs/How_to_Protect_AI_Models_in_Cloud_Native_Environments.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,27 @@ The service supports attestation, measurement fetching and event logs collecting
CCNP is a good choice to fetch these evidences including measurements and event logs, which hides the complexity of the underlying platforms and increase the usability of the APIs. Here's the sample code using CCNP:

```Python
from ccnp import Eventlog
event_logs = Eventlog.get_platform_eventlog()
from ccnp import Ccnpsdk
event_logs = CcnpSdk.inst().get_cc_eventlog()
```

To verify that the event logs have not been tampered with, we can compare the measurement replayed from event logs with the IMR (Integrated Measurement Register) values fetched using CCNP.
Here's the sample code using CCNP to fetch IMR values (use Intel TDX RTMR as example):

```Python
from ccnp import Measurement, MeasurementType
imr_measurement = Measurement.get_platform_measurement(MeasurementType.TYPE_TDX_RTMR, None, 1)
from ccnp import Ccnpsdk
imr_measurement = CcnpSdk.inst().get_cc_measurement([index, 12])
```

CCNP API detail documentation can be found [here](https://intel.github.io/confidential-cloud-native-primitives/).
CCNP API detail documentation can be found [here](https://cc-api.github.io/confidential-cloud-native-primitives/).

### 1.3 Attestation by using Confidential Cloud-Native Primitives (CCNP)

To get the key to decrypt the model, we need provide the quote of TEE for attestation, CCNP is a good choice to get the quote and it hides the complexity and is easy to use, sample code from CCNP:

```Python
from ccnp import Quote
quote=Quote.get_quote()
from ccnp import Ccnpsdk
quote = CcnpSdk.inst().get_cc_report().dump()
```

### 1.4 AI Model Decryption
Expand Down Expand Up @@ -221,20 +221,20 @@ We can fetch, replay and verify event logs before attestation, the sample code:
```Python
import logging

from ccnp import Eventlog, Measurement, MeasurementType
from ccnp import Ccnpsdk

LOG = logging.getLogger(__name__)
IMR_VERIFY_COUNT = 3

# Fetch event logs using CCNP and replay.
event_logs = Eventlog.get_platform_eventlog()
measurement_dict = replay(event_logs)
event_logs = CcnpSdk.inst().get_cc_eventlog()
measurement_dict = CcnpSdk.inst().replay_cc_eventlog(event_logs)

# Fetch IMR measurement (use Intel TDX RTMR as example) and verify with replayed value.
for index in range(IMR_VERIFY_COUNT):
# Fectch IMR measurement
imr_measurement = base64.b64decode(Measurement.get_platform_measurement(
MeasurementType.TYPE_TDX_RTMR, None, index))
imr_measurement = base64.b64decode(CcnpSdk.inst().get_cc_measurement(
[index, 12])

# Get IMR value from replayed event logs
for value in measurement_dict[index].values():
Expand All @@ -261,15 +261,16 @@ The sample code to get the quote with user data:
```Python
import base64

from ccnp import Quote
from ccnp import Ccnpsdk
from cryptography.hazmat.primitives.asymmetric import rsa

private_key = rsa.generate_private_key(public_exponent=65537, key_size=3072)
pubkey = private_key.public_key()
pubkey_der = pubkey.public_bytes(encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
user_data = base64.b64encode(pubkey_der).decode('utf-8')
quote = Quote.get_quote(user_data=user_data)
quote = Ccnpsdk.get_cc_report(data=user_data).dump()
quote = base64.b64encode(quote.quote).decode('utf-8')
```

### 2.7 AI Model Decryption
Expand All @@ -286,8 +287,7 @@ uint32 IV length | uint32 tag length | uint32 data length

To decrypt the data, here are some sample code:

```
```Python
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

def decrypt_data(encrypted_data, key) -> bytes:
Expand Down Expand Up @@ -317,4 +317,5 @@ Intel’s TDX technology can provide a TEE running environment, and CCNP can sim
1. Model Provider: https://github.com/intel/cloud-native-ai-pipeline/blob/main/cnap/core/modelprovider.py
2. Key Broker Client: https://github.com/intel/cloud-native-ai-pipeline/blob/main/cnap/core/keybroker.py
3. CCNP: https://github.com/cc-api/confidential-cloud-native-primitives
4. TCG_PCClient Spec: https://trustedcomputinggroup.org/wp-content/uploads/TCG_PCClientSpecPlat_TPM_2p0_1p04_pub.pdf
4. CC Trusted API: https://github.com/cc-api/cc-trusted-api
5. TCG_PCClient Spec: https://trustedcomputinggroup.org/wp-content/uploads/TCG_PCClientSpecPlat_TPM_2p0_1p04_pub.pdf
Binary file modified docs/secure-model-design.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit c7eb278

Please sign in to comment.