Skip to content

Commit

Permalink
Merge pull request #1 from akash-akya/fifo
Browse files Browse the repository at this point in the history
Use named fifo as communication medium between elixir and command
  • Loading branch information
akash-akya committed Mar 25, 2020
2 parents 76ffc57 + 6fccec6 commit 3d3303c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 120 deletions.
46 changes: 34 additions & 12 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"io"
"io/ioutil"
"os"
"os/exec"
"time"
)

func executor(workdir string, maxChunkSize int, args []string) error {
func executor(workdir string, inputFifoPath string, outputFifoPath string, args []string) error {
const stdoutMarker = 0x00
// const stderrMarker = 0x01

Expand All @@ -16,14 +17,17 @@ func executor(workdir string, maxChunkSize int, args []string) error {

logger.Printf("Command path: %v\n", proc.Path)

inputFifo := openFifo(inputFifoPath, os.O_RDONLY)
outputFifo := openFifo(outputFifoPath, os.O_WRONLY)

signal := make(chan bool)
go startPipeline(proc, os.Stdin, os.Stdout, maxChunkSize, signal)
go startPipeline(proc, inputFifo, outputFifo, signal)

// wait pipeline to start
<-signal

err := proc.Start()
fatal_if(err)
fatalIf(err)

// wait for pipeline exit
<-signal
Expand All @@ -40,31 +44,29 @@ func executor(workdir string, maxChunkSize int, args []string) error {
return err
}

func startPipeline(proc *exec.Cmd, stdin io.Reader, outstream io.Writer, maxChunkSize int, signal chan bool) {
func startPipeline(proc *exec.Cmd, inputFifo *os.File, outputFifo *os.File, signal chan bool) {
// some commands expect stdin to be connected
cmdInput, err := proc.StdinPipe()
fatal_if(err)
fatalIf(err)

cmdOutput, err := proc.StdoutPipe()
fatal_if(err)
fatalIf(err)

logger.Println("Starting pipeline")

demand, consumerExit := startCommandConsumer(stdin)
outputStreamerExit := startOutputStreamer(cmdOutput, outstream, maxChunkSize, demand)
startInputConsumer(cmdInput, inputFifo)
outputStreamerExit := startOutputStreamer(cmdOutput, outputFifo)
commandExit := createCommandExitChan(os.Stdin)

// signal that pipline is setup
signal <- true

// wait for pipline to exit
select {
case <-consumerExit:
case <-outputStreamerExit:
case <-commandExit:
}

cmdOutput.Close()
cmdInput.Close()

// signal pipeline shutdown
signal <- true
}
Expand All @@ -85,3 +87,23 @@ func safeExit(proc *exec.Cmd) error {
return err
}
}

func openFifo(fifoPath string, mode int) *os.File {
fifo, err := os.OpenFile(fifoPath, mode, 0600)
if err != nil {
fatal(err)
}
return fifo
}

func createCommandExitChan(stdin io.ReadCloser) <-chan struct{} {
exitSignal := make(chan struct{})
go func() {
defer close(exitSignal)

_, err := io.Copy(ioutil.Discard, stdin)
fatalIf(err)
}()

return exitSignal
}
137 changes: 39 additions & 98 deletions io.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,136 +6,77 @@ import (
"os"
)

var outBuf [1 << 16]byte
var buf [1 << 16]byte

func startOutputStreamer(pipe io.ReadCloser, outstream io.Writer, maxChunkSize int, demand <-chan int) <-chan struct{} {
func startOutputStreamer(pipe io.ReadCloser, fifo *os.File) <-chan struct{} {
exit := make(chan struct{})
const stdoutMarker = 0x00

go func() {
defer close(exit)

buf := outBuf
buf[2] = stdoutMarker
pending := 0
defer func() {
pipe.Close()
fifo.Close()
close(exit)
}()

for {
if pending == 0 {
d, ok := <-demand
if !ok {
return
}
pending = d
} else if pending < maxChunkSize {
select {
case d, ok := <-demand:
if !ok {
bytesRead, readErr := pipe.Read(buf[2:])
if bytesRead > 0 {
write16Be(buf[:2], bytesRead)
bytesWritten, writeErr := fifo.Write(buf[:bytesRead+2])
if writeErr != nil {
switch writeErr.(type) {
// ignore broken pipe or closed pipe errors
case *os.PathError:
return
default:
fatal(writeErr)
}
pending += d
default:
}
}
logger.Printf("[cmd_out] written bytes: %v\n", bytesWritten)

chunkSize := 0
if pending > maxChunkSize {
chunkSize = maxChunkSize
} else {
chunkSize = pending
}

readBytes, readErr := pipe.Read(buf[3 : 3+chunkSize])

if readBytes > 0 {
write16Be(buf[:2], readBytes+1)
bytesWritten, writeErr := outstream.Write(buf[:2+readBytes+1])
logger.Printf("out: written bytes: %v\n", bytesWritten)
fatal_if(writeErr)
pending -= readBytes
} else if readErr == io.EOF || readBytes == 0 {
// From io.Reader docs:
//
// Implementations of Read are discouraged from returning a zero
// byte count with a nil error, and callers should treat that
// situation as a no-op.
//
// In this case it appears that 0 bytes may sometimes be returned
// indefinitely. Therefore we close the pipe.
if readErr == io.EOF {
logger.Println("Encountered EOF when reading from stdout")
} else {
logger.Println("Read 0 bytes with no error")
}
} else if readErr == io.EOF && bytesRead == 0 {
return
} else {
switch readErr.(type) {
case *os.PathError:
return
default:
fatal(readErr)
}
fatal(readErr)
}
}
logger.Println("Exiting output streamer")
}()
return exit
}

func startCommandConsumer(stdin io.Reader) (<-chan int, <-chan struct{}) {
demand := make(chan int)
exitSignal := make(chan struct{})
buf := make([]byte, 4)
func startInputConsumer(pipe io.WriteCloser, fifo *os.File) {
buf := make([]byte, 2)

go func() {
defer close(exitSignal)
defer close(demand)
defer func() {
fifo.Close()
pipe.Close()
}()

for {
bytesRead, readErr := io.ReadFull(stdin, buf[:2])
logger.Printf("READ stdin %v bytes", bytesRead)
bytesRead, readErr := io.ReadFull(fifo, buf)
if readErr == io.EOF && bytesRead == 0 {
logger.Printf("[STDIN] EOF")
return
}
fatal_if(readErr)

length := read16Be(buf[:2])
fatalIf(readErr)

length := read16Be(buf)
logger.Printf("[cmd_in] read packet length = %v\n", length)
if length == 0 {
return
}

// TODO: must be 2 since no commands are supported now
if length != 4 {
fatal("Invalid data length")
}

bytesRead, readErr = io.ReadFull(stdin, buf[:4])
fatal_if(readErr)

demand <- int(read32Be(buf[:4]))
}
logger.Println("Exiting command consumer")
}()

return aggregate(demand), exitSignal
}

func aggregate(demand <-chan int) <-chan int {
out := make(chan int, 1)
go func() {
defer close(out)
for d := range demand {
select {
case existing := <-out:
out <- (existing + d)
default:
out <- d
_, writeErr := io.CopyN(pipe, fifo, int64(length))
if writeErr != nil {
switch writeErr.(type) {
// ignore broken pipe or closed pipe errors
case *os.PathError:
return
default:
fatal(writeErr)
}
}
}
logger.Println("Exiting aggregator")
}()
return out
}

func read16Be(data []byte) uint16 {
Expand Down
4 changes: 1 addition & 3 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ func (w dummyWriter) Write(p []byte) (n int, err error) {
}

func initLogger(flag string) {
const kFileMode = 0666

var file io.Writer
switch flag {
case "":
Expand All @@ -28,7 +26,7 @@ func initLogger(flag string) {
default:
var err error
file, err = os.OpenFile(flag, os.O_CREATE|os.O_WRONLY, 0666)
fatal_if(err)
fatalIf(err)
}
logger = log.New(file, "[odu]: ", log.Lmicroseconds)
}
21 changes: 16 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"os"
)

// VERSION of the odu
const VERSION = "0.1.0"

const usage = "Usage: odu [options] -- <program> [<arg>...]"

var dirFlag = flag.String("dir", ".", "working directory for the spawned process")
var logFlag = flag.String("log", "", "enable logging")
var chunkSizeFlag = flag.Int("chunk-size", 65035, "maximum chunk size (depends on operating system)")
var inputFlag = flag.String("input", "", "path to input fifo")
var outputFlag = flag.String("output", "", "path to output fifo")
var versionFlag = flag.Bool("v", false, "print version and exit")

func main() {
Expand All @@ -23,25 +25,34 @@ func main() {
os.Exit(0)
}

if *chunkSizeFlag <= 0 {
die_usage("chunk-size should be a valid positive integer.")
if pipeExists(*outputFlag) {
dieUsage("output is not a pipe")
}

if pipeExists(*inputFlag) {
dieUsage("input is not a pipe")
}

initLogger(*logFlag)

args := flag.Args()
validateArgs(args)

err := executor(*dirFlag, *chunkSizeFlag, args)
err := executor(*dirFlag, *inputFlag, *outputFlag, args)
if err != nil {
os.Exit(getExitStatus(err))
}
}

func validateArgs(args []string) {
if len(args) < 1 {
die_usage("Not enough arguments.")
dieUsage("Not enough arguments.")
}

logger.Printf("Flag values:\n dir: %v\nArgs: %v\n", *dirFlag, args)
}

func pipeExists(path string) bool {
info, err := os.Stat(path)
return !os.IsNotExist(err) && info.Mode()&os.ModeNamedPipe == 0
}
4 changes: 2 additions & 2 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func die(reason string) {
os.Exit(-1)
}

func die_usage(reason string) {
func dieUsage(reason string) {
if logger != nil {
logger.Printf("dying: %v\n", reason)
}
Expand All @@ -29,7 +29,7 @@ func fatal(any interface{}) {
logger.Panicf("%v\n", any)
}

func fatal_if(any interface{}) {
func fatalIf(any interface{}) {
if logger == nil {
fmt.Fprintf(os.Stderr, "%v\n", any)
os.Exit(-1)
Expand Down

0 comments on commit 3d3303c

Please sign in to comment.