| | | 1 | | using pva.SuperV.Common; |
| | | 2 | | using pva.SuperV.Engine.Exceptions; |
| | | 3 | | using pva.SuperV.Engine.HistoryRetrieval; |
| | | 4 | | using pva.SuperV.Engine.Processing; |
| | | 5 | | using TDengine.Driver; |
| | | 6 | | using TDengine.Driver.Client; |
| | | 7 | | |
| | | 8 | | namespace pva.SuperV.Engine.HistoryStorage |
| | | 9 | | { |
| | | 10 | | /// <summary> |
| | | 11 | | /// TDengine histiory storage engine. |
| | | 12 | | /// </summary> |
| | | 13 | | public class TDengineHistoryStorage : IHistoryStorageEngine |
| | | 14 | | { |
| | | 15 | | /// <summary> |
| | | 16 | | /// TDengine history storage string. |
| | | 17 | | /// </summary> |
| | | 18 | | public const string Prefix = "TDengine"; |
| | | 19 | | |
| | | 20 | | /// <summary> |
| | | 21 | | /// Contains the equivalence between .Net and TDengine data types for the types being handled. |
| | | 22 | | /// </summary> |
| | 3 | 23 | | private static readonly Dictionary<Type, string> DotnetToDbTypes = new() |
| | 3 | 24 | | { |
| | 3 | 25 | | { typeof(DateTime), "TIMESTAMP" }, |
| | 3 | 26 | | { typeof(short), "SMALLINT"}, |
| | 3 | 27 | | { typeof(int), "INT" }, |
| | 3 | 28 | | { typeof(long), "BIGINT" }, |
| | 3 | 29 | | { typeof(TimeSpan), "BIGINT" }, |
| | 3 | 30 | | { typeof(uint), "INT UNSIGNED" }, |
| | 3 | 31 | | { typeof(ulong), "BIGINT UNSIGNED" }, |
| | 3 | 32 | | { typeof(float), "FLOAT" }, |
| | 3 | 33 | | { typeof(double), "DOUBLE" }, |
| | 3 | 34 | | { typeof(bool), "BOOL" }, |
| | 3 | 35 | | { typeof(string), "NCHAR(132)" }, |
| | 3 | 36 | | { typeof(sbyte), "TINYINT" }, |
| | 3 | 37 | | { typeof(byte), "TINYINT UNSIGNED" }, |
| | 3 | 38 | | { typeof(ushort), "SMALLINT UNSIGNED" } |
| | 3 | 39 | | /* |
| | 3 | 40 | | BINARY byte[] |
| | 3 | 41 | | JSON byte[] |
| | 3 | 42 | | VARBINARY byte[] |
| | 3 | 43 | | GEOMETRY byte[] |
| | 3 | 44 | | */ |
| | 3 | 45 | | }; |
| | | 46 | | |
| | | 47 | | /// <summary> |
| | | 48 | | /// The connection string to the TDengine backend. |
| | | 49 | | /// </summary> |
| | | 50 | | private readonly string connectionString; |
| | | 51 | | |
| | | 52 | | /// <summary> |
| | | 53 | | /// The TDengine clinet. |
| | | 54 | | /// </summary> |
| | | 55 | | private ITDengineClient? tdEngineClient; |
| | | 56 | | |
| | | 57 | | /// <summary> |
| | | 58 | | /// Builds a TDengine connection from connection stirng. |
| | | 59 | | /// </summary> |
| | | 60 | | /// <param name="tdEngineConnectionString">The TDengine connection string.</param> |
| | 12 | 61 | | public TDengineHistoryStorage(string tdEngineConnectionString) |
| | 12 | 62 | | { |
| | 12 | 63 | | connectionString = tdEngineConnectionString; |
| | 12 | 64 | | Connect(); |
| | 12 | 65 | | } |
| | | 66 | | |
| | | 67 | | /// <summary> |
| | | 68 | | /// Connects to TDengine. |
| | | 69 | | /// </summary> |
| | | 70 | | /// <exception cref="TdEngineException"></exception> |
| | | 71 | | private void Connect() |
| | 12 | 72 | | { |
| | 12 | 73 | | var builder = new ConnectionStringBuilder(connectionString); |
| | | 74 | | try |
| | 12 | 75 | | { |
| | | 76 | | // Open connection with using block, it will close the connection automatically |
| | 12 | 77 | | tdEngineClient = DbDriver.Open(builder); |
| | 12 | 78 | | } |
| | 0 | 79 | | catch (Exception e) |
| | 0 | 80 | | { |
| | 0 | 81 | | throw new TdEngineException($"connect to {connectionString}", e); |
| | | 82 | | } |
| | 12 | 83 | | } |
| | | 84 | | |
| | | 85 | | /// <summary> |
| | | 86 | | /// Upsert a history repository in storage engine. |
| | | 87 | | /// </summary> |
| | | 88 | | /// <param name="projectName">Project name to hwich the repository belongs.</param> |
| | | 89 | | /// <param name="repository">History repository</param> |
| | | 90 | | /// <returns>ID of repository in storage engine.</returns> |
| | | 91 | | /// <exception cref="TdEngineException">TDengine error</exception> |
| | | 92 | | public string UpsertRepository(string projectName, HistoryRepository repository) |
| | 12 | 93 | | { |
| | 12 | 94 | | string repositoryName = GetRepositoryName(projectName, repository.Name); |
| | | 95 | | try |
| | 12 | 96 | | { |
| | 12 | 97 | | tdEngineClient?.Exec($"CREATE DATABASE IF NOT EXISTS {repositoryName} PRECISION 'ns' KEEP 3650 DURATION |
| | 12 | 98 | | } |
| | 0 | 99 | | catch (Exception e) |
| | 0 | 100 | | { |
| | 0 | 101 | | throw new TdEngineException($"upsert repository {repositoryName} on {connectionString}", e); |
| | | 102 | | } |
| | 12 | 103 | | return repositoryName; |
| | 12 | 104 | | } |
| | | 105 | | |
| | | 106 | | /// <summary> |
| | | 107 | | /// Deletes a history repository from storage engine. |
| | | 108 | | /// </summary> |
| | | 109 | | /// <param name="projectName">Project name to zhich the repository belongs.</param> |
| | | 110 | | /// <param name="repositoryName">History repository name.</param> |
| | | 111 | | /// <exception cref="TdEngineException">TDengine error</exception> |
| | | 112 | | public void DeleteRepository(string projectName, string repositoryName) |
| | 0 | 113 | | { |
| | 0 | 114 | | string repositoryActualName = GetRepositoryName(projectName, repositoryName); |
| | | 115 | | try |
| | 0 | 116 | | { |
| | 0 | 117 | | tdEngineClient?.Exec($"DROP DATABASE {repositoryActualName};"); |
| | 0 | 118 | | } |
| | 0 | 119 | | catch (Exception e) |
| | 0 | 120 | | { |
| | 0 | 121 | | throw new TdEngineException($"delete repository {repositoryActualName} on {connectionString}", e); |
| | | 122 | | } |
| | 0 | 123 | | } |
| | | 124 | | |
| | | 125 | | /// <summary> |
| | | 126 | | /// Upsert a class time series in storage engine |
| | | 127 | | /// </summary> |
| | | 128 | | /// <param name="repositoryStorageId">History repository in which the time series should be created.</param> |
| | | 129 | | /// <param name="projectName">Project name to zhich the time series belongs.</param> |
| | | 130 | | /// <param name="className">Class name</param> |
| | | 131 | | /// <param name="historizationProcessing">History processing for which the time series should be created.</param |
| | | 132 | | /// <returns>Time series ID in storage engine.</returns> |
| | | 133 | | /// <exception cref="TdEngineException">TDengine error</exception> |
| | | 134 | | public string UpsertClassTimeSerie(string repositoryStorageId, string projectName, string className, IHistorizat |
| | 151 | 135 | | { |
| | 151 | 136 | | string classTimeSerieId = GetClassTimeSerieId(projectName, className, historizationProcessing); |
| | | 137 | | try |
| | 151 | 138 | | { |
| | 151 | 139 | | tdEngineClient?.Exec($"USE {repositoryStorageId};"); |
| | 151 | 140 | | string fieldNames = "TS TIMESTAMP, QUALITY NCHAR(10),"; |
| | 151 | 141 | | fieldNames += |
| | 151 | 142 | | historizationProcessing.FieldsToHistorize |
| | 164 | 143 | | .Select(field => $"_{field.Name} {GetFieldDbType(field)}") |
| | 164 | 144 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 150 | 145 | | string command = $"CREATE STABLE IF NOT EXISTS {classTimeSerieId} ({fieldNames}) TAGS (instance varchar( |
| | 150 | 146 | | tdEngineClient?.Exec(command); |
| | 150 | 147 | | } |
| | 1 | 148 | | catch (SuperVException) |
| | 1 | 149 | | { |
| | 1 | 150 | | throw; |
| | | 151 | | } |
| | 0 | 152 | | catch (Exception e) |
| | 0 | 153 | | { |
| | 0 | 154 | | throw new TdEngineException($"upsert class time series {classTimeSerieId} on {connectionString}", e); |
| | | 155 | | } |
| | 150 | 156 | | return classTimeSerieId; |
| | 150 | 157 | | } |
| | | 158 | | |
| | | 159 | | /// <summary> |
| | | 160 | | /// Historize instance values in storage engine |
| | | 161 | | /// </summary> |
| | | 162 | | /// <param name="repositoryStorageId">The history repository ID.</param> |
| | | 163 | | /// <param name="historizationProcessingName">The historization processing name.</param> |
| | | 164 | | /// <param name="classTimeSerieId">The time series ID.</param> |
| | | 165 | | /// <param name="instanceName">The instance name.</param> |
| | | 166 | | /// <param name="timestamp">the timestamp of the values</param> |
| | | 167 | | /// <param name="quality">The quality level of the values.</param> |
| | | 168 | | /// <param name="fieldsToHistorize">List of fields to be historized.</param> |
| | | 169 | | /// <exception cref="TdEngineException">TDengine error</exception> |
| | | 170 | | public void HistorizeValues(string repositoryStorageId, string historizationProcessingName, string classTimeSeri |
| | 65 | 171 | | { |
| | 65 | 172 | | string instanceTableName = $"{instanceName}_{historizationProcessingName}".ToLowerInvariant(); |
| | 65 | 173 | | tdEngineClient!.Exec($"USE {repositoryStorageId};"); |
| | 65 | 174 | | using var stmt = tdEngineClient!.StmtInit(); |
| | | 175 | | try |
| | 65 | 176 | | { |
| | 186 | 177 | | string fieldToHistorizeNames = fieldsToHistorize.Select(field => $"_{field.FieldDefinition!.Name}") |
| | 121 | 178 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 65 | 179 | | string fieldValuesPlaceholders = Enumerable.Repeat("?", fieldsToHistorize.Count + 2) |
| | 251 | 180 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 65 | 181 | | string sql = $@"INSERT INTO ? USING {classTimeSerieId} (instance) TAGS(?) |
| | 65 | 182 | | (TS, QUALITY, {fieldToHistorizeNames}) VALUES ({fieldValuesPlaceholders}); |
| | 65 | 183 | | "; |
| | 65 | 184 | | List<object> rowValues = new(fieldsToHistorize.Count + 2) |
| | 65 | 185 | | { |
| | 65 | 186 | | timestamp.ToLocalTime(), |
| | 65 | 187 | | (quality ?? QualityLevel.Good).ToString() |
| | 65 | 188 | | }; |
| | 65 | 189 | | fieldsToHistorize.ForEach(field => |
| | 186 | 190 | | rowValues.Add(ConvertFieldValueToDb(field))); |
| | 65 | 191 | | stmt.Prepare(sql); |
| | | 192 | | // set table name |
| | 65 | 193 | | stmt.SetTableName(instanceTableName); |
| | | 194 | | // set tags |
| | 65 | 195 | | stmt.SetTags([instanceTableName]); |
| | | 196 | | // bind row values |
| | 65 | 197 | | stmt.BindRow([.. rowValues]); |
| | | 198 | | // add batch |
| | 65 | 199 | | stmt.AddBatch(); |
| | | 200 | | // execute |
| | 65 | 201 | | stmt.Exec(); |
| | 65 | 202 | | } |
| | 0 | 203 | | catch (Exception e) |
| | 0 | 204 | | { |
| | 0 | 205 | | throw new TdEngineException($"insert to table {classTimeSerieId} on {connectionString}", e); |
| | | 206 | | } |
| | 130 | 207 | | } |
| | | 208 | | |
| | | 209 | | /// <summary> |
| | | 210 | | /// Gets instance values historized between 2 timestamps. |
| | | 211 | | /// </summary> |
| | | 212 | | /// <param name="instanceName">The instance name.</param> |
| | | 213 | | /// <param name="timeRange">Time range for querying.</param> |
| | | 214 | | /// <param name="instanceTimeSerieParameters">Parameters defining the time serie.</param> |
| | | 215 | | /// <param name="fields">List of fields to be retrieved. One of them should have the <see cref="HistorizationPro |
| | | 216 | | /// <returns>List of history rows.</returns> |
| | | 217 | | /// <exception cref="TdEngineException">TDengine error</exception> |
| | | 218 | | public List<HistoryRow> GetHistoryValues(string instanceName, HistoryTimeRange timeRange, InstanceTimeSerieParam |
| | 30 | 219 | | { |
| | 30 | 220 | | string instanceTableName = GetInstanceTableName(instanceName, instanceTimeSerieParameters); |
| | 30 | 221 | | List<HistoryRow> rows = []; |
| | | 222 | | try |
| | 30 | 223 | | { |
| | 30 | 224 | | tdEngineClient!.Exec($"USE {instanceTimeSerieParameters.HistorizationProcessing!.HistoryRepository!.Hist |
| | 82 | 225 | | string fieldNames = fields.Select(field => $"_{field.Name}") |
| | 52 | 226 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 30 | 227 | | string sqlQuery = |
| | 30 | 228 | | $""" |
| | 30 | 229 | | SELECT {fieldNames}, TS, QUALITY FROM {instanceTableName} |
| | 30 | 230 | | WHERE TS between "{FormatToSqlDate(timeRange.From)}" and "{FormatToSqlDate(timeRange.To)}"; |
| | 30 | 231 | | |
| | 30 | 232 | | """; |
| | 30 | 233 | | using IRows row = tdEngineClient!.Query(sqlQuery); |
| | 64 | 234 | | while (row.Read()) |
| | 34 | 235 | | { |
| | 34 | 236 | | rows.Add(new HistoryRow(row, fields, true)); |
| | 34 | 237 | | } |
| | 30 | 238 | | } |
| | 0 | 239 | | catch (Exception e) |
| | 0 | 240 | | { |
| | 0 | 241 | | throw new TdEngineException($"select from table {instanceTableName} on {connectionString}", e); |
| | | 242 | | } |
| | 30 | 243 | | return rows; |
| | 30 | 244 | | } |
| | | 245 | | |
| | | 246 | | /// <summary> |
| | | 247 | | /// Gets instance statistic values historized between 2 timestamps. |
| | | 248 | | /// </summary> |
| | | 249 | | /// <param name="instanceName">The instance name.</param> |
| | | 250 | | /// <param name="timeRange">Query containing time range parameters.</param> |
| | | 251 | | /// <param name="instanceTimeSerieParameters">Parameters defining the time serie.</param> |
| | | 252 | | /// <param name="fields">List of fields to be retrieved. One of them should have the <see cref="HistorizationPro |
| | | 253 | | /// <returns>List of history rows.</returns> |
| | | 254 | | /// <exception cref="TdEngineException">TDengine error</exception> |
| | | 255 | | public List<HistoryStatisticRow> GetHistoryStatistics(string instanceName, HistoryStatisticTimeRange timeRange, |
| | | 256 | | InstanceTimeSerieParameters instanceTimeSerieParameters, List<HistoryStatisticField> fields) |
| | 25 | 257 | | { |
| | 25 | 258 | | string instanceTableName = GetInstanceTableName(instanceName, instanceTimeSerieParameters); |
| | 25 | 259 | | List<HistoryStatisticRow> rows = []; |
| | | 260 | | try |
| | 25 | 261 | | { |
| | 25 | 262 | | tdEngineClient!.Exec($"USE {instanceTimeSerieParameters.HistorizationProcessing!.HistoryRepository!.Hist |
| | 58 | 263 | | string fieldNames = fields.Select(field => field.StatisticFunction == HistoryStatFunction.NONE ? |
| | 58 | 264 | | $"_{field.Field.Name}" : $"{field.StatisticFunction}(_{field.Field.Name})") |
| | 33 | 265 | | .Aggregate((a, b) => $"{a},{b}"); |
| | 25 | 266 | | string fillClause = ""; |
| | 25 | 267 | | if (timeRange.FillMode is not null) |
| | 25 | 268 | | { |
| | 25 | 269 | | fillClause = $"FILL({timeRange.FillMode})"; |
| | 25 | 270 | | } |
| | 25 | 271 | | string sqlQuery = |
| | 25 | 272 | | $""" |
| | 25 | 273 | | SELECT {fieldNames}, _WSTART, _WEND, _WDURATION, _WSTART, MAX(QUALITY) FROM {instanceTableName} |
| | 25 | 274 | | WHERE TS between "{FormatToSqlDate(timeRange.From)}" and "{FormatToSqlDate(timeRange.To)}" |
| | 25 | 275 | | INTERVAL({FormatInterval(timeRange.Interval)}) SLIDING({FormatInterval(timeRange.Interval)}) {fillClause}; |
| | 25 | 276 | | |
| | 25 | 277 | | """; |
| | 25 | 278 | | using IRows row = tdEngineClient!.Query(sqlQuery); |
| | 51 | 279 | | while (row.Read()) |
| | 26 | 280 | | { |
| | 26 | 281 | | rows.Add(new HistoryStatisticRow(row, fields, false)); |
| | 26 | 282 | | } |
| | 25 | 283 | | } |
| | 0 | 284 | | catch (Exception e) |
| | 0 | 285 | | { |
| | 0 | 286 | | throw new TdEngineException($"select from table {instanceTableName} on {connectionString}", e); |
| | | 287 | | } |
| | 25 | 288 | | return rows; |
| | 25 | 289 | | } |
| | | 290 | | |
| | | 291 | | /// <summary> |
| | | 292 | | /// Disposes the instance. |
| | | 293 | | /// </summary> |
| | | 294 | | public void Dispose() |
| | 10 | 295 | | { |
| | 10 | 296 | | Dispose(true); |
| | 10 | 297 | | GC.SuppressFinalize(this); |
| | 10 | 298 | | } |
| | | 299 | | |
| | | 300 | | /// <summary> |
| | | 301 | | /// Disposes the instance. Dispose the TDengine connection. |
| | | 302 | | /// </summary> |
| | | 303 | | /// <param name="disposing">Indicates if called from Dispose()</param> |
| | | 304 | | protected virtual void Dispose(bool disposing) |
| | 10 | 305 | | { |
| | 10 | 306 | | tdEngineClient?.Dispose(); |
| | 10 | 307 | | } |
| | | 308 | | |
| | | 309 | | /// <summary> |
| | | 310 | | /// Gets the TDengine data type for a field definition. |
| | | 311 | | /// </summary> |
| | | 312 | | /// <param name="field">Field for which the TDengine data type should be retrieved.</param> |
| | | 313 | | /// <returns>TDengine data type.</returns> |
| | | 314 | | /// <exception cref="UnhandledHistoryFieldTypeException"></exception> |
| | | 315 | | private static string GetFieldDbType(IFieldDefinition field) |
| | 164 | 316 | | { |
| | 164 | 317 | | if (DotnetToDbTypes.TryGetValue(field.Type, out var dbType)) |
| | 163 | 318 | | { |
| | 163 | 319 | | return dbType; |
| | | 320 | | } |
| | 1 | 321 | | throw new UnhandledHistoryFieldTypeException(field.Name, field.Type); |
| | 163 | 322 | | } |
| | | 323 | | |
| | | 324 | | private static string GetRepositoryName(string projectName, string repositoryName) |
| | 12 | 325 | | => $"{projectName}_{repositoryName}".ToLowerInvariant(); |
| | | 326 | | |
| | | 327 | | private static string GetClassTimeSerieId(string projectName, string className, IHistorizationProcessing histori |
| | 151 | 328 | | => $"{projectName}_{className}_{historizationProcessing.Name}".ToLowerInvariant(); |
| | | 329 | | |
| | | 330 | | private static string GetInstanceTableName(string instanceName, InstanceTimeSerieParameters instanceTimeSeriePar |
| | 55 | 331 | | => $"{instanceName}_{instanceTimeSerieParameters!.HistorizationProcessing!.Name}".ToLowerInvariant(); |
| | | 332 | | |
| | | 333 | | /// <summary> |
| | | 334 | | /// Formats a DateTime to SQL format used by TDengine. |
| | | 335 | | /// </summary> |
| | | 336 | | /// <param name="dateTime">The date time to be formatted.</param> |
| | | 337 | | /// <returns>SQL string for date time.</returns> |
| | | 338 | | private static string FormatToSqlDate(DateTime dateTime) |
| | 110 | 339 | | { |
| | 110 | 340 | | return $"{dateTime.ToUniversalTime():yyyy-MM-dd HH:mm:ss.fffK}"; |
| | 110 | 341 | | } |
| | | 342 | | |
| | | 343 | | private static string FormatInterval(TimeSpan interval) |
| | 50 | 344 | | { |
| | 50 | 345 | | TimeSpan timespan = interval; |
| | 50 | 346 | | string intervalText = ""; |
| | 50 | 347 | | intervalText += GetIntervalPeriod(timespan.Days / 365, 'y'); |
| | 50 | 348 | | intervalText += GetIntervalPeriod((timespan.Days % 365) / 30, 'm'); |
| | 50 | 349 | | intervalText += GetIntervalPeriod((timespan.Days % 365 % 30) / 7, 'w'); |
| | 50 | 350 | | intervalText += GetIntervalPeriod(timespan.Days % 365 % 30 % 7, 'd'); |
| | 50 | 351 | | intervalText += GetIntervalPeriod(timespan.Hours, 'h'); |
| | 50 | 352 | | intervalText += GetIntervalPeriod(timespan.Minutes, 'm'); |
| | 50 | 353 | | intervalText += GetIntervalPeriod(timespan.Seconds, 's'); |
| | 50 | 354 | | intervalText += GetIntervalPeriod(timespan.Milliseconds, 'a'); |
| | 50 | 355 | | intervalText += GetIntervalPeriod(timespan.Nanoseconds, 'b'); |
| | | 356 | | // Remove last comma and space |
| | 50 | 357 | | return intervalText.TrimEnd()[..^1]; |
| | 50 | 358 | | } |
| | | 359 | | |
| | | 360 | | private static string GetIntervalPeriod(int value, char periodLetter) |
| | 450 | 361 | | => value > 0 ? $"{value}{periodLetter}, " : ""; |
| | | 362 | | |
| | | 363 | | private static object ConvertFieldValueToDb(IField field) |
| | 121 | 364 | | => field switch |
| | 121 | 365 | | { |
| | 8 | 366 | | Field<bool> typedField => typedField.Value, |
| | 4 | 367 | | Field<DateTime> typedField => typedField.Value.ToLocalTime(), |
| | 14 | 368 | | Field<double> typedField => typedField.Value, |
| | 8 | 369 | | Field<float> typedField => typedField.Value, |
| | 33 | 370 | | Field<int> typedField => typedField.Value, |
| | 8 | 371 | | Field<long> typedField => typedField.Value, |
| | 8 | 372 | | Field<short> typedField => typedField.Value, |
| | 8 | 373 | | Field<string> typedField => typedField.Value, |
| | 6 | 374 | | Field<TimeSpan> typedField => typedField.Value.Ticks, |
| | 8 | 375 | | Field<uint> typedField => typedField.Value, |
| | 8 | 376 | | Field<ulong> typedField => typedField.Value, |
| | 8 | 377 | | Field<ushort> typedField => typedField.Value, |
| | 0 | 378 | | _ => throw new UnhandledMappingException(nameof(TDengineHistoryStorage), field.Type.ToString()) |
| | 121 | 379 | | }; |
| | | 380 | | |
| | | 381 | | } |
| | | 382 | | } |