Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 80 additions & 7 deletions node/machine_passport_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,65 @@ def require_admin():
return None


def _is_admin_request() -> bool:
"""Non-erroring admin check: True iff a valid admin key is present. Used to
decide whether a public GET returns the full record or the redacted view."""
admin_key = request.headers.get('X-Admin-Key', '') or request.headers.get('X-API-Key', '')
expected = os.environ.get('ADMIN_KEY', '')
if not expected or not admin_key:
return False
return hmac.compare_digest(admin_key.encode('utf-8'), expected.encode('utf-8'))


# Machine-passport GETs stay PUBLIC (provenance/showcase data, like the Green
# Tracker), but the genuinely-sensitive fields are stripped from the public view;
# an admin-keyed request gets the full record. Sensitive in two ways:
# PRIVACY: repair-log technician / free-text notes / cost.
# ANTI-SPOOFING: the benchmark signatures ARE the raw hardware fingerprint
# (cache-timing profile, SIMD identity, thermal curve, entropy throughput,
# compute/bandwidth) — publishing them would hand an attacker the exact
# profile to mimic, so they're admin-only. The attestation entropy_score is
# a derived summary of the same and is stripped too.
# Public still sees: name, architecture, year, photos, provenance, restoration
# story (repair type/description/parts), earnings stats, ownership lineage, and
# that a benchmark exists (timestamp) — none of which reveals a fingerprint.
_REPAIR_PRIVATE_FIELDS = ('technician', 'notes', 'cost_rtc')
_ATTEST_PRIVATE_FIELDS = ('entropy_score',)
_BENCHMARK_PRIVATE_FIELDS = (
'cache_timing_profile', 'simd_identity', 'thermal_curve',
'memory_bandwidth', 'compute_score', 'entropy_throughput',
)


def _redact_entries(entries, private_fields):
"""Return a copy of a list-of-dicts with private fields removed (public view)."""
if not isinstance(entries, list):
return entries
out = []
for e in entries:
if isinstance(e, dict):
out.append({k: v for k, v in e.items() if k not in private_fields})
else:
out.append(e)
return out


def _public_passport_full(data):
"""Redact private fields from a full passport export for the public view."""
if not isinstance(data, dict):
return data
redacted = dict(data)
if 'repair_log' in redacted:
redacted['repair_log'] = _redact_entries(redacted['repair_log'], _REPAIR_PRIVATE_FIELDS)
for key in ('attestations', 'attestation_history'):
if key in redacted:
redacted[key] = _redact_entries(redacted[key], _ATTEST_PRIVATE_FIELDS)
for key in ('benchmarks', 'benchmark_signatures'):
if key in redacted:
redacted[key] = _redact_entries(redacted[key], _BENCHMARK_PRIVATE_FIELDS)
return redacted


def get_optional_json_object():
"""Return an optional JSON object body or an error response."""
data = request.get_json(silent=True)
Expand Down Expand Up @@ -121,8 +180,10 @@ def get_passport(machine_id: str):
"""
ledger = get_ledger()
data = ledger.export_passport_full(machine_id)

if data:
if not _is_admin_request():
data = _public_passport_full(data)
return jsonify({
'ok': True,
'passport': data,
Expand Down Expand Up @@ -185,11 +246,14 @@ def get_repair_log(machine_id: str):

if not passport:
return jsonify({'ok': False, 'error': 'passport_not_found'}), 404


repair_log = ledger.get_repair_log(machine_id)
if not _is_admin_request():
repair_log = _redact_entries(repair_log, _REPAIR_PRIVATE_FIELDS)
return jsonify({
'ok': True,
'machine_id': machine_id,
'repair_log': ledger.get_repair_log(machine_id),
'repair_log': repair_log,
})


Expand All @@ -204,11 +268,14 @@ def get_attestations(machine_id: str):

if not passport:
return jsonify({'ok': False, 'error': 'passport_not_found'}), 404


attestations = ledger.get_attestation_history(machine_id)
if not _is_admin_request():
attestations = _redact_entries(attestations, _ATTEST_PRIVATE_FIELDS)
return jsonify({
'ok': True,
'machine_id': machine_id,
'attestations': ledger.get_attestation_history(machine_id),
'attestations': attestations,
})


Expand All @@ -223,11 +290,17 @@ def get_benchmarks(machine_id: str):

if not passport:
return jsonify({'ok': False, 'error': 'passport_not_found'}), 404


benchmarks = ledger.get_benchmark_signatures(machine_id)
if not _is_admin_request():
# The benchmark measurements ARE the raw hardware fingerprint — strip
# them from the public view (a timestamped, measurement-less record
# remains so the public can see a benchmark exists).
benchmarks = _redact_entries(benchmarks, _BENCHMARK_PRIVATE_FIELDS)
return jsonify({
'ok': True,
'machine_id': machine_id,
'benchmarks': ledger.get_benchmark_signatures(machine_id),
'benchmarks': benchmarks,
})


Expand Down
106 changes: 106 additions & 0 deletions node/tests/test_machine_passport.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,109 @@ def run_tests():
else:
print("\n❌ Some tests failed")
sys.exit(1)


class TestPassportPublicRedaction(unittest.TestCase):
"""Passport GETs stay public but redact private fields without an admin key."""

def setUp(self):
from flask import Flask
import machine_passport_api
from machine_passport_api import machine_passport_bp

self._prev_admin_key = os.environ.get('ADMIN_KEY')
os.environ.pop('ADMIN_KEY', None)

self.app = Flask(__name__)
self.app.config['TESTING'] = True
self.app.register_blueprint(machine_passport_bp)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.db')
tmp.close()
machine_passport_api.PASSPORT_DB_PATH = tmp.name
machine_passport_api._ledger = None
self.client = self.app.test_client()
self.mp_api = machine_passport_api

ledger = machine_passport_api.get_ledger()
ledger.create_passport(MachinePassport(
machine_id='redact_test', name='Old Faithful', owner_miner_id='miner-1',
manufacture_year=2001, architecture='pentium4'))
ledger.add_repair_entry(
machine_id='redact_test', repair_date=1, repair_type='recap',
description='replaced caps', technician='Tech Shop',
cost_rtc=50_000_000, notes='private internal note')
ledger.add_attestation(
machine_id='redact_test', attestation_ts=1, epoch=5,
total_rtc_earned=1_000_000, benchmark_hash='abc', entropy_score=0.91)
ledger.add_benchmark(
machine_id='redact_test', benchmark_ts=1,
cache_timing_profile='L1:3ns L2:9ns', simd_identity='altivec-bias-7',
thermal_curve='cold:40 warm:62', memory_bandwidth=2.5,
compute_score=88.0, entropy_throughput=1.3)

def tearDown(self):
if os.path.exists(self.mp_api.PASSPORT_DB_PATH):
os.unlink(self.mp_api.PASSPORT_DB_PATH)
self.mp_api._ledger = None
if self._prev_admin_key is None:
os.environ.pop('ADMIN_KEY', None)
else:
os.environ['ADMIN_KEY'] = self._prev_admin_key

def _admin(self):
os.environ['ADMIN_KEY'] = 'k'
return {'X-Admin-Key': 'k'}

def test_repair_log_public_redacts_private_fields(self):
entry = json.loads(self.client.get('/api/machine-passport/redact_test/repair-log').data)['repair_log'][0]
for f in ('technician', 'notes', 'cost_rtc'):
self.assertNotIn(f, entry)
self.assertIn('description', entry) # showcase field stays

def test_repair_log_admin_sees_full(self):
entry = json.loads(self.client.get('/api/machine-passport/redact_test/repair-log',
headers=self._admin()).data)['repair_log'][0]
self.assertEqual(entry['technician'], 'Tech Shop')
self.assertEqual(entry['cost_rtc'], 50_000_000)

def test_attestations_public_redacts_entropy(self):
a = json.loads(self.client.get('/api/machine-passport/redact_test/attestations').data)['attestations'][0]
self.assertNotIn('entropy_score', a)
self.assertIn('benchmark_hash', a) # hash stays public

def test_attestations_admin_sees_entropy(self):
a = json.loads(self.client.get('/api/machine-passport/redact_test/attestations',
headers=self._admin()).data)['attestations'][0]
self.assertEqual(a['entropy_score'], 0.91)

def test_listing_stays_public(self):
resp = self.client.get('/api/machine-passport')
self.assertEqual(resp.status_code, 200) # NOT 401 — keep-public preserved
self.assertEqual(json.loads(resp.data)['count'], 1)

def test_benchmarks_public_strips_fingerprint(self):
b = json.loads(self.client.get('/api/machine-passport/redact_test/benchmarks').data)['benchmarks'][0]
for f in ('cache_timing_profile', 'simd_identity', 'thermal_curve',
'memory_bandwidth', 'compute_score', 'entropy_throughput'):
self.assertNotIn(f, b) # raw fingerprint must not leak publicly
self.assertIn('benchmark_ts', b) # existence/timestamp stays public

def test_benchmarks_admin_sees_fingerprint(self):
b = json.loads(self.client.get('/api/machine-passport/redact_test/benchmarks',
headers=self._admin()).data)['benchmarks'][0]
self.assertEqual(b['cache_timing_profile'], 'L1:3ns L2:9ns')
self.assertEqual(b['simd_identity'], 'altivec-bias-7')

def test_full_export_public_redacts_all_sensitive(self):
p = json.loads(self.client.get('/api/machine-passport/redact_test').data)['passport']
self.assertNotIn('technician', p['repair_log'][0])
self.assertNotIn('entropy_score', p['attestation_history'][0])
self.assertNotIn('cache_timing_profile', p['benchmark_signatures'][0])

def test_full_export_admin_sees_all(self):
p = json.loads(self.client.get('/api/machine-passport/redact_test',
headers=self._admin()).data)['passport']
self.assertEqual(p['repair_log'][0]['technician'], 'Tech Shop')
self.assertEqual(p['benchmark_signatures'][0]['simd_identity'], 'altivec-bias-7')


Loading