Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expired and failed cypher transactions #156

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 62 additions & 41 deletions lib/neo4j-server/cypher_response.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Neo4j
module Server
class CypherResponse
attr_reader :data, :columns, :error_msg, :error_status, :error_code, :response
attr_reader :data, :columns, :response

class ResponseError < StandardError
attr_reader :status, :code
Expand All @@ -13,7 +13,6 @@ def initialize(msg, status, code)
end
end


class HashEnumeration
include Enumerable
extend Forwardable
Expand Down Expand Up @@ -76,13 +75,13 @@ def map_row_value(value, session)
def hash_value_as_object(value, session)
return value unless value['labels'] || value['type'] || transaction_response?

is_node, data = if transaction_response?
add_transaction_entity_id
[!mapped_rest_data['start'], mapped_rest_data]
elsif value['labels'] || value['type']
add_entity_id(value)
[value['labels'], value]
end
is_node, data = if transaction_response?
add_transaction_entity_id
[!mapped_rest_data['start'], mapped_rest_data]
elsif value['labels'] || value['type']
add_entity_id(value)
[value['labels'], value]
end
(is_node ? CypherNode : CypherRelationship).new(session, data).wrapper
end

Expand All @@ -91,6 +90,7 @@ def hash_value_as_object(value, session)
def initialize(response, uncommited = false)
@response = response
@uncommited = uncommited
set_data_from_request if response
end

def entity_data(id = nil)
Expand Down Expand Up @@ -121,8 +121,39 @@ def add_transaction_entity_id
mapped_rest_data.merge!('id' => mapped_rest_data['self'].split('/').last.to_i)
end

def errors
transaction_response? ? transaction_errors : non_transaction_errors
end

def transaction_errors
Array(response.body['errors']).map do |error|
ResponseError.new(error['message'], error['status'], error['code'])
end
end

def non_transaction_errors
return [] unless response.status == 400
Array(ResponseError.new(response.body['message'], response.body['exception'], response.body['fullname']))
end

def error
errors.first
end

def error_msg
error && error.message
end

def error_status
error && error.status
end

def error_code
error && error.code
end

def error?
!!@error
errors.any?
end

def data?
Expand All @@ -148,55 +179,45 @@ def set_data(data, columns)
self
end

def set_error(error_msg, error_status, error_core)
@error = true
@error_msg = error_msg
@error_status = error_status
@error_code = error_core
self
def set_data_from_request
return if error?
if transaction_response? && response.body['results']
set_data(response.body['results'][0]['data'], response.body['results'][0]['columns'])
else
set_data(response.body['data'], response.body['columns'])
end
end

def raise_error
fail 'Tried to raise error without an error' unless @error
fail ResponseError.new(@error_msg, @error_status, @error_code)
fail 'Tried to raise error without an error' unless error?
fail error
end

def raise_cypher_error
fail 'Tried to raise error without an error' unless @error
fail Neo4j::Session::CypherError.new(@error_msg, @error_code, @error_status)
fail 'Tried to raise error without an error' unless error?
fail Neo4j::Session::CypherError.new(error.message, error.code, error.status)
end


def self.create_with_no_tx(response)
case response.status
when 200
CypherResponse.new(response).set_data(response.body['data'], response.body['columns'])
when 400
CypherResponse.new(response).set_error(response.body['message'], response.body['exception'], response.body['fullname'])
else
fail "Unknown response code #{response.status} for #{response.env[:url]}"
end
CypherResponse.new(response)
end

def self.create_with_tx(response)
fail "Unknown response code #{response.status} for #{response.request_uri}" unless response.status == 200

first_result = response.body['results'][0]
cr = CypherResponse.new(response, true)

if response.body['errors'].empty?
cr.set_data(first_result['data'], first_result['columns'])
else
first_error = response.body['errors'].first
cr.set_error(first_error['message'], first_error['status'], first_error['code'])
end
cr
CypherResponse.new(response, true)
end

def transaction_response?
response.respond_to?('body') && !response.body['commit'].nil?
end

def transaction_failed?
errors.any? { |e| e.code =~ /Neo\.DatabaseError/ }
end

def transaction_not_found?
errors.any? { |e| e.code == 'Neo.ClientError.Transaction.UnknownId' }
end

def rest_data
@result_index = @row_index = 0
mapped_rest_data
Expand Down
76 changes: 60 additions & 16 deletions lib/neo4j-server/cypher_transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,36 @@ def initialize(url, session_connection)
end

ROW_REST = %w(row REST)

def _query(cypher_query, params = nil)
fail 'Transaction expired, unable to perform query' if expired?
statement = {statement: cypher_query, parameters: params, resultDataContents: ROW_REST}
body = {statements: [statement]}

response = exec_url && commit_url ? connection.post(exec_url, body) : register_urls(body)
_create_cypher_response(response)
_create_cypher_response(response).tap do |cypher_response|
handle_transaction_errors(cypher_response)
end
end

def _create_cypher_response(response)
CypherResponse.create_with_tx(response)
end

# Replaces current transaction with invalid transaction indicating it was rolled back or expired on the server side. http://neo4j.com/docs/stable/status-codes.html#_classifications
def handle_transaction_errors(response)
tx_class = if response.transaction_not_found?
ExpiredCypherTransaction
elsif response.transaction_failed?
InvalidCypherTransaction
end

register_invalid_transaction(tx_class) if tx_class
end

def register_invalid_transaction(tx_class)
tx = tx_class.new(Neo4j::Transaction.current)
Neo4j::Transaction.unregister_current
tx.register_instance
end

def _delete_tx
Expand Down Expand Up @@ -57,23 +80,44 @@ def register_urls(body)
response
end

def _create_cypher_response(response)
first_result = response.body['results'][0]

cr = CypherResponse.new(response, true)
if response.body['errors'].empty?
cr.set_data(first_result['data'], first_result['columns'])
else
first_error = response.body['errors'].first
expired if first_error['message'].match(/Unrecognized transaction id/)
cr.set_error(first_error['message'], first_error['code'], first_error['code'])
end
cr
end

def empty_response
OpenStruct.new(status: 200, body: '')
end

def valid?
!invalid?
end

def expired?
is_a? ExpiredCypherTransaction
end

def invalid?
is_a? InvalidCypherTransaction
end
end

class InvalidCypherTransaction < CypherTransaction
attr_accessor :original_transaction

def initialize(transaction)
self.original_transaction = transaction
mark_failed
end

def close
Neo4j::Transaction.unregister(self)
end

def _query(cypher_query, params = nil)
fail 'Transaction is invalid, unable to perform query'
end
end

class ExpiredCypherTransaction < InvalidCypherTransaction
def _query(cypher_query, params = nil)
fail 'Transaction expired, unable to perform query'
end
end
end
end
11 changes: 1 addition & 10 deletions lib/neo4j/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def register_instance
Neo4j::Transaction.register(self)
end

# Marks this transaction as failed, which means that it will unconditionally be rolled back when close() is called. Aliased for legacy purposes.
# Marks this transaction as failed on the client side, which means that it will unconditionally be rolled back when close() is called. Aliased for legacy purposes.
def mark_failed
@failure = true
end
Expand All @@ -21,15 +21,6 @@ def failed?
end
alias_method :failure?, :failed?

def mark_expired
@expired = true
end
alias_method :expired, :mark_expired

def expired?
!!@expired
end

# @private
def push_nested!
@pushed_nested += 1
Expand Down
12 changes: 10 additions & 2 deletions spec/neo4j-server/e2e/cypher_transaction_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ module Server
expect(r.error?).to be true

expect(r.error_msg).to match(/Invalid input/)
expect(r.error_status).to match(/Syntax/)
expect(r.error_code).to match(/Syntax/)
end

it 'can rollback' do
Expand All @@ -73,11 +73,19 @@ module Server
it 'cannot continue operations if a transaction is expired' do
node = Neo4j::Node.create(name: 'andreas')
Neo4j::Transaction.run do |tx|
tx.expired
tx.register_invalid_transaction(Neo4j::Server::ExpiredCypherTransaction)
expect { node[:name] = 'foo' }.to raise_error 'Transaction expired, unable to perform query'
end
end

it 'cannot continue operations if a transaction is invalid' do
node = Neo4j::Node.create(name: 'andreas')
Neo4j::Transaction.run do |tx|
tx.register_invalid_transaction(Neo4j::Server::InvalidCypherTransaction)
expect { node[:name] = 'foo' }.to raise_error 'Transaction is invalid, unable to perform query'
end
end

it 'can use Transaction block style' do
node = Neo4j::Transaction.run { Neo4j::Node.create(name: 'andreas') }
expect(node['name']).to eq('andreas')
Expand Down
24 changes: 24 additions & 0 deletions spec/neo4j-server/unit/cypher_response_unit_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,30 @@ def successful_response(response)
end

skip 'returns hydrated CypherPath objects?'

describe '#errors' do
let(:cypher_response) { CypherResponse.new(response, true) }

context 'without transaction' do
let(:response) do
double('tx_response', status: 400, body: {'message' => 'Some error', 'exception' => 'SomeError', 'fullname' => 'SomeError'})
end

it 'returns an array of errors' do
expect(cypher_response.errors).to be_a(Array)
end
end

context 'using transaction' do
let(:response) do
double('non-tx_response', status: 200, body: {'errors' => ['message' => 'Some error', 'status' => 'SomeError', 'code' => 'SomeError'], 'commit' => 'commit_uri'})
end

it 'returns an array of errors' do
expect(cypher_response.errors).to be_a(Array)
end
end
end
end
end
end
Expand Down
Loading