Skip to content

Commit 996c540

Browse files
committed
Fix inventory flushing
- it's a combo of handling a tuple and pickling of raw data, and is triggered under rare occasions
1 parent a9cc560 commit 996c540

4 files changed

Lines changed: 166 additions & 3 deletions

File tree

src/helper_startup.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232

3333
logger = logging.getLogger('default')
3434

35+
#: The latest version of the keys.dat settings schema. Bump this
36+
#: when adding a new migration step in :func:`updateConfig` or
37+
#: :class:`class_sqlThread.sqlThread`.
38+
LATEST_SETTINGS_VERSION = 10
39+
3540
# The user may de-select Portable Mode in the settings if they want
3641
# the config files to stay in the application data folder.
3742
StoreConfigFilesInSameDirectoryAsProgramByDefault = False
@@ -72,7 +77,9 @@ def loadConfig():
7277
# no config file (or it cannot be accessed). Create config file.
7378
# config.add_section('bitmessagesettings')
7479
config.read()
75-
config.set('bitmessagesettings', 'settingsversion', '10')
80+
config.set(
81+
'bitmessagesettings', 'settingsversion',
82+
str(LATEST_SETTINGS_VERSION))
7683
if 'linux' in sys.platform:
7784
config.set('bitmessagesettings', 'minimizetotray', 'false')
7885
# This isn't implimented yet and when True on

src/storage/sqlite.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ def flush(self):
109109
for objectHash, value in self._inventory.items():
110110
sql.execute(
111111
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
112-
sqlite3.Binary(objectHash), *value)
112+
sqlite3.Binary(objectHash),
113+
value.type,
114+
value.stream,
115+
sqlite3.Binary(value.payload),
116+
value.expires,
117+
sqlite3.Binary(value.tag))
113118
self._inventory.clear()
114119

115120
def clean(self):

src/tests/test_config_process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import tempfile
77
from pybitmessage.bmconfigparser import config
8+
from pybitmessage.helper_startup import LATEST_SETTINGS_VERSION
89
from .test_process import TestProcessProto
910
from .common import skip_python3
1011

@@ -22,7 +23,8 @@ def test_config_defaults(self):
2223
config.read(os.path.join(self.home, 'keys.dat'))
2324

2425
self.assertEqual(config.safeGetInt(
25-
'bitmessagesettings', 'settingsversion'), 10)
26+
'bitmessagesettings', 'settingsversion'),
27+
LATEST_SETTINGS_VERSION)
2628
self.assertEqual(config.safeGetInt(
2729
'bitmessagesettings', 'port'), 8444)
2830
# don't connect

src/tests/test_inventory_flush.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
"""Tests for SqliteInventory.flush()"""
2+
# pylint: disable=protected-access,wrong-import-order,wrong-import-position
3+
# pylint: disable=import-outside-toplevel
4+
5+
import os
6+
import struct
7+
import tempfile
8+
import threading
9+
import time
10+
11+
from .common import skip_python3
12+
from .partial import TestPartialRun
13+
14+
skip_python3()
15+
16+
17+
class TestInventoryFlush(TestPartialRun):
18+
"""
19+
Integration test: exercises flush() end-to-end with the real sqlThread
20+
consumer running, so that type errors in parameter binding surface here
21+
rather than silently killing a production thread.
22+
"""
23+
24+
@classmethod
25+
def setUpClass(cls):
26+
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
27+
super(TestInventoryFlush, cls).setUpClass()
28+
29+
from bmconfigparser import config, config_ready
30+
from class_sqlThread import sqlThread
31+
from helper_sql import sql_ready, sqlStoredProcedure
32+
from helper_startup import LATEST_SETTINGS_VERSION
33+
from storage.sqlite import SqliteInventory
34+
35+
cls._sqlStoredProcedure = staticmethod(sqlStoredProcedure)
36+
37+
# sqlThread.run() waits on config_ready and then reads
38+
# settingsversion; normally helper_startup.loadConfig() handles
39+
# both, but TestPartialRun only calls config.read() which loads
40+
# default.ini (no settingsversion). Set the minimum the
41+
# sqlThread needs so it can initialise the database.
42+
if not config.has_option(
43+
'bitmessagesettings', 'settingsversion'):
44+
config.set(
45+
'bitmessagesettings', 'settingsversion',
46+
str(LATEST_SETTINGS_VERSION))
47+
config_ready.set()
48+
49+
sql_lookup = sqlThread()
50+
sql_lookup.daemon = True
51+
sql_lookup.start()
52+
sql_ready.wait()
53+
cls.inventory = SqliteInventory()
54+
55+
@classmethod
56+
def tearDownClass(cls):
57+
cls._sqlStoredProcedure('exit')
58+
for thread in threading.enumerate():
59+
if thread.name == "SQL":
60+
thread.join(timeout=10)
61+
super(TestInventoryFlush, cls).tearDownClass()
62+
63+
# -- helpers ----------------------------------------------------------
64+
65+
@staticmethod
66+
def _make_hash(seed):
67+
"""Return a 32-byte hash derived from *seed*."""
68+
return (b'\x00' * 31 + bytes([seed & 0xFF]))[-32:]
69+
70+
def _flush_and_check(self, obj_hash):
71+
"""
72+
Flush the inventory to the database, clear the _objects lookup
73+
cache so that __contains__ is forced to hit sqlite, then verify
74+
the hash is found via the normal inventory API.
75+
"""
76+
self.inventory.flush()
77+
self.inventory._objects.clear()
78+
self.assertIn(obj_hash, self.inventory)
79+
80+
# -- test cases -------------------------------------------------------
81+
82+
def test_flush_with_bytes_payload(self):
83+
"""Baseline: payload and tag are plain bytes."""
84+
h = self._make_hash(1)
85+
self.inventory[h] = (
86+
2, 1, b'\x80\x01' + os.urandom(64),
87+
int(time.time()) + 3600, b'\xff' * 32)
88+
self._flush_and_check(h)
89+
90+
def test_flush_with_memoryview_payload(self):
91+
"""
92+
Reproduce the production crash: payload and tag as memoryview
93+
cause 'Error binding parameter 3 - probably unsupported type.'
94+
"""
95+
h = self._make_hash(2)
96+
self.inventory[h] = (
97+
2, 1, memoryview(b'\x80\x02' + os.urandom(64)),
98+
int(time.time()) + 3600, memoryview(b'\xee' * 32))
99+
self._flush_and_check(h)
100+
101+
def test_flush_with_bytearray_payload(self):
102+
"""bytearray is another bytes-like type that could trip sqlite3."""
103+
h = self._make_hash(3)
104+
self.inventory[h] = (
105+
2, 1, bytearray(b'\x80\x03' + os.urandom(64)),
106+
int(time.time()) + 3600, bytearray(b'\xdd' * 32))
107+
self._flush_and_check(h)
108+
109+
def test_flush_with_empty_tag(self):
110+
"""Empty tag (b'') must not break the INSERT."""
111+
h = self._make_hash(4)
112+
self.inventory[h] = (
113+
2, 1, b'\x80\x04' + os.urandom(64),
114+
int(time.time()) + 3600, b'')
115+
self._flush_and_check(h)
116+
117+
# pylint: disable=redefined-variable-type
118+
def test_flush_multiple_mixed_types(self):
119+
"""Flush a batch of items with mixed blob types."""
120+
count = 20
121+
hashes = [self._make_hash(0x10 + i) for i in range(count)]
122+
expires = int(time.time()) + 3600
123+
124+
for i, h in enumerate(hashes):
125+
payload = struct.pack('>I', i) + os.urandom(60)
126+
tag = struct.pack('>I', i) + b'\x00' * 28
127+
if i % 3 == 0:
128+
payload = memoryview(payload)
129+
tag = memoryview(tag)
130+
elif i % 3 == 1:
131+
payload = bytearray(payload)
132+
tag = bytearray(tag)
133+
self.inventory[h] = (2, 1, payload, expires, tag)
134+
135+
self.inventory.flush()
136+
self.inventory._objects.clear()
137+
138+
for i, h in enumerate(hashes):
139+
self.assertIn(
140+
h, self.inventory,
141+
"Item {} missing after batch flush".format(i))
142+
143+
def test_flush_clears_memory_cache(self):
144+
"""After flush the in-memory _inventory dict must be empty."""
145+
h = self._make_hash(0xF0)
146+
self.inventory[h] = (
147+
2, 1, b'\x00' * 32, int(time.time()) + 3600, b'')
148+
self.inventory.flush()
149+
self.assertEqual(len(self.inventory._inventory), 0)

0 commit comments

Comments
 (0)