Skip to content

Commit 026c376

Browse files
committed
Add machine section for global state.
Add networking instructions. Add functions with different set of arguments.
1 parent 0e44c3d commit 026c376

10 files changed

Lines changed: 286 additions & 41 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use serde::Deserialize;
33
use std::{fmt::Display, net::Ipv4Addr};
44
use syscalls::Sysno;
55

6+
pub mod machine;
67
pub mod script;
78
pub mod worker;
89

src/machine.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use crate::script::ast::MachineInstruction;
2+
3+
use log::{debug, trace};
4+
use std::{
5+
io::{prelude::*, BufReader},
6+
net::TcpListener,
7+
thread,
8+
};
9+
10+
#[derive(Debug)]
11+
pub enum MachineError {
12+
Internal,
13+
}
14+
15+
fn start_server(addr: String, target_port: u16) -> Result<(), MachineError> {
16+
debug!("Starting server at {:?}:{:?}", addr, target_port);
17+
18+
let listener = TcpListener::bind((addr, target_port)).unwrap();
19+
20+
for stream in listener.incoming() {
21+
let mut stream = stream.unwrap();
22+
23+
// As a simplest solution to keep a connection open, spawn a
24+
// thread. It's not the best one though, as we waste resources.
25+
// For the purpose of only keeping connections open we could e.g.
26+
// spawn only two threads, where the first one receives connections
27+
// and adds streams into the list of active, and the second iterates
28+
// through streams and replies. This way the connections will have
29+
// high latency, but for the purpose of networking workload it
30+
// doesn't matter.
31+
thread::spawn(move || loop {
32+
let mut buf_reader = BufReader::new(&stream);
33+
let mut buffer = String::new();
34+
35+
match buf_reader.read_line(&mut buffer) {
36+
Ok(0) => {
37+
// EOF, exit
38+
trace!("EOF");
39+
return;
40+
}
41+
Ok(_n) => {
42+
trace!("Received {:?}", buffer);
43+
44+
let response = "hello\n";
45+
match stream.write_all(response.as_bytes()) {
46+
Ok(_) => {
47+
// Response is sent, handle the next one
48+
}
49+
Err(e) => {
50+
trace!("ERROR: sending response, {}", e);
51+
break;
52+
}
53+
}
54+
}
55+
Err(e) => {
56+
trace!("ERROR: reading a line, {}", e)
57+
}
58+
}
59+
});
60+
}
61+
62+
Ok(())
63+
}
64+
65+
pub fn apply(instr: MachineInstruction) -> Result<(), MachineError> {
66+
match instr {
67+
MachineInstruction::Server { port } => {
68+
start_server("127.0.0.1".to_string(), port)
69+
}
70+
}
71+
}

src/main.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use core_affinity::CoreId;
2222
use docopt::Docopt;
2323
use fork::{fork, Fork};
2424
use itertools::iproduct;
25+
use itertools::{Either, Itertools};
2526
use nix::errno::Errno;
2627
use nix::sys::signal::{kill, Signal};
2728
use nix::sys::wait::waitpid;
@@ -30,6 +31,7 @@ use serde::Deserialize;
3031
use std::time::SystemTime;
3132
use std::{thread, time};
3233

34+
use berserker::machine::apply;
3335
use berserker::script::{ast::Node, parser::parse_instructions};
3436
use berserker::{
3537
worker::new_script_worker, worker::new_worker, WorkloadConfig,
@@ -58,15 +60,35 @@ fn run_script(script_path: String) -> Vec<Option<i32>> {
5860
parse_instructions(&std::fs::read_to_string(script_path).unwrap())
5961
.unwrap();
6062

61-
ast.iter().for_each(|node| {
63+
let (machine, works): (Vec<_>, Vec<_>) =
64+
ast.iter().partition_map(|node| match node {
65+
Node::Work { .. } => Either::Right(node),
66+
Node::Machine { .. } => Either::Left(node),
67+
});
68+
69+
if let Some(m) = machine.into_iter().next() {
70+
let Node::Machine { m_instructions } = m.clone() else {
71+
unreachable!()
72+
};
73+
74+
for instr in m_instructions {
75+
debug!("INSTR {:?}", instr);
76+
thread::spawn(move || apply(instr.clone()));
77+
}
78+
};
79+
80+
works.into_iter().for_each(|node| {
6281
debug!("AST NODE: {:?}", node);
6382

6483
let Node::Work {
6584
name: _,
6685
args,
6786
instructions: _,
6887
dist: _,
69-
} = node;
88+
} = node
89+
else {
90+
unreachable!()
91+
};
7092

7193
let workers: u32 = args
7294
.get("workers")

src/script/ast.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,21 @@ pub enum Instruction {
1414
Debug { text: String },
1515
}
1616

17+
#[derive(Debug, Clone)]
18+
pub enum MachineInstruction {
19+
Server { port: u16 },
20+
}
21+
1722
#[derive(Debug, Clone)]
1823
pub enum Dist {
1924
Exp { rate: f64 },
2025
}
2126

2227
#[derive(Debug, Clone)]
2328
pub enum Node {
29+
Machine {
30+
m_instructions: Vec<MachineInstruction>,
31+
},
2432
Work {
2533
name: String,
2634
args: HashMap<String, String>,

src/script/grammar.peg

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ ident = @{ident_char ~ (ASCII_DIGIT | ident_char)*}
66

77
constant = {
88
"\"" ~ value ~ "\""
9-
| ASCII_DIGIT
9+
| ASCII_DIGIT*
1010
}
1111

1212
arg = {
@@ -15,7 +15,7 @@ arg = {
1515
}
1616

1717
args = {"(" ~ (arg ~ ("," ~ arg)* ~ ","?)? ~ ")"}
18-
value = {(ASCII_ALPHANUMERIC| "." | " " | "/")*}
18+
value = {(ASCII_ALPHANUMERIC| "." | " " | "/" | ":")*}
1919

2020
param = {ident ~ "=" ~ value}
2121
params = {"(" ~ (param ~ ("," ~ param)* ~ ","?)? ~ ")"}
@@ -24,13 +24,15 @@ task = { "task" }
2424
network = { "network" }
2525
port = { "port" }
2626
open = { "open" }
27+
ping = { "ping" }
2728
debug = { "debug" }
2829

2930
funcName = {
3031
task
3132
| network
3233
| port
3334
| open
35+
| ping
3436
| debug
3537
}
3638

@@ -65,6 +67,20 @@ function = {
6567
~ dist?
6668
}
6769

70+
server = { "server" }
71+
load = { "load" }
72+
memory = { "memory" }
73+
74+
mInstrName = {
75+
server
76+
| load
77+
| memory
78+
}
79+
80+
machineInstruction = {mInstrName ~ args? ~ ";"}
81+
82+
machine = {"machine" ~ "{" ~ machineInstruction* ~ "}"}
83+
6884
file = {
69-
SOI ~ (expr*) ~ EOI
85+
SOI ~ machine? ~ (expr*) ~ EOI
7086
}

src/script/parser.rs

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use log::debug;
22
use pest::{self, error::Error, Parser};
33
use std::collections::HashMap;
44

5-
use crate::script::ast::{Arg, Dist, Instruction, Node};
5+
use crate::script::ast::{Arg, Dist, Instruction, MachineInstruction, Node};
66

77
// ANCHOR: parser
88
#[derive(pest_derive::Parser)]
@@ -18,9 +18,17 @@ pub fn parse_instructions(source: &str) -> Result<Vec<Node>, Error<Rule>> {
1818
pest::set_error_detail(true);
1919
let mut ast = vec![];
2020
let pairs = InstructionParser::parse(Rule::file, source)?;
21+
let expr_rules = [Rule::expr, Rule::machine];
22+
2123
for pair in pairs {
22-
if let Rule::file = pair.as_rule() {
23-
ast.push(build_ast_from_expr(pair.into_inner().next().unwrap()));
24+
if pair.as_rule() != Rule::file {
25+
continue;
26+
}
27+
28+
for i in pair.into_inner() {
29+
if expr_rules.contains(&i.as_rule()) {
30+
ast.push(build_ast_from_expr(i));
31+
}
2432
}
2533
}
2634
debug!("AST {:?}", ast);
@@ -30,6 +38,9 @@ pub fn parse_instructions(source: &str) -> Result<Vec<Node>, Error<Rule>> {
3038
fn build_ast_from_expr(pair: pest::iterators::Pair<Rule>) -> Node {
3139
match pair.as_rule() {
3240
Rule::expr => build_ast_from_expr(pair.into_inner().next().unwrap()),
41+
Rule::machine => Node::Machine {
42+
m_instructions: build_ast_from_minstr(pair.into_inner()),
43+
},
3344
Rule::function => {
3445
let mut inner = pair.into_inner();
3546
let mut work = inner.next().unwrap().into_inner();
@@ -65,6 +76,49 @@ fn build_ast_from_expr(pair: pest::iterators::Pair<Rule>) -> Node {
6576
}
6677
}
6778

79+
fn build_ast_from_minstr(
80+
pair: pest::iterators::Pairs<Rule>,
81+
) -> Vec<MachineInstruction> {
82+
let mut instr = vec![] as Vec<MachineInstruction>;
83+
84+
for i in pair {
85+
let mut inner = i.into_inner();
86+
let name = inner.next().unwrap();
87+
match name.into_inner().next().unwrap().as_rule() {
88+
Rule::server => {
89+
let port: u16 = inner
90+
.next()
91+
.unwrap()
92+
.into_inner()
93+
.next()
94+
.unwrap()
95+
.as_span()
96+
.as_str()
97+
.to_string()
98+
.parse()
99+
.unwrap();
100+
instr.push(MachineInstruction::Server { port });
101+
}
102+
unknown => panic!("Unknown machine instruction: {unknown:?}"),
103+
}
104+
105+
//debug!("PAIR {:?}", inner.next().unwrap().as_span());
106+
//debug!("PAIR {:?}", inner.next().unwrap().into_inner().next().unwrap().as_span());
107+
//match i.as_rule() {
108+
//Rule::mInstrName => {
109+
//debug!("INSTR {:?}", i.into_inner());
110+
////let mut instrs = i.into_inner().next().unwrap().into_inner();
111+
////let name = instrs.next().unwrap().as_span().as_str().to_string();
112+
113+
//instr.push(MachineInstruction::Server { port: 80 });
114+
//}
115+
//unknown => panic!("Unknown machine instruction: {unknown:?}"),
116+
//}
117+
}
118+
119+
instr
120+
}
121+
68122
fn build_ast_from_instr(pair: pest::iterators::Pair<Rule>) -> Vec<Instruction> {
69123
let mut instr = vec![] as Vec<Instruction>;
70124

0 commit comments

Comments
 (0)