| | | 1 | | using pva.SuperV.Engine.Exceptions; |
| | | 2 | | using pva.SuperV.Engine.HistoryRetrieval; |
| | | 3 | | using pva.SuperV.Engine.Processing; |
| | | 4 | | using TDengine.Driver; |
| | | 5 | | using TDengine.Driver.Client; |
| | | 6 | | |
| | | 7 | | namespace pva.SuperV.Engine.HistoryStorage |
| | | 8 | | { |
| | | 9 | | /// <summary> |
| | | 10 | | /// TDengine histiory storage engine. |
| | | 11 | | /// </summary> |
| | | 12 | | public class TDengineHistoryStorage : IHistoryStorageEngine |
| | | 13 | | { |
| | | 14 | | /// <summary> |
| | | 15 | | /// TDengine history storage string. |
| | | 16 | | /// </summary> |
| | | 17 | | public const string Prefix = "TDengine"; |
| | | 18 | | |
| | | 19 | | /// <summary> |
| | | 20 | | /// Contains the equivalence between .Net and TDengine data types for the types being handled. |
| | | 21 | | /// </summary> |
| | 3 | 22 | | private static readonly Dictionary<Type, string> DotnetToDbTypes = new() |
| | 3 | 23 | | { |
| | 3 | 24 | | { typeof(DateTime), "TIMESTAMP" }, |
| | 3 | 25 | | { typeof(short), "SMALLINT"}, |
| | 3 | 26 | | { typeof(int), "INT" }, |
| | 3 | 27 | | { typeof(long), "BIGINT" }, |
| | 3 | 28 | | { typeof(TimeSpan), "BIGINT" }, |
| | 3 | 29 | | { typeof(uint), "INT UNSIGNED" }, |
| | 3 | 30 | | { typeof(ulong), "BIGINT UNSIGNED" }, |
| | 3 | 31 | | { typeof(float), "FLOAT" }, |
| | 3 | 32 | | { typeof(double), "DOUBLE" }, |
| | 3 | 33 | | { typeof(bool), "BOOL" }, |
| | 3 | 34 | | { typeof(string), "NCHAR(132)" }, |
| | 3 | 35 | | { typeof(sbyte), "TINYINT" }, |
| | 3 | 36 | | { typeof(byte), "TINYINT UNSIGNED" }, |
| | 3 | 37 | | { typeof(ushort), "SMALLINT UNSIGNED" } |
| | 3 | 38 | | /* |
| | 3 | 39 | | BINARY byte[] |
| | 3 | 40 | | JSON byte[] |
| | 3 | 41 | | VARBINARY byte[] |
| | 3 | 42 | | GEOMETRY byte[] |
| | 3 | 43 | | */ |
| | 3 | 44 | | }; |
| | | 45 | | |
| | | 46 | | /// <summary> |
| | | 47 | | /// The connection string to the TDengine backend. |
| | | 48 | | /// </summary> |
| | | 49 | | private readonly string connectionString; |
| | | 50 | | |
| | | 51 | | /// <summary> |
| | | 52 | | /// The TDengine clinet. |
| | | 53 | | /// </summary> |
| | | 54 | | private ITDengineClient? tdEngineClient; |
| | | 55 | | |
| | | 56 | | /// <summary> |
| | | 57 | | /// Builds a TDengine connection from connection stirng. |
| | | 58 | | /// </summary> |
| | | 59 | | /// <param name="tdEngineConnectionString">The TDengine connection string.</param> |
| | 14 | 60 | | public TDengineHistoryStorage(string tdEngineConnectionString) |
| | 14 | 61 | | { |
| | 14 | 62 | | connectionString = tdEngineConnectionString; |
| | 14 | 63 | | Connect(); |
| | 14 | 64 | | } |
| | | 65 | | |
| | | 66 | | /// <summary> |
| | | 67 | | /// Connects to TDengine. |
| | | 68 | | /// </summary> |
| | | 69 | | /// <exception cref="TdEngineException"></exception> |
| | | 70 | | private void Connect() |
| | 14 | 71 | | { |
| | 14 | 72 | | var builder = new ConnectionStringBuilder(connectionString); |
| | | 73 | | try |
| | 14 | 74 | | { |
| | | 75 | | // Open connection with using block, it will close the connection automatically |
| | 14 | 76 | | tdEngineClient = DbDriver.Open(builder); |
| | 14 | 77 | | } |
| | 0 | 78 | | catch (Exception e) |
| | 0 | 79 | | { |
| | 0 | 80 | | throw new TdEngineException($"connect to {connectionString}", e); |
| | | 81 | | } |
| | 14 | 82 | | } |
| | | 83 | | |
| | | 84 | | /// <summary> |
| | | 85 | | /// Upsert a history repository in storage engine. |
| | | 86 | | /// </summary> |
| | | 87 | | /// <param name="projectName">Project name to zhich the repository belongs.</param> |
| | | 88 | | /// <param name="repository">History repository</param> |
| | | 89 | | /// <returns>ID of repository in storqge engine.</returns> |
| | | 90 | | public string UpsertRepository(string projectName, HistoryRepository repository) |
| | 14 | 91 | | { |
| | 14 | 92 | | string repositoryName = $"{projectName}{repository.Name}".ToLowerInvariant(); |
| | | 93 | | try |
| | 14 | 94 | | { |
| | 14 | 95 | | tdEngineClient?.Exec($"CREATE DATABASE IF NOT EXISTS {repositoryName} PRECISION 'ns' KEEP 3650 DURATION |
| | 14 | 96 | | } |
| | 0 | 97 | | catch (Exception e) |
| | 0 | 98 | | { |
| | 0 | 99 | | throw new TdEngineException($"upsert repository {repositoryName} on {connectionString}", e); |
| | | 100 | | } |
| | 14 | 101 | | return repositoryName; |
| | 14 | 102 | | } |
| | | 103 | | |
| | | 104 | | /// <summary> |
| | | 105 | | /// Deletes a history repository from storage engine. |
| | | 106 | | /// </summary> |
| | | 107 | | /// <param name="projectName">Project name to zhich the repository belongs.</param> |
| | | 108 | | /// <param name="repositoryName">History repository name.</param> |
| | | 109 | | public void DeleteRepository(string projectName, string repositoryName) |
| | 0 | 110 | | { |
| | 0 | 111 | | string repositoryActualName = $"{projectName}{repositoryName}".ToLowerInvariant(); |
| | | 112 | | try |
| | 0 | 113 | | { |
| | 0 | 114 | | tdEngineClient?.Exec($"DROP DATABASE {repositoryActualName};"); |
| | 0 | 115 | | } |
| | 0 | 116 | | catch (Exception e) |
| | 0 | 117 | | { |
| | 0 | 118 | | throw new TdEngineException($"delete repository {repositoryActualName} on {connectionString}", e); |
| | | 119 | | } |
| | 0 | 120 | | } |
| | | 121 | | |
| | | 122 | | /// <summary> |
| | | 123 | | /// Upsert a class time series in storage engine |
| | | 124 | | /// </summary> |
| | | 125 | | /// <typeparam name="T"></typeparam> |
| | | 126 | | /// <param name="repositoryStorageId">History repository in which the time series should be created.</param> |
| | | 127 | | /// <param name="projectName">Project name to zhich the time series belongs.</param> |
| | | 128 | | /// <param name="className">Class name</param> |
| | | 129 | | /// <param name="historizationProcessing">History processing for which the time series should be created.</param |
| | | 130 | | /// <returns>Time series ID in storage engine.</returns> |
| | | 131 | | public string UpsertClassTimeSerie<T>(string repositoryStorageId, string projectName, string className, Historiz |
| | 28 | 132 | | { |
| | 28 | 133 | | string tableName = $"{projectName}{className}{historizationProcessing.Name}".ToLowerInvariant(); |
| | | 134 | | try |
| | 28 | 135 | | { |
| | 28 | 136 | | tdEngineClient?.Exec($"USE {repositoryStorageId};"); |
| | 28 | 137 | | string fieldNames = "TS TIMESTAMP, QUALITY NCHAR(10),"; |
| | 28 | 138 | | fieldNames += |
| | 28 | 139 | | historizationProcessing.FieldsToHistorize |
| | 173 | 140 | | .Select(field => $"_{field.Name} {GetFieldDbType(field)}") |
| | 173 | 141 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 27 | 142 | | string command = $"CREATE STABLE IF NOT EXISTS {tableName} ({fieldNames}) TAGS (instance varchar(64));"; |
| | 27 | 143 | | tdEngineClient?.Exec(command); |
| | 27 | 144 | | } |
| | 1 | 145 | | catch (SuperVException) |
| | 1 | 146 | | { |
| | 1 | 147 | | throw; |
| | | 148 | | } |
| | 0 | 149 | | catch (Exception e) |
| | 0 | 150 | | { |
| | 0 | 151 | | throw new TdEngineException($"upsert class time series {tableName} on {connectionString}", e); |
| | | 152 | | } |
| | 27 | 153 | | return tableName; |
| | 27 | 154 | | } |
| | | 155 | | |
| | | 156 | | /// <summary> |
| | | 157 | | /// Historize instance values in storage engine |
| | | 158 | | /// </summary> |
| | | 159 | | /// <param name="repositoryStorageId">The history repository ID.</param> |
| | | 160 | | /// <param name="classTimeSerieId">The time series ID.</param> |
| | | 161 | | /// <param name="instanceName">The instance name.</param> |
| | | 162 | | /// <param name="timestamp">the timestamp of the values</param> |
| | | 163 | | /// <param name="fieldsToHistorize">List of fields to be historized.</param> |
| | | 164 | | public void HistorizeValues(string repositoryStorageId, string classTimeSerieId, string instanceName, DateTime t |
| | 19 | 165 | | { |
| | 19 | 166 | | string instanceTableName = instanceName.ToLowerInvariant(); |
| | 19 | 167 | | tdEngineClient!.Exec($"USE {repositoryStorageId};"); |
| | 19 | 168 | | using var stmt = tdEngineClient!.StmtInit(); |
| | | 169 | | try |
| | 19 | 170 | | { |
| | 108 | 171 | | string fieldToHistorizeNames = fieldsToHistorize.Select(field => $"_{field.FieldDefinition!.Name}") |
| | 89 | 172 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 19 | 173 | | string fieldsPlaceholders = Enumerable.Repeat("?", fieldsToHistorize.Count + 2) |
| | 127 | 174 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 19 | 175 | | string sql = $@"INSERT INTO ? USING {classTimeSerieId} (instance) TAGS(?) |
| | 19 | 176 | | (TS, QUALITY, {fieldToHistorizeNames}) VALUES ({fieldsPlaceholders}); |
| | 19 | 177 | | "; |
| | 19 | 178 | | List<object> rowValues = new(fieldsToHistorize.Count + 2) |
| | 19 | 179 | | { |
| | 19 | 180 | | timestamp.ToLocalTime(), |
| | 19 | 181 | | (quality ?? QualityLevel.Good).ToString() |
| | 19 | 182 | | }; |
| | 19 | 183 | | fieldsToHistorize.ForEach(field => |
| | 108 | 184 | | rowValues.Add(ConvertFieldValueToDb(field))); |
| | 19 | 185 | | stmt.Prepare(sql); |
| | | 186 | | // set table name |
| | 19 | 187 | | stmt.SetTableName($"{instanceTableName}"); |
| | | 188 | | // set tags |
| | 19 | 189 | | stmt.SetTags([instanceTableName]); |
| | | 190 | | // bind row values |
| | 19 | 191 | | stmt.BindRow([.. rowValues]); |
| | | 192 | | // add batch |
| | 19 | 193 | | stmt.AddBatch(); |
| | | 194 | | // execute |
| | 19 | 195 | | stmt.Exec(); |
| | 19 | 196 | | } |
| | 0 | 197 | | catch (Exception e) |
| | 0 | 198 | | { |
| | 0 | 199 | | throw new TdEngineException($"insert to table {classTimeSerieId} on {connectionString}", e); |
| | | 200 | | } |
| | 38 | 201 | | } |
| | | 202 | | |
| | | 203 | | private static object ConvertFieldValueToDb(IField field) |
| | 89 | 204 | | { |
| | 89 | 205 | | return field switch |
| | 89 | 206 | | { |
| | 6 | 207 | | Field<bool> typedField => typedField.Value, |
| | 0 | 208 | | Field<DateTime> typedField => typedField.Value.ToLocalTime(), |
| | 8 | 209 | | Field<double> typedField => typedField.Value, |
| | 6 | 210 | | Field<float> typedField => typedField.Value, |
| | 27 | 211 | | Field<int> typedField => typedField.Value, |
| | 6 | 212 | | Field<long> typedField => typedField.Value, |
| | 6 | 213 | | Field<short> typedField => typedField.Value, |
| | 6 | 214 | | Field<string> typedField => typedField.Value, |
| | 6 | 215 | | Field<TimeSpan> typedField => typedField.Value.Ticks, |
| | 6 | 216 | | Field<uint> typedField => typedField.Value, |
| | 6 | 217 | | Field<ulong> typedField => typedField.Value, |
| | 6 | 218 | | Field<ushort> typedField => typedField.Value, |
| | 0 | 219 | | _ => throw new UnhandledMappingException(nameof(TDengineHistoryStorage), field.Type.ToString()) |
| | 89 | 220 | | }; |
| | 89 | 221 | | } |
| | | 222 | | |
| | | 223 | | /// <summary> |
| | | 224 | | /// Gets instance values historized between 2 timestamps. |
| | | 225 | | /// </summary> |
| | | 226 | | /// <param name="repositoryStorageId">The history repository ID.</param> |
| | | 227 | | /// <param name="classTimeSerieId">The time series ID.</param> |
| | | 228 | | /// <param name="instanceName">The instance name.</param> |
| | | 229 | | /// <param name="timeRange">Time range for querying.</param> |
| | | 230 | | /// <param name="fields">List of fields to be retrieved. One of them should have the <see cref="HistorizationPro |
| | | 231 | | /// <returns>List of history rows.</returns> |
| | | 232 | | public List<HistoryRow> GetHistoryValues(string repositoryStorageId, string classTimeSerieId, string instanceNam |
| | 8 | 233 | | { |
| | 8 | 234 | | string instanceTableName = instanceName.ToLowerInvariant(); |
| | 8 | 235 | | List<HistoryRow> rows = []; |
| | | 236 | | try |
| | 8 | 237 | | { |
| | 8 | 238 | | tdEngineClient!.Exec($"USE {repositoryStorageId};"); |
| | 60 | 239 | | string fieldNames = fields.Select(field => $"_{field.Name}") |
| | 52 | 240 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 8 | 241 | | string sqlQuery = |
| | 8 | 242 | | $@" |
| | 8 | 243 | | SELECT {fieldNames}, TS, QUALITY FROM {instanceTableName} |
| | 8 | 244 | | WHERE TS between ""{FormatToSqlDate(timeRange.From)}"" and ""{FormatToSqlDate(timeRange.To)}""; |
| | 8 | 245 | | "; |
| | 8 | 246 | | using IRows row = tdEngineClient!.Query(sqlQuery); |
| | 20 | 247 | | while (row.Read()) |
| | 12 | 248 | | { |
| | 12 | 249 | | rows.Add(new HistoryRow(row, fields, true)); |
| | 12 | 250 | | } |
| | 8 | 251 | | } |
| | 0 | 252 | | catch (Exception e) |
| | 0 | 253 | | { |
| | 0 | 254 | | throw new TdEngineException($"select from table {instanceTableName} on {connectionString}", e); |
| | | 255 | | } |
| | 8 | 256 | | return rows; |
| | 8 | 257 | | } |
| | | 258 | | |
| | | 259 | | /// <summary> |
| | | 260 | | /// Gets instance statistic values historized between 2 timestamps. |
| | | 261 | | /// </summary> |
| | | 262 | | /// <param name="repositoryStorageId">The history repository ID.</param> |
| | | 263 | | /// <param name="classTimeSerieId">The time series ID.</param> |
| | | 264 | | /// <param name="instanceName">The instance name.</param> |
| | | 265 | | /// <param name="timeRange">Query containing time range parameters.</param> |
| | | 266 | | /// <param name="fields">List of fields to be retrieved. One of them should have the <see cref="HistorizationPro |
| | | 267 | | /// <returns>List of history rows.</returns> |
| | | 268 | | public List<HistoryStatisticRow> GetHistoryStatistics(string repositoryStorageId, string classTimeSerieId, strin |
| | | 269 | | HistoryStatisticTimeRange timeRange, List<HistoryStatisticField> fields) |
| | 5 | 270 | | { |
| | 5 | 271 | | string instanceTableName = instanceName.ToLowerInvariant(); |
| | 5 | 272 | | List<HistoryStatisticRow> rows = []; |
| | | 273 | | try |
| | 5 | 274 | | { |
| | 5 | 275 | | tdEngineClient!.Exec($"USE {repositoryStorageId};"); |
| | 18 | 276 | | string fieldNames = fields.Select(field => $"{field.StatisticFunction}(_{field.Field.Name})") |
| | 13 | 277 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 5 | 278 | | string fillClause = ""; |
| | 5 | 279 | | if (timeRange.FillMode is not null) |
| | 5 | 280 | | { |
| | 5 | 281 | | fillClause = $"FILL({timeRange.FillMode})"; |
| | 5 | 282 | | } |
| | 5 | 283 | | string sqlQuery = |
| | 5 | 284 | | $@" |
| | 5 | 285 | | SELECT {fieldNames}, _WSTART, _WEND, _WDURATION, _WSTART, MAX(QUALITY) FROM {instanceTableName} |
| | 5 | 286 | | WHERE TS between ""{FormatToSqlDate(timeRange.From)}"" and ""{FormatToSqlDate(timeRange.To)}"" |
| | 5 | 287 | | INTERVAL({FormatInterval(timeRange.Interval)}) SLIDING({FormatInterval(timeRange.Interval)}) {fillClause}; |
| | 5 | 288 | | "; |
| | 5 | 289 | | using IRows row = tdEngineClient!.Query(sqlQuery); |
| | 11 | 290 | | while (row.Read()) |
| | 6 | 291 | | { |
| | 6 | 292 | | rows.Add(new HistoryStatisticRow(row, fields)); |
| | 6 | 293 | | } |
| | 5 | 294 | | } |
| | 0 | 295 | | catch (Exception e) |
| | 0 | 296 | | { |
| | 0 | 297 | | throw new TdEngineException($"select from table {instanceTableName} on {connectionString}", e); |
| | | 298 | | } |
| | 5 | 299 | | return rows; |
| | 5 | 300 | | } |
| | | 301 | | |
| | | 302 | | /// <summary> |
| | | 303 | | /// Formats a DateTime to SQL format used by TDengine. |
| | | 304 | | /// </summary> |
| | | 305 | | /// <param name="dateTime">The date time to be formatted.</param> |
| | | 306 | | /// <returns>SQL string for date time.</returns> |
| | | 307 | | private static string FormatToSqlDate(DateTime dateTime) |
| | 26 | 308 | | { |
| | 26 | 309 | | return $"{dateTime.ToUniversalTime():yyyy-MM-dd HH:mm:ss.fffK}"; |
| | 26 | 310 | | } |
| | | 311 | | |
| | | 312 | | private static string FormatInterval(TimeSpan interval) |
| | 10 | 313 | | { |
| | 10 | 314 | | TimeSpan timespan = interval; |
| | 10 | 315 | | string intervalText = ""; |
| | 10 | 316 | | intervalText += GetIntervalPeriod(timespan.Days / 365, 'y'); |
| | 10 | 317 | | intervalText += GetIntervalPeriod((timespan.Days % 365) / 30, 'm'); |
| | 10 | 318 | | intervalText += GetIntervalPeriod(((timespan.Days % 365) % 30) / 7, 'w'); |
| | 10 | 319 | | intervalText += GetIntervalPeriod(((timespan.Days % 365) % 30) % 7, 'd'); |
| | 10 | 320 | | intervalText += GetIntervalPeriod(timespan.Hours, 'h'); |
| | 10 | 321 | | intervalText += GetIntervalPeriod(timespan.Minutes, 'm'); |
| | 10 | 322 | | intervalText += GetIntervalPeriod(timespan.Seconds, 's'); |
| | 10 | 323 | | intervalText += GetIntervalPeriod(timespan.Milliseconds, 'a'); |
| | 10 | 324 | | intervalText += GetIntervalPeriod(timespan.Nanoseconds, 'b'); |
| | | 325 | | // Remove last comma and space |
| | 10 | 326 | | return intervalText.TrimEnd()[..^1]; |
| | 10 | 327 | | } |
| | | 328 | | |
| | | 329 | | private static string GetIntervalPeriod(int value, char periodLetter) |
| | 90 | 330 | | { |
| | 90 | 331 | | if (value > 0) |
| | 10 | 332 | | { |
| | 10 | 333 | | return $"{value}{periodLetter}, "; |
| | | 334 | | } |
| | | 335 | | |
| | 80 | 336 | | return ""; |
| | 90 | 337 | | } |
| | | 338 | | |
| | | 339 | | /// <summary> |
| | | 340 | | /// Disposes the instance. |
| | | 341 | | /// </summary> |
| | | 342 | | public void Dispose() |
| | 13 | 343 | | { |
| | 13 | 344 | | Dispose(true); |
| | 13 | 345 | | GC.SuppressFinalize(this); |
| | 13 | 346 | | } |
| | | 347 | | |
| | | 348 | | /// <summary> |
| | | 349 | | /// Disposes the instance. Dispose the TDengine connection. |
| | | 350 | | /// </summary> |
| | | 351 | | /// <param name="disposing"></param> |
| | | 352 | | protected virtual void Dispose(bool disposing) |
| | 13 | 353 | | { |
| | 13 | 354 | | tdEngineClient?.Dispose(); |
| | 13 | 355 | | } |
| | | 356 | | |
| | | 357 | | /// <summary> |
| | | 358 | | /// Gets the TDengine data type for a field definition. |
| | | 359 | | /// </summary> |
| | | 360 | | /// <param name="field">Field for which the TDengine data type should be retrieved.</param> |
| | | 361 | | /// <returns>TDengine data type.</returns> |
| | | 362 | | /// <exception cref="UnhandledHistoryFieldTypeException"></exception> |
| | | 363 | | private static string GetFieldDbType(IFieldDefinition field) |
| | 173 | 364 | | { |
| | 173 | 365 | | if (DotnetToDbTypes.TryGetValue(field.Type, out var dbType)) |
| | 172 | 366 | | { |
| | 172 | 367 | | return dbType; |
| | | 368 | | } |
| | 1 | 369 | | throw new UnhandledHistoryFieldTypeException(field.Name, field.Type); |
| | 172 | 370 | | } |
| | | 371 | | } |
| | | 372 | | } |