Skip to content

Commit

Permalink
GH-600: Resolve blockchain scanning
Browse files Browse the repository at this point in the history
* Starts Accountant
* Adds Receivables scan timeout (may become a configuration parameter in another card)
* Stores `max_block_count` in the config table when processing error messages providing a limit
* Adds Max Block Count to configuration response
* Increments CURRENT_SCHEMA_VERSION
* Adds schema migration for `max_block_count`
* Improves blockchain scan error handling and logging
  • Loading branch information
masqrauder committed Sep 6, 2023
1 parent eb8d891 commit 849d8be
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 7 deletions.
12 changes: 12 additions & 0 deletions masq/src/commands/configuration_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ impl ConfigurationCommand {
&Self::interpret_option(&configuration.earning_wallet_address_opt),
);
dump_parameter_line(stream, "Gas price:", &configuration.gas_price.to_string());
dump_parameter_line(
stream,
"Max block count:",
&configuration
.max_block_count_opt
.map(|m| m.separate_with_commas())
.unwrap_or_else(|| "[Unlimited]".to_string()),
);
dump_parameter_line(
stream,
"Neighborhood mode:",
Expand Down Expand Up @@ -306,6 +314,7 @@ mod tests {
chain_name: "ropsten".to_string(),
gas_price: 2345,
neighborhood_mode: "standard".to_string(),
max_block_count_opt: None,
consuming_wallet_private_key_opt: Some("consuming wallet private key".to_string()),
consuming_wallet_address_opt: Some("consuming wallet address".to_string()),
earning_wallet_address_opt: Some("earning address".to_string()),
Expand Down Expand Up @@ -367,6 +376,7 @@ mod tests {
|Current schema version: schema version\n\
|Earning wallet address: earning address\n\
|Gas price: 2345\n\
|Max block count: [Unlimited]\n\
|Neighborhood mode: standard\n\
|Port mapping protocol: PCP\n\
|Start block: 3456\n\
Expand Down Expand Up @@ -403,6 +413,7 @@ mod tests {
clandestine_port: 1234,
chain_name: "mumbai".to_string(),
gas_price: 2345,
max_block_count_opt: Some(100_000),
neighborhood_mode: "zero-hop".to_string(),
consuming_wallet_address_opt: None,
consuming_wallet_private_key_opt: None,
Expand Down Expand Up @@ -463,6 +474,7 @@ mod tests {
|Current schema version: schema version\n\
|Earning wallet address: earning wallet\n\
|Gas price: 2345\n\
|Max block count: 100,000\n\
|Neighborhood mode: zero-hop\n\
|Port mapping protocol: PCP\n\
|Start block: 3456\n\
Expand Down
2 changes: 1 addition & 1 deletion masq_lib/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::data_version::DataVersion;
use const_format::concatcp;

pub const DEFAULT_CHAIN: Chain = Chain::PolyMainnet;
pub const CURRENT_SCHEMA_VERSION: usize = 8;
pub const CURRENT_SCHEMA_VERSION: usize = 9;

pub const HIGHEST_RANDOM_CLANDESTINE_PORT: u16 = 9999;
pub const HTTP_PORT: u16 = 80;
Expand Down
2 changes: 2 additions & 0 deletions masq_lib/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ pub struct UiConfigurationResponse {
pub earning_wallet_address_opt: Option<String>,
#[serde(rename = "gasPrice")]
pub gas_price: u64,
#[serde(rename = "maxBlockCount")]
pub max_block_count_opt: Option<u64>,
#[serde(rename = "neighborhoodMode")]
pub neighborhood_mode: String,
#[serde(rename = "portMappingProtocol")]
Expand Down
13 changes: 11 additions & 2 deletions node/src/accountant/scanners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ where
pub struct ScannerCommon {
initiated_at_opt: Option<SystemTime>,
pub payment_thresholds: Rc<PaymentThresholds>,
timeout: Duration,
}

impl ScannerCommon {
fn new(payment_thresholds: Rc<PaymentThresholds>) -> Self {
Self {
initiated_at_opt: None,
payment_thresholds,
timeout: Duration::from_secs(30u64),
}
}

Expand Down Expand Up @@ -730,8 +732,15 @@ impl Scanner<RetrieveTransactions, ReceivedPayments> for ReceivableScanner {
response_skeleton_opt: Option<ResponseSkeleton>,
logger: &Logger,
) -> Result<RetrieveTransactions, BeginScanError> {
if let Some(timestamp) = self.scan_started_at() {
return Err(BeginScanError::ScanAlreadyRunning(timestamp));
if let Some(scan_started_at) = self.scan_started_at() {
return match scan_started_at.elapsed() {
Ok(elapsed) if elapsed.as_secs() >= self.common.timeout.as_secs() => {
info!(logger, "Receivables scan timed out");
self.mark_as_ended(logger);
Err(BeginScanError::NothingToProcess)
}
_ => Err(BeginScanError::ScanAlreadyRunning(scan_started_at)),
};
}
self.mark_as_started(timestamp);
info!(
Expand Down
1 change: 1 addition & 0 deletions node/src/actor_system_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl ActorSystemFactoryTools for ActorSystemFactoryToolsReal {

//after we've bound all the actors, send start messages to any actors that need it
send_start_message!(peer_actors.neighborhood);
send_start_message!(peer_actors.accountant);

stream_handler_pool_subs
}
Expand Down
13 changes: 11 additions & 2 deletions node/src/blockchain/blockchain_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,19 @@ impl BlockchainBridge {
}
Err(e) => {
if let Some(max_block_count) = self.extract_max_block_count(e.clone()) {
info!(self.logger, "Writing max_block_count({})", max_block_count);
debug!(self.logger, "Writing max_block_count({})", max_block_count);
self.persistent_config
.set_max_block_count(max_block_count)
.map_or_else(|_| Ok(()), |_| Ok(()))
.map_or_else(
|_| {
warning!(self.logger, "{} update max_block_count to {}. Scheduling next scan with that limit.", e, max_block_count);
Err(format!("{} updated max_block_count to {}. Scheduling next scan with that limit.", e, max_block_count))
},
|e| {
warning!(self.logger, "Writing max_block_count failed: {:?}", e);
Err(format!("Writing max_block_count failed: {:?}", e))
},
)
} else {
warning!(
self.logger,
Expand Down
4 changes: 3 additions & 1 deletion node/src/database/db_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ impl DbInitializerReal {
false,
"scan intervals",
);
Self::set_config_value(conn, "max_block_count", None, false, "maximum block count");
}

fn create_pending_payable_table(&self, conn: &Connection) {
Expand Down Expand Up @@ -764,7 +765,7 @@ mod tests {
#[test]
fn constants_have_correct_values() {
assert_eq!(DATABASE_FILE, "node-data.db");
assert_eq!(CURRENT_SCHEMA_VERSION, 8);
assert_eq!(CURRENT_SCHEMA_VERSION, 9);
}

#[test]
Expand Down Expand Up @@ -1039,6 +1040,7 @@ mod tests {
false,
);
verify(&mut config_vec, "mapping_protocol", None, false);
verify(&mut config_vec, "max_block_count", None, false);
verify(&mut config_vec, "min_hops", Some("3"), false);
verify(
&mut config_vec,
Expand Down
2 changes: 2 additions & 0 deletions node/src/database/db_migrations/db_migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::database::db_migrations::migrations::migration_4_to_5::Migrate_4_to_5
use crate::database::db_migrations::migrations::migration_5_to_6::Migrate_5_to_6;
use crate::database::db_migrations::migrations::migration_6_to_7::Migrate_6_to_7;
use crate::database::db_migrations::migrations::migration_7_to_8::Migrate_7_to_8;
use crate::database::db_migrations::migrations::migration_8_to_9::Migrate_8_to_9;
use crate::database::db_migrations::migrator_utils::{
DBMigDeclarator, DBMigrationUtilities, DBMigrationUtilitiesReal, DBMigratorInnerConfiguration,
};
Expand Down Expand Up @@ -77,6 +78,7 @@ impl DbMigratorReal {
&Migrate_5_to_6,
&Migrate_6_to_7,
&Migrate_7_to_8,
&Migrate_8_to_9,
]
}

Expand Down
70 changes: 70 additions & 0 deletions node/src/database/db_migrations/migrations/migration_8_to_9.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::database::db_migrations::db_migrator::DatabaseMigration;
use crate::database::db_migrations::migrator_utils::DBMigDeclarator;

#[allow(non_camel_case_types)]
pub struct Migrate_8_to_9;

impl DatabaseMigration for Migrate_8_to_9 {
fn migrate<'a>(
&self,
declaration_utils: Box<dyn DBMigDeclarator + 'a>,
) -> rusqlite::Result<()> {
declaration_utils.execute_upon_transaction(&[
&"INSERT INTO config (name, value, encrypted) VALUES ('max_block_count', null, 0)",
])
}

fn old_version(&self) -> usize {
8
}
}

#[cfg(test)]
mod tests {
use crate::database::db_initializer::{
DbInitializationConfig, DbInitializer, DbInitializerReal, DATABASE_FILE,
};
use crate::test_utils::database_utils::{
bring_db_0_back_to_life_and_return_connection, make_external_data, retrieve_config_row,
};
use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler};
use masq_lib::test_utils::utils::ensure_node_home_directory_exists;
use std::fs::create_dir_all;

#[test]
fn migration_from_8_to_9_is_properly_set() {
init_test_logging();
let dir_path = ensure_node_home_directory_exists(
"db_migrations",
"migration_from_8_to_9_is_properly_set",
);
create_dir_all(&dir_path).unwrap();
let db_path = dir_path.join(DATABASE_FILE);
let _ = bring_db_0_back_to_life_and_return_connection(&db_path);
let subject = DbInitializerReal::default();

let result = subject.initialize_to_version(
&dir_path,
9,
DbInitializationConfig::create_or_migrate(make_external_data()),
);
let connection = result.unwrap();
let (mp_value, mp_encrypted) = retrieve_config_row(connection.as_ref(), "max_block_count");
let (cs_value, cs_encrypted) = retrieve_config_row(connection.as_ref(), "schema_version");
assert_eq!(mp_value, None);
assert_eq!(mp_encrypted, false);
assert_eq!(cs_value, Some("9".to_string()));
assert_eq!(cs_encrypted, false);
TestLogHandler::new().assert_logs_contain_in_order(vec![
"DbMigrator: Database successfully migrated from version 0 to 1",
"DbMigrator: Database successfully migrated from version 1 to 2",
"DbMigrator: Database successfully migrated from version 2 to 3",
"DbMigrator: Database successfully migrated from version 3 to 4",
"DbMigrator: Database successfully migrated from version 4 to 5",
"DbMigrator: Database successfully migrated from version 5 to 6",
"DbMigrator: Database successfully migrated from version 6 to 7",
"DbMigrator: Database successfully migrated from version 7 to 8",
"DbMigrator: Database successfully migrated from version 8 to 9",
]);
}
}
1 change: 1 addition & 0 deletions node/src/database/db_migrations/migrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod migration_4_to_5;
pub mod migration_5_to_6;
pub mod migration_6_to_7;
pub mod migration_7_to_8;
pub mod migration_8_to_9;
12 changes: 12 additions & 0 deletions node/src/db_config/config_dao.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,16 @@ mod tests {
let result = subject.get("schema_version").unwrap();
assert_eq!(result, ConfigDaoRecord::new("schema_version", None, false));
}

#[test]
fn test_handle_update_execution() {
let result = handle_update_execution(Err(rusqlite::Error::ExecuteReturnedResults));

assert_eq!(
result,
Err(ConfigDaoError::DatabaseError(
"Execute returned results - did you mean to call query?".to_string()
))
)
}
}
2 changes: 2 additions & 0 deletions node/src/db_config/config_dao_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Default for ConfigDaoNull {
"scan_intervals".to_string(),
(Some(DEFAULT_SCAN_INTERVALS.to_string()), false),
);
data.insert("max_block_count".to_string(), (None, false));
Self { data }
}
}
Expand Down Expand Up @@ -273,6 +274,7 @@ mod tests {
"schema_version",
Some(format!("{}", CURRENT_SCHEMA_VERSION).as_str()),
),
("max_block_count", None),
]
.into_iter()
.map(|(k, v_opt)| (k.to_string(), v_opt.map(|v| v.to_string())))
Expand Down
17 changes: 16 additions & 1 deletion node/src/db_config/persistent_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,14 @@ impl PersistentConfiguration for PersistentConfigurationReal {
}

fn max_block_count(&self) -> Result<u64, PersistentConfigError> {
self.simple_get_method(decode_u64, "max_block_count")
match self.get("max_block_count") {
Ok(max_block_count) => match decode_u64(max_block_count) {
Ok(Some(mbc)) => Ok(mbc),
Err(e) => Err(PersistentConfigError::from(e)),
Ok(None) => Err(PersistentConfigError::NotPresent),
},
Err(e) => Err(PersistentConfigError::from(e)),
}
}

fn set_max_block_count(&mut self, value: u64) -> Result<(), PersistentConfigError> {
Expand Down Expand Up @@ -1945,6 +1952,14 @@ mod tests {
);
}

#[test]
fn max_block_count_set_method_works() {
persistent_config_plain_data_assertions_for_simple_set_method!(
"max_block_count",
100000u64
);
}

#[test]
#[should_panic(
expected = "ever-supplied value missing: payment_thresholds; database is corrupt!"
Expand Down
11 changes: 11 additions & 0 deletions node/src/node_configurator/configurator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ impl Configurator {
"earningWalletAddressOpt",
)?;
let start_block = Self::value_required(persistent_config.start_block(), "startBlock")?;
let max_block_count_opt =
match Self::value_required(persistent_config.max_block_count(), "maxBlockCount") {
Ok(value) => Some(value),
_ => None,
};
let neighborhood_mode =
Self::value_required(persistent_config.neighborhood_mode(), "neighborhoodMode")?
.to_string();
Expand Down Expand Up @@ -616,6 +621,7 @@ impl Configurator {
clandestine_port,
chain_name,
gas_price,
max_block_count_opt,
neighborhood_mode,
consuming_wallet_private_key_opt,
consuming_wallet_address_opt,
Expand Down Expand Up @@ -2211,6 +2217,7 @@ mod tests {
.gas_price_result(Ok(2345))
.consuming_wallet_private_key_result(Ok(Some(consuming_wallet_private_key)))
.mapping_protocol_result(Ok(Some(AutomapProtocol::Igdp)))
.max_block_count_result(Ok(100000))
.neighborhood_mode_result(Ok(NeighborhoodModeLight::Standard))
.past_neighbors_result(Ok(Some(vec![node_descriptor.clone()])))
.earning_wallet_address_result(Ok(Some(earning_wallet_address.clone())))
Expand All @@ -2236,6 +2243,7 @@ mod tests {
clandestine_port: 1234,
chain_name: "ropsten".to_string(),
gas_price: 2345,
max_block_count_opt: Some(100000),
neighborhood_mode: String::from("standard"),
consuming_wallet_private_key_opt: None,
consuming_wallet_address_opt: None,
Expand Down Expand Up @@ -2339,6 +2347,7 @@ mod tests {
.consuming_wallet_private_key_params(&consuming_wallet_private_key_params_arc)
.consuming_wallet_private_key_result(Ok(Some(consuming_wallet_private_key.clone())))
.mapping_protocol_result(Ok(Some(AutomapProtocol::Igdp)))
.max_block_count_result(Ok(100000))
.neighborhood_mode_result(Ok(NeighborhoodModeLight::ConsumeOnly))
.past_neighbors_params(&past_neighbors_params_arc)
.past_neighbors_result(Ok(Some(vec![node_descriptor.clone()])))
Expand Down Expand Up @@ -2366,6 +2375,7 @@ mod tests {
clandestine_port: 1234,
chain_name: "ropsten".to_string(),
gas_price: 2345,
max_block_count_opt: Some(100000),
neighborhood_mode: String::from("consume-only"),
consuming_wallet_private_key_opt: Some(consuming_wallet_private_key),
consuming_wallet_address_opt: Some(consuming_wallet_address),
Expand Down Expand Up @@ -2441,6 +2451,7 @@ mod tests {
"0x0123456789012345678901234567890123456789".to_string(),
)))
.start_block_result(Ok(3456))
.max_block_count_result(Ok(100000))
.neighborhood_mode_result(Ok(NeighborhoodModeLight::ConsumeOnly))
.mapping_protocol_result(Ok(Some(AutomapProtocol::Igdp)))
.consuming_wallet_private_key_result(cwpk);
Expand Down

0 comments on commit 849d8be

Please sign in to comment.