Skip to content

Commit

Permalink
Fix parallel state execution with Salt-SSH
Browse files Browse the repository at this point in the history
  • Loading branch information
lkubb committed May 16, 2024
1 parent 12fe5c1 commit 65f7168
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
1 change: 1 addition & 0 deletions changelog/66514.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed parallel state execution with Salt-SSH
21 changes: 19 additions & 2 deletions salt/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import salt.utils.files
import salt.utils.hashutils
import salt.utils.immutabletypes as immutabletypes
import salt.utils.jid
import salt.utils.msgpack
import salt.utils.platform
import salt.utils.process
Expand Down Expand Up @@ -757,7 +758,21 @@ def __init__(
loader="states",
initial_pillar=None,
file_client=None,
__invocation_id=None,
):
"""
When instantiating an object of this class, do not pass
``__invocation_id``. It is an internal field for tracking
parallel executions where no jid is available (Salt-SSH) and
only exposed as an init argument to work on spawning platforms.
"""
if jid is not None:
__invocation_id = jid
if __invocation_id is None:
# For salt-ssh parallel states, we need a unique identifier
# for a single execution. self.jid should not be set there
# since it's used for other purposes as well.
__invocation_id = salt.utils.jid.gen_jid(opts)
self._init_kwargs = {
"opts": opts,
"pillar_override": pillar_override,
Expand All @@ -768,6 +783,7 @@ def __init__(
"mocked": mocked,
"loader": loader,
"initial_pillar": initial_pillar,
"__invocation_id": __invocation_id,
}
self.states_loader = loader
if "grains" not in opts:
Expand Down Expand Up @@ -814,6 +830,7 @@ def __init__(
self.pre = {}
self.__run_num = 0
self.jid = jid
self.invocation_id = __invocation_id
self.instance_id = str(id(self))
self.inject_globals = {}
self.mocked = mocked
Expand Down Expand Up @@ -2236,7 +2253,7 @@ def _call_parallel_target(cls, instance, init_kwargs, name, cdata, low):
]
)

troot = os.path.join(instance.opts["cachedir"], instance.jid)
troot = os.path.join(instance.opts["cachedir"], instance.invocation_id)
tfile = os.path.join(troot, salt.utils.hashutils.sha1_digest(tag))
if not os.path.isdir(troot):
try:
Expand Down Expand Up @@ -2820,7 +2837,7 @@ def reconcile_procs(self, running):
if not proc.is_alive():
ret_cache = os.path.join(
self.opts["cachedir"],
self.jid,
self.invocation_id,
salt.utils.hashutils.sha1_digest(tag),
)
if not os.path.isfile(ret_cache):
Expand Down

0 comments on commit 65f7168

Please sign in to comment.