-
Notifications
You must be signed in to change notification settings - Fork 0
/
liveness_2_safety.py
executable file
·347 lines (261 loc) · 10.5 KB
/
liveness_2_safety.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#!/usr/bin/env python3
import argparse
from inspect import cleandoc
import sys
import aiger_swig.aiger_wrap as aiglib
from shell import execute_shell
def to_str_ret_out_err(ret, out, err):
res = ''
res += "return:%i" % ret + '\n'
res += "out:\n"
res += out + '\n'
res += "err:\n"
res += err
return res
def get_counter_aig(k): # -> (int,int,int)
template = \
"""
MODULE main -- count-up till k
VAR
state: 0..{k};
IVAR
reset: boolean;
inc: boolean;
DEFINE
beep := (state={k});
ASSIGN
init(state) := 0;
next(state) := case
reset: 0;
inc: state + 1;
TRUE:state;
esac;
SPEC
AG(!beep) -- then we use the bad signal
"""
global reset, inc
counter_smv = cleandoc(template).format(k=str(k))
ret, out, err = execute_shell('smvflatten | smvtoaig | aigmove | aigtoaig -a', input=counter_smv + "\n")
assert ret == 0, to_str_ret_out_err(ret, out, err)
out_lines = out.splitlines()
aag, m, i, l, o, a = out_lines[0].split()
# assert i == '1' and o == '1'
assert i == '2' and o == '1'
for i in filter(lambda l: l.startswith('i'), out_lines):
i = i.split()
if i[1] == "reset":
reset = int(i[0][1:])
if i[1] == "inc":
inc = int(i[0][1:])
reset = (reset + 1) * 2
inc = (inc + 1) * 2
ret, out, err = execute_shell('aigstrip | aigtoaig -a', input=out)
out_lines = out.splitlines()
aag, m, i, l, o, a = out_lines[0].split()
out_def_line = out_lines[int(i) + 1 + int(l)]
assert len(out_def_line.split()) == 1, out_def_line
out_signal = int(out_def_line)
return out, out_signal
def is_negated(l):
return (l & 1) == 1
def negate(lit):
return lit ^ 1
def strip_lit(l):
return l & ~1
def get_lit_type(l):
input_ = aiglib.aiger_is_input(spec, strip_lit(l))
latch_ = aiglib.aiger_is_latch(spec, strip_lit(l))
and_ = aiglib.aiger_is_and(spec, strip_lit(l))
return input_, latch_, and_
#: :type: aiglib.aiger
spec = None
symbol_by_ulit = set()
counter_and_u_new_lits, counter_latch_u_new_lits = None, None # _u_ stands for 'unsigned', _s_ -- for 'signed'
shift = None
out = None
new_format = None
reset = None
inc = None
#
def get_add_symbol(s_new_lit):
assert counter_and_u_new_lits or counter_latch_u_new_lits, \
'not initialized'
u_new_lit = strip_lit(s_new_lit)
if u_new_lit == 0:
input_, latch_, and_ = get_lit_type(u_new_lit)
return input_ or latch_ or and_
if u_new_lit in symbol_by_ulit:
input_, latch_, and_ = get_lit_type(u_new_lit)
return input_ or latch_ or and_
if u_new_lit in [strip_lit(get_new_s_lit(reset)), strip_lit(get_new_s_lit(inc))]:
# previously input literal, it was not AND nor latch in the counter
input_, latch_, and_ = get_lit_type(u_new_lit)
return input_ or latch_ or and_
assert u_new_lit in counter_and_u_new_lits or \
u_new_lit in counter_latch_u_new_lits, "%s\n%s\n%s" % (u_new_lit, counter_and_u_new_lits, counter_latch_u_new_lits)
if u_new_lit in counter_and_u_new_lits:
aiglib.aiger_add_and(spec, u_new_lit, 1, 1)
else: # u_new_lit in counter_latch_u_new_lits:
aiglib.aiger_add_latch(spec, u_new_lit, 1, 'counter_latch')
symbol_by_ulit.add(u_new_lit)
input_, latch_, and_ = get_lit_type(u_new_lit)
return input_ or latch_ or and_
def get_new_s_lit(old_lit):
""" :return: _signed_ literal """
# 2 is the input of a counter aig
# 2 -> spec.justice[0].lits[0], 3 -> negate(spec.fairness.lit)
# counter_other_lit -> counter_other_lit + shift
if strip_lit(old_lit) == reset:
res = aiglib.get_justice_lit(spec, 0, 0)
if is_negated(old_lit):
res = negate(res)
return res
if strip_lit(old_lit) == inc:
if not spec.num_fairness:
# Here we have something like GF true -> GF just
# so we always increment our counter
if is_negated(old_lit):
return 0
return 1
# here we have GF fair -> GF just
res = aiglib.get_ith_fairness(spec, 0).lit
if is_negated(old_lit):
res = negate(res)
return res
return old_lit + shift
#
# old_beep_lit = get_counter_aig_output_lit(counter_aig)
# new_beep_lit = old_beep_lit + shift
def define_counter_new_lits(counter_aig):
global counter_and_u_new_lits, counter_latch_u_new_lits
counter_latch_u_new_lits = set()
counter_and_u_new_lits = set()
# print counter_aig
for l in counter_aig.splitlines()[3:]: # ignore header and input
tokens = l.split()
if len(tokens) == 1: # output, ignore
continue
if len(tokens) == 2: # latch
old_l = int(tokens[0])
new_l = get_new_s_lit(old_l)
counter_latch_u_new_lits.add(new_l)
elif len(tokens) == 3: # AND gate
old_l = int(tokens[0])
new_l = get_new_s_lit(old_l)
counter_and_u_new_lits.add(new_l)
else:
assert 0, l
def define_shift(): # should go before any additions to the spec
global shift
# two inputs: reset and inc
# each input is used twice (as-is and negated)
next_lit = spec.maxvar * 2 + 4
# 6 is the first literal, which is not an input
shift = next_lit - 6
def add_counter_to_spec(k):
counter_aig, out_overflow_signal = get_counter_aig(k)
define_shift()
define_counter_new_lits(counter_aig)
for l in counter_aig.splitlines()[3:]: # ignore header and input
tokens = l.split()
if len(tokens) == 1: # output, ignore
continue
if len(tokens) == 2: # latch
old_l, old_next = int(tokens[0]), int(tokens[1])
new_l, new_next = get_new_s_lit(old_l), get_new_s_lit(old_next)
#: :type: aiglib.aiger_symbol
l_symbol = get_add_symbol(new_l)
get_add_symbol(new_next)
l_symbol.next = new_next
elif len(tokens) == 3: # AND gate
old_and, old_rhs0, old_rhs1 = (int(tokens[0]),
int(tokens[1]),
int(tokens[2]))
new_and, new_rhs0, new_rhs1 = (get_new_s_lit(old_and),
get_new_s_lit(old_rhs0),
get_new_s_lit(old_rhs1))
#: :type: aiglib.aiger_and
and_symbol = get_add_symbol(new_and)
get_add_symbol(new_rhs0)
get_add_symbol(new_rhs1)
and_symbol.rhs0 = new_rhs0
and_symbol.rhs1 = new_rhs1
# print 'and: old_and, old_rhs0, old_rhs1 -> ' \
# 'new_and, new_rhs0, new_rhs1', \
# old_and, old_rhs0, old_rhs1, \
# new_and, new_rhs0, new_rhs1
else:
assert 0, l
# every literal gets defined above, so now goes
aiglib.aiger_add_bad(spec, get_new_s_lit(out_overflow_signal), 'k-liveness')
def aiger_hacky_remove_jf(spec):
""" 'Remove' all justice signals from spec. """
# aiger uses num_justice to store the length of the justice section
# and relies on it when printing
just = spec.num_justice
spec.num_justice = 0
fair = spec.num_fairness
spec.num_fairness = 0
# we return the old value, so that we can reset num_justice afterwards
return just, fair
def write_and_die():
global spec, out, new_format
(j, f) = aiger_hacky_remove_jf(spec)
res, string = aiglib.aiger_write_to_string(spec,
aiglib.aiger_ascii_mode,
2147483648)
# maybe useful for gc?
spec.num_justice = j
spec.num_fairness = f
assert res != 0, 'writing failure'
if not new_format:
# post-process
ret, out_string, err = execute_shell(
'aigmove -i | aigor | aigtoaig -a',
input=string)
assert ret == 0, 'post-processing failure: ' + \
to_str_ret_out_err(ret, string, err) + \
'input was:\n' + string
string = out_string
out.write(string)
exit(0)
def main(spec_filename, k):
assert k > 0, str(k)
global spec
#: :type: aiglib.aiger
spec = aiglib.aiger_init()
aiglib.aiger_open_and_read_from_file(spec, spec_filename)
assert spec.num_fairness <= 1, 'more than one fairness property is not supported yet'
if spec.num_justice == 0:
write_and_die()
assert spec.num_justice == 1, 'more than one justice property is not supported yet: ' + str(spec.num_justice)
assert spec.justice.size == 1, 'the justice section should contain exactly one literal: ' + str(spec.justice.size)
add_counter_to_spec(k)
write_and_die()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert justice liveness into k-safety. '
'Requires: smvflatten, and aiger tools in your $PATH. '
'By default, the format is with the single bad output. '
'NOTE: justice signal is replaced with True.')
parser.add_argument('aiger', metavar='aiger', nargs='?',
type=str, default='/dev/stdin',
help='input AIGER file to be transformed')
parser.add_argument('out', metavar='out',
default=sys.stdout,
nargs='?',
type=argparse.FileType('w'),
help='output file name (default stdout)')
parser.add_argument('-k', '--k', type=int, default=2,
help='(default 2) '
'Max value of the counter. Counting starts from 0.'
'Reaching this value produces spec violation.')
parser.add_argument('--new', '-n',
action='store_true',
default=False,
help='Produce new format (BCJF) -- '
'it adds new k-liveness Bad property '
'but it keeps the justice signal too.')
args = parser.parse_args()
out = args.out
new_format = args.new
exit(main(args.aiger, args.k))