-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathRamCloudClient.java
More file actions
514 lines (463 loc) · 18.2 KB
/
RamCloudClient.java
File metadata and controls
514 lines (463 loc) · 18.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
/**
* Copyright (c) 2013 Stanford University. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/*
* This file implements a RAMCloud client DB layer for YCSB (Yahoo!'s Cloud
* Storage Benchmark suite).
*
* See https://github.com/brianfrankcooper/YCSB/wiki/Adding-a-Database and the
* com.yahoo.ycsb.DB abstract class if you're starting to hack on this.
*/
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.StringByteIterator;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.ArrayList;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.BufferUnderflowException;
import edu.stanford.ramcloud.RAMCloud;
import edu.stanford.ramcloud.RAMCloudObject;
public class RamCloudClient extends DB {
private RAMCloud ramcloud;
private HashMap<String, Long> tableIds;
public static final String LOCATOR_PROPERTY = "ramcloud.coordinatorLocator";
public static final String TABLE_SERVER_SPAN_PROPERTY = "ramcloud.tableServerSpan";
public static final String DEBUG_PROPERTY = "ramcloud.debug";
/// Success is always 0.
public static final int OK = 0;
/// YCSB interprets anything non-0 as an error, but doesn't interpret the
/// specific value.
public static final int ERROR = 1;
/// The number of servers each tablet should be split across (tables will
/// be split tablets into this many tablets). This is set via the
/// TABLE_SERVER_SPAN_PROPERTY, if given.
private int tableServerSpan = 1;
/// Value from the DEBUG_PROPERTY property. If true, print various
/// messages to stderr that give some minor insight into what's going
/// on.
private static boolean debug = false;
/**
* This method returns the 64-bit table identifier for the given table,
* creating it first if necessary.
*/
private long
getTableId(String tableName)
{
if (!tableIds.containsKey(tableName))
tableIds.put(tableName, ramcloud.createTable(tableName, tableServerSpan));
return tableIds.get(tableName);
}
/**
* Serialize the fields and values for a particular key into a byte[]
* array to be written to RAMCloud. This method uses Java's built-in
* Object serialization functionality.
*/
private static byte[]
serializeUNUSED(HashMap<String, ByteIterator> values)
{
Object object = StringByteIterator.getStringMap(values);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
try {
ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
objectStream.writeObject(object);
} catch (Exception e) {
if (debug)
System.err.println("RamCloudClient serialization failed: " + e);
}
return byteStream.toByteArray();
}
/**
* Deserialize the fields and values stored in a RAMCloud object blob
* into the given HashMap. This method uses Java's built-in Object
* deserialization functionality.
*/
private static void
deserializeUNUSED(byte[] bytes, HashMap<String, ByteIterator> into) throws DBException
{
Object object = null;
ByteArrayInputStream byteStream = new ByteArrayInputStream(bytes);
try {
ObjectInputStream objectStream = new ObjectInputStream(byteStream);
object = objectStream.readObject();
} catch (Exception e) {
if (debug)
System.err.println("RamCloudClient deserialization failed: " + e);
}
HashMap<String, String> stringMap = (HashMap<String, String>)object;
StringByteIterator.putAllAsByteIterators(into, stringMap);
}
/**
* Serialize the fields and values for a particular key into a byte[]
* array to be written to RAMCloud. This method uses a hand-coded
* serializer. It's about 1.2-3x as fast as when using Java's object
* serializer.
*/
private static byte[]
serialize(HashMap<String, ByteIterator> values)
{
byte[][] kvArray = new byte[values.size() * 2][];
int kvIndex = 0;
int totalLength = 0;
for (Map.Entry<String, ByteIterator> entry : values.entrySet()) {
totalLength += 8; // fields denoting key length and value length
byte[] keyBytes = entry.getKey().getBytes();
kvArray[kvIndex++] = keyBytes;
totalLength += keyBytes.length;
byte[] valueBytes = entry.getValue().toString().getBytes();
kvArray[kvIndex++] = valueBytes;
totalLength += valueBytes.length;
}
ByteBuffer buf = ByteBuffer.allocate(totalLength);
buf.order(ByteOrder.LITTLE_ENDIAN);
kvIndex = 0;
for (int i = 0; i < kvArray.length / 2; i++) {
byte[] keyBytes = kvArray[kvIndex++];
buf.putInt(keyBytes.length);
buf.put(keyBytes);
byte[] valueBytes = kvArray[kvIndex++];
buf.putInt(valueBytes.length);
buf.put(valueBytes);
}
return buf.array();
}
/**
* Deserialize the fields and values stored in a RAMCloud object blob
* into the given HashMap. This method uses a hand-coded deserializer.
* It's about 3-5x as fast as when using Java's object deserializer.
*/
private static void
deserialize(byte[] bytes, HashMap<String, ByteIterator> into) throws DBException
{
ByteBuffer buf = ByteBuffer.wrap(bytes);
buf.order(ByteOrder.LITTLE_ENDIAN);
boolean workingOnKey = false;
try {
while (true) {
int keyByteLength = buf.getInt();
workingOnKey = true;
String key = new String(bytes, buf.position(), keyByteLength);
buf.position(buf.position() + keyByteLength);
int valueByteLength = buf.getInt();
String value = new String(bytes, buf.position(), valueByteLength);
buf.position(buf.position() + valueByteLength);
into.put(key, new StringByteIterator(value));
workingOnKey = false;
}
} catch (BufferUnderflowException e) {
// Done, hopefully.
if (buf.remaining() != 0 || workingOnKey) {
throw new DBException("deserialize: ByteBuffer not parsed " +
"properly! Had " + buf.remaining() + " bytes left over!");
}
}
}
/**
* Initialize any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public void
init() throws DBException
{
assert ramcloud == null;
assert tableIds == null;
Properties props = getProperties();
String locator = props.getProperty(LOCATOR_PROPERTY);
if (locator == null)
throw new DBException("Missing property " + LOCATOR_PROPERTY);
String tableServerSpanString = props.getProperty(TABLE_SERVER_SPAN_PROPERTY);
if (tableServerSpanString != null)
tableServerSpan = new Integer(tableServerSpanString).intValue();
if (props.getProperty(DEBUG_PROPERTY) != null)
debug = true;
if (debug)
System.err.println("RamCloudClient connecting to " + locator + " ...");
ramcloud = new RAMCloud(locator);
tableIds = new HashMap<String, Long>();
}
/**
* Cleanup any state for this DB.
* Called once per DB instance; there is one DB instance per client thread.
*/
public void
cleanup() throws DBException
{
if (debug)
System.err.println("RamCloudClient disconnecting ...");
ramcloud.disconnect();
ramcloud = null;
tableIds = null;
}
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error.
*/
@Override
public int
delete(String table, String key) {
try {
ramcloud.remove(getTableId(table), key);
} catch (Exception e) {
if (debug)
System.err.println("RamCloudClient delete threw: " + e);
return ERROR;
}
return OK;
}
/**
* Insert a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error.
*/
@Override
public int
insert(String table, String key, HashMap<String, ByteIterator> values)
{
byte[] value = serialize(values);
try {
ramcloud.write(getTableId(table), key, value);
} catch (Exception e) {
if (debug)
System.err.println("RamCloudClient insert threw: " + e);
return ERROR;
}
return OK;
}
/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found".
*/
@Override
public int
read(String table,
String key,
Set<String> fields,
HashMap<String, ByteIterator> result)
{
RAMCloudObject object = null;
HashMap<String, ByteIterator> map = new HashMap<String, ByteIterator>();
try {
object = ramcloud.read(getTableId(table), key);
} catch (Exception e) {
if (debug)
System.err.println("RamCloudClient read threw: " + e);
return ERROR;
}
try {
deserialize(object.getValueBytes(), map);
} catch (DBException e) {
if (debug)
System.err.println("RamCloudClient deserializer threw: " + e);
return ERROR;
}
if (fields == null) {
result.putAll(map);
} else {
for (String field : fields) {
if (!map.containsKey(field))
return ERROR;
result.put(field, map.get(field));
}
}
return OK;
}
/**
* Perform a range scan for a set of records in the database. Each field/value
* pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value
* pairs for one record
* @return Zero on success, a non-zero error code on error.
*/
@Override
public int
scan(String table,
String startkey,
int recordcount,
Set<String> fields,
Vector<HashMap<String, ByteIterator>> result)
{
if (debug)
System.err.println("Warning: RAMCloud doesn't support range scans yet.");
return ERROR;
}
/**
* Update a record in the database. Any field/value pairs in the specified
* values HashMap will be written into the record with the specified record
* key, overwriting any existing values with the same field name.
*
* The YCSB documentation makes no explicit mention of this, but as far as
* I can gather an update here is akin to a SQL update. That is, we are
* expected to preserve the values that aren't being updated. For RAMCloud,
* this means having to do a read-modify-write.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error.
*/
@Override
public int
update(String table, String key, HashMap<String, ByteIterator> values)
{
// XXX- Should we use conditional ops to ensure the RMW is atomic?
HashMap<String, ByteIterator> oldValues = new HashMap<String, ByteIterator>();
read(table, key, null, oldValues);
oldValues.putAll(values);
return insert(table, key, oldValues);
}
/***************************************************************************
* The following methods are for debugging / benchmarking this class as a
* standalone application.
**************************************************************************/
private static HashMap<String, ByteIterator>
generateValues(int numFields, int fieldWidth)
{
HashMap<String, ByteIterator> values = new HashMap<String, ByteIterator>();
String fieldValuePrototype = "";
for (int i = 0; i < fieldWidth; i++)
fieldValuePrototype += "!";
for (int i = 0; i < numFields; i++) {
String fieldValue = fieldValuePrototype.substring(0, fieldWidth - ("" + i).length()) + i;
values.put("field" + i, new StringByteIterator(fieldValue));
}
return values;
}
private static double
measureReadLatency(RamCloudClient client, int numFields, int fieldWidth)
{
HashMap<String, ByteIterator> values = generateValues(numFields, fieldWidth);
client.insert("theTable", "theKey", values);
HashMap<String, ByteIterator> result = new HashMap<String, ByteIterator>();
long before = System.nanoTime();
for (int i = 0; i < 100000; i++) {
client.read("theTable", "theKey", null, result);
result.clear();
}
long after = System.nanoTime();
client.delete("theTable", "theKey");
return ((double)(after - before) / 100000 / 1000);
}
private static double
measureWriteLatency(RamCloudClient client, int numFields, int fieldWidth)
{
HashMap<String, ByteIterator> values = generateValues(numFields, fieldWidth);
long before = System.nanoTime();
for (int i = 0; i < 100000; i++)
client.insert("theTable", "theKey", values);
long after = System.nanoTime();
client.delete("theTable", "theKey");
return ((double)(after - before) / 100000 / 1000);
}
private static double
measureSerializerLatency(int numFields, int fieldWidth)
{
HashMap<String, ByteIterator> values = generateValues(numFields, fieldWidth);
long before = System.nanoTime();
for (int i = 0; i < 100000; i++) {
byte[] serialized = serialize(values);
}
long after = System.nanoTime();
return ((double)(after - before) / 100000 / 1000);
}
private static double
measureDeserializerLatency(int numFields, int fieldWidth)
{
HashMap<String, ByteIterator> values = generateValues(numFields, fieldWidth);
byte[] serialized = serialize(values);
HashMap<String, ByteIterator> results = new HashMap<String, ByteIterator>();
long before = System.nanoTime();
for (int i = 0; i < 100000; i++) {
try {
deserialize(serialized, results);
} catch (DBException e) {
}
results.clear();
}
long after = System.nanoTime();
return ((double)(after - before) / 100000 / 1000);
}
public static void
main(String argv[])
{
if (argv.length != 1) {
System.err.println("Error: first argument must be the " +
"coordinator ServiceLocator string");
return;
}
// argv[0] is the coordinatorLocator
RamCloudClient client = new RamCloudClient();
Properties props = client.getProperties();
props.setProperty(LOCATOR_PROPERTY, argv[0]);
try {
client.init();
} catch (DBException e) {
System.err.println("Failed to initialize RamCloudClient: " + e);
return;
}
// Warm up first.
measureReadLatency(client, 5, 10);
int[] fieldCounts = { 1, 2, 3, 5, 10, 20, 50, 100 };
// Measure read performance. The RAMCloud bindings are good for about
// 7.5us reads on Infiniband, so we need to be careful that our
// field serialisation is fast.
for (int fields : fieldCounts) {
System.out.println("Avg read latency (" + fields +
" field(s)): " + measureReadLatency(client, fields, 100) + " us");
}
// And now write performance...
for (int fields : fieldCounts) {
System.out.println("Avg write latency (" + fields +
" field(s)): " + measureWriteLatency(client, fields, 100) + " us");
}
// And let's see how fast our serializer and deserializer are...
for (int fields : fieldCounts) {
System.out.println("Avg serialization / deserialization latency (" + fields +
" field(s)): " + measureSerializerLatency(fields, 100) + " / " +
measureDeserializerLatency(fields, 100) + " us");
}
}
}